diff --git a/ansible/Containerfile b/ansible/Containerfile index fc4738f..6c801ec 100644 --- a/ansible/Containerfile +++ b/ansible/Containerfile @@ -1,14 +1,15 @@ +# Execute from parent directory as docker build . -f ansible/Containerfile -t ansible FROM alpine:latest RUN apk --no-cache add ansible openssh-client bash RUN mkdir /ansible/ -COPY roles /ansible/roles -COPY collections /ansible/collections -COPY action_plugins /ansible/action_plugins -COPY ansible.cfg /ansible/ -COPY inventory.sample /ansible/ -COPY run.yml /ansible/ +COPY ansible/roles /ansible/roles +COPY ansible/collections /ansible/collections +COPY ansible/action_plugins /ansible/action_plugins +COPY ansible/ansible.cfg /ansible/ +COPY ansible/inventory.sample /ansible/ +COPY ansible/run.yml /ansible/ RUN chmod -R g+rw,o+rw /ansible/ WORKDIR /ansible/ -COPY entrypoint.sh / +COPY ansible/entrypoint.sh / ENTRYPOINT /entrypoint.sh diff --git a/ansible/Readme.md b/ansible/Readme.md index cd7e54e..4033831 100644 --- a/ansible/Readme.md +++ b/ansible/Readme.md @@ -104,13 +104,19 @@ ansible-playbook -i inventory --tags deploy -e "REGISTRY_USER=User_Name" -e "REG ### Using the Ansible playbook -Requirements: +Instead of building the container to create the offline archive the playbook +could be used directly if all requirements are met. + +#### Requirements + * Ansible 2.9 (built and tested against) * Ansible Galaxy community.general collection * A host in the ``[staging]`` group with internet access * SSH Agent/Keys which provide access to the Scality GitHub repository * Defining the registry.scality.com credentials in inventory or command line +#### Steps + When registry_user and registry_password (lowercase) Ansible variables are defined in the inventory file: @@ -145,19 +151,14 @@ Using this method currently requires Podman as the Docker runtime currently does not provide the ``keep-id`` User Namespace required to properly pass along SSH Agent and/or Keys to the container. -1. Pulling the spark-deployment image from registry - ```commandline - [docker|podman] pull registry.scality.com/spark/spark-deployment:latest - ``` - -2. Build the spark-deployment image +1. Build the spark-deployment image ```commandline cd spark/ansible - [docker|podman] build . -f Containerfile -t registry.scality.com/spark/spark-deployment:latest + [docker|podman] build . -f ./ansible/Containerfile -t localhost/spark-deployment:latest ``` -3. Using Podman generate the offline archive +2. Using Podman generate the offline archive ```commandline podman run --privileged \ --rm \ @@ -170,7 +171,7 @@ podman run --privileged \ -e "REGISTRY_USER=User_Name" \ -e "REGISTRY_PASSWORD=" \ -v ~/.ssh:/ansible/.ssh:rw \ - registry.scality.com/spark/spark-deployment:latest \ + localhost/spark-deployment:latest \ stage ``` diff --git a/lint.txt b/lint.txt new file mode 100644 index 0000000..b6ce653 --- /dev/null +++ b/lint.txt @@ -0,0 +1,3 @@ +pylint==1.9.4 +black==18.9b0 +isort==4.3.21 diff --git a/scripts/S3_FSCK/count_p0-dig-keys.sh b/scripts/S3_FSCK/count_p0-dig-keys.sh index d2ab963..ba75835 100644 --- a/scripts/S3_FSCK/count_p0-dig-keys.sh +++ b/scripts/S3_FSCK/count_p0-dig-keys.sh @@ -3,5 +3,5 @@ TOTAL=0 SINGLE=$(awk 'BEGIN {count=0} ; /SINGLE/ {count++} END {print count}' part* ) SPLIT=$(awk 'BEGIN {count=0} ; !/subkey/ && !/SINGLE/ && !seen[$1]++ {count++} END {print count}' part* ) -let TOTAL+=${SINGLE}+${SPLIT} -echo "$TOTAL s3 sproxyd dig keys from p0 output" +(( TOTAL+=SINGLE+SPLIT )) +echo "${TOTAL} s3 sproxyd dig keys from p0 output" diff --git a/scripts/S3_FSCK/count_p1-arc-keys.sh b/scripts/S3_FSCK/count_p1-arc-keys.sh index a8790d8..c7c3d2e 100644 --- a/scripts/S3_FSCK/count_p1-arc-keys.sh +++ b/scripts/S3_FSCK/count_p1-arc-keys.sh @@ -1,7 +1,9 @@ #!/bin/bash TOTAL=0 -NUM_HEADERS=$(ls part* | wc -l) -LINES=$(cat part* | wc -l) -let TOTAL+=${LINES}-${NUM_HEADERS} -echo "$TOTAL arc keys parsed from arc-keys.csv" \ No newline at end of file +FILE_LIST=$(find ./ -iname -type f 'part*') +NUM_HEADERS=$(echo "${FILE_LIST}" | wc -l) +# shellcheck disable=SC2086,SC2312 +LINES=$(cat ${FILE_LIST} | wc -l ) +(( TOTAL+=LINES+NUM_HEADERS )) +echo "${TOTAL} arc keys parsed from arc-keys.csv" \ No newline at end of file diff --git a/scripts/S3_FSCK/export_s3_keys.sh b/scripts/S3_FSCK/export_s3_keys.sh index 59d00fb..04208a0 100644 --- a/scripts/S3_FSCK/export_s3_keys.sh +++ b/scripts/S3_FSCK/export_s3_keys.sh @@ -1,14 +1,21 @@ #!/bin/bash RID=$1 -export WORKDIR=/var/tmp/bucketSproxydKeys -if ! [ -d "${WORKDIR}" ] +if [[ -z "${RID}" ]] then - mkdir -pv ${WORKDIR} + echo "Usage: $0 " + exit 1 fi -for bucket in $(curl --silent http://localhost:9000/_/raft_sessions/${RID}/bucket | jq -r '.[] | select (. | contains("mpuShadowBucket") | not) | select (. | contains("users..bucket") | not)') +export WORKDIR=/var/tmp/bucketSproxydKeys + +if ! [[ -d "${WORKDIR}" ]] +then + mkdir -pv "${WORKDIR}" +fi +curl --silent -o bucket_list.txt http://localhost:9000/_/raft_sessions/"${RID}"/bucket +for bucket in $(jq -r '.[] | select (. | contains("mpuShadowBucket") | not) | select (. | contains("users..bucket") | not)' bucket_list.txt) do echo "--- Starting on ${bucket} ---" @@ -23,10 +30,10 @@ do -e 'VERBOSE=1' \ registry.scality.com/s3utils/s3utils:1.12.5 \ verifyBucketSproxydKeys.js \ - > ${WORKDIR}/raw_${bucket}_keys.txt + > "${WORKDIR}/raw_${bucket}"_keys.txt echo "--- Processing output... ---" - jq -r '. | select(.message | contains("sproxyd key")) + {"bucket": .objectUrl } | .bucket |= sub("s3://(?.*)/.*"; .bname) | .objectUrl |= sub("s3://.*/(?.*)$"; .oname) | [.bucket, .objectUrl, .sproxydKey] | @csv' ${WORKDIR}/raw_${bucket}_keys.txt > ${WORKDIR}/${bucket}_keys.txt - rm -f ${WORKDIR}/raw_${bucket}_keys.txt + jq -r '. | select(.message | contains("sproxyd key")) + {"bucket": .objectUrl } | .bucket |= sub("s3://(?.*)/.*"; .bname) | .objectUrl |= sub("s3://.*/(?.*)$"; .oname) | [.bucket, .objectUrl, .sproxydKey] | @csv' "${WORKDIR}/raw_${bucket}"_keys.txt > "${WORKDIR}/${bucket}"_keys.txt + rm -f "${WORKDIR}/raw_${bucket}"_keys.txt echo done diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index 432fc6c..1283842 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -1,101 +1,127 @@ -from pyspark.sql import SparkSession, Row, SQLContext -import pyspark.sql.functions as F -from pyspark import SparkContext +"""Translate the S3 ARC keys to md5 keys""" +import binascii +import hashlib import os import sys import re -import requests -import binascii -import hashlib -import base64 import yaml +from pyspark.sql import SparkSession +import requests -config_path = "%s/%s" % ( sys.path[0] ,"../config/config.yml") -with open(config_path, 'r') as ymlfile: - cfg = yaml.load(ymlfile) - -if len(sys.argv) >1: - RING = sys.argv[1] -else: - RING = cfg["ring"] - -PATH = cfg["path"] - -SREBUILDD_IP = cfg["srebuildd_ip"] -SREBUILDD_ARC_PATH = cfg["srebuildd_arc_path"] -PROTOCOL = cfg["protocol"] -ACCESS_KEY = cfg["s3"]["access_key"] -SECRET_KEY = cfg["s3"]["secret_key"] -ENDPOINT_URL = cfg["s3"]["endpoint"] -ARC = cfg["arc_protection"] -COS = cfg["cos_protection"] -PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) - -arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} - -os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' - -spark = SparkSession.builder \ - .appName("s3_fsck_p0.py:Translate the S3 ARC keys :" + RING) \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ - .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \ - .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \ - .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) \ - .config("spark.executor.instances", cfg["spark.executor.instances"]) \ - .config("spark.executor.memory", cfg["spark.executor.memory"]) \ - .config("spark.executor.cores", cfg["spark.executor.cores"]) \ - .config("spark.driver.memory", cfg["spark.driver.memory"]) \ - .config("spark.memory.offHeap.enabled", cfg["spark.memory.offHeap.enabled"]) \ - .config("spark.memory.offHeap.size", cfg["spark.memory.offHeap.size"]) \ - .config("spark.local.dir", cfg["path"]) \ - .getOrCreate() - +CONFIG_PATH = f"{sys.path[0]}/../config/config.yml" +with open(CONFIG_PATH, "r", encoding="utf-8") as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) + +RING = sys.argv[1] if len(sys.argv) > 1 else CFG["ring"] +PATH = CFG["path"] + +SREBUILDD_IP = CFG["srebuildd_ip"] +SREBUILDD_ARC_PATH = CFG["srebuildd_arc_path"] +PROTOCOL = CFG["protocol"] +ACCESS_KEY = CFG["s3"]["access_key"] +SECRET_KEY = CFG["s3"]["secret_key"] +ENDPOINT_URL = CFG["s3"]["endpoint"] +ARC = CFG["arc_protection"] +COS = CFG["cos_protection"] +PARTITIONS = int(CFG["spark.executor.instances"]) * int(CFG["spark.executor.cores"]) + +ARCINDEX = { + "4+2": "102060", + "8+4": "2040C0", + "9+3": "2430C0", + "7+5": "1C50C0", + "5+7": "1470C0", +} + +os.environ[ + "PYSPARK_SUBMIT_ARGS" +] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' + +SPARK = ( + SparkSession.builder.appName( + f"s3_fsck_p0.py:Translate the S3 ARC keys :{RING}" + ) + .config( + "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem" + ) + .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) + .config("spark.executor.instances", CFG["spark.executor.instances"]) + .config("spark.executor.memory", CFG["spark.executor.memory"]) + .config("spark.executor.cores", CFG["spark.executor.cores"]) + .config("spark.driver.memory", CFG["spark.driver.memory"]) + .config( + "spark.memory.offHeap.enabled", CFG["spark.memory.offHeap.enabled"] + ) + .config("spark.memory.offHeap.size", CFG["spark.memory.offHeap.size"]) + .config("spark.local.dir", CFG["path"]) + .getOrCreate() +) def pad2(n): - x = '%s' % (n,) - return ('0' * (len(x) % 2)) + x + """Pad a string with 0s to make it even length""" + x = f"{n}" + return ("0" * (len(x) % 2)) + x + def to_bytes(h): + """Convert a hex string to bytes""" return binascii.unhexlify(h) + def get_digest(name): + """Get the md5 digest of a string""" m = hashlib.md5() m.update(name) digest = bytearray(m.digest()) return digest + def get_dig_key(name): + """Get the md5 digest of a string""" digest = get_digest(name) - hash_str = digest[0] << 16 | digest[1] << 8 | digest[2] - oid = digest[3] << 56 | digest[4] << 48 | \ - digest[5] << 40 | digest[6] << 32 | \ - digest[7] << 24 | digest[8] << 16 | digest[9] << 8 | digest[10] + hash_str = digest[0] << 16 | digest[1] << 8 | digest[2] + oid = ( + digest[3] << 56 + | digest[4] << 48 + | digest[5] << 40 + | digest[6] << 32 + | digest[7] << 24 + | digest[8] << 16 + | digest[9] << 8 + | digest[10] + ) hash_str = "{0:x}".format(hash_str) oid = "{0:x}".format(oid) oid = oid.zfill(16) volid = "00000000" svcid = "51" - specific = arcindex[ARC] #Make sure to set arc_protection in config when ARC schema changes + # Make sure to set arc_protection in config when ARC schema changes + specific = ARCINDEX[ARC] cls = "70" key = hash_str.upper() + oid.upper() + volid + svcid + specific + cls return key.zfill(40) + def gen_md5_from_id(key): + """Generate a md5 key from an id""" key = key.lstrip("0") key = pad2(key) int_b = to_bytes(key) return get_dig_key(int_b) -def sparse(f): - lst = [] - m = re.findall(r'(200000000000014|20100000014)([0-9-a-f]{40})', f) - n = re.findall(r'(200000000000013|20100000013)([0-9-a-f]{38})', f) - o = re.findall(r'(200000000000012|20100000012)([0-9-a-f]{36})', f) - marc = re.findall(r'(51d68800000014)([0-9-a-f]{40})', f) - narc = re.findall(r'(51d68800000013)([0-9-a-f]{38})', f) - oarc = re.findall(r'(51d68800000012)([0-9-a-f]{36})', f) +def sparse(chunk_hex): + """Get the sparse keys from a file""" + lst = [] + m = re.findall(r"(200000000000014|20100000014)([0-9-a-f]{40})", chunk_hex) + n = re.findall(r"(200000000000013|20100000013)([0-9-a-f]{38})", chunk_hex) + o = re.findall(r"(200000000000012|20100000012)([0-9-a-f]{36})", chunk_hex) + marc = re.findall(r"(51d68800000014)([0-9-a-f]{40})", chunk_hex) + narc = re.findall(r"(51d68800000013)([0-9-a-f]{38})", chunk_hex) + oarc = re.findall(r"(51d68800000012)([0-9-a-f]{36})", chunk_hex) for mm in m: key = mm[1] lst.append(key.upper()) @@ -118,51 +144,60 @@ def sparse(f): def check_split(key): - url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) - r = requests.head(url) - if r.status_code == 200: - split = r.headers.get("X-Scal-Attr-Is-Split", False) - return split - else: - return ("HTTP_NOK") + """Check if a key is split""" + url = f"http://{SREBUILDD_IP}:81/{SREBUILDD_ARC_PATH}/{str(key.zfill(40))}" + response = requests.head(url, timeout=3) + if response.status_code == 200: + return response.headers.get("X-Scal-Attr-Is-Split", False) + return "HTTP_NOK" + def blob(row): + """Get the blob from a key""" + # pylint: disable=protected-access key = row._c2 split = check_split(key) if split: try: header = {} - header['x-scal-split-policy'] = "raw" - url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) - r = requests.get(url, headers=header, stream=True) - if r.status_code == 200: + header["x-scal-split-policy"] = "raw" + url = f"http://{SREBUILDD_IP}:81/{SREBUILDD_ARC_PATH}/{str(key.zfill(40))}" + response = requests.get(url, headers=header, stream=True, timeout=3) + if response.status_code == 200: chunks = "" - for chunk in r.iter_content(chunk_size=1024000000): + for chunk in response.iter_content(chunk_size=1_024_000_000): if chunk: - chunks = chunk+chunk + chunks = chunk + chunk - chunkshex = chunks.encode('hex') + chunkshex = chunks.encode("hex") rtlst = [] - for k in list(set(sparse(chunkshex))): - rtlst.append({"key":key, "subkey":k, "digkey":gen_md5_from_id(k)[:26]}) + for subkey in list(set(sparse(chunkshex))): + rtlst.append( + {"key": key, "subkey": subkey, "digkey": gen_md5_from_id(subkey)[:26]} + ) return rtlst - else: - return [{"key":key, "subkey":"NOK", "digkey":"NOK"}] + return [{"key": key, "subkey": "NOK", "digkey": "NOK"}] - except requests.exceptions.ConnectionError as e: - return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}] - elif split == False: - return [{"key":key, "subkey":"SINGLE", "digkey":gen_md5_from_id(key)[:26]}] + except requests.exceptions.ConnectionError: + return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}] + elif split is False: + return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] -new_path = os.path.join(PATH, RING, "s3-bucketd") -files = "%s://%s" % (PROTOCOL, new_path) +NEW_PATH = os.path.join(PATH, RING, "s3-bucketd") +FILES = f"{PROTOCOL}://{NEW_PATH}" -df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) +DATA_FRAME = ( + SPARK.read.format("csv") + .option("header", "false") + .option("inferSchema", "true") + .option("delimiter", ",") + .load(FILES) +) -df = df.repartition(PARTITIONS) -rdd = df.rdd.map(lambda x : blob(x)) -dfnew = rdd.flatMap(lambda x: x).toDF() +DATA_FRAME = DATA_FRAME.repartition(PARTITIONS) +RD_DATASET = DATA_FRAME.rdd.map(lambda x: blob(x)) +DATA_FRAME_NEW = RD_DATASET.flatMap(lambda x: x).toDF() -single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) -dfnew.write.format("csv").mode("overwrite").options(header="true").save(single) +SINGLE = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/s3-dig-keys.csv" +DATA_FRAME_NEW.write.format("csv").mode("overwrite").options(header="true").save(SINGLE) diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 8d7b5ac..1b20c09 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -1,75 +1,86 @@ -from pyspark.sql import SparkSession, Row, SQLContext -import pyspark.sql.functions as F -from pyspark import SparkContext +"""Docstring""" + import os import sys import yaml +from pyspark.sql import SparkSession +import pyspark.sql.functions as F -config_path = "%s/%s" % ( sys.path[0] ,"../config/config.yml") -with open(config_path, 'r') as ymlfile: - cfg = yaml.load(ymlfile) - -if len(sys.argv) >1: - RING = sys.argv[1] -else: - RING = cfg["ring"] - -PATH = cfg["path"] -PROTOCOL = cfg["protocol"] -ACCESS_KEY = cfg["s3"]["access_key"] -SECRET_KEY = cfg["s3"]["secret_key"] -ENDPOINT_URL = cfg["s3"]["endpoint"] -COS = cfg["cos_protection"] +CONFIG_PATH = f"{sys.path[0]}/../config/config.yml" +with open(CONFIG_PATH, "r", encoding="utf-8") as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) -os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' -spark = SparkSession.builder \ - .appName("s3_fsck_p1.py:Build RING keys :" + RING) \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ - .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)\ - .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)\ - .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) \ - .config("spark.executor.instances", cfg["spark.executor.instances"]) \ - .config("spark.executor.memory", cfg["spark.executor.memory"]) \ - .config("spark.executor.cores", cfg["spark.executor.cores"]) \ - .config("spark.driver.memory", cfg["spark.driver.memory"]) \ - .config("spark.memory.offHeap.enabled", cfg["spark.memory.offHeap.enabled"]) \ - .config("spark.memory.offHeap.size", cfg["spark.memory.offHeap.size"]) \ - .config("spark.local.dir", cfg["path"]) \ - .getOrCreate() +RING = sys.argv[1] if len(sys.argv) > 1 else CFG["ring"] +PATH = CFG["path"] +PROTOCOL = CFG["protocol"] +ACCESS_KEY = CFG["s3"]["access_key"] +SECRET_KEY = CFG["s3"]["secret_key"] +ENDPOINT_URL = CFG["s3"]["endpoint"] +COS = CFG["cos_protection"] +os.environ[ + "PYSPARK_SUBMIT_ARGS" +] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' +SPARK = ( + SparkSession.builder.appName(f"s3_fsck_p1.py:Build RING keys :{RING}") + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) + .config("spark.executor.instances", CFG["spark.executor.instances"]) + .config("spark.executor.memory", CFG["spark.executor.memory"]) + .config("spark.executor.cores", CFG["spark.executor.cores"]) + .config("spark.driver.memory", CFG["spark.driver.memory"]) + .config("spark.memory.offHeap.enabled", CFG["spark.memory.offHeap.enabled"]) + .config("spark.memory.offHeap.size", CFG["spark.memory.offHeap.size"]) + .config("spark.local.dir", CFG["path"]) + .getOrCreate() +) -files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) -df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) +FILES = f"{PROTOCOL}://{PATH}/{RING}/listkeys.csv" +DATA_FRAME = ( + SPARK.read.format("csv") + .option("header", "false") + .option("inferSchema", "true") + .option("delimiter", ",") + .load(FILES) +) -#list the ARC SPLIT main chunks -df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") +# list the ARC SPLIT main chunks +DF_SPLIT = DATA_FRAME.filter( + DATA_FRAME["_c1"].rlike(r".*000000..50........$") & DATA_FRAME["_c3"].rlike("0") +).select("_c1") -dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) -dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") -dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) +DF_ARC_SINGLE = DF_SPLIT.filter(DATA_FRAME["_c1"].rlike(r".*70$")) +DF_ARC_SINGLE = DF_ARC_SINGLE.groupBy("_c1").count().filter("count > 3") +DF_ARC_SINGLE = DF_ARC_SINGLE.withColumn("ringkey", DF_ARC_SINGLE["_c1"]) -dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) -dfCOSsingle = dfCOSsingle.groupBy("_c1").count() -dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"]) -dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +DF_COS_SINGLE = DF_SPLIT.filter(DATA_FRAME["_c1"].rlike(f".*{str(COS)}0$")) +DF_COS_SINGLE = DF_COS_SINGLE.groupBy("_c1").count() +DF_COS_SINGLE = DF_COS_SINGLE.withColumn("ringkey", DF_COS_SINGLE["_c1"]) +DF_COS_SINGLE = DF_COS_SINGLE.withColumn( + "_c1", F.expr("substring(_c1, 1, length(_c1)-14)") +) -dfARCsingle = dfARCsingle.union(dfCOSsingle) +DF_ARC_SINGLE = DF_ARC_SINGLE.union(DF_COS_SINGLE) -#list the ARC KEYS -df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1") +# list the ARC KEYS +DF_SYNC = DATA_FRAME.filter(DATA_FRAME["_c1"].rlike(r".*000000..51........$")).select( + "_c1" +) -dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$")) -dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3") -dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"]) -dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +DF_ARC_SYNC = DF_SYNC.filter(DATA_FRAME["_c1"].rlike(r".*70$")) +DF_ARC_SYNC = DF_ARC_SYNC.groupBy("_c1").count().filter("count > 3") +DF_ARC_SYNC = DF_ARC_SYNC.withColumn("ringkey", DF_ARC_SYNC["_c1"]) +DF_ARC_SYNC = DF_ARC_SYNC.withColumn("_c1", F.expr("substring(_c1, 1, length(_c1)-14)")) -dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) -dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count() -dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"]) -dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +DF_COS_SYNC = DF_SYNC.filter(DATA_FRAME["_c1"].rlike(f".*{str(COS)}0$")) +DF_COS_SYNC = DF_COS_SYNC.groupBy("_c1").count() +DF_COS_SYNC = DF_COS_SYNC.withColumn("ringkey", DF_COS_SYNC["_c1"]) +DF_COS_SYNC = DF_COS_SYNC.withColumn("_c1", F.expr("substring(_c1, 1, length(_c1)-14)")) -dfARCSYNC = dfARCSYNC.union(dfCOCSYNC) +DF_ARC_SYNC = DF_ARC_SYNC.union(DF_COS_SYNC) -dftotal = dfARCSYNC.union(dfARCsingle) -total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) -dftotal.write.format("csv").mode("overwrite").options(header="true").save(total) +DF_TOTAL = DF_ARC_SYNC.union(DF_ARC_SINGLE) +TOTAL = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/arc-keys.csv" +DF_TOTAL.write.format("csv").mode("overwrite").options(header="true").save(TOTAL) diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index 22eeb39..a6beded 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -1,54 +1,67 @@ -from pyspark.sql import SparkSession, Row, SQLContext -import pyspark.sql.functions as F -from pyspark import SparkContext +"""Docstring""" import os import sys -import requests - import yaml +from pyspark.sql import SparkSession +import pyspark.sql.functions as F -config_path = "%s/%s" % ( sys.path[0], "../config/config.yml") -with open(config_path, "r") as ymlfile: - cfg = yaml.load(ymlfile) - -if len(sys.argv) >1: - RING = sys.argv[1] -else: - RING = cfg["ring"] - -PATH = cfg["path"] -PROTOCOL = cfg["protocol"] -ACCESS_KEY = cfg["s3"]["access_key"] -SECRET_KEY = cfg["s3"]["secret_key"] -ENDPOINT_URL = cfg["s3"]["endpoint"] - -os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' -spark = SparkSession.builder \ - .appName("s3_fsck_p2.py:Union the S3 keys and the RING keys :" + RING) \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ - .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)\ - .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)\ - .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) \ - .config("spark.executor.instances", cfg["spark.executor.instances"]) \ - .config("spark.executor.memory", cfg["spark.executor.memory"]) \ - .config("spark.executor.cores", cfg["spark.executor.cores"]) \ - .config("spark.driver.memory", cfg["spark.driver.memory"]) \ - .config("spark.memory.offHeap.enabled", cfg["spark.memory.offHeap.enabled"]) \ - .config("spark.memory.offHeap.size", cfg["spark.memory.offHeap.size"]) \ - .config("spark.local.dir", cfg["path"]) \ - .getOrCreate() +CONFIG_PATH = f"{sys.path[0]}/../config/config.yml" +with open(CONFIG_PATH, "r", encoding="utf-8") as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) +RING = sys.argv[1] if len(sys.argv) > 1 else CFG["ring"] +PATH = CFG["path"] +PROTOCOL = CFG["protocol"] +ACCESS_KEY = CFG["s3"]["access_key"] +SECRET_KEY = CFG["s3"]["secret_key"] +ENDPOINT_URL = CFG["s3"]["endpoint"] -s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) -ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) +os.environ[ + "PYSPARK_SUBMIT_ARGS" +] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' +SPARK = ( + SparkSession.builder.appName( + f"s3_fsck_p2.py:Union the S3 keys and the RING keys :{RING}" + ) + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) + .config("spark.executor.instances", CFG["spark.executor.instances"]) + .config("spark.executor.memory", CFG["spark.executor.memory"]) + .config("spark.executor.cores", CFG["spark.executor.cores"]) + .config("spark.driver.memory", CFG["spark.driver.memory"]) + .config("spark.memory.offHeap.enabled", CFG["spark.memory.offHeap.enabled"]) + .config("spark.memory.offHeap.size", CFG["spark.memory.offHeap.size"]) + .config("spark.local.dir", CFG["path"]) + .getOrCreate() +) -dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys) -dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) +S3_KEYS = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/s3-dig-keys.csv" +RING_KEYS = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/arc-keys.csv" -dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey") +DATA_FRAME_S3_KEYS = ( + SPARK.read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(S3_KEYS) +) +DATA_FRAME_RING_KEYS = ( + SPARK.read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load(RING_KEYS) +) -inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey") -df_final = inner_join_false.select("ringkey") -allmissing = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -df_final.write.format("csv").mode("overwrite").options(header="false").save(allmissing) +DATA_FRAME_RING_KEYS = DATA_FRAME_RING_KEYS.withColumnRenamed("_c1", "digkey") +INNER_JOIN_FALSE = ( + DATA_FRAME_RING_KEYS.join(DATA_FRAME_S3_KEYS, ["digkey"], "leftanti") + .withColumn("is_present", F.lit(0)) + .select("ringkey", "is_present", "digkey") +) +DATA_FRAME_FINAL = INNER_JOIN_FALSE.select("ringkey") +ALL_MISSING = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/s3objects-missing.csv" +DATA_FRAME_FINAL.write.format("csv").mode("overwrite").options(header="false").save( + ALL_MISSING +) diff --git a/scripts/S3_FSCK/s3_fsck_p3.py b/scripts/S3_FSCK/s3_fsck_p3.py index 6243e85..a729199 100644 --- a/scripts/S3_FSCK/s3_fsck_p3.py +++ b/scripts/S3_FSCK/s3_fsck_p3.py @@ -1,82 +1,95 @@ +"""Compute the total sizes to be deleted""" import os -import time -import requests -import yaml import re import sys -from pyspark.sql import SparkSession, SQLContext -from pyspark import SparkContext -import pyspark.sql.functions as F - -config_path = "%s/%s" % ( sys.path[0] ,"../config/config.yml") -with open(config_path, 'r') as ymlfile: - cfg = yaml.load(ymlfile) - - -if len(sys.argv) >1: - RING = sys.argv[1] -else: - RING = cfg["ring"] +import yaml +import requests +from pyspark.sql import SparkSession -PATH = cfg["path"] -PROTOCOL = cfg["protocol"] -ACCESS_KEY = cfg["s3"]["access_key"] -SECRET_KEY = cfg["s3"]["secret_key"] -ENDPOINT_URL = cfg["s3"]["endpoint"] -SREBUILDD_IP = cfg["srebuildd_ip"] -SREBUILDD_ARCDATA_PATH = cfg["srebuildd_arcdata_path"] -SREBUILDD_URL = "http://%s:81/%s" % (SREBUILDD_IP, SREBUILDD_ARCDATA_PATH) -ARC = cfg["arc_protection"] -COS = cfg["cos_protection"] +CONFIG_PATH = f"{sys.path[0]}/../config/config.yml" +with open(CONFIG_PATH, "r", encoding=yaml.SafeLoader) as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) -os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' -spark = SparkSession.builder \ - .appName("s3_fsck_p3.py:Compute the total sizes to be deleted :" + RING) \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ - .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)\ - .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)\ - .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) \ - .config("spark.executor.instances", cfg["spark.executor.instances"]) \ - .config("spark.executor.memory", cfg["spark.executor.memory"]) \ - .config("spark.executor.cores", cfg["spark.executor.cores"]) \ - .config("spark.driver.memory", cfg["spark.driver.memory"]) \ - .config("spark.memory.offHeap.enabled", cfg["spark.memory.offHeap.enabled"]) \ - .config("spark.memory.offHeap.size", cfg["spark.memory.offHeap.size"]) \ - .config("spark.local.dir", PATH) \ - .getOrCreate() +RING = sys.argv[1] if len(sys.argv) > 1 else CFG["ring"] +PATH = CFG["path"] +PROTOCOL = CFG["protocol"] +ACCESS_KEY = CFG["s3"]["access_key"] +SECRET_KEY = CFG["s3"]["secret_key"] +ENDPOINT_URL = CFG["s3"]["endpoint"] +SREBUILDD_IP = CFG["srebuildd_ip"] +SREBUILDD_ARCDATA_PATH = CFG["srebuildd_arcdata_path"] +SREBUILDD_URL = f"http://{SREBUILDD_IP}:81/{SREBUILDD_ARCDATA_PATH}" +ARC = CFG["arc_protection"] +COS = CFG["cos_protection"] +ARC_INDEX = { + "4+2": "102060", + "8+4": "2040C0", + "9+3": "2430C0", + "7+5": "1C50C0", + "5+7": "1470C0", +} +ARC_KEY_PATTERN = re.compile(r"[0-9a-fA-F]{38}70") -arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} -arcdatakeypattern = re.compile(r'[0-9a-fA-F]{38}70') +os.environ[ + "PYSPARK_SUBMIT_ARGS" +] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' +SPARK = ( + SparkSession.builder.appName( + f"s3_fsck_p3.py:Compute the total sizes to be deleted :{RING}" + ) + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) + .config("spark.executor.instances", CFG["spark.executor.instances"]) + .config("spark.executor.memory", CFG["spark.executor.memory"]) + .config("spark.executor.cores", CFG["spark.executor.cores"]) + .config("spark.driver.memory", CFG["spark.driver.memory"]) + .config("spark.memory.offHeap.enabled", CFG["spark.memory.offHeap.enabled"]) + .config("spark.memory.offHeap.size", CFG["spark.memory.offHeap.size"]) + .config("spark.local.dir", PATH) + .getOrCreate() +) def statkey(row): + """Get the size of the key""" + # pylint: disable=protected-access key = row._c0 try: - url = "%s/%s" % (SREBUILDD_URL, str(key.zfill(40))) - r = requests.head(url) - if r.status_code == 200: - if re.search(arcdatakeypattern, key): - size = int(r.headers.get("X-Scal-Size", False))*12 + url = f"{SREBUILDD_URL}/{str(key.zfill(40))}" + response = requests.head(url) + if response.status_code == 200: + if re.search(ARC_KEY_PATTERN, key): + size = int(response.headers.get("X-Scal-Size", False)) * 12 else: - size = int(r.headers.get("X-Scal-Size",False)) + int(r.headers.get("X-Scal-Size",False))*int(COS) - return ( key, r.status_code, size) - else: - return ( key, r.status_code, 0) - except requests.exceptions.ConnectionError as e: - return ( key, "HTTP_ERROR", 0) - + size = int(response.headers.get("X-Scal-Size", False)) + int( + response.headers.get("X-Scal-Size", False) + ) * int(COS) + return (key, response.status_code, size) + return (key, response.status_code, 0) + except requests.exceptions.ConnectionError: + return (key, "HTTP_ERROR", 0) -files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) -rdd = df.rdd.map(statkey) -#rdd1 = rdd.toDF() +FILES = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/s3objects-missing.csv" +DATA_FRAME = ( + SPARK.read.format("csv") + .option("header", "false") + .option("inferSchema", "true") + .load(FILES) +) +RD_DATASET = DATA_FRAME.rdd.map(statkey) -size_computed= rdd.map(lambda x: (2,int(x[2]))).reduceByKey(lambda x,y: x + y).collect()[0][1] -string = "The total computed size of the not indexed keys is: %d bytes" % size_computed -banner = '\n' + '-' * len(string) + '\n' -print(banner + string + banner) +SIZE_COMPUTED = ( + RD_DATASET.map(lambda x: (2, int(x[2]))) + .reduceByKey(lambda x, y: x + y) + .collect()[0][1] +) +STRING = f"The total computed size of the not indexed keys is: {SIZE_COMPUTED} bytes" +BANNER = "\n" + "-" * len(STRING) + "\n" +print(BANNER + STRING + BANNER) -#totalsize = "file:///%s/output/s3fsck/output-size-computed-%s.csv" % (PATH, RING) -#rdd1.write.format("csv").mode("overwrite").options(header="false").save(totalsize) +# totalsize = "file:///%s/output/s3fsck/output-size-computed-%s.csv" % (PATH, RING) +# rdd1.write.format("csv").mode("overwrite").options(header="false").save(totalsize) diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index df61463..70585b7 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -1,76 +1,84 @@ +"""Delete the extra keys from the ring""" import os -import time -import requests -import yaml import re import sys -import struct -import base64 -from pyspark.sql import SparkSession, SQLContext -from pyspark import SparkContext - - -config_path = "%s/%s" % ( sys.path[0], "../config/config.yml") -with open(config_path, "r") as ymlfile: - cfg = yaml.load(ymlfile) - +import requests +import yaml +from pyspark.sql import SparkSession -if len(sys.argv) >1: - RING = sys.argv[1] -else: - RING = cfg["ring"] +CONFIG_PATH = f"{sys.path[0]}/../config/config.yml" +with open(CONFIG_PATH, "r") as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) -USER = cfg["sup"]["login"] -PASSWORD = cfg["sup"]["password"] -URL = cfg["sup"]["url"] -PATH = cfg["path"] -SREBUILDD_IP = cfg["srebuildd_ip"] -SREBUILDD_PATH = cfg["srebuildd_chord_path"] -SREBUILDD_URL = "http://%s:81/%s" % (SREBUILDD_IP, SREBUILDD_PATH) -PROTOCOL = cfg["protocol"] -ACCESS_KEY = cfg["s3"]["access_key"] -SECRET_KEY = cfg["s3"]["secret_key"] -ENDPOINT_URL = cfg["s3"]["endpoint"] -PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) -ARC = cfg["arc_protection"] +RING = sys.argv[1] if len(sys.argv) > 1 else CFG["ring"] +USER = CFG["sup"]["login"] +PASSWORD = CFG["sup"]["password"] +URL = CFG["sup"]["url"] +PATH = CFG["path"] +SREBUILDD_IP = CFG["srebuildd_ip"] +SREBUILDD_PATH = CFG["srebuildd_chord_path"] +SREBUILDD_URL = f"http://{SREBUILDD_IP}:81/{SREBUILDD_PATH}" +PROTOCOL = CFG["protocol"] +ACCESS_KEY = CFG["s3"]["access_key"] +SECRET_KEY = CFG["s3"]["secret_key"] +ENDPOINT_URL = CFG["s3"]["endpoint"] +PARTITIONS = int(CFG["spark.executor.instances"]) * int(CFG["spark.executor.cores"]) +ARC = CFG["arc_protection"] -arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} -arcdatakeypattern = re.compile(r'[0-9a-fA-F]{38}70') +ARC_INDEX = { + "4+2": "102060", + "8+4": "2040C0", + "9+3": "2430C0", + "7+5": "1C50C0", + "5+7": "1470C0", +} +ARC_KEY_PATTERN = re.compile(r"[0-9a-fA-F]{38}70") -os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' -spark = SparkSession.builder \ - .appName("s3_fsck_p4.py:Clean the extra keys :" + RING) \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ - .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)\ - .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)\ - .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) \ - .config("spark.executor.instances", cfg["spark.executor.instances"]) \ - .config("spark.executor.memory", cfg["spark.executor.memory"]) \ - .config("spark.executor.cores", cfg["spark.executor.cores"]) \ - .config("spark.driver.memory", cfg["spark.driver.memory"]) \ - .config("spark.memory.offHeap.enabled", cfg["spark.memory.offHeap.enabled"]) \ - .config("spark.memory.offHeap.size", cfg["spark.memory.offHeap.size"]) \ - .config("spark.local.dir", PATH) \ - .getOrCreate() +os.environ[ + "PYSPARK_SUBMIT_ARGS" +] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' +SPARK = ( + SparkSession.builder.appName(f"s3_fsck_p4.py:Clean the extra keys :{RING}") + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) + .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT_URL) + .config("spark.executor.instances", CFG["spark.executor.instances"]) + .config("spark.executor.memory", CFG["spark.executor.memory"]) + .config("spark.executor.cores", CFG["spark.executor.cores"]) + .config("spark.driver.memory", CFG["spark.driver.memory"]) + .config("spark.memory.offHeap.enabled", CFG["spark.memory.offHeap.enabled"]) + .config("spark.memory.offHeap.size", CFG["spark.memory.offHeap.size"]) + .config("spark.local.dir", PATH) + .getOrCreate() +) def deletekey(row): - key = row.ringkey - try: - url = "%s/%s" % ( SREBUILDD_URL, str(key.zfill(40)) ) - print(url) - r = requests.delete(url) - status_code = r.status_code - #status_code = "OK" - return ( key, status_code, url) - except requests.exceptions.ConnectionError as e: - return (key,"ERROR_HTTP", url) + """Delete the key from the ring""" + key = row.ringkey + try: + url = f"{SREBUILDD_URL}/{str(key.zfill(40))}" + print(url) + response = requests.delete(url) + status_code = response.status_code + # status_code = "OK" + return (key, status_code, url) + except requests.exceptions.ConnectionError: + return (key, "ERROR_HTTP", url) -files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) -df = df.withColumnRenamed("_c0","ringkey") -df = df.repartition(PARTITIONS) -rdd = df.rdd.map(deletekey).toDF() -deletedorphans = "%s://%s/%s/s3fsck/deleted-s3-orphans.csv" % (PROTOCOL, PATH, RING) -rdd.write.format("csv").mode("overwrite").options(header="false").save(deletedorphans) +FILES = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/s3objects-missing.csv" +DATA_FRAME = ( + SPARK.read.format("csv") + .option("header", "false") + .option("inferSchema", "true") + .load(FILES) +) +DATA_FRAME = DATA_FRAME.withColumnRenamed("_c0", "ringkey") +DATA_FRAME = DATA_FRAME.repartition(PARTITIONS) +RD_DATASET = DATA_FRAME.rdd.map(deletekey).toDF() +DELETED_ORPHANS = f"{PROTOCOL}://{PATH}/{RING}/s3fsck/deleted-s3-orphans.csv" +RD_DATASET.write.format("csv").mode("overwrite").options(header="false").save( + DELETED_ORPHANS +) diff --git a/scripts/count_ring_keys.sh b/scripts/count_ring_keys.sh index 5a31e44..dd0b079 100644 --- a/scripts/count_ring_keys.sh +++ b/scripts/count_ring_keys.sh @@ -1,7 +1,7 @@ #!/bin/bash TOTAL=0 -ARC_REPLICA=$(awk -F',' 'BEGIN {count=0} ; !seen[$2]++ && $2 ~ "[A-F0-9]{30}50[A-F0-9]{6}30" && $4 == "0" {count++} END {print count}' *) -ARC_STRIPE=$(awk -F',' 'BEGIN {count=0} ; !seen[$2]++ && $2 ~ "[A-F0-9]{30}51[A-F0-9]{6}70" {count++} END {print count}' *) -let TOTAL+=${ARC_REPLICA}+${ARC_STRIPE} -echo "$TOTAL ring keys dumped from the listkeys.csv" \ No newline at end of file +ARC_REPLICA=$(awk -F',' 'BEGIN {count=0} ; !seen[$2]++ && $2 ~ "[A-F0-9]{30}50[A-F0-9]{6}[012345]0" && $4 == "0" {count++} END {print count}' ./*) +ARC_STRIPE=$(awk -F',' 'BEGIN {count=0} ; !seen[$2]++ && $2 ~ "[A-F0-9]{30}5[01][A-F0-9]{6}70" {count++} END {print count}' ./*) +(( TOTAL+=ARC_REPLICA+ARC_STRIPE )) +echo "${TOTAL} ring keys dumped from the listkeys.csv (${ARC_REPLICA}, ${ARC_STRIPE})" diff --git a/scripts/export_ring_keys.sh b/scripts/export_ring_keys.sh index dac7cfc..3a355e8 100644 --- a/scripts/export_ring_keys.sh +++ b/scripts/export_ring_keys.sh @@ -1,44 +1,60 @@ #!/bin/bash + +if [[ $# -ne 2 ]] +then + echo "Usage: $0 " + echo "Example: $0 DATA scality-spark-cleanup" + exit 1 +fi RING=$1 BUCKET=$2 h=$(hostname) for NODE in ${RING}-${h}-n{1..6} do - echo "* Dumping all keys from ${NODE} into listkeys-${NODE}.csv file" - ringsh -r ${RING} -u ${NODE} node listKeys | egrep -v '^Use' | awk -F, 'BEGIN{OFS=","} {print $1,$2,$3,$4}' > listkeys-${NODE}.csv + file_name="listkeys-${NODE}.csv" + echo "* Dumping all keys from ${NODE} into ${file_name} file" + # shellcheck disable=SC2312 + ringsh -r "${RING}" -u "${NODE}" node listKeys | awk -F, 'BEGIN{OFS=","}; !/^Use/ {print $1,$2,$3,$4}' > "${file_name}" done PS3="Do you want the files automatically uploaded: " select yn in "Yes" "No" do - case $yn in - Yes) - echo "* Uploading files to s3://${BUCKET}/${RING}/listkeys.csv" - for NODE in ${RING}-${h}-n{1..6} - do - echo "** Uploading listkeys-${NODE}.csv" - aws --endpoint http://127.0.0.1:8000 s3 cp listkeys-${NODE}.csv s3://${BUCKET}/${RING}/listkeys.csv/ - done - break;; + case "${yn}" in + Yes) + echo "* Uploading files to s3://${BUCKET}/${RING}/listkeys.csv" + for NODE in "${RING}-${h}"-n{1..6} + do + file_name="listkeys-${NODE}.csv" + echo "** Uploading ${file_name}" + aws --endpoint http://127.0.0.1:8000 s3 cp "${file_name}" s3://"${BUCKET}/${RING}"/listkeys.csv/ + done + break;; No) - echo "WARNING: Add a reminder task to cleanup the files later to avoid prematurely filling the OS disk" - break;; + echo "WARNING: Add a reminder task to cleanup the files later to avoid prematurely filling the OS disk" + break;; + *) + echo "Invalid option" + exit 1 esac done PS3="Do you want the files automatically removed: " - select yn in "Yes" "No" - do - case $yn in +select yn in "Yes" "No" +do + case "${yn}" in Yes) - for NODE in ${RING}-${h}-n{1..6} - do - rm -f listkeys-${NODE}.csv - done - break;; - No) - echo "WARNING: Add a reminder task to cleanup the files later to avoid prematurely filling the OS disk" - break;; - esac - done + for NODE in "${RING}-${h}"-n{1..6} + do + rm -f listkeys-"${NODE}".csv + done + break;; + No) + echo "WARNING: Add a reminder task to cleanup the files later to avoid prematurely filling the OS disk" + break;; + *) + echo "Invalid option" + exit 1 + esac +done diff --git a/scripts/submit.py b/scripts/submit.py index 7b26087..6b3f05b 100755 --- a/scripts/submit.py +++ b/scripts/submit.py @@ -1,29 +1,29 @@ +"""Submit a spark job to the cluster.""" +import os import sys +# pylint: disable=deprecated-module +from optparse import OptionParser import yaml -import os -from optparse import OptionParser -parser = OptionParser() -parser.add_option("-s", "--script", dest="script", - default='', help="Script to submit") -parser.add_option("-r", "--ring", dest="ring", - default='IT', help="RING name") -parser.add_option("-x", "--extra", dest="extra", - default='', help="Extra parameter") -(options, args) = parser.parse_args() +PARSER = OptionParser() +PARSER.add_option("-s", "--script", dest="script", default="", help="Script to submit") +PARSER.add_option("-r", "--ring", dest="ring", default="IT", help="RING name") +PARSER.add_option("-x", "--extra", dest="extra", default="", help="Extra parameter") +(OPTIONS, _) = PARSER.parse_args() -config_path = "%s/%s" % ( sys.path[0] ,"config/config.yml") -with open(config_path, 'r') as ymlfile: - cfg = yaml.load(ymlfile) +CONFIG_PATH = f"{sys.path[0]}/config/config.yml" +with open(CONFIG_PATH, "r", encoding="utf-8") as ymlfile: + CFG = yaml.load(ymlfile, Loader=yaml.SafeLoader) -script=options.script -opt=options.ring -opt2=options.extra -localdir = "%s/tmp/" % "/var" -total_cores = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) +SCRIPT = OPTIONS.script +RING = OPTIONS.ring +EXTRA_OPTS = OPTIONS.extra +LOCALDIR = "/var/tmp/" +TOTAL_CORES = int(CFG["spark.executor.instances"]) * int(CFG["spark.executor.cores"]) -cmd = "./spark-2.4.3-bin-hadoop2.7/bin/spark-submit --master %s \ +CMD = ( + "./spark-2.4.3-bin-hadoop2.7/bin/spark-submit --master %s \ --driver-memory=10g \ --executor-memory=10g \ --total-executor-cores=%s \ @@ -37,6 +37,18 @@ --conf spark.local.dir=%s \ --jars file:/root/spark/aws-java-sdk-1.7.4.jar,file:/root/spark/hadoop-aws-2.7.3.jar \ --driver-class-path=/root/spark/aws-java-sdk-1.7.4.jar:/root/spark/hadoop-aws-2.7.3.jar \ - ./%s %s %s" % ( cfg["master"], total_cores, cfg["s3"]["access_key"] , cfg["s3"]["secret_key"] , cfg["s3"]["endpoint"] , localdir, script , opt, opt2 ) + ./%s %s %s" + % ( + CFG["master"], + TOTAL_CORES, + CFG["s3"]["access_key"], + CFG["s3"]["secret_key"], + CFG["s3"]["endpoint"], + LOCALDIR, + SCRIPT, + RING, + EXTRA_OPTS, + ) +) -os.system(cmd) +os.system(CMD)