Skip to content

Commit

Permalink
feat(runtimes/python3.11): Remove unused LibertAI parts (ASGI apps, o…
Browse files Browse the repository at this point in the history
…ther encodings...)
  • Loading branch information
RezaRahemtola committed Nov 28, 2024
1 parent ac2b8a3 commit 1f6ca13
Showing 1 changed file with 25 additions and 145 deletions.
170 changes: 25 additions & 145 deletions runtimes/python3.11-debian12/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import sys
import traceback
from collections.abc import AsyncIterable
from contextlib import redirect_stdout
from dataclasses import dataclass, field
from enum import Enum
from io import StringIO
from os import system
from shutil import make_archive
from typing import Any, Literal, NewType, Optional, Union, cast
from typing import Any, Literal, NewType, Optional, Union

import aiohttp
import msgpack
Expand All @@ -37,13 +34,10 @@


class Encoding(str, Enum):
plain = "plain"
zip = "zip"
squashfs = "squashfs"


class Interface(str, Enum):
asgi = "asgi"
executable = "executable"


Expand Down Expand Up @@ -115,11 +109,11 @@ def setup_variables(variables: Optional[dict[str, str]]):


def setup_network(
ipv4: Optional[str],
ipv6: Optional[str],
ipv4_gateway: Optional[str],
ipv6_gateway: Optional[str],
dns_servers: Optional[list[str]] = None,
ipv4: Optional[str],
ipv6: Optional[str],
ipv4_gateway: Optional[str],
ipv6_gateway: Optional[str],
dns_servers: Optional[list[str]] = None,
):
"""Setup the system with info from the host."""
dns_servers = dns_servers or []
Expand Down Expand Up @@ -189,7 +183,7 @@ def setup_volumes(volumes: list[Volume]):


async def wait_for_lifespan_event_completion(
application: ASGIApplication, event: Union[Literal["startup", "shutdown"]]
application: ASGIApplication, event: Union[Literal["startup", "shutdown"]]
):
"""
Send the startup lifespan signal to the ASGI app.
Expand All @@ -207,7 +201,6 @@ async def send(response: dict):
response_type = response.get("type")
if response_type == f"lifespan.{event}.complete":
lifespan_completion.set()
return
else:
logger.warning(f"Unexpected response to {event}: {response_type}")

Expand All @@ -221,68 +214,14 @@ async def send(response: dict):
)


async def setup_code_asgi(code: bytes, encoding: Encoding, entrypoint: str) -> ASGIApplication:
# Allow importing packages from /opt/packages, give it priority
sys.path.insert(0, "/opt/packages")

logger.debug("Extracting code")
app: ASGIApplication
if encoding == Encoding.squashfs:
sys.path.insert(0, "/opt/code")
module_name, app_name = entrypoint.split(":", 1)
logger.debug("import module")
module = __import__(module_name)
for level in module_name.split(".")[1:]:
module = getattr(module, level)
app = getattr(module, app_name)
elif encoding == Encoding.zip:
# Unzip in /opt and import the entrypoint from there
if not os.path.exists("/opt/archive.zip"):
open("/opt/archive.zip", "wb").write(code)
logger.debug("Run unzip")
os.system("unzip -q /opt/archive.zip -d /opt")
sys.path.insert(0, "/opt")
module_name, app_name = entrypoint.split(":", 1)
logger.debug("import module")
module = __import__(module_name)
for level in module_name.split(".")[1:]:
module = getattr(module, level)
logger.debug("import done")
app = getattr(module, app_name)
elif encoding == Encoding.plain:
# Execute the code and extract the entrypoint
locals: dict[str, Any] = {}
exec(code, globals(), locals)
app = locals[entrypoint]
else:
raise ValueError(f"Unknown encoding '{encoding}'")
await wait_for_lifespan_event_completion(application=app, event="startup")
return ASGIApplication(app)


def setup_code_executable(code: bytes, encoding: Encoding, entrypoint: str) -> subprocess.Popen:
def setup_code_executable(encoding: Encoding, entrypoint: str) -> subprocess.Popen:
logger.debug("Extracting code")
if encoding == Encoding.squashfs:
path = f"/opt/code/{entrypoint}"
if not os.path.isfile(path):
os.system("find /opt/code/")
raise FileNotFoundError(f"No such file: {path}")
os.system(f"chmod +x {path}")
elif encoding == Encoding.zip:
open("/opt/archive.zip", "wb").write(code)
logger.debug("Run unzip")
os.makedirs("/opt/code", exist_ok=True)
os.system("unzip /opt/archive.zip -d /opt/code")
path = f"/opt/code/{entrypoint}"
if not os.path.isfile(path):
os.system("find /opt/code")
raise FileNotFoundError(f"No such file: {path}")
os.system(f"chmod +x {path}")
elif encoding == Encoding.plain:
os.makedirs("/opt/code", exist_ok=True)
path = f"/opt/code/executable {entrypoint}"
open(path, "wb").write(code)
os.system(f"chmod +x {path}")
else:
raise ValueError(f"Unknown encoding '{encoding}'. This should never happen.")

Expand All @@ -291,77 +230,23 @@ def setup_code_executable(code: bytes, encoding: Encoding, entrypoint: str) -> s


async def setup_code(
code: bytes,
encoding: Encoding,
entrypoint: str,
interface: Interface,
encoding: Encoding,
entrypoint: str,
interface: Interface,
) -> Union[ASGIApplication, subprocess.Popen]:
if interface == Interface.asgi:
return await setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint)
elif interface == Interface.executable:
return setup_code_executable(code=code, encoding=encoding, entrypoint=entrypoint)
if interface == Interface.executable:
return setup_code_executable(encoding=encoding, entrypoint=entrypoint)
else:
raise ValueError("Invalid interface. This should never happen.")


async def run_python_code_http(application: ASGIApplication, scope: dict) -> tuple[dict, dict, str, Optional[bytes]]:
logger.debug("Running code")
with StringIO() as buf, redirect_stdout(buf):
# Execute in the same process, saves ~20ms than a subprocess

# The body should not be part of the ASGI scope itself
scope_body: bytes = scope.pop("body")

async def receive():
type_ = "http.request" if scope["type"] in ("http", "websocket") else "aleph.message"
return {"type": type_, "body": scope_body, "more_body": False}

send_queue: asyncio.Queue = asyncio.Queue()

async def send(dico):
await send_queue.put(dico)

# TODO: Better error handling
logger.debug("Awaiting application...")
await application(scope, receive, send)

logger.debug("Waiting for headers")
headers: dict
if scope["type"] == "http":
headers = await send_queue.get()
else:
headers = {}

logger.debug("Waiting for body")
response_body: dict = await send_queue.get()

logger.debug("Waiting for buffer")
output = buf.getvalue()

logger.debug(f"Headers {headers}")
logger.debug(f"Body {response_body}")
logger.debug(f"Output {output}")

logger.debug("Getting output data")
output_data: bytes
if os.path.isdir("/data") and os.listdir("/data"):
make_archive("/opt/output", "zip", "/data")
with open("/opt/output.zip", "rb") as output_zipfile:
output_data = output_zipfile.read()
else:
output_data = b""

logger.debug("Returning result")
return headers, response_body, output, output_data


async def make_request(session, scope):
async with session.request(
scope["method"],
url="http://localhost:8080{}".format(scope["path"]),
params=scope["query_string"],
headers=[(a.decode("utf-8"), b.decode("utf-8")) for a, b in scope["headers"]],
data=scope.get("body", None),
scope["method"],
url="http://localhost:8080{}".format(scope["path"]),
params=scope["query_string"],
headers=[(a.decode("utf-8"), b.decode("utf-8")) for a, b in scope["headers"]],
data=scope.get("body", None),
) as resp:
headers = {
"headers": [(a.encode("utf-8"), b.encode("utf-8")) for a, b in resp.headers.items()],
Expand Down Expand Up @@ -410,9 +295,9 @@ async def run_executable_http(scope: dict) -> tuple[dict, dict, str, Optional[by


async def process_instruction(
instruction: bytes,
interface: Interface,
application: Union[ASGIApplication, subprocess.Popen],
instruction: bytes,
interface: Interface,
application: Union[ASGIApplication, subprocess.Popen],
) -> AsyncIterable[bytes]:
if instruction == b"halt":
logger.info("Received halt command")
Expand Down Expand Up @@ -448,12 +333,7 @@ async def process_instruction(
body: dict
output_data: Optional[bytes]

if interface == Interface.asgi:
application = cast(ASGIApplication, application)
headers, body, output, output_data = await run_python_code_http(
application=application, scope=payload.scope
)
elif interface == Interface.executable:
if interface == Interface.executable:
headers, body, output, output_data = await run_executable_http(scope=payload.scope)
else:
raise ValueError("Unknown interface. This should never happen")
Expand Down Expand Up @@ -524,7 +404,7 @@ def setup_system(config: ConfigurationPayload):


def umount_volumes(volumes: list[Volume]):
"Umount user related filesystems"
"""Umount user related filesystems"""
system("sync")
for volume in volumes:
logger.debug(f"Umounting /dev/{volume.device} on {volume.mount}")
Expand All @@ -540,7 +420,7 @@ async def main() -> None:

try:
app: Union[ASGIApplication, subprocess.Popen] = await setup_code(
config.code, config.encoding, config.entrypoint, config.interface
config.encoding, config.entrypoint, config.interface
)
client.send(msgpack.dumps({"success": True}))
except Exception as error:
Expand All @@ -557,7 +437,7 @@ async def main() -> None:
raise

class ServerReference:
"Reference used to close the server from within `handle_instruction"
"""Reference used to close the server from within `handle_instruction`"""
server: asyncio.AbstractServer

server_reference = ServerReference()
Expand Down

0 comments on commit 1f6ca13

Please sign in to comment.