From 1a48b52d3501ba030fe5e196f971130789790e74 Mon Sep 17 00:00:00 2001 From: Dag7 Date: Wed, 18 Dec 2024 09:51:03 +0100 Subject: [PATCH] Fix Demozoo importer (#257) --- scrapers/py_importers/demozoo.py | 35 ++-- scrapers/py_importers/py_common/utils.py | 227 ++++++++++------------- scrapers/py_importers/requirements.txt | 1 + 3 files changed, 119 insertions(+), 144 deletions(-) diff --git a/scrapers/py_importers/demozoo.py b/scrapers/py_importers/demozoo.py index 7d0db19e..8912801f 100644 --- a/scrapers/py_importers/demozoo.py +++ b/scrapers/py_importers/demozoo.py @@ -3,17 +3,8 @@ # URL is structured in this way: # https://demozoo.org/productions/?platform={internal_no_platform}&production_type={internal_prodtype_number} -import sys -import re -import os -import json -import shutil -import zipfile -import fnmatch -import urllib3 + import requests -import unicodedata -from urllib.request import urlopen from bs4 import BeautifulSoup from py_common.Logger import Logger @@ -71,13 +62,15 @@ def scrape(platform): page = requests.get(baseurl + "/productions/?platform=" + str(PLATFORMS[platform][0]) + "&page=1", timeout=None) soup = BeautifulSoup(page.content, 'html.parser') - # get total number of pages - span_pages = soup.find("span", {"class":"current"}) - numberofpages = int(str.strip(span_pages.text).split(" ")[-1].split(".")[0]) - logger.write("[INFO]", "Total number of pages: " + str(numberofpages) ) - # parsing every page - for i in range(0, numberofpages): + enough_page = True + i = 0 + while enough_page: + if soup.find('a', {"title": "Next_page"}): + enough_page = True + else: + enough_page = False + logger.write("[INFO]", "Parsing page: " + str(i+1) ) #TODO: dont call twice this page, as it is called before @@ -107,7 +100,7 @@ def scrape(platform): # check if it could be added to database or not # building files - ret = utils.build(prod, entrypath, ["GB", "GBC"]) # TODO: GBA, add GBA to this list + ret = utils.build(prod, entrypath, ["gb", "gbc"]) # TODO: GBA, add GBA to this list # make required JSON file if ret != 1: @@ -165,7 +158,7 @@ def scrape_page(slug, url, platform): # fetching screenshot screen_obj = soup.find('a', {"class": "screenshot"}) - if screen_obj != None: + if screen_obj is not None: screenshot = screen_obj.get("href") else: screenshot = "None" @@ -178,7 +171,7 @@ def scrape_page(slug, url, platform): # fetching url (if present) url = soup.find('ul', {"class": "download_links"}) - if url != None: + if url is not None: url = url.findChildren("a") else: # it doesn't make any sense to have a prod without DL link @@ -196,11 +189,15 @@ def scrape_page(slug, url, platform): elif len(url) >= 2: # because almost always the prod will have the secondary mirror as scene.org or smth like that url = url[1].get("href") + if "scene.org" in url and "view" in url: + url = url.replace("view", "get") # fetching video video = soup.find(lambda tag: tag.name == "a" and "youtube" in tag.text.lower()) video = video.get("href") if video else "" + files = [f"{slug}.{platform.lower()}"] + return Production(title, slug, developer, platform, typetag, screenshots, files, video, repository=source, url=url) def main(): diff --git a/scrapers/py_importers/py_common/utils.py b/scrapers/py_importers/py_common/utils.py index b9d4ae3d..ab6c610f 100644 --- a/scrapers/py_importers/py_common/utils.py +++ b/scrapers/py_importers/py_common/utils.py @@ -1,28 +1,20 @@ -import sys -import py_common.utils import re import json import shutil import zipfile import fnmatch -import urllib3 import requests -import unicodedata import contextlib import urllib -from urllib.request import urlopen -import imghdr from PIL import Image import os from os import listdir -from os.path import isfile, join -from bs4 import BeautifulSoup from unidecode import unidecode from py_common.Logger import Logger -from py_common.Production import Production +import py7zr ########################### ### GLOBAL VAR AND CONS ### @@ -115,139 +107,124 @@ def fetch_prod_name(prod, suffix, filepath): return path -def build(prod: Production, entrypath: str, desired_extentions: list): + +def build(prod, entrypath: str, desired_extensions: list): ''' - given a prod "Production" object containing - all production's data, create a proper named folder, fetches all files (screenshot + rom) - and properly organize everything + Given a prod "Production" object containing + all production's data, create a properly named folder, fetch all files (screenshot + ROM), + and organize everything. ''' - if not os.path.exists(entrypath + prod.slug): - ############# - # PROD FILE # - ############# - # make its own folder - os.mkdir(entrypath + prod.slug, 0o777) - - # figuring out the suffix - suffix = str.lower(prod.url.split(".")[-1]) - if suffix not in desired_extentions: - suffix = "gb" - - # building the filepath - filepath = entrypath + prod.slug + "/" - - # download the file - # in case of http - if prod.url.startswith("http"): - try: - r = requests.get(prod.url, allow_redirects=True, - timeout=None, verify=False, headers=headers) - if r.status_code != 200: - logger.write("[ERR]:", str(r.status_code) + - ": " + prod.slug + " - " + prod.url) - - # cleaning in case of error - shutil.rmtree(entrypath + prod.slug) - return 1 - except ConnectionError as e: - logger.write("[ERR]:", str(r.status_code) + - ": " + prod.slug + " - " + prod.url) - logger.write("[ERR]:", "REASON: " + e) - - # cleaning in case of error - shutil.rmtree(entrypath + prod.slug) - return 1 - open(filepath + prod.slug + "." + suffix, 'wb').write(r.content) - else: - with contextlib.closing(urllib.request.urlopen(prod.url)) as r: - with open(filepath + prod.slug + "." + suffix, 'wb') as f: - shutil.copyfileobj(r, f) - - # unzip in case of zip - if prod.url.endswith(".zip") or prod.url.endswith(".ZIP"): - # download and unzip - try: - with zipfile.ZipFile(filepath + prod.slug + "." + suffix, "r") as zip_ref: - zip_ref.extractall(filepath + "unzippedfolder") + # Create folder if not already present + target_folder = os.path.join(entrypath, prod.slug) + if not os.path.exists(target_folder): + os.mkdir(target_folder, 0o777) - # manage all extensions, and it doesn't matter if they have uppercase or lowercase - path = [] # eventually the file + # Extract file extension + suffix = prod.url.split(".")[-1].lower() + + if suffix not in desired_extensions and suffix not in ["zip", "7z", "mp4"]: + print(f"ERROR: {prod.slug} extension is not in {desired_extensions}") + suffix = "gb" # Fallback extension - extentions = fix_extentions(desired_extentions) - for extension in extentions: - path = fetch_prod_name(prod, extension, filepath) - if path != []: - break + # Build the file path + filepath = os.path.join(target_folder, f"{prod.slug}.{suffix}") - # proper renaming and moving the file - if path != []: - os.rename(path[0], filepath + prod.slug + - "." + extension.lower()) + # Download the file + try: + if prod.url.startswith("http"): + r = requests.get(prod.url, allow_redirects=True, timeout=None, verify=False) + if r.status_code != 200: + raise Exception(f"HTTP Error {r.status_code}") + with open(filepath, 'wb') as f: + f.write(r.content) + else: + with contextlib.closing(urllib.request.urlopen(prod.url)) as r: + with open(filepath, 'wb') as f: + shutil.copyfileobj(r, f) + except Exception as e: + logger.write("[ERR]:", f"Error downloading {prod.slug}: {e}") + shutil.rmtree(target_folder) + return 1 + + # Unzip and handle files + if suffix in ["zip", "7z"]: + unzipped_path = os.path.join(target_folder, "unzippedfolder") + os.makedirs(unzipped_path, exist_ok=True) - # update production object file - prod.files.append(prod.slug + "." + extension.lower()) - else: - logger.write( - "[WARN]", prod.title + " extension is not a " + prod.platform + " file.") - shutil.rmtree(entrypath + prod.slug) - return 1 - - # cleaning up unneeded files - shutil.rmtree(filepath + "unzippedfolder") - if CLEANZIP: - os.remove(filepath + prod.slug + "." + "zip") - except zipfile.BadZipFile as e: - logger.write("[ERR] ", str(e) + " bad zip file") - shutil.rmtree(entrypath + prod.slug) + try: + if suffix == "zip": + with zipfile.ZipFile(filepath, "r") as zip_ref: + zip_ref.extractall(unzipped_path) + elif suffix == "7z": + with py7zr.SevenZipFile(filepath, mode='r') as z: + z.extractall(unzipped_path) + except Exception as e: + logger.write("[ERR]:", f"Failed to extract {suffix} file: {e}") + shutil.rmtree(target_folder) return 1 - else: - # it is a proper gb file -> just write the filename in its own structure field - pass - - # download the screenshot - if prod.screenshots != None and prod.screenshots != [] and prod.screenshots[0] != "None": - r = requests.get( - prod.screenshots[0], allow_redirects=True, timeout=None) - - # figuring out what kind of screenshots I am dealing with - screen_file_path = filepath + prod.slug + "." - - # screenshot fileext - screen_ext = prod.screenshots[0].split(".")[-1] - logger.write("[INFO]", " The screenshot is in " + - screen_ext + " format") - if screen_ext.lower() == "png": - screen_file_path += "png" - else: - screen_file_path += screen_ext - - open(screen_file_path, 'wb').write(r.content) + # Search for desired extensions in the extracted folder + valid_file_found = False + + # Recursively search all files under the unzipped path + for root, _, files in os.walk(unzipped_path): + for file in files: + ext = file.split(".")[-1].lower() + if ext in desired_extensions: + extracted_file = os.path.join(root, file) + final_file = os.path.join(target_folder, f"{prod.slug}.{ext}") + + # Move the valid file to the target folder + shutil.move(extracted_file, final_file) + prod.files.append(f"{prod.slug}.{ext}") + + valid_file_found = True + break + + if valid_file_found: + break - if screen_ext != "png": - im = Image.open(screen_file_path).convert("RGB") - im.save(filepath + prod.slug + ".png", "png") + if not valid_file_found: + logger.write("[WARN]:", f"No valid files with extensions {desired_extensions} found.") + shutil.rmtree(target_folder) + return 1 - logger.write( - "[INFO]", " Screenshot has been converted into a PNG file.") - logger.write("[INFO]", " Removing screenshot " + - screen_ext + " file...") + # Clean up unzipped files and original archive + shutil.rmtree(unzipped_path) + if CLEANZIP: + os.remove(filepath) + else: + prod.files.append(f"{prod.slug}.{suffix}") - os.remove(screen_file_path) + # Handle screenshots + if prod.screenshots and prod.screenshots[0] != "None": + try: + r = requests.get(prod.screenshots[0], allow_redirects=True, timeout=None) + screen_ext = prod.screenshots[0].split(".")[-1].lower() + screen_file = os.path.join(target_folder, f"{prod.slug}.{screen_ext}") + with open(screen_file, 'wb') as f: + f.write(r.content) + + # Convert to PNG if necessary + if screen_ext != "png": + img = Image.open(screen_file).convert("RGB") + png_file = os.path.join(target_folder, f"{prod.slug}.png") + img.save(png_file, "PNG") + os.remove(screen_file) + prod.screenshots[0] = f"{prod.slug}.png" + else: + prod.screenshots[0] = f"{prod.slug}.png" + except Exception as e: + logger.write("[ERR]:", f"Failed to download screenshot for {prod.slug}: {e}") + prod.screenshots = [] - open(filepath + prod.slug + "." + "png", 'wb').write(r.content) - prod.screenshots[0] = prod.slug + "." + "png" - else: - prod.screenshots = [] - logger.write( - "[INFO]", "Screenshot not present for this production") else: - logger.write( - "[WARN]", "directory already present. Skipping " + prod.slug + "...") + logger.write("[WARN]:", f"Directory already exists for {prod.slug}. Skipping...") return 1 return 0 + def fix_extentions(desired_extentions): ''' given a theorical list of extensions, it returns a list containing additional correct extensions (like CGB, AGB) diff --git a/scrapers/py_importers/requirements.txt b/scrapers/py_importers/requirements.txt index 3f0d398a..97dc0f23 100644 --- a/scrapers/py_importers/requirements.txt +++ b/scrapers/py_importers/requirements.txt @@ -12,3 +12,4 @@ webencodings==0.5.1 wget==3.2 webptools==0.0.5 pillow==8.3.2 +py7zr==0.22.0 \ No newline at end of file