diff --git a/idb/cli/__init__.py b/idb/cli/__init__.py index e509a8eae..250115001 100644 --- a/idb/cli/__init__.py +++ b/idb/cli/__init__.py @@ -11,7 +11,8 @@ import os from abc import ABCMeta, abstractmethod from argparse import ArgumentParser, Namespace -from typing import AsyncGenerator, Optional +from collections.abc import AsyncGenerator +from typing import Optional from idb.common import plugin from idb.common.command import Command @@ -123,7 +124,7 @@ def add_parser_arguments(self, parser: ArgumentParser) -> None: super().add_parser_arguments(parser) async def _run_impl(self, args: Namespace) -> None: - address: Optional[Address] = None + address: Address | None = None try: async with _get_client(args=args, logger=self.logger) as client: address = client.address diff --git a/idb/cli/commands/app.py b/idb/cli/commands/app.py index bf2ef4515..72e43bc9e 100644 --- a/idb/cli/commands/app.py +++ b/idb/cli/commands/app.py @@ -51,7 +51,7 @@ def add_parser_arguments(self, parser: ArgumentParser) -> None: super().add_parser_arguments(parser) async def run_with_client(self, args: Namespace, client: Client) -> None: - artifact: Optional[InstalledArtifact] = None + artifact: InstalledArtifact | None = None compression = ( Compression[args.compression] if args.compression is not None else None ) diff --git a/idb/cli/commands/approve.py b/idb/cli/commands/approve.py index 582c80874..e4f31e87e 100644 --- a/idb/cli/commands/approve.py +++ b/idb/cli/commands/approve.py @@ -13,7 +13,7 @@ from idb.common.types import Client, Permission -_ARG_TO_ENUM: Dict[str, Permission] = { +_ARG_TO_ENUM: dict[str, Permission] = { key.lower(): value for (key, value) in Permission.__members__.items() } diff --git a/idb/cli/commands/daemon.py b/idb/cli/commands/daemon.py index fec96a18e..317527be5 100644 --- a/idb/cli/commands/daemon.py +++ b/idb/cli/commands/daemon.py @@ -89,7 +89,7 @@ async def _run_impl(self, args: Namespace) -> None: self.logger.info("Exiting") def _reply_with_port( - self, reply_fd: Optional[int], prefer_ipv6: bool, ports: Dict[str, int] + self, reply_fd: int | None, prefer_ipv6: bool, ports: dict[str, int] ) -> None: if not reply_fd: return diff --git a/idb/cli/commands/file.py b/idb/cli/commands/file.py index 3804957fe..48697ef7a 100644 --- a/idb/cli/commands/file.py +++ b/idb/cli/commands/file.py @@ -21,7 +21,7 @@ def _add_container_types_to_group( - group: _MutuallyExclusiveGroup, containers: List[Tuple[FileContainerType, str]] + group: _MutuallyExclusiveGroup, containers: list[tuple[FileContainerType, str]] ) -> None: for container_type, help_text in containers: argument_name = container_type.value.replace("_", "-") @@ -121,7 +121,7 @@ def name(self) -> str: return "list" @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return ["ls"] def add_parser_arguments(self, parser: ArgumentParser) -> None: @@ -200,7 +200,7 @@ def name(self) -> str: return "move" @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return ["mv"] def add_parser_arguments(self, parser: ArgumentParser) -> None: @@ -233,7 +233,7 @@ def name(self) -> str: return "remove" @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return ["rm"] def add_parser_arguments(self, parser: ArgumentParser) -> None: @@ -324,7 +324,7 @@ def name(self) -> str: return "read" @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return ["show"] def add_parser_arguments(self, parser: ArgumentParser) -> None: diff --git a/idb/cli/commands/log.py b/idb/cli/commands/log.py index 24b6d6bf1..31e2e0016 100644 --- a/idb/cli/commands/log.py +++ b/idb/cli/commands/log.py @@ -57,8 +57,8 @@ async def run_with_client(self, args: Namespace, client: Client) -> None: print("") def normalise_log_arguments( - self, log_arguments: Optional[List[str]] - ) -> Optional[List[str]]: + self, log_arguments: list[str] | None + ) -> list[str] | None: if log_arguments is None: return None diff --git a/idb/cli/commands/revoke.py b/idb/cli/commands/revoke.py index cbe03b730..31842231d 100644 --- a/idb/cli/commands/revoke.py +++ b/idb/cli/commands/revoke.py @@ -13,7 +13,7 @@ from idb.common.types import Client, Permission -_ARG_TO_ENUM: Dict[str, Permission] = { +_ARG_TO_ENUM: dict[str, Permission] = { key.lower(): value for (key, value) in Permission.__members__.items() } diff --git a/idb/cli/commands/screenshot.py b/idb/cli/commands/screenshot.py index 0495f2451..713bbbdd8 100644 --- a/idb/cli/commands/screenshot.py +++ b/idb/cli/commands/screenshot.py @@ -8,8 +8,9 @@ import sys from argparse import ArgumentParser, Namespace +from collections.abc import Iterator from contextlib import contextmanager -from typing import IO, Iterator +from typing import IO from idb.cli import ClientCommand from idb.common.types import Client diff --git a/idb/cli/commands/shell.py b/idb/cli/commands/shell.py index 0b7705b07..02ac3be20 100644 --- a/idb/cli/commands/shell.py +++ b/idb/cli/commands/shell.py @@ -21,7 +21,7 @@ class ShellCommand(ClientCommand): def __init__(self, parser: ArgumentParser) -> None: super().__init__() self.parser = parser - self.root_command: Optional[CommandGroup] = None + self.root_command: CommandGroup | None = None @property def description(self) -> str: diff --git a/idb/cli/commands/target.py b/idb/cli/commands/target.py index 85b4bdc14..ed47155e6 100644 --- a/idb/cli/commands/target.py +++ b/idb/cli/commands/target.py @@ -8,7 +8,8 @@ import json from argparse import ArgumentParser, Namespace, SUPPRESS -from typing import Mapping, Union +from collections.abc import Mapping +from typing import Union import idb.common.plugin as plugin from idb.cli import ClientCommand, CompanionCommand, ManagementCommand @@ -37,7 +38,7 @@ class DisconnectCommandException(Exception): pass -def get_destination(args: Namespace) -> Union[TCPAddress, str]: +def get_destination(args: Namespace) -> TCPAddress | str: if is_udid(args.companion): return args.companion elif args.port and args.companion: diff --git a/idb/cli/commands/video.py b/idb/cli/commands/video.py index 894ad6b23..970932d9d 100644 --- a/idb/cli/commands/video.py +++ b/idb/cli/commands/video.py @@ -15,7 +15,7 @@ from idb.common.types import Client, VideoFormat -_FORMAT_CHOICE_MAP: Dict[str, VideoFormat] = { +_FORMAT_CHOICE_MAP: dict[str, VideoFormat] = { str(format.value.lower()): format for format in VideoFormat } @@ -30,7 +30,7 @@ def name(self) -> str: return "video" @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return ["record-video"] def add_parser_arguments(self, parser: ArgumentParser) -> None: diff --git a/idb/cli/commands/xctest.py b/idb/cli/commands/xctest.py index 9032996ac..046b4b772 100644 --- a/idb/cli/commands/xctest.py +++ b/idb/cli/commands/xctest.py @@ -297,7 +297,7 @@ async def install_bundles(self, args: Namespace, client: Client) -> None: async def install_dsym_test_bundle( self, args: Namespace, client: Client, test_bundle_location: str - ) -> Optional[str]: + ) -> str | None: dsym_name = None dsym_path_location = args.install_dsym_test_bundle if args.install_dsym_test_bundle == NO_SPECIFIED_PATH: @@ -320,10 +320,10 @@ async def install_dsym_test_bundle( dsym_name = install_response.name return dsym_name - def get_tests_to_run(self, args: Namespace) -> Optional[Set[str]]: + def get_tests_to_run(self, args: Namespace) -> set[str] | None: return None - def get_tests_to_skip(self, args: Namespace) -> Optional[Set[str]]: + def get_tests_to_skip(self, args: Namespace) -> set[str] | None: return None @@ -366,10 +366,10 @@ async def install_bundles(self, args: Namespace, client: Client) -> None: async for app in client.install(args.app_bundle_id): args.app_bundle_id = app.name - def get_tests_to_run(self, args: Namespace) -> Optional[Set[str]]: + def get_tests_to_run(self, args: Namespace) -> set[str] | None: return set(args.tests_to_run) if args.tests_to_run else None - def get_tests_to_skip(self, args: Namespace) -> Optional[Set[str]]: + def get_tests_to_skip(self, args: Namespace) -> set[str] | None: return set(args.tests_to_skip) if args.tests_to_skip else None @@ -414,7 +414,7 @@ def add_parser_arguments(self, parser: ArgumentParser) -> None: Format: className/methodName", ) - def get_tests_to_run(self, args: Namespace) -> Optional[Set[str]]: + def get_tests_to_run(self, args: Namespace) -> set[str] | None: if args.test_to_run: return set(args.test_to_run) if args.tests_to_run: diff --git a/idb/cli/main.py b/idb/cli/main.py index 60b844284..6290a324d 100644 --- a/idb/cli/main.py +++ b/idb/cli/main.py @@ -117,7 +117,7 @@ logger: logging.Logger = logging.getLogger() -def get_default_companion_path() -> Optional[str]: +def get_default_companion_path() -> str | None: if sys.platform != "darwin": return None return shutil.which("idb_companion") or "/usr/local/bin/idb_companion" @@ -126,7 +126,7 @@ def get_default_companion_path() -> Optional[str]: SysExitArg = Union[int, str, None] -async def gen_main(cmd_input: Optional[List[str]] = None) -> SysExitArg: +async def gen_main(cmd_input: list[str] | None = None) -> SysExitArg: # Make sure all files are created with global rw permissions os.umask(0o000) # Setup parser @@ -180,7 +180,7 @@ async def gen_main(cmd_input: Optional[List[str]] = None) -> SysExitArg: help="If flagged will not modify local state when a companion is known to be unresponsive", ) shell_command = ShellCommand(parser=parser) - commands: List[Command] = [ + commands: list[Command] = [ AppInstallCommand(), AppUninstallCommand(), AppListCommand(), @@ -340,7 +340,7 @@ async def gen_main(cmd_input: Optional[List[str]] = None) -> SysExitArg: await drain_coroutines(pending) -async def drain_coroutines(pending: Set[asyncio.Task]) -> None: +async def drain_coroutines(pending: set[asyncio.Task]) -> None: if not pending: return logger.debug(f"Shutting down {len(pending)} coroutines") @@ -355,7 +355,7 @@ async def drain_coroutines(pending: Set[asyncio.Task]) -> None: pass -def main(cmd_input: Optional[List[str]] = None) -> SysExitArg: +def main(cmd_input: list[str] | None = None) -> SysExitArg: loop = asyncio.get_event_loop() try: return loop.run_until_complete(gen_main(cmd_input)) diff --git a/idb/cli/tests/parser_tests.py b/idb/cli/tests/parser_tests.py index bf3072d55..8006c6314 100644 --- a/idb/cli/tests/parser_tests.py +++ b/idb/cli/tests/parser_tests.py @@ -28,7 +28,7 @@ T = TypeVar("T") -COMPANION_PATH: Optional[str] = ( +COMPANION_PATH: str | None = ( "/usr/local/bin/idb_companion" if sys.platform == "darwin" else None ) @@ -37,7 +37,7 @@ class AsyncGeneratorMock(AsyncMock): def __call__(self) -> None: super() - def __init__(self, iter_list: Tuple[T, ...] = ()) -> None: + def __init__(self, iter_list: tuple[T, ...] = ()) -> None: super().__init__() self.iter_list = iter_list self.iter_pos: int = -1 diff --git a/idb/common/args.py b/idb/common/args.py index 751415055..de84d7869 100644 --- a/idb/common/args.py +++ b/idb/common/args.py @@ -21,8 +21,8 @@ def __call__( self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, - values: List[str], - option_string: Optional[str] = None, + values: list[str], + option_string: str | None = None, ) -> None: assert len(values) == 1 try: @@ -36,13 +36,11 @@ def __call__( setattr(namespace, self.dest, d) -def have_file_with_extension(file_prefix: str, extensions: List[str]) -> bool: - return any( - (os.path.exists(f"{file_prefix}.{extension}") for extension in extensions) - ) +def have_file_with_extension(file_prefix: str, extensions: list[str]) -> bool: + return any(os.path.exists(f"{file_prefix}.{extension}") for extension in extensions) -def find_next_file_prefix(basename: str, extensions: Optional[List[str]] = None) -> str: +def find_next_file_prefix(basename: str, extensions: list[str] | None = None) -> str: extensions = extensions or [] number = 1 while True: diff --git a/idb/common/command.py b/idb/common/command.py index cd92de424..501bbb468 100644 --- a/idb/common/command.py +++ b/idb/common/command.py @@ -13,7 +13,7 @@ class Command(metaclass=ABCMeta): @property - def aliases(self) -> List[str]: + def aliases(self) -> list[str]: return [] @property @@ -40,16 +40,16 @@ def resolve_command_from_args(self, args: Namespace) -> "Command": class CompositeCommand(Command, metaclass=ABCMeta): def __init__(self) -> None: - self.parser: Optional[ArgumentParser] = None - self._subcommands_by_name: Dict[str, Command] = {} + self.parser: ArgumentParser | None = None + self._subcommands_by_name: dict[str, Command] = {} @property @abstractmethod - def subcommands(self) -> List[Command]: + def subcommands(self) -> list[Command]: pass @property - def subcommands_by_name(self) -> Dict[str, Command]: + def subcommands_by_name(self) -> dict[str, Command]: def add_unique_cmd(key: str, value: Command) -> None: assert ( key not in self._subcommands_by_name @@ -89,7 +89,7 @@ async def run(self, args: Namespace) -> None: class CommandGroup(CompositeCommand): - def __init__(self, name: str, description: str, commands: List[Command]) -> None: + def __init__(self, name: str, description: str, commands: list[Command]) -> None: super().__init__() self.commands = commands self._name = name @@ -104,5 +104,5 @@ def description(self) -> str: return self._description @property - def subcommands(self) -> List[Command]: + def subcommands(self) -> list[Command]: return self.commands diff --git a/idb/common/companion.py b/idb/common/companion.py index de2454753..bbb084377 100644 --- a/idb/common/companion.py +++ b/idb/common/companion.py @@ -11,10 +11,11 @@ import logging import os import subprocess +from collections.abc import AsyncGenerator, Sequence from dataclasses import dataclass from datetime import timedelta from logging import DEBUG as LOG_LEVEL_DEBUG, Logger -from typing import AsyncGenerator, Dict, List, Optional, Sequence, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from idb.common.constants import IDB_LOGS_PATH from idb.common.file import get_last_n_lines @@ -41,7 +42,7 @@ DEFAULT_COMPANION_TEARDOWN_TIMEOUT = timedelta(seconds=30) -CompanionReport = Dict[str, Union[int, str]] +CompanionReport = dict[str, Union[int, str]] class IdbJsonException(Exception): @@ -71,7 +72,7 @@ async def _terminate_process( process.kill() -def _only_arg_from_filter(only: Optional[OnlyFilter]) -> List[str]: +def _only_arg_from_filter(only: OnlyFilter | None) -> list[str]: if isinstance(only, TargetType): if only == TargetType.MAC: return [] @@ -81,7 +82,7 @@ def _only_arg_from_filter(only: Optional[OnlyFilter]) -> List[str]: return [] -def parse_json_line(line: bytes) -> Dict[str, Union[int, str]]: +def parse_json_line(line: bytes) -> dict[str, int | str]: decoded_line = line.decode() try: return json.loads(decoded_line) @@ -111,7 +112,7 @@ async def _verify_port_from_spawned_companion( report: CompanionReport, port_name: str, log_file_path: str, - expected_port: Optional[int], + expected_port: int | None, ) -> int: try: extracted_port = int(report[port_name]) @@ -151,9 +152,9 @@ async def _extract_domain_sock_from_spawned_companion( class CompanionServerConfig: udid: str only: OnlyFilter - log_file_path: Optional[str] - cwd: Optional[str] - tmp_path: Optional[str] + log_file_path: str | None + cwd: str | None + tmp_path: str | None reparent: bool @@ -161,10 +162,10 @@ class Companion(CompanionBase): def __init__( self, companion_path: str, - device_set_path: Optional[str], + device_set_path: str | None, logger: Logger, architecture: Architecture = Architecture.ANY, - only: Optional[OnlyFilter] = None, + only: OnlyFilter | None = None, ) -> None: self._companion_path = companion_path self._device_set_path = device_set_path @@ -174,9 +175,9 @@ def __init__( @asynccontextmanager async def _start_companion_command( - self, arguments: List[str] + self, arguments: list[str] ) -> AsyncGenerator[asyncio.subprocess.Process, None]: - cmd: List[str] = [] + cmd: list[str] = [] if self._architecture != Architecture.ANY: cmd = ["arch", "-" + self._architecture.value] cmd += [self._companion_path] @@ -207,7 +208,7 @@ async def _start_companion_command( ) async def _run_companion_command( - self, arguments: List[str], timeout: Optional[timedelta] + self, arguments: list[str], timeout: timedelta | None ) -> str: timeout = timeout if timeout is not None else DEFAULT_COMPANION_COMMAND_TIMEOUT async with self._start_companion_command(arguments=arguments) as process: @@ -230,8 +231,8 @@ async def _run_udid_command( self, udid: str, command: str, - timeout: Optional[timedelta], - extra_arguments: Optional[Sequence[str]] = None, + timeout: timedelta | None, + extra_arguments: Sequence[str] | None = None, ) -> str: arguments = [f"--{command}", udid] if extra_arguments is not None: @@ -247,16 +248,16 @@ def _log_file_path(self, target_udid: str) -> str: async def _spawn_server( self, config: CompanionServerConfig, - port_env_variables: Dict[str, str], - bind_arguments: List[str], - ) -> Tuple[asyncio.subprocess.Process, str]: + port_env_variables: dict[str, str], + bind_arguments: list[str], + ) -> tuple[asyncio.subprocess.Process, str]: if os.getuid() == 0: logging.warning( "idb should not be run as root. " "Listing available targets on this host and spawning " "companions will not work" ) - arguments: List[str] = [] + arguments: list[str] = [] if self._architecture != Architecture.ANY: arguments = ["arch", "-" + self._architecture.value] arguments += ( @@ -297,11 +298,11 @@ async def _spawn_server( async def spawn_tcp_server( self, config: CompanionServerConfig, - port: Optional[int], - swift_port: Optional[int] = None, - tls_cert_path: Optional[str] = None, - ) -> Tuple[asyncio.subprocess.Process, int, Optional[int]]: - port_env_variables: Dict[str, str] = {} + port: int | None, + swift_port: int | None = None, + tls_cert_path: str | None = None, + ) -> tuple[asyncio.subprocess.Process, int, int | None]: + port_env_variables: dict[str, str] = {} if swift_port is not None: port_env_variables["IDB_SWIFT_COMPANION_PORT"] = str(swift_port) @@ -321,7 +322,7 @@ async def spawn_tcp_server( companion_report, "grpc_port", log_file_path, port ) - extracted_swift_port: Optional[int] = None + extracted_swift_port: int | None = None if swift_port: try: extracted_swift_port = await _verify_port_from_spawned_companion( @@ -370,7 +371,7 @@ async def spawn_domain_sock_server( @log_call() async def create( - self, device_type: str, os_version: str, timeout: Optional[timedelta] = None + self, device_type: str, os_version: str, timeout: timedelta | None = None ) -> TargetDescription: output = await self._run_companion_command( arguments=["--create", f"{device_type},{os_version}"], timeout=timeout @@ -379,7 +380,7 @@ async def create( @log_call() async def boot( - self, udid: str, verify: bool = True, timeout: Optional[timedelta] = None + self, udid: str, verify: bool = True, timeout: timedelta | None = None ) -> None: await self._run_udid_command( udid=udid, @@ -390,7 +391,7 @@ async def boot( @asynccontextmanager async def boot_headless( - self, udid: str, verify: bool = True, timeout: Optional[timedelta] = None + self, udid: str, verify: bool = True, timeout: timedelta | None = None ) -> AsyncGenerator[None, None]: async with self._start_companion_command( [ @@ -414,7 +415,7 @@ async def boot_headless( self._logger.info(f"Done with {target}. Shutting down.") @log_call() - async def shutdown(self, udid: str, timeout: Optional[timedelta] = None) -> None: + async def shutdown(self, udid: str, timeout: timedelta | None = None) -> None: await self._run_udid_command(udid=udid, command="shutdown", timeout=timeout) @log_call() @@ -427,8 +428,8 @@ async def erase( async def clone( self, udid: str, - destination_device_set: Optional[str] = None, - timeout: Optional[timedelta] = None, + destination_device_set: str | None = None, + timeout: timedelta | None = None, ) -> TargetDescription: arguments = ["--clone", udid] if destination_device_set is not None: @@ -437,21 +438,19 @@ async def clone( return target_description_from_json(output.splitlines()[-1]) @log_call() - async def delete( - self, udid: Optional[str], timeout: Optional[timedelta] = None - ) -> None: + async def delete(self, udid: str | None, timeout: timedelta | None = None) -> None: await self._run_udid_command( udid=udid if udid is not None else "all", command="delete", timeout=timeout ) @log_call() - async def clean(self, udid: str, timeout: Optional[timedelta] = None) -> None: + async def clean(self, udid: str, timeout: timedelta | None = None) -> None: await self._run_udid_command(udid=udid, command="clean", timeout=timeout) @log_call() async def list_targets( - self, only: Optional[OnlyFilter] = None, timeout: Optional[timedelta] = None - ) -> List[TargetDescription]: + self, only: OnlyFilter | None = None, timeout: timedelta | None = None + ) -> list[TargetDescription]: arguments = ["--list", "1"] + _only_arg_from_filter(only=only) output = await self._run_companion_command(arguments=arguments, timeout=timeout) return [ @@ -461,8 +460,8 @@ async def list_targets( ] async def tail_targets( - self, only: Optional[OnlyFilter] = None - ) -> AsyncGenerator[List[TargetDescription], None]: + self, only: OnlyFilter | None = None + ) -> AsyncGenerator[list[TargetDescription], None]: arguments = ["--notify", "stdout"] + _only_arg_from_filter(only=only) async with self._start_companion_command(arguments=arguments) as process: async for line in none_throws(process.stdout): @@ -471,9 +470,9 @@ async def tail_targets( @log_call() async def target_description( self, - udid: Optional[str] = None, - only: Optional[OnlyFilter] = None, - timeout: Optional[timedelta] = None, + udid: str | None = None, + only: OnlyFilter | None = None, + timeout: timedelta | None = None, ) -> TargetDescription: all_details = await self.list_targets(only=only, timeout=timeout) details = all_details @@ -487,7 +486,7 @@ async def target_description( @asynccontextmanager async def unix_domain_server( - self, udid: str, path: str, only: Optional[OnlyFilter] = None + self, udid: str, path: str, only: OnlyFilter | None = None ) -> AsyncGenerator[str, None]: async with self._start_companion_command( ["--udid", udid, "--grpc-domain-sock", path] diff --git a/idb/common/companion_set.py b/idb/common/companion_set.py index 7ed1a8d84..e9f40e8a2 100644 --- a/idb/common/companion_set.py +++ b/idb/common/companion_set.py @@ -10,10 +10,11 @@ import json import logging import os +from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path -from typing import AsyncGenerator, List, Optional +from typing import List, Optional from idb.common.constants import IDB_STATE_FILE_PATH from idb.common.format import json_data_companions, json_to_companion_info @@ -56,12 +57,12 @@ def __init__( self.logger = logger @asynccontextmanager - async def _use_stored_companions(self) -> AsyncGenerator[List[CompanionInfo], None]: + async def _use_stored_companions(self) -> AsyncGenerator[list[CompanionInfo], None]: async with _open_lockfile(filename=self.state_file_path): # Create the state file Path(self.state_file_path).touch(exist_ok=True) fresh_state = False - with open(self.state_file_path, "r") as f: + with open(self.state_file_path) as f: try: companion_info_in = json_to_companion_info(json.load(f)) except json.JSONDecodeError: @@ -91,11 +92,11 @@ async def _use_stored_companions(self) -> AsyncGenerator[List[CompanionInfo], No with open(self.state_file_path, "w") as f: json.dump(json_data_companions(companion_info_out), f) - async def get_companions(self) -> List[CompanionInfo]: + async def get_companions(self) -> list[CompanionInfo]: async with self._use_stored_companions() as companions: return companions - async def add_companion(self, companion: CompanionInfo) -> Optional[CompanionInfo]: + async def add_companion(self, companion: CompanionInfo) -> CompanionInfo | None: async with self._use_stored_companions() as companions: udid = companion.udid current = {existing.udid: existing for existing in companions} @@ -111,7 +112,7 @@ async def add_companion(self, companion: CompanionInfo) -> Optional[CompanionInf companions.append(companion) return None - async def clear(self) -> List[CompanionInfo]: + async def clear(self) -> list[CompanionInfo]: async with self._use_stored_companions() as companions: cleared = list(companions) companions.clear() @@ -119,7 +120,7 @@ async def clear(self) -> List[CompanionInfo]: async def remove_companion( self, destination: ConnectionDestination - ) -> List[CompanionInfo]: + ) -> list[CompanionInfo]: async with self._use_stored_companions() as companions: if isinstance(destination, str): to_remove = [ diff --git a/idb/common/constants.py b/idb/common/constants.py index 6fb20bbd6..8709fa686 100644 --- a/idb/common/constants.py +++ b/idb/common/constants.py @@ -22,7 +22,7 @@ STOP_INSTRUMENTS_TIMEOUT: float = timedelta(minutes=10).total_seconds() CRASH_LIST_TIMEOUT: float = timedelta(minutes=5).total_seconds() -JSONDict = Dict[str, Any] +JSONDict = dict[str, Any] BASE_IDB_FILE_PATH: str = "/tmp/idb" IDB_PID_PATH: str = f"{BASE_IDB_FILE_PATH}/pid" diff --git a/idb/common/file.py b/idb/common/file.py index 7caecc5e2..e5ba969ea 100644 --- a/idb/common/file.py +++ b/idb/common/file.py @@ -6,13 +6,13 @@ # pyre-strict -from typing import AsyncIterator +from collections.abc import AsyncIterator import aiofiles def get_last_n_lines(file_path: str, n: int) -> str: - with open(file_path, "r") as f: + with open(file_path) as f: return "\n".join(f.readlines()[-n:]) diff --git a/idb/common/format.py b/idb/common/format.py index 4439d6642..4ecb3e622 100644 --- a/idb/common/format.py +++ b/idb/common/format.py @@ -74,11 +74,11 @@ def human_format_test_info(test: TestRunInfo) -> str: return output -def human_format_activities(activities: List[TestActivity]) -> str: +def human_format_activities(activities: list[TestActivity]) -> str: tree: Tree = Tree() start: float = activities[0].start - def process_activity(activity: TestActivity, parent: Optional[str] = None) -> None: + def process_activity(activity: TestActivity, parent: str | None = None) -> None: tree.create_node( f"{activity.name} ({activity.finish - start:.2f}s)", activity.uuid, @@ -103,7 +103,7 @@ def process_activity(activity: TestActivity, parent: Optional[str] = None) -> No def json_format_test_info(test: TestRunInfo) -> str: - data: Dict[str, Any] = { + data: dict[str, Any] = { "bundleName": test.bundle_name, "className": test.class_name, "methodName": test.method_name, @@ -128,7 +128,7 @@ def json_format_test_info(test: TestRunInfo) -> str: return json.dumps(data) -def json_format_activity(activity: TestActivity) -> Dict[str, Any]: +def json_format_activity(activity: TestActivity) -> dict[str, Any]: return { "title": activity.title, "duration": activity.duration, @@ -175,13 +175,13 @@ def human_format_installed_app_info(app: InstalledAppInfo) -> str: def app_process_id_based_on_state( pid: int, state: AppProcessState, -) -> Optional[str]: +) -> str | None: if state is AppProcessState.RUNNING: return str(pid) return None -def app_process_state_to_string(state: Optional[AppProcessState]) -> str: +def app_process_state_to_string(state: AppProcessState | None) -> str: if state is AppProcessState.RUNNING: return "Running" elif state is AppProcessState.NOT_RUNNING: @@ -227,8 +227,8 @@ def human_format_target_info(target: TargetDescription) -> str: return target_info + f"{address.path}" -def json_data_target_info(target: TargetDescription) -> Dict[str, Any]: - data: Dict[str, Any] = { +def json_data_target_info(target: TargetDescription) -> dict[str, Any]: + data: dict[str, Any] = { "name": target.name, "udid": target.udid, "state": target.state, @@ -254,11 +254,11 @@ def json_data_target_info(target: TargetDescription) -> Dict[str, Any]: def json_data_companions( - companions: List[CompanionInfo], -) -> List[Dict[str, Union[str, Optional[int]]]]: - data: List[Dict[str, Union[str, Optional[int]]]] = [] + companions: list[CompanionInfo], +) -> list[dict[str, str | int | None]]: + data: list[dict[str, str | int | None]] = [] for companion in companions: - item: Dict[str, Union[str, Optional[int]]] = { + item: dict[str, str | int | None] = { "udid": companion.udid, "is_local": companion.is_local, "pid": companion.pid, @@ -273,7 +273,7 @@ def json_data_companions( return data -def json_to_companion_info(data: List[Dict[str, Any]]) -> List[CompanionInfo]: +def json_to_companion_info(data: list[dict[str, Any]]) -> list[CompanionInfo]: return [ CompanionInfo( udid=item["udid"], @@ -293,13 +293,13 @@ def target_description_from_json(data: str) -> TargetDescription: return target_description_from_dictionary(parsed=json.loads(data)) -def target_descriptions_from_json(data: str) -> List[TargetDescription]: +def target_descriptions_from_json(data: str) -> list[TargetDescription]: return [ target_description_from_dictionary(parsed=target) for target in json.loads(data) ] -def target_description_from_dictionary(parsed: Dict[str, Any]) -> TargetDescription: +def target_description_from_dictionary(parsed: dict[str, Any]) -> TargetDescription: return TargetDescription( udid=parsed["udid"], name=parsed["name"], diff --git a/idb/common/gzip.py b/idb/common/gzip.py index 3b9f64953..0430c15e0 100644 --- a/idb/common/gzip.py +++ b/idb/common/gzip.py @@ -9,7 +9,7 @@ import asyncio import sys -from typing import AsyncGenerator, AsyncIterator +from collections.abc import AsyncGenerator, AsyncIterator from idb.utils.contextlib import asynccontextmanager from idb.utils.typing import none_throws diff --git a/idb/common/hid.py b/idb/common/hid.py index 23dd4f72e..3d26cdc8b 100644 --- a/idb/common/hid.py +++ b/idb/common/hid.py @@ -6,7 +6,8 @@ # pyre-strict -from typing import AsyncIterator, Dict, Iterable, List, Optional, Tuple +from collections.abc import AsyncIterator, Iterable +from typing import Dict, List, Optional, Tuple from idb.common.types import ( HIDButton, @@ -23,27 +24,23 @@ ) -def tap_to_events( - x: float, y: float, duration: Optional[float] = None -) -> List[HIDEvent]: +def tap_to_events(x: float, y: float, duration: float | None = None) -> list[HIDEvent]: return _press_with_duration(HIDTouch(point=Point(x=x, y=y)), duration=duration) def button_press_to_events( - button: HIDButtonType, duration: Optional[float] = None -) -> List[HIDEvent]: + button: HIDButtonType, duration: float | None = None +) -> list[HIDEvent]: return _press_with_duration(HIDButton(button=button), duration=duration) -def key_press_to_events( - keycode: int, duration: Optional[float] = None -) -> List[HIDEvent]: +def key_press_to_events(keycode: int, duration: float | None = None) -> list[HIDEvent]: return _press_with_duration(HIDKey(keycode=keycode), duration=duration) def _press_with_duration( - action: HIDPressAction, duration: Optional[float] = None -) -> List[HIDEvent]: + action: HIDPressAction, duration: float | None = None +) -> list[HIDEvent]: events = [] events.append(HIDPress(action=action, direction=HIDDirection.DOWN)) if duration: @@ -53,11 +50,11 @@ def _press_with_duration( def swipe_to_events( - p_start: Tuple[float, float], - p_end: Tuple[float, float], - duration: Optional[float] = None, - delta: Optional[float] = None, -) -> List[HIDEvent]: + p_start: tuple[float, float], + p_end: tuple[float, float], + duration: float | None = None, + delta: float | None = None, +) -> list[HIDEvent]: start = Point(x=p_start[0], y=p_start[1]) end = Point(x=p_end[0], y=p_end[1]) return [HIDSwipe(start=start, end=end, delta=delta, duration=duration)] @@ -71,7 +68,7 @@ def _key_up_event(keycode: int) -> HIDEvent: return HIDPress(action=HIDKey(keycode=keycode), direction=HIDDirection.UP) -def key_press_shifted_to_events(keycode: int) -> List[HIDEvent]: +def key_press_shifted_to_events(keycode: int) -> list[HIDEvent]: return [ _key_down_event(225), _key_down_event(keycode), @@ -80,7 +77,7 @@ def key_press_shifted_to_events(keycode: int) -> List[HIDEvent]: ] -KEY_MAP: Dict[str, List[HIDEvent]] = { +KEY_MAP: dict[str, list[HIDEvent]] = { "a": key_press_to_events(4), "b": key_press_to_events(5), "c": key_press_to_events(6), @@ -180,7 +177,7 @@ def key_press_shifted_to_events(keycode: int) -> List[HIDEvent]: } -def text_to_events(text: str) -> List[HIDEvent]: +def text_to_events(text: str) -> list[HIDEvent]: events = [] for character in text: if character in KEY_MAP: diff --git a/idb/common/logging.py b/idb/common/logging.py index b06fdefaa..ba8b53ab0 100644 --- a/idb/common/logging.py +++ b/idb/common/logging.py @@ -10,9 +10,10 @@ import inspect import logging import time +from collections.abc import Sequence from concurrent.futures import CancelledError from types import TracebackType -from typing import Any, AsyncContextManager, Optional, Sequence, Tuple, Type +from typing import Any, AsyncContextManager, Optional, Tuple, Type from uuid import uuid4 import idb.common.plugin as plugin @@ -24,11 +25,11 @@ def _initial_info( - args: Sequence[object], metadata: Optional[LoggingMetadata] -) -> Tuple[LoggingMetadata, int]: + args: Sequence[object], metadata: LoggingMetadata | None +) -> tuple[LoggingMetadata, int]: _metadata: LoggingMetadata = metadata or {} if len(args): - self_meta: Optional[LoggingMetadata] = getattr(args[0], "metadata", None) + self_meta: LoggingMetadata | None = getattr(args[0], "metadata", None) if self_meta: _metadata.update(self_meta) _metadata["event_uuid"] = str(uuid4()) @@ -38,11 +39,11 @@ def _initial_info( class log_call(AsyncContextManager[None]): def __init__( - self, name: Optional[str] = None, metadata: Optional[LoggingMetadata] = None + self, name: str | None = None, metadata: LoggingMetadata | None = None ) -> None: self.name = name self.metadata: LoggingMetadata = metadata or {} - self.start: Optional[int] = None + self.start: int | None = None async def __aenter__(self) -> None: name = none_throws(self.name) @@ -52,9 +53,9 @@ async def __aenter__(self) -> None: async def __aexit__( self, - exc_type: Optional[Type[BaseException]], - exception: Optional[BaseException], - traceback: Optional[TracebackType], + exc_type: type[BaseException] | None, + exception: BaseException | None, + traceback: TracebackType | None, ) -> bool: name = none_throws(self.name) duration = int((time.time() - none_throws(self.start)) * 1000) diff --git a/idb/common/misc.py b/idb/common/misc.py index 433fa9f2c..0d2bb56dc 100644 --- a/idb/common/misc.py +++ b/idb/common/misc.py @@ -10,7 +10,7 @@ from typing import Dict -def get_env_with_idb_prefix() -> Dict[str, str]: +def get_env_with_idb_prefix() -> dict[str, str]: env = dict(os.environ) env = {key: env[key] for key in env if key.startswith("IDB_")} return {key[len("IDB_") :]: env[key] for key in env} diff --git a/idb/common/networking.py b/idb/common/networking.py index a64490fad..ad4a62b27 100644 --- a/idb/common/networking.py +++ b/idb/common/networking.py @@ -29,9 +29,9 @@ def is_port_open(host: str, port: int) -> bool: async def gen_listening_ports_from_fd( process: asyncio.subprocess.Process, read_fd: int, - timeout: Optional[int] = None, - logger: Optional[logging.Logger] = None, -) -> Tuple[int, Optional[int]]: + timeout: int | None = None, + logger: logging.Logger | None = None, +) -> tuple[int, int | None]: if logger is None: logger = logging.getLogger("reply-fd") wait = asyncio.ensure_future(process.wait()) @@ -50,9 +50,7 @@ async def gen_listening_ports_from_fd( return await ports -async def _read_from_fd( - read_fd: int, logger: logging.Logger -) -> Tuple[int, Optional[int]]: +async def _read_from_fd(read_fd: int, logger: logging.Logger) -> tuple[int, int | None]: async with aiofiles.open(read_fd, "r") as f: logger.info(f"Opened read_fd: {read_fd}") data = await f.readline() @@ -60,6 +58,6 @@ async def _read_from_fd( return _get_ports(data) -def _get_ports(data: str) -> Tuple[int, Optional[int]]: +def _get_ports(data: str) -> tuple[int, int | None]: all_ports = json.loads(data) return all_ports["grpc_port"] diff --git a/idb/common/plugin.py b/idb/common/plugin.py index fbf036b96..295ec4eac 100644 --- a/idb/common/plugin.py +++ b/idb/common/plugin.py @@ -14,10 +14,11 @@ import os import ssl from argparse import ArgumentParser +from collections.abc import Awaitable, Callable from functools import wraps from logging import Logger from types import ModuleType -from typing import Any, Awaitable, Callable, Dict, List, Optional, overload, TypeVar +from typing import Any, Dict, List, Optional, overload, TypeVar from idb.common.command import Command from idb.common.types import LoggingMetadata @@ -32,7 +33,7 @@ def package_exists(package_name: str) -> bool: PLUGIN_PACKAGE_NAMES = ["idb.fb.plugin"] -PLUGINS: List[ModuleType] = [ +PLUGINS: list[ModuleType] = [ importlib.import_module(package.name) for package in [ importlib.util.find_spec(package_name) @@ -163,7 +164,7 @@ def resolve_metadata(logger: Logger) -> LoggingMetadata: def append_companion_metadata( - logger: Logger, metadata: Dict[str, str] + logger: Logger, metadata: dict[str, str] ) -> LoggingMetadata: for plugin in PLUGINS: method = getattr(plugin, "append_companion_metadata", None) @@ -173,7 +174,7 @@ def append_companion_metadata( return metadata -def get_commands() -> List[Command]: +def get_commands() -> list[Command]: commands = [] for plugin in PLUGINS: @@ -185,7 +186,7 @@ def get_commands() -> List[Command]: return commands -def channel_ssl_context() -> Optional[ssl.SSLContext]: +def channel_ssl_context() -> ssl.SSLContext | None: for plugin in PLUGINS: method = getattr(plugin, "channel_ssl_context", None) if not method: diff --git a/idb/common/signal.py b/idb/common/signal.py index 2910e14bb..cabfc8965 100644 --- a/idb/common/signal.py +++ b/idb/common/signal.py @@ -8,9 +8,10 @@ import asyncio import signal +from collections.abc import AsyncGenerator, Sequence from logging import Logger from sys import stderr -from typing import AsyncGenerator, Sequence, TypeVar +from typing import TypeVar _SIGNALS: Sequence[signal.Signals] = [signal.SIGTERM, signal.SIGINT] diff --git a/idb/common/stream.py b/idb/common/stream.py index 9f310bcfb..c75467246 100644 --- a/idb/common/stream.py +++ b/idb/common/stream.py @@ -6,7 +6,8 @@ # pyre-strict -from typing import AsyncIterator, Callable, TypeVar +from collections.abc import AsyncIterator, Callable +from typing import TypeVar _A = TypeVar("_A") diff --git a/idb/common/tar.py b/idb/common/tar.py index e22cbc435..5d4492fdc 100644 --- a/idb/common/tar.py +++ b/idb/common/tar.py @@ -12,7 +12,8 @@ import tempfile import uuid from abc import abstractmethod -from typing import AsyncGenerator, AsyncIterator, List, Optional +from collections.abc import AsyncGenerator, AsyncIterator +from typing import List, Optional from idb.common.types import Compression from idb.utils.contextlib import asynccontextmanager @@ -24,7 +25,7 @@ class TarException(BaseException): def _has_executable(exe: str) -> bool: - return any((os.path.exists(os.path.join(path, exe)) for path in os.get_exec_path())) + return any(os.path.exists(os.path.join(path, exe)) for path in os.get_exec_path()) READ_CHUNK_SIZE: int = 1024 * 1024 * 4 # 4Mb, the default max read for gRPC @@ -43,8 +44,8 @@ async def is_gnu_tar() -> bool: class TarArchiveProcess: def __init__( self, - paths: List[str], - additional_tar_args: Optional[List[str]], + paths: list[str], + additional_tar_args: list[str] | None, place_in_subfolders: bool, verbose: bool, ) -> None: @@ -61,7 +62,7 @@ async def run(self) -> AsyncGenerator[asyncio.subprocess.Process, None]: async with self._run_process(command) as process: yield process - def _apply_additional_args(self, command: List[str], temp_dir: str) -> None: + def _apply_additional_args(self, command: list[str], temp_dir: str) -> None: additional_args = self._additional_tar_args if additional_args: command.extend(additional_args) @@ -79,13 +80,13 @@ def _apply_additional_args(self, command: List[str], temp_dir: str) -> None: @property @abstractmethod - def _tar_command(self) -> List[str]: + def _tar_command(self) -> list[str]: pass @asynccontextmanager @abstractmethod def _run_process( - self, command: List[str] + self, command: list[str] ) -> AsyncGenerator[asyncio.subprocess.Process, None]: pass @@ -96,12 +97,12 @@ class GzipArchive(TarArchiveProcess): ) @property - def _tar_command(self) -> List[str]: + def _tar_command(self) -> list[str]: return ["tar", "vcf" if self._verbose else "cf", "-"] @asynccontextmanager async def _run_process( - self, command: List[str] + self, command: list[str] ) -> AsyncGenerator[asyncio.subprocess.Process, None]: pipe_read, pipe_write = os.pipe() process_tar = await asyncio.create_subprocess_exec( @@ -120,10 +121,10 @@ async def _run_process( class ZstdArchive(TarArchiveProcess): - ZSTD_EXECUTABLES: List[str] = ["pzstd", "zstd"] # in the order of preference + ZSTD_EXECUTABLES: list[str] = ["pzstd", "zstd"] # in the order of preference @property - def _tar_command(self) -> List[str]: + def _tar_command(self) -> list[str]: return [ "tar", "--use-compress-program", @@ -134,7 +135,7 @@ def _tar_command(self) -> List[str]: @asynccontextmanager async def _run_process( - self, command: List[str] + self, command: list[str] ) -> AsyncGenerator[asyncio.subprocess.Process, None]: process = await asyncio.create_subprocess_exec( *command, stderr=sys.stderr, stdout=asyncio.subprocess.PIPE @@ -154,7 +155,7 @@ def _get_zstd_exe(cls) -> str: def _create_untar_command( output_path: str, gnu_tar: bool, verbose: bool = False -) -> List[str]: +) -> list[str]: command = ["tar", "-C", output_path] if not verbose and gnu_tar: command.append("--warning=no-unknown-keyword") @@ -168,8 +169,8 @@ async def _generator_from_data(data: bytes) -> AsyncIterator[bytes]: async def create_tar( - paths: List[str], - additional_tar_args: Optional[List[str]] = None, + paths: list[str], + additional_tar_args: list[str] | None = None, place_in_subfolders: bool = False, verbose: bool = False, ) -> bytes: @@ -189,9 +190,9 @@ async def create_tar( async def generate_tar( - paths: List[str], + paths: list[str], compression: Compression = Compression.GZIP, - additional_tar_args: Optional[List[str]] = None, + additional_tar_args: list[str] | None = None, place_in_subfolders: bool = False, verbose: bool = False, ) -> AsyncIterator[bytes]: diff --git a/idb/common/tests/companion_set_tests.py b/idb/common/tests/companion_set_tests.py index 6398fc9fa..c58f84369 100644 --- a/idb/common/tests/companion_set_tests.py +++ b/idb/common/tests/companion_set_tests.py @@ -7,8 +7,8 @@ # pyre-strict import tempfile +from collections.abc import AsyncGenerator from pathlib import Path -from typing import AsyncGenerator from unittest import mock from idb.common.companion_set import CompanionSet diff --git a/idb/common/tests/companion_tests.py b/idb/common/tests/companion_tests.py index 91af727e0..45948df61 100644 --- a/idb/common/tests/companion_tests.py +++ b/idb/common/tests/companion_tests.py @@ -8,8 +8,8 @@ import json import os +from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from typing import AsyncIterator from unittest import mock from idb.common.companion import ( diff --git a/idb/common/types.py b/idb/common/types.py index f7f3b9ecd..75be8759b 100644 --- a/idb/common/types.py +++ b/idb/common/types.py @@ -10,27 +10,16 @@ import json from abc import ABC, abstractmethod, abstractproperty from asyncio import StreamReader, StreamWriter +from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator, Mapping from contextlib import asynccontextmanager from dataclasses import asdict, dataclass, field from datetime import timedelta from enum import Enum from io import StringIO -from typing import ( - AsyncGenerator, - AsyncIterable, - AsyncIterator, - Dict, - IO, - List, - Mapping, - Optional, - Set, - Tuple, - Union, -) +from typing import Dict, IO, List, Optional, Set, Tuple, Union -LoggingMetadata = Dict[str, Optional[Union[str, List[str], int, float]]] +LoggingMetadata = dict[str, Optional[Union[str, list[str], int, float]]] class IdbException(Exception): @@ -102,7 +91,7 @@ class AppProcessState(Enum): class InstalledAppInfo: bundle_id: str name: str - architectures: Set[str] + architectures: set[str] install_type: str process_state: AppProcessState debuggable: bool @@ -111,10 +100,10 @@ class InstalledAppInfo: @dataclass(frozen=True) class InstrumentsTimings: - launch_error_timeout: Optional[float] = None - launch_retry_timeout: Optional[float] = None - terminate_timeout: Optional[float] = None - operation_duration: Optional[float] = None + launch_error_timeout: float | None = None + launch_retry_timeout: float | None = None + terminate_timeout: float | None = None + operation_duration: float | None = None class HIDButtonType(Enum): @@ -132,7 +121,7 @@ class HIDButtonType(Enum): class CompanionInfo: udid: str is_local: bool - pid: Optional[int] + pid: int | None address: Address metadata: LoggingMetadata = field(default_factory=dict) @@ -141,9 +130,9 @@ class CompanionInfo: class ScreenDimensions: width: int height: int - density: Optional[float] - width_points: Optional[int] - height_points: Optional[int] + density: float | None + width_points: int | None + height_points: int | None DeviceDetails = Mapping[str, Union[int, str]] @@ -154,15 +143,15 @@ class TargetDescription: udid: str name: str target_type: TargetType - state: Optional[str] - os_version: Optional[str] - architecture: Optional[str] - companion_info: Optional[CompanionInfo] - screen_dimensions: Optional[ScreenDimensions] - model: Optional[str] = None - device: Optional[DeviceDetails] = None - extended: Optional[DeviceDetails] = None - diagnostics: Optional[DeviceDetails] = None + state: str | None + os_version: str | None + architecture: str | None + companion_info: CompanionInfo | None + screen_dimensions: ScreenDimensions | None + model: str | None = None + device: DeviceDetails | None = None + extended: DeviceDetails | None = None + diagnostics: DeviceDetails | None = None metadata: LoggingMetadata = field(default_factory=dict) @property @@ -178,7 +167,7 @@ class FileEntryInfo: @dataclass(frozen=True) class FileListing: parent: str - entries: List[FileEntryInfo] + entries: list[FileEntryInfo] @dataclass(frozen=True) @@ -188,27 +177,27 @@ class AccessibilityInfo: @dataclass(frozen=True) class CrashLogInfo: - name: Optional[str] - bundle_id: Optional[str] - process_name: Optional[str] - parent_process_name: Optional[str] - process_identifier: Optional[int] - parent_process_identifier: Optional[int] - timestamp: Optional[int] + name: str | None + bundle_id: str | None + process_name: str | None + parent_process_name: str | None + process_identifier: int | None + parent_process_identifier: int | None + timestamp: int | None @dataclass(frozen=True) class CrashLog: - info: Optional[CrashLogInfo] - contents: Optional[str] + info: CrashLogInfo | None + contents: str | None @dataclass(frozen=True) class CrashLogQuery: - since: Optional[int] = None - before: Optional[int] = None - bundle_id: Optional[str] = None - name: Optional[str] = None + since: int | None = None + before: int | None = None + bundle_id: str | None = None + name: str | None = None @dataclass(frozen=True) @@ -236,8 +225,8 @@ class TestActivity: start: float finish: float name: str - attachments: List[TestAttachment] - sub_activities: List["TestActivity"] + attachments: list[TestAttachment] + sub_activities: list["TestActivity"] @dataclass(frozen=True) @@ -245,11 +234,11 @@ class TestRunInfo: bundle_name: str class_name: str method_name: str - logs: List[str] + logs: list[str] duration: float passed: bool - failure_info: Optional[TestRunFailureInfo] - activityLogs: Optional[List[TestActivity]] + failure_info: TestRunFailureInfo | None + activityLogs: list[TestActivity] | None crashed: bool @property @@ -260,8 +249,8 @@ def crashed_outside_test_case(self) -> bool: @dataclass(frozen=True) class InstalledTestInfo: bundle_id: str - name: Optional[str] - architectures: Optional[Set[str]] + name: str | None + architectures: set[str] | None @dataclass(frozen=True) @@ -308,8 +297,8 @@ class HIDPress: class HIDSwipe: start: Point end: Point - delta: Optional[float] - duration: Optional[float] + delta: float | None + duration: float | None @dataclass(frozen=True) @@ -323,8 +312,8 @@ class HIDDelay: @dataclass(frozen=True) class InstalledArtifact: name: str - uuid: Optional[str] - progress: Optional[float] + uuid: str | None + progress: float | None class FileContainerType(Enum): @@ -362,75 +351,73 @@ class CodeCoverageFormat(Enum): class Companion(ABC): @abstractmethod async def create( - self, device_type: str, os_version: str, timeout: Optional[timedelta] = None + self, device_type: str, os_version: str, timeout: timedelta | None = None ) -> TargetDescription: pass @abstractmethod async def boot( - self, udid: str, verify: bool = True, timeout: Optional[timedelta] = None + self, udid: str, verify: bool = True, timeout: timedelta | None = None ) -> None: pass @abstractmethod @asynccontextmanager async def boot_headless( - self, udid: str, verify: bool = True, timeout: Optional[timedelta] = None + self, udid: str, verify: bool = True, timeout: timedelta | None = None ) -> AsyncGenerator[None, None]: yield @abstractmethod - async def shutdown(self, udid: str, timeout: Optional[timedelta] = None) -> None: + async def shutdown(self, udid: str, timeout: timedelta | None = None) -> None: pass @abstractmethod - async def erase(self, udid: str, timeout: Optional[timedelta] = None) -> None: + async def erase(self, udid: str, timeout: timedelta | None = None) -> None: pass @abstractmethod async def clone( self, udid: str, - destination_device_set: Optional[str] = None, - timeout: Optional[timedelta] = None, + destination_device_set: str | None = None, + timeout: timedelta | None = None, ) -> TargetDescription: pass @abstractmethod - async def delete( - self, udid: Optional[str], timeout: Optional[timedelta] = None - ) -> None: + async def delete(self, udid: str | None, timeout: timedelta | None = None) -> None: pass @abstractmethod - async def clean(self, udid: str, timeout: Optional[timedelta] = None) -> None: + async def clean(self, udid: str, timeout: timedelta | None = None) -> None: pass @abstractmethod async def list_targets( - self, only: Optional[OnlyFilter] = None, timeout: Optional[timedelta] = None - ) -> List[TargetDescription]: + self, only: OnlyFilter | None = None, timeout: timedelta | None = None + ) -> list[TargetDescription]: pass @abstractmethod async def tail_targets( - self, only: Optional[OnlyFilter] = None - ) -> AsyncGenerator[List[TargetDescription], None]: + self, only: OnlyFilter | None = None + ) -> AsyncGenerator[list[TargetDescription], None]: yield @abstractmethod async def target_description( self, - udid: Optional[str] = None, - only: Optional[OnlyFilter] = None, - timeout: Optional[timedelta] = None, + udid: str | None = None, + only: OnlyFilter | None = None, + timeout: timedelta | None = None, ) -> TargetDescription: pass @abstractmethod @asynccontextmanager async def unix_domain_server( - self, udid: str, path: str, only: Optional[OnlyFilter] = None + self, udid: str, path: str, only: OnlyFilter | None = None ) -> AsyncGenerator[str, None]: yield @@ -440,19 +427,19 @@ class Client(ABC): @abstractmethod async def list_apps( self, fetch_process_state: bool = True - ) -> List[InstalledAppInfo]: + ) -> list[InstalledAppInfo]: pass @abstractmethod async def launch( self, bundle_id: str, - env: Optional[Dict[str, str]] = None, - args: Optional[List[str]] = None, + env: dict[str, str] | None = None, + args: list[str] | None = None, foreground_if_running: bool = False, wait_for_debugger: bool = False, - stop: Optional[asyncio.Event] = None, - pid_file: Optional[str] = None, + stop: asyncio.Event | None = None, + pid_file: str | None = None, ) -> None: pass @@ -461,24 +448,24 @@ async def run_xctest( self, test_bundle_id: str, app_bundle_id: str, - test_host_app_bundle_id: Optional[str] = None, + test_host_app_bundle_id: str | None = None, is_ui_test: bool = False, is_logic_test: bool = False, - tests_to_run: Optional[Set[str]] = None, - tests_to_skip: Optional[Set[str]] = None, - env: Optional[Dict[str, str]] = None, - args: Optional[List[str]] = None, - result_bundle_path: Optional[str] = None, - idb_log_buffer: Optional[StringIO] = None, - timeout: Optional[int] = None, + tests_to_run: set[str] | None = None, + tests_to_skip: set[str] | None = None, + env: dict[str, str] | None = None, + args: list[str] | None = None, + result_bundle_path: str | None = None, + idb_log_buffer: StringIO | None = None, + timeout: int | None = None, poll_interval_sec: float = 0.5, report_activities: bool = False, report_attachments: bool = False, - activities_output_path: Optional[str] = None, - coverage_output_path: Optional[str] = None, + activities_output_path: str | None = None, + coverage_output_path: str | None = None, enable_continuous_coverage_collection: bool = False, coverage_format: CodeCoverageFormat = CodeCoverageFormat.EXPORTED, - log_directory_path: Optional[str] = None, + log_directory_path: str | None = None, wait_for_debugger: bool = False, ) -> AsyncIterator[TestRunInfo]: yield @@ -486,38 +473,38 @@ async def run_xctest( @abstractmethod async def install( self, - bundle: Union[str, IO[bytes]], - compression: Optional[Compression] = None, - make_debuggable: Optional[bool] = None, - override_modification_time: Optional[bool] = None, + bundle: str | IO[bytes], + compression: Compression | None = None, + make_debuggable: bool | None = None, + override_modification_time: bool | None = None, ) -> AsyncIterator[InstalledArtifact]: yield @abstractmethod async def install_dylib( - self, dylib: Union[str, IO[bytes]] + self, dylib: str | IO[bytes] ) -> AsyncIterator[InstalledArtifact]: yield @abstractmethod async def install_dsym( self, - dsym: Union[str, IO[bytes]], - bundle_id: Optional[str], - compression: Optional[Compression], - bundle_type: Optional[FileContainerType] = None, + dsym: str | IO[bytes], + bundle_id: str | None, + compression: Compression | None, + bundle_type: FileContainerType | None = None, ) -> AsyncIterator[InstalledArtifact]: yield @abstractmethod async def install_xctest( - self, xctest: Union[str, IO[bytes]], skip_signing_bundles: Optional[bool] = None + self, xctest: str | IO[bytes], skip_signing_bundles: bool | None = None ) -> AsyncIterator[InstalledArtifact]: yield @abstractmethod async def install_framework( - self, framework_path: Union[str, IO[bytes]] + self, framework_path: str | IO[bytes] ) -> AsyncIterator[InstalledArtifact]: yield @@ -526,7 +513,7 @@ async def uninstall(self, bundle_id: str) -> None: pass @abstractmethod - async def list_xctests(self) -> List[InstalledTestInfo]: + async def list_xctests(self) -> list[InstalledTestInfo]: pass @abstractmethod @@ -534,12 +521,12 @@ async def terminate(self, bundle_id: str) -> None: pass @abstractmethod - async def list_test_bundle(self, test_bundle_id: str, app_path: str) -> List[str]: + async def list_test_bundle(self, test_bundle_id: str, app_path: str) -> list[str]: pass @abstractmethod async def tail_logs( - self, stop: asyncio.Event, arguments: Optional[List[str]] = None + self, stop: asyncio.Event, arguments: list[str] | None = None ) -> AsyncIterator[str]: yield @@ -561,7 +548,7 @@ async def set_locale(self, locale_identifier: str) -> None: @abstractmethod async def set_preference( - self, name: str, value: str, value_type: str, domain: Optional[str] + self, name: str, value: str, value_type: str, domain: str | None ) -> None: pass @@ -570,11 +557,11 @@ async def get_locale(self) -> str: pass @abstractmethod - async def get_preference(self, name: str, domain: Optional[str]) -> str: + async def get_preference(self, name: str, domain: str | None) -> str: pass @abstractmethod - async def list_locale_identifiers(self) -> List[str]: + async def list_locale_identifiers(self) -> list[str]: pass @abstractmethod @@ -595,13 +582,13 @@ async def send_notification(self, bundle_id: str, json_payload: str) -> None: @abstractmethod async def approve( - self, bundle_id: str, permissions: Set[Permission], scheme: Optional[str] = None + self, bundle_id: str, permissions: set[Permission], scheme: str | None = None ) -> None: pass @abstractmethod async def revoke( - self, bundle_id: str, permissions: Set[Permission], scheme: Optional[str] = None + self, bundle_id: str, permissions: set[Permission], scheme: str | None = None ) -> None: pass @@ -612,8 +599,8 @@ async def record_video(self, stop: asyncio.Event, output_file: str) -> None: @abstractmethod async def stream_video( self, - output_file: Optional[str], - fps: Optional[int], + output_file: str | None, + fps: int | None, format: VideoFormat, compression_quality: float, scale_factor: float = 1, @@ -625,30 +612,30 @@ async def screenshot(self) -> bytes: pass @abstractmethod - async def tap(self, x: float, y: float, duration: Optional[float] = None) -> None: + async def tap(self, x: float, y: float, duration: float | None = None) -> None: pass @abstractmethod async def button( - self, button_type: HIDButtonType, duration: Optional[float] = None + self, button_type: HIDButtonType, duration: float | None = None ) -> None: pass @abstractmethod - async def key(self, keycode: int, duration: Optional[float] = None) -> None: + async def key(self, keycode: int, duration: float | None = None) -> None: return @abstractmethod - async def key_sequence(self, key_sequence: List[int]) -> None: + async def key_sequence(self, key_sequence: list[int]) -> None: pass @abstractmethod async def swipe( self, - p_start: Tuple[int, int], - p_end: Tuple[int, int], - duration: Optional[float] = None, - delta: Optional[int] = None, + p_start: tuple[int, int], + p_end: tuple[int, int], + duration: float | None = None, + delta: int | None = None, ) -> None: pass @@ -666,7 +653,7 @@ async def describe(self, fetch_diagnostics: bool = False) -> TargetDescription: @abstractmethod async def accessibility_info( - self, point: Optional[Tuple[int, int]], nested: bool + self, point: tuple[int, int] | None, nested: bool ) -> AccessibilityInfo: pass @@ -677,13 +664,13 @@ async def run_instruments( trace_basename: str, template_name: str, app_bundle_id: str, - app_environment: Optional[Dict[str, str]] = None, - app_arguments: Optional[List[str]] = None, - tool_arguments: Optional[List[str]] = None, - started: Optional[asyncio.Event] = None, - timings: Optional[InstrumentsTimings] = None, - post_process_arguments: Optional[List[str]] = None, - ) -> List[str]: + app_environment: dict[str, str] | None = None, + app_arguments: list[str] | None = None, + tool_arguments: list[str] | None = None, + started: asyncio.Event | None = None, + timings: InstrumentsTimings | None = None, + post_process_arguments: list[str] | None = None, + ) -> list[str]: pass @abstractmethod @@ -693,30 +680,30 @@ async def xctrace_record( output: str, template_name: str, all_processes: bool = False, - time_limit: Optional[float] = None, - package: Optional[str] = None, - process_to_attach: Optional[str] = None, - process_to_launch: Optional[str] = None, - process_env: Optional[Dict[str, str]] = None, - launch_args: Optional[List[str]] = None, - target_stdin: Optional[str] = None, - target_stdout: Optional[str] = None, - post_args: Optional[List[str]] = None, - stop_timeout: Optional[float] = None, - started: Optional[asyncio.Event] = None, - ) -> List[str]: + time_limit: float | None = None, + package: str | None = None, + process_to_attach: str | None = None, + process_to_launch: str | None = None, + process_env: dict[str, str] | None = None, + launch_args: list[str] | None = None, + target_stdin: str | None = None, + target_stdout: str | None = None, + post_args: list[str] | None = None, + stop_timeout: float | None = None, + started: asyncio.Event | None = None, + ) -> list[str]: pass @abstractmethod - async def crash_list(self, query: CrashLogQuery) -> List[CrashLogInfo]: + async def crash_list(self, query: CrashLogQuery) -> list[CrashLogInfo]: pass @abstractmethod - async def crash_delete(self, query: CrashLogQuery) -> List[CrashLogInfo]: + async def crash_delete(self, query: CrashLogQuery) -> list[CrashLogInfo]: pass @abstractmethod - async def add_media(self, file_paths: List[str]) -> None: + async def add_media(self, file_paths: list[str]) -> None: pass @abstractmethod @@ -729,12 +716,12 @@ async def dap( input_stream: StreamReader, output_stream: StreamWriter, stop: asyncio.Event, - compression: Optional[Compression], + compression: Compression | None, ) -> None: raise NotImplementedError("Dap command not implemented") @abstractmethod - async def debugserver_start(self, bundle_id: str) -> List[str]: + async def debugserver_start(self, bundle_id: str) -> list[str]: pass @abstractmethod @@ -742,7 +729,7 @@ async def debugserver_stop(self) -> None: pass @abstractmethod - async def debugserver_status(self) -> Optional[List[str]]: + async def debugserver_status(self) -> list[str] | None: pass @abstractmethod @@ -756,21 +743,21 @@ async def hid(self, event_iterator: AsyncIterable[HIDEvent]) -> None: @abstractmethod async def ls_single( self, container: FileContainer, path: str - ) -> List[FileEntryInfo]: + ) -> list[FileEntryInfo]: pass @abstractmethod - async def ls(self, container: FileContainer, paths: List[str]) -> List[FileListing]: + async def ls(self, container: FileContainer, paths: list[str]) -> list[FileListing]: pass @abstractmethod async def mv( - self, container: FileContainer, src_paths: List[str], dest_path: str + self, container: FileContainer, src_paths: list[str], dest_path: str ) -> None: pass @abstractmethod - async def rm(self, container: FileContainer, paths: List[str]) -> None: + async def rm(self, container: FileContainer, paths: list[str]) -> None: pass @abstractmethod @@ -786,10 +773,10 @@ async def pull( @abstractmethod async def push( self, - src_paths: List[str], + src_paths: list[str], container: FileContainer, dest_path: str, - compression: Optional[Compression], + compression: Compression | None, ) -> None: pass @@ -805,18 +792,18 @@ class ClientManager: async def connect( self, destination: ConnectionDestination, - metadata: Optional[Dict[str, str]] = None, + metadata: dict[str, str] | None = None, ) -> CompanionInfo: pass @abstractmethod - async def disconnect(self, destination: Union[Address, str]) -> None: + async def disconnect(self, destination: Address | str) -> None: pass @abstractmethod async def list_targets( - self, only: Optional[OnlyFilter] = None - ) -> List[TargetDescription]: + self, only: OnlyFilter | None = None + ) -> list[TargetDescription]: pass @abstractmethod @@ -834,5 +821,5 @@ async def wait_closed(self) -> None: pass @abstractproperty - def ports(self) -> Dict[str, str]: + def ports(self) -> dict[str, str]: pass diff --git a/idb/grpc/client.py b/idb/grpc/client.py index 3ae701a9f..40c1841c4 100644 --- a/idb/grpc/client.py +++ b/idb/grpc/client.py @@ -16,20 +16,10 @@ import tempfile import urllib.parse from asyncio import StreamReader, StreamWriter +from collections.abc import AsyncGenerator, AsyncIterable, AsyncIterator, Iterable from io import StringIO from pathlib import Path -from typing import ( - Any, - AsyncGenerator, - AsyncIterable, - AsyncIterator, - Dict, - Iterable, - List, - Optional, - Set, - Tuple, -) +from typing import Any, Dict, List, Optional, Set, Tuple import idb.common.plugin as plugin from grpclib.client import Channel @@ -170,7 +160,7 @@ from idb.utils.contextlib import asynccontextmanager -APPROVE_MAP: Dict[Permission, "ApproveRequest.Permission"] = { +APPROVE_MAP: dict[Permission, "ApproveRequest.Permission"] = { Permission.PHOTOS: ApproveRequest.PHOTOS, Permission.CAMERA: ApproveRequest.CAMERA, Permission.CONTACTS: ApproveRequest.CONTACTS, @@ -180,7 +170,7 @@ Permission.MICROPHONE: ApproveRequest.MICROPHONE, } -REVOKE_MAP: Dict[Permission, "RevokeRequest.Permission"] = { +REVOKE_MAP: dict[Permission, "RevokeRequest.Permission"] = { Permission.PHOTOS: RevokeRequest.PHOTOS, Permission.CAMERA: RevokeRequest.CAMERA, Permission.CONTACTS: RevokeRequest.CONTACTS, @@ -190,14 +180,14 @@ Permission.MICROPHONE: RevokeRequest.MICROPHONE, } -VIDEO_FORMAT_MAP: Dict[VideoFormat, "VideoStreamRequest.Format"] = { +VIDEO_FORMAT_MAP: dict[VideoFormat, "VideoStreamRequest.Format"] = { VideoFormat.H264: VideoStreamRequest.H264, VideoFormat.RBGA: VideoStreamRequest.RBGA, VideoFormat.MJPEG: VideoStreamRequest.MJPEG, VideoFormat.MINICAP: VideoStreamRequest.MINICAP, } -COMPRESSION_MAP: Dict[Compression, "Payload.Compression"] = { +COMPRESSION_MAP: dict[Compression, "Payload.Compression"] = { Compression.GZIP: Payload.GZIP, Compression.ZSTD: Payload.ZSTD, } @@ -268,7 +258,7 @@ async def build( address: Address, logger: logging.Logger, exchange_metadata: bool = True, - extra_metadata: Optional[Dict[str, str]] = None, + extra_metadata: dict[str, str] | None = None, use_tls: bool = False, ) -> AsyncGenerator["Client", None]: metadata_to_companion = ( @@ -330,7 +320,7 @@ async def for_companion( companion: Companion, udid: str, logger: logging.Logger, - only: Optional[OnlyFilter] = None, + only: OnlyFilter | None = None, ) -> AsyncGenerator["Client", None]: with tempfile.NamedTemporaryFile() as temp: # Remove the tempfile so we can bind to it first. @@ -347,7 +337,7 @@ async def _tail_specific_logs( self, source: LogRequest.Source, stop: asyncio.Event, - arguments: Optional[List[str]], + arguments: list[str] | None, ) -> AsyncIterator[str]: async with self.stub.log.open() as stream: await stream.send_message( @@ -360,12 +350,12 @@ async def _install_to_destination( self, bundle: Bundle, destination: Destination, - compression: Optional[Compression], - make_debuggable: Optional[bool], - bundle_id: Optional[str], - bundle_type: Optional[FileContainerType], - override_modification_time: Optional[bool] = None, - skip_signing_bundles: Optional[bool] = None, + compression: Compression | None, + make_debuggable: bool | None, + bundle_id: str | None, + bundle_type: FileContainerType | None, + override_modification_time: bool | None = None, + skip_signing_bundles: bool | None = None, ) -> AsyncIterator[InstalledArtifact]: async with self.stub.install.open() as stream: generator = None @@ -458,7 +448,7 @@ def _log_from_companion(self, data: str) -> None: @log_and_handle_exceptions("list_apps") async def list_apps( self, fetch_process_state: bool = True - ) -> List[InstalledAppInfo]: + ) -> list[InstalledAppInfo]: response = await self.stub.list_apps( ListAppsRequest(suppress_process_state=fetch_process_state is False) ) @@ -477,7 +467,7 @@ async def list_apps( @log_and_handle_exceptions("accessibility_info") async def accessibility_info( - self, point: Optional[Tuple[int, int]], nested: bool + self, point: tuple[int, int] | None, nested: bool ) -> AccessibilityInfo: grpc_point = Point(x=point[0], y=point[1]) if point is not None else None response = await self.stub.accessibility_info( @@ -493,7 +483,7 @@ async def accessibility_info( return AccessibilityInfo(json=response.json) @log_and_handle_exceptions("add_media") - async def add_media(self, file_paths: List[str]) -> None: + async def add_media(self, file_paths: list[str]) -> None: async with self.stub.add_media.open() as stream: if self.is_local: for file_path in file_paths: @@ -520,8 +510,8 @@ async def add_media(self, file_paths: List[str]) -> None: async def approve( self, bundle_id: str, - permissions: Set[Permission], - scheme: Optional[str] = None, + permissions: set[Permission], + scheme: str | None = None, ) -> None: await self.stub.approve( ApproveRequest( @@ -536,8 +526,8 @@ async def approve( async def revoke( self, bundle_id: str, - permissions: Set[Permission], - scheme: Optional[str] = None, + permissions: set[Permission], + scheme: str | None = None, ) -> None: await self.stub.revoke( RevokeRequest( @@ -621,14 +611,14 @@ async def uninstall(self, bundle_id: str) -> None: await self.stub.uninstall(UninstallRequest(bundle_id=bundle_id)) @log_and_handle_exceptions("rm") - async def rm(self, container: FileContainer, paths: List[str]) -> None: + async def rm(self, container: FileContainer, paths: list[str]) -> None: await self.stub.rm( RmRequest(paths=paths, container=file_container_to_grpc(container)) ) @log_and_handle_exceptions("mv") async def mv( - self, container: FileContainer, src_paths: List[str], dest_path: str + self, container: FileContainer, src_paths: list[str], dest_path: str ) -> None: await self.stub.mv( MvRequest( @@ -641,14 +631,14 @@ async def mv( @log_and_handle_exceptions("ls") async def ls_single( self, container: FileContainer, path: str - ) -> List[FileEntryInfo]: + ) -> list[FileEntryInfo]: response = await self.stub.ls( LsRequest(path=path, container=file_container_to_grpc(container)) ) return [FileEntryInfo(path=file.path) for file in response.files] @log_and_handle_exceptions("ls") - async def ls(self, container: FileContainer, paths: List[str]) -> List[FileListing]: + async def ls(self, container: FileContainer, paths: list[str]) -> list[FileListing]: response = await self.stub.ls( LsRequest(paths=paths, container=file_container_to_grpc(container)) ) @@ -667,12 +657,12 @@ async def mkdir(self, container: FileContainer, path: str) -> None: ) @log_and_handle_exceptions("crash_delete") - async def crash_delete(self, query: CrashLogQuery) -> List[CrashLogInfo]: + async def crash_delete(self, query: CrashLogQuery) -> list[CrashLogInfo]: response = await self.stub.crash_delete(_to_crash_log_query_proto(query)) return _to_crash_log_info_list(response) @log_and_handle_exceptions("crash_list") - async def crash_list(self, query: CrashLogQuery) -> List[CrashLogInfo]: + async def crash_list(self, query: CrashLogQuery) -> list[CrashLogInfo]: response = await self.stub.crash_list(_to_crash_log_query_proto(query)) return _to_crash_log_info_list(response) @@ -685,9 +675,9 @@ async def crash_show(self, name: str) -> CrashLog: async def install( self, bundle: Bundle, - compression: Optional[Compression] = None, - make_debuggable: Optional[bool] = None, - override_modification_time: Optional[bool] = None, + compression: Compression | None = None, + make_debuggable: bool | None = None, + override_modification_time: bool | None = None, ) -> AsyncIterator[InstalledArtifact]: async for response in self._install_to_destination( bundle=bundle, @@ -704,7 +694,7 @@ async def install( async def install_xctest( self, xctest: Bundle, - skip_signing_bundles: Optional[bool] = None, + skip_signing_bundles: bool | None = None, ) -> AsyncIterator[InstalledArtifact]: async for response in self._install_to_destination( bundle=xctest, @@ -733,9 +723,9 @@ async def install_dylib(self, dylib: Bundle) -> AsyncIterator[InstalledArtifact] async def install_dsym( self, dsym: Bundle, - bundle_id: Optional[str], - compression: Optional[Compression], - bundle_type: Optional[FileContainerType] = None, + bundle_id: str | None, + compression: Compression | None, + bundle_type: FileContainerType | None = None, ) -> AsyncIterator[InstalledArtifact]: async for response in self._install_to_destination( bundle=dsym, @@ -764,10 +754,10 @@ async def install_framework( @log_and_handle_exceptions("push") async def push( self, - src_paths: List[str], + src_paths: list[str], container: FileContainer, dest_path: str, - compression: Optional[Compression], + compression: Compression | None, ) -> None: async with self.stub.push.open() as stream: await stream.send_message( @@ -843,14 +833,14 @@ async def tail( await stream.send_message(TailRequest(stop=TailRequest.Stop())) @log_and_handle_exceptions("xctest_list_tests") - async def list_test_bundle(self, test_bundle_id: str, app_path: str) -> List[str]: + async def list_test_bundle(self, test_bundle_id: str, app_path: str) -> list[str]: response = await self.stub.xctest_list_tests( XctestListTestsRequest(bundle_name=test_bundle_id, app_path=app_path) ) return list(response.names) @log_and_handle_exceptions("xctest_list_bundles") - async def list_xctests(self) -> List[InstalledTestInfo]: + async def list_xctests(self) -> list[InstalledTestInfo]: response = await self.stub.xctest_list_bundles(XctestListBundlesRequest()) return [ InstalledTestInfo( @@ -866,17 +856,17 @@ async def send_events(self, events: Iterable[HIDEvent]) -> None: await self.hid(iterator_to_async_iterator(events)) @log_and_handle_exceptions("hid") - async def tap(self, x: float, y: float, duration: Optional[float] = None) -> None: + async def tap(self, x: float, y: float, duration: float | None = None) -> None: await self.send_events(tap_to_events(x, y, duration)) @log_and_handle_exceptions("hid") async def button( - self, button_type: HIDButtonType, duration: Optional[float] = None + self, button_type: HIDButtonType, duration: float | None = None ) -> None: await self.send_events(button_press_to_events(button_type, duration)) @log_and_handle_exceptions("hid") - async def key(self, keycode: int, duration: Optional[float] = None) -> None: + async def key(self, keycode: int, duration: float | None = None) -> None: await self.send_events(key_press_to_events(keycode, duration)) @log_and_handle_exceptions("hid") @@ -886,16 +876,16 @@ async def text(self, text: str) -> None: @log_and_handle_exceptions("hid") async def swipe( self, - p_start: Tuple[int, int], - p_end: Tuple[int, int], - duration: Optional[float] = None, - delta: Optional[int] = None, + p_start: tuple[int, int], + p_end: tuple[int, int], + duration: float | None = None, + delta: int | None = None, ) -> None: await self.send_events(swipe_to_events(p_start, p_end, duration, delta)) @log_and_handle_exceptions("hid") - async def key_sequence(self, key_sequence: List[int]) -> None: - events: List[HIDEvent] = [] + async def key_sequence(self, key_sequence: list[int]) -> None: + events: list[HIDEvent] = [] for key in key_sequence: events.extend(key_press_to_events(key)) await self.send_events(events) @@ -921,7 +911,7 @@ async def debug_server(self, request: DebugServerRequest) -> DebugServerResponse return await stream.recv_message() @log_and_handle_exceptions("debugserver") - async def debugserver_start(self, bundle_id: str) -> List[str]: + async def debugserver_start(self, bundle_id: str) -> list[str]: response = await self.debug_server( request=DebugServerRequest( start=DebugServerRequest.Start(bundle_id=bundle_id) @@ -936,7 +926,7 @@ async def debugserver_stop(self) -> None: ) @log_and_handle_exceptions("debugserver") - async def debugserver_status(self) -> Optional[List[str]]: + async def debugserver_status(self) -> list[str] | None: response = await self.debug_server( request=DebugServerRequest(status=DebugServerRequest.Status()) ) @@ -950,13 +940,13 @@ async def run_instruments( trace_basename: str, template_name: str, app_bundle_id: str, - app_environment: Optional[Dict[str, str]] = None, - app_arguments: Optional[List[str]] = None, - tool_arguments: Optional[List[str]] = None, - started: Optional[asyncio.Event] = None, - timings: Optional[InstrumentsTimings] = None, - post_process_arguments: Optional[List[str]] = None, - ) -> List[str]: + app_environment: dict[str, str] | None = None, + app_arguments: list[str] | None = None, + tool_arguments: list[str] | None = None, + started: asyncio.Event | None = None, + timings: InstrumentsTimings | None = None, + post_process_arguments: list[str] | None = None, + ) -> list[str]: self.logger.info("Starting instruments connection") async with self.stub.instruments_run.open() as stream: self.logger.info("Sending instruments request") @@ -1028,12 +1018,12 @@ async def run_instruments( async def launch( self, bundle_id: str, - args: Optional[List[str]] = None, - env: Optional[Dict[str, str]] = None, + args: list[str] | None = None, + env: dict[str, str] | None = None, foreground_if_running: bool = False, wait_for_debugger: bool = False, - stop: Optional[asyncio.Event] = None, - pid_file: Optional[str] = None, + stop: asyncio.Event | None = None, + pid_file: str | None = None, ) -> None: async with self.stub.launch.open() as stream: request = LaunchRequest( @@ -1090,8 +1080,8 @@ async def record_video(self, stop: asyncio.Event, output_file: str) -> None: @log_and_handle_exceptions("video_stream") async def stream_video( self, - output_file: Optional[str], - fps: Optional[int], + output_file: str | None, + fps: int | None, format: VideoFormat, compression_quality: float, scale_factor: float = 1, @@ -1147,7 +1137,7 @@ async def stream_video( async def _handle_code_coverage_in_response( self, response: XctestRunResponse, - coverage_output_path: Optional[str], + coverage_output_path: str | None, coverage_format: CodeCoverageFormat, ) -> None: if ( @@ -1181,24 +1171,24 @@ async def run_xctest( self, test_bundle_id: str, app_bundle_id: str, - test_host_app_bundle_id: Optional[str] = None, + test_host_app_bundle_id: str | None = None, is_ui_test: bool = False, is_logic_test: bool = False, - tests_to_run: Optional[Set[str]] = None, - tests_to_skip: Optional[Set[str]] = None, - env: Optional[Dict[str, str]] = None, - args: Optional[List[str]] = None, - result_bundle_path: Optional[str] = None, - idb_log_buffer: Optional[StringIO] = None, - timeout: Optional[int] = None, + tests_to_run: set[str] | None = None, + tests_to_skip: set[str] | None = None, + env: dict[str, str] | None = None, + args: list[str] | None = None, + result_bundle_path: str | None = None, + idb_log_buffer: StringIO | None = None, + timeout: int | None = None, poll_interval_sec: float = TESTS_POLL_INTERVAL, report_activities: bool = False, report_attachments: bool = False, - activities_output_path: Optional[str] = None, - coverage_output_path: Optional[str] = None, + activities_output_path: str | None = None, + coverage_output_path: str | None = None, enable_continuous_coverage_collection: bool = False, coverage_format: CodeCoverageFormat = CodeCoverageFormat.EXPORTED, - log_directory_path: Optional[str] = None, + log_directory_path: str | None = None, wait_for_debugger: bool = False, ) -> AsyncIterator[TestRunInfo]: async with self.stub.xctest_run.open() as stream: @@ -1279,7 +1269,7 @@ async def run_xctest( @log_and_handle_exceptions("log") async def tail_logs( - self, stop: asyncio.Event, arguments: Optional[List[str]] = None + self, stop: asyncio.Event, arguments: list[str] | None = None ) -> AsyncIterator[str]: async for message in self._tail_specific_logs( source=LogRequest.TARGET, stop=stop, arguments=arguments @@ -1313,7 +1303,7 @@ async def set_locale(self, locale_identifier: str) -> None: @log_and_handle_exceptions("setting") async def set_preference( - self, name: str, value: str, value_type: str, domain: Optional[str] + self, name: str, value: str, value_type: str, domain: str | None ) -> None: await self.stub.setting( SettingRequest( @@ -1334,7 +1324,7 @@ async def get_locale(self) -> str: return response.value @log_and_handle_exceptions("get_setting") - async def get_preference(self, name: str, domain: Optional[str]) -> str: + async def get_preference(self, name: str, domain: str | None) -> str: response = await self.stub.get_setting( # pyre-ignore GetSettingRequest(setting=AnySetting, name=name, domain=domain) @@ -1342,7 +1332,7 @@ async def get_preference(self, name: str, domain: Optional[str]) -> str: return response.value @log_and_handle_exceptions("list_settings") - async def list_locale_identifiers(self) -> List[str]: + async def list_locale_identifiers(self) -> list[str]: response = await self.stub.list_settings( ListSettingRequest( setting=LocaleSetting, @@ -1358,20 +1348,20 @@ async def xctrace_record( output: str, template_name: str, all_processes: bool = False, - time_limit: Optional[float] = None, - package: Optional[str] = None, - process_to_attach: Optional[str] = None, - process_to_launch: Optional[str] = None, - process_env: Optional[Dict[str, str]] = None, - launch_args: Optional[List[str]] = None, - target_stdin: Optional[str] = None, - target_stdout: Optional[str] = None, + time_limit: float | None = None, + package: str | None = None, + process_to_attach: str | None = None, + process_to_launch: str | None = None, + process_env: dict[str, str] | None = None, + launch_args: list[str] | None = None, + target_stdin: str | None = None, + target_stdout: str | None = None, # FB options - post_args: Optional[List[str]] = None, - stop_timeout: Optional[float] = None, + post_args: list[str] | None = None, + stop_timeout: float | None = None, # control events - started: Optional[asyncio.Event] = None, - ) -> List[str]: + started: asyncio.Event | None = None, + ) -> list[str]: self.logger.info("Starting xctrace connection") async with self.stub.xctrace_record.open() as stream: self.logger.info("Sending xctrace record request") @@ -1464,7 +1454,7 @@ async def dap( input_stream: StreamReader, output_stream: StreamWriter, stop: asyncio.Event, - compression: Optional[Compression], + compression: Compression | None, ) -> None: path = Path(dap_path) pkg_id = path.stem diff --git a/idb/grpc/crash.py b/idb/grpc/crash.py index ab96e31e0..389fd54fc 100644 --- a/idb/grpc/crash.py +++ b/idb/grpc/crash.py @@ -17,7 +17,7 @@ ) -def _to_crash_log_info_list(response: CrashLogResponse) -> List[CrashLogInfo]: +def _to_crash_log_info_list(response: CrashLogResponse) -> list[CrashLogInfo]: return [_to_crash_log_info(proto) for proto in response.list] diff --git a/idb/grpc/dap.py b/idb/grpc/dap.py index 0d2957101..1b896be2f 100644 --- a/idb/grpc/dap.py +++ b/idb/grpc/dap.py @@ -9,7 +9,8 @@ import asyncio import logging from asyncio import StreamReader, StreamWriter -from typing import AsyncGenerator, Optional +from collections.abc import AsyncGenerator +from typing import Optional from idb.common.types import IdbException from idb.grpc.idb_grpc import CompanionServiceStub @@ -73,8 +74,8 @@ async def pipe( """ Pipe stdin and stdout to remote dap server """ - read_future: Optional[asyncio.Future[StreamReader]] = None - write_future: Optional[asyncio.Future[StreamWriter]] = None + read_future: asyncio.Future[StreamReader] | None = None + write_future: asyncio.Future[StreamWriter] | None = None stop_future = asyncio.ensure_future(stop.wait()) while True: if read_future is None: diff --git a/idb/grpc/hid.py b/idb/grpc/hid.py index 168a41adc..bd06e431c 100644 --- a/idb/grpc/hid.py +++ b/idb/grpc/hid.py @@ -51,7 +51,7 @@ ] -def _tanslation_from_pairs(pairs: List[Tuple[_A, _B]], item: _A) -> _B: +def _tanslation_from_pairs(pairs: list[tuple[_A, _B]], item: _A) -> _B: pair_map = {py: grpc for (py, grpc) in pairs} return pair_map[item] diff --git a/idb/grpc/install.py b/idb/grpc/install.py index 73149b787..3968c1a02 100644 --- a/idb/grpc/install.py +++ b/idb/grpc/install.py @@ -7,8 +7,9 @@ # pyre-strict import os +from collections.abc import AsyncIterator from logging import Logger -from typing import AsyncIterator, IO, List, Optional, Union +from typing import IO, List, Optional, Union import aiofiles import idb.common.gzip as gzip @@ -85,7 +86,7 @@ async def _generate_framework_chunks( async def generate_requests( - requests: List[InstallRequest], + requests: list[InstallRequest], ) -> AsyncIterator[InstallRequest]: for request in requests: yield request @@ -107,7 +108,7 @@ async def generate_io_chunks( def generate_binary_chunks( path: str, destination: Destination, - compression: Optional[Compression], + compression: Compression | None, logger: Logger, ) -> AsyncIterator[InstallRequest]: if destination == InstallRequest.APP: diff --git a/idb/grpc/instruments.py b/idb/grpc/instruments.py index 122544f7d..3bc6c32ea 100644 --- a/idb/grpc/instruments.py +++ b/idb/grpc/instruments.py @@ -8,8 +8,9 @@ import asyncio +from collections.abc import AsyncIterator from logging import Logger -from typing import AsyncIterator, Optional +from typing import Optional from idb.common.types import InstrumentsTimings from idb.grpc.idb_pb2 import InstrumentsRunRequest, InstrumentsRunResponse @@ -67,8 +68,8 @@ async def instruments_drain_until_stop( def translate_instruments_timings( - timings: Optional[InstrumentsTimings], -) -> Optional[InstrumentsRunRequest.InstrumentsTimings]: + timings: InstrumentsTimings | None, +) -> InstrumentsRunRequest.InstrumentsTimings | None: return ( InstrumentsRunRequest.InstrumentsTimings( # pyre-ignore diff --git a/idb/grpc/launch.py b/idb/grpc/launch.py index 8d9b7ffe2..30ec3b493 100644 --- a/idb/grpc/launch.py +++ b/idb/grpc/launch.py @@ -17,7 +17,7 @@ async def drain_launch_stream( - stream: Stream[LaunchRequest, LaunchResponse], pid_file: Optional[str] + stream: Stream[LaunchRequest, LaunchResponse], pid_file: str | None ) -> None: async for message in stream: output = message.output diff --git a/idb/grpc/management.py b/idb/grpc/management.py index 92453b600..4ba48dc03 100644 --- a/idb/grpc/management.py +++ b/idb/grpc/management.py @@ -10,7 +10,8 @@ import logging import os import signal -from typing import AsyncGenerator, Dict, List, Optional +from collections.abc import AsyncGenerator +from typing import Dict, List, Optional from idb.common.companion import Companion, CompanionServerConfig from idb.common.companion_set import CompanionSet @@ -50,12 +51,12 @@ async def _realize_companions( companion_set: CompanionSet, prune_dead_companion: bool, logger: logging.Logger, -) -> List[TargetDescription]: - to_prune: List[Companion] +) -> list[TargetDescription]: + to_prune: list[Companion] async def _companion_to_target( companion: CompanionInfo, - ) -> Optional[TargetDescription]: + ) -> TargetDescription | None: try: async with Client.build(address=companion.address, logger=logger) as client: return await client.describe() @@ -94,17 +95,17 @@ async def _check_domain_socket_is_bound(path: str) -> bool: class ClientManager(ClientManagerBase): def __init__( self, - companion_path: Optional[str] = None, - device_set_path: Optional[str] = None, + companion_path: str | None = None, + device_set_path: str | None = None, prune_dead_companion: bool = True, - logger: Optional[logging.Logger] = None, + logger: logging.Logger | None = None, ) -> None: os.makedirs(BASE_IDB_FILE_PATH, exist_ok=True) self._logger: logging.Logger = ( logger if logger else logging.getLogger("idb_grpc_client") ) self._companion_set = CompanionSet(logger=self._logger) - self._companion: Optional[Companion] = ( + self._companion: Companion | None = ( Companion( companion_path=companion_path, device_set_path=device_set_path, @@ -155,7 +156,7 @@ async def _spawn_companion_server(self, udid: str) -> CompanionInfo: return companion_info @asynccontextmanager - async def from_udid(self, udid: Optional[str]) -> AsyncGenerator[Client, None]: + async def from_udid(self, udid: str | None) -> AsyncGenerator[Client, None]: companions = { companion.udid: companion for companion in await self._companion_set.get_companions() @@ -188,9 +189,9 @@ async def from_udid(self, udid: Optional[str]) -> AsyncGenerator[Client, None]: @log_call() async def list_targets( - self, only: Optional[OnlyFilter] = None - ) -> List[TargetDescription]: - async def _list_local_targets() -> List[TargetDescription]: + self, only: OnlyFilter | None = None + ) -> list[TargetDescription]: + async def _list_local_targets() -> list[TargetDescription]: companion = self._companion if companion is None: return [] @@ -212,7 +213,7 @@ async def _list_local_targets() -> List[TargetDescription]: async def connect( self, destination: ConnectionDestination, - metadata: Optional[Dict[str, str]] = None, + metadata: dict[str, str] | None = None, ) -> CompanionInfo: self._logger.debug(f"Connecting directly to {destination} with meta {metadata}") if isinstance(destination, TCPAddress) or isinstance( diff --git a/idb/grpc/stream.py b/idb/grpc/stream.py index 2ce8097e7..c7a9df818 100644 --- a/idb/grpc/stream.py +++ b/idb/grpc/stream.py @@ -7,8 +7,9 @@ # pyre-strict import asyncio +from collections.abc import AsyncIterator from logging import Logger -from typing import AsyncIterator, Dict, Generic, Optional, TypeVar +from typing import Dict, Generic, Optional, TypeVar from idb.utils.typing import none_throws @@ -18,9 +19,9 @@ class Stream(Generic[_TSend, _TRecv], AsyncIterator[_TRecv]): - metadata: Dict[str, str] = {} + metadata: dict[str, str] = {} - async def recv_message(self) -> Optional[_TRecv]: ... + async def recv_message(self) -> _TRecv | None: ... async def send_message(self, message: _TSend) -> None: ... @@ -43,7 +44,7 @@ async def drain_to_stream( async def generate_bytes( - stream: AsyncIterator[object], logger: Optional[Logger] = None + stream: AsyncIterator[object], logger: Logger | None = None ) -> AsyncIterator[bytes]: async for item in stream: log_output = getattr(item, "log_output", None) diff --git a/idb/grpc/target.py b/idb/grpc/target.py index c16901f7d..d01a7db26 100644 --- a/idb/grpc/target.py +++ b/idb/grpc/target.py @@ -7,7 +7,8 @@ # pyre-strict import json -from typing import List, Sequence +from collections.abc import Sequence +from typing import List from idb.common.format import target_type_from_string from idb.common.types import Address, CompanionInfo, ScreenDimensions, TargetDescription @@ -79,7 +80,7 @@ def screen_dimensions_to_py(dimensions: GrpcScreenDimensions) -> ScreenDimension def merge_connected_targets( local_targets: Sequence[TargetDescription], connected_targets: Sequence[TargetDescription], -) -> List[TargetDescription]: +) -> list[TargetDescription]: connected_mapping = {target.udid: target for target in connected_targets} targets = {} # First, add all local targets, updating companion info where available diff --git a/idb/grpc/video.py b/idb/grpc/video.py index 5d500d516..89dc809a4 100644 --- a/idb/grpc/video.py +++ b/idb/grpc/video.py @@ -6,7 +6,7 @@ # pyre-strict -from typing import AsyncIterator +from collections.abc import AsyncIterator from idb.grpc.idb_pb2 import RecordResponse diff --git a/idb/grpc/xctest.py b/idb/grpc/xctest.py index 8bbabd0e4..b31b5c757 100644 --- a/idb/grpc/xctest.py +++ b/idb/grpc/xctest.py @@ -29,7 +29,7 @@ Application = XctestRunRequest.Application UI = XctestRunRequest.UI -CODE_COVERAGE_FORMAT_MAP: Dict[ +CODE_COVERAGE_FORMAT_MAP: dict[ CodeCoverageFormat, "XctestRunRequest.CodeCoverage.Format" ] = { CodeCoverageFormat.EXPORTED: XctestRunRequest.CodeCoverage.EXPORTED, @@ -54,9 +54,7 @@ def _get_xctest_type(path: str) -> _XCTestType: raise XCTestException(f"{path} is not a valid xctest target") -def extract_paths_from_xctestrun( - path: str, logger: Optional[Logger] = None -) -> List[str]: +def extract_paths_from_xctestrun(path: str, logger: Logger | None = None) -> list[str]: """ When using xctestrun we need to copy: - the xctestrun file @@ -67,7 +65,7 @@ def extract_paths_from_xctestrun( result = [path] test_root = os.path.dirname(path) with open(path, "rb") as f: - xctestrun_dict: Dict[str, Any] = plistlib.load(f) + xctestrun_dict: dict[str, Any] = plistlib.load(f) for _test_id, test_dict in xctestrun_dict.items(): if _test_id == "__xctestrun_metadata__": continue @@ -82,12 +80,12 @@ def extract_paths_from_xctestrun( return result -def xctest_paths_to_tar(bundle_path: str, logger: Optional[Logger] = None) -> List[str]: +def xctest_paths_to_tar(bundle_path: str, logger: Logger | None = None) -> list[str]: test_type = _get_xctest_type(bundle_path) if test_type is _XCTestType.XCTest: return [bundle_path] with open(bundle_path, "rb") as f: - plist: Dict[str, Any] = plistlib.load(f) + plist: dict[str, Any] = plistlib.load(f) use_artifacts = ( v.get("UseDestinationArtifacts", False) is True for v in plist.values() ) @@ -101,15 +99,15 @@ def xctest_paths_to_tar(bundle_path: str, logger: Optional[Logger] = None) -> Li def make_request( test_bundle_id: str, app_bundle_id: str, - test_host_app_bundle_id: Optional[str], + test_host_app_bundle_id: str | None, is_ui_test: bool, is_logic_test: bool, - tests_to_run: Optional[Set[str]], - tests_to_skip: Optional[Set[str]], - env: Optional[Dict[str, str]], - args: Optional[List[str]], - result_bundle_path: Optional[str], - timeout: Optional[int], + tests_to_run: set[str] | None, + tests_to_skip: set[str] | None, + env: dict[str, str] | None, + args: list[str] | None, + result_bundle_path: str | None, + timeout: int | None, report_activities: bool, report_attachments: bool, collect_coverage: bool, @@ -174,7 +172,7 @@ async def untar_into_path( def make_results( response: XctestRunResponse, log_parser: XCTestLogParser -) -> List[TestRunInfo]: +) -> list[TestRunInfo]: return [ TestRunInfo( bundle_name=result.bundle_name, diff --git a/idb/grpc/xctest_log_parser.py b/idb/grpc/xctest_log_parser.py index 4f74191ba..13cdf53c4 100644 --- a/idb/grpc/xctest_log_parser.py +++ b/idb/grpc/xctest_log_parser.py @@ -21,10 +21,10 @@ class Event(NamedTuple): methodName: str -XCTestLogParserData = Dict[XCTestLogParserKey, List[str]] +XCTestLogParserData = dict[XCTestLogParserKey, list[str]] -def _try_parse_event(log_line: str) -> Optional[Event]: +def _try_parse_event(log_line: str) -> Event | None: event = None parsed_json = None if len(log_line) < 10_000: # For performance reasons, don't parse long lines @@ -49,7 +49,7 @@ def _try_parse_event(log_line: str) -> Optional[Event]: class XCTestLogParser: _logs: XCTestLogParserData - _current_test: Optional[XCTestLogParserKey] + _current_test: XCTestLogParserKey | None def __init__(self) -> None: self._logs = defaultdict(list) @@ -67,7 +67,7 @@ def parse_streaming_log(self, line: str) -> None: elif event.event == "end-test": self._current_test = None - def get_logs_for_test(self, class_name: str, method_name: str) -> List[str]: + def get_logs_for_test(self, class_name: str, method_name: str) -> list[str]: key = XCTestLogParserKey(className=class_name, methodName=method_name) return self._logs[key] diff --git a/idb/grpc/xctrace.py b/idb/grpc/xctrace.py index 64e2419e7..b3dd73642 100644 --- a/idb/grpc/xctrace.py +++ b/idb/grpc/xctrace.py @@ -9,8 +9,9 @@ import asyncio import re +from collections.abc import AsyncIterator from logging import Logger -from typing import AsyncIterator, Optional +from typing import Optional from idb.grpc.idb_pb2 import XctraceRecordRequest, XctraceRecordResponse from idb.grpc.stream import Stream @@ -65,7 +66,7 @@ async def xctrace_drain_until_stop( logger.info(output.decode()) -def formatted_time_to_seconds(formatted_time: Optional[str]) -> Optional[float]: +def formatted_time_to_seconds(formatted_time: str | None) -> float | None: if not formatted_time: return None pattern = r"^([1-9]\d*)(ms|s|m|h)$" diff --git a/idb/utils/contextlib.py b/idb/utils/contextlib.py index a44ffe038..2177220c2 100644 --- a/idb/utils/contextlib.py +++ b/idb/utils/contextlib.py @@ -9,8 +9,9 @@ import abc import sys +from collections.abc import AsyncIterator, Callable from functools import wraps -from typing import AsyncContextManager, AsyncIterator, Callable, TypeVar +from typing import AsyncContextManager, TypeVar # @asynccontextmanager is available in Python 3.7 @@ -257,8 +258,5 @@ def helper(*args, **kwds): return helper -if sys.version_info >= (3, 7): - # Use the offical python one if available - from contextlib import asynccontextmanager -else: - asynccontextmanager = _asynccontextmanager +# Use the offical python one if available +from contextlib import asynccontextmanager diff --git a/idb/utils/testing.py b/idb/utils/testing.py index 55631bfdc..c18465485 100644 --- a/idb/utils/testing.py +++ b/idb/utils/testing.py @@ -13,7 +13,8 @@ import unittest import unittest.mock as _mock import warnings -from typing import Awaitable, Callable, cast, TypeVar +from collections.abc import Awaitable, Callable +from typing import cast, TypeVar # pyre-ignore from unittest.case import _Outcome @@ -230,7 +231,7 @@ class AsyncMock(_mock.Mock): """ def __call__(self, *args, **kwargs): - sup = super(AsyncMock, self) + sup = super() async def coro(): return sup.__call__(*args, **kwargs) diff --git a/idb/utils/typing.py b/idb/utils/typing.py index f8fef3675..b58ef7dc8 100644 --- a/idb/utils/typing.py +++ b/idb/utils/typing.py @@ -12,6 +12,6 @@ T = TypeVar("T") -def none_throws(optional: Optional[T]) -> T: +def none_throws(optional: T | None) -> T: assert optional is not None, "Unexpected None" return optional