From 644b54e5235280f27037372b75be2675d0b3ad7c Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Tue, 10 Oct 2023 11:50:10 -0700 Subject: [PATCH] RING-44425 - input structures, required fields --- scripts/S3_FSCK/s3_fsck_p0.py | 6 ++++-- scripts/S3_FSCK/s3_fsck_p1.py | 10 +++++++--- scripts/S3_FSCK/s3_fsck_p2.py | 24 ++++++++++++++++++++---- scripts/S3_FSCK/s3_fsck_p3.py | 10 +++++++++- scripts/S3_FSCK/s3_fsck_p4.py | 8 +++++++- 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index d913434..159f428 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -195,11 +195,13 @@ def blob(row): new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) -# reading without a header, -# columns _c0, _c1, _c2 are the default column names of +# reading without a header, +# columns _c0, _c1, _c2 are the default column names of # columns 1, 2, 3 for the csv # input structure: (bucket name, s3 object key, sproxyd input key) # e.g. test,48K_object.01,9BC9C6080ED24A42C2F1A9C78F6BCD5967F70220 +# Required Fields: +# - _c2 (sproxyd input key) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # repartition the dataframe to have the same number of partitions as the number of executors * cores diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index acaeb21..430c8b3 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,10 +39,14 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) -# listkeys.csv structure: -# { RING key, main chunk of the RING key, disk, flag } +# reading without a header, +# columns _c0, _c1, _c2, _c3 are the default column names of +# columns 1, 2, 3, 4 for the csv +# input structure: (RING key, main chunk, disk, flag) # e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 -# reading listkeys.csv files without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 +# Required Fields: +# - _c1 (main chunk) +# - _c3 (FLAG) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # list the ARC SPLIT main chunks with service ID 50 from column 2 diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index d64aecf..b4e9fe7 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -39,14 +39,30 @@ .getOrCreate() -# s3keys are read from the verifySproxydKeys.js scripts output +# s3keys are generated by verifySproxydKeys.js script and processed by s3_fsck_p0.py s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) -# ringkeys are read from the listkeys.py (or ringsh dump) scripts output +# ringkeys are generated by the listkeys.py (or ringsh dump) script and processed by s3_fsck_p1.py ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) -# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to +# reading with a header, the columns are named. +# columns digkey, sproxyd input key, subkey are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, sproxyd input key, subkey) +# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# Required Fields: +# - digkey +# - sproxyd input key dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys) -# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to + + +# reading with a header, the columns are named. +# columns _c1, count, ringkey (main chunk) are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, count, ringkey (main chunk)) +# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# Required Fields: +# - digkey +# - ringkey (main chunk) dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) # rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 diff --git a/scripts/S3_FSCK/s3_fsck_p3.py b/scripts/S3_FSCK/s3_fsck_p3.py index 358f75f..77712f9 100644 --- a/scripts/S3_FSCK/s3_fsck_p3.py +++ b/scripts/S3_FSCK/s3_fsck_p3.py @@ -84,8 +84,16 @@ def statkey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -# Create a dataframe from the csv file not using the header, the columns will be _c0, _c1, _c2 + +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) + # Create a resilient distributed dataset (RDD) from the dataframe (logical partitions of data) # The rdd is a collection of tuples returned from statkey (key, status_code, size) rdd = df.rdd.map(statkey) diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index 062a4f2..290ec20 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -69,7 +69,13 @@ def deletekey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) # rename the column _c0 (column 1) to ringkey df = df.withColumnRenamed("_c0","ringkey")