From f3cd8f8ed0ef2ac5ec75adb8fea60e568c68b8e3 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Mon, 27 Nov 2023 16:10:01 +0800 Subject: [PATCH] Typing helper --- bumble/hci.py | 4 ++ bumble/helpers.py | 109 ++++++++++++++++++++++++++++------------------ bumble/l2cap.py | 8 ++++ 3 files changed, 79 insertions(+), 42 deletions(-) diff --git a/bumble/hci.py b/bumble/hci.py index 8897624e..67fe4571 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -5296,6 +5296,10 @@ class HCI_Disconnection_Complete_Event(HCI_Event): See Bluetooth spec @ 7.7.5 Disconnection Complete Event ''' + status: int + connection_handle: int + reason: int + # ----------------------------------------------------------------------------- @HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)]) diff --git a/bumble/helpers.py b/bumble/helpers.py index 83c7c6df..61748510 100644 --- a/bumble/helpers.py +++ b/bumble/helpers.py @@ -15,30 +15,39 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + +from collections.abc import Callable, MutableMapping +from typing import cast, Any import logging -from .colors import color -from .att import ATT_CID, ATT_PDU -from .smp import SMP_CID, SMP_Command -from .core import name_or_number -from .l2cap import ( +from bumble import avdtp +from bumble.colors import color +from bumble.att import ATT_CID, ATT_PDU +from bumble.smp import SMP_CID, SMP_Command +from bumble.core import name_or_number +from bumble.l2cap import ( L2CAP_PDU, L2CAP_CONNECTION_REQUEST, L2CAP_CONNECTION_RESPONSE, L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID, L2CAP_Control_Frame, + L2CAP_Connection_Request, L2CAP_Connection_Response, ) -from .hci import ( +from bumble.hci import ( HCI_EVENT_PACKET, HCI_ACL_DATA_PACKET, HCI_DISCONNECTION_COMPLETE_EVENT, HCI_AclDataPacketAssembler, + HCI_Packet, + HCI_Event, + HCI_AclDataPacket, + HCI_Disconnection_Complete_Event, ) -from .rfcomm import RFCOMM_Frame, RFCOMM_PSM -from .sdp import SDP_PDU, SDP_PSM -from .avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM +from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM +from bumble.sdp import SDP_PDU, SDP_PSM # ----------------------------------------------------------------------------- # Logging @@ -50,23 +59,25 @@ PSM_NAMES = { RFCOMM_PSM: 'RFCOMM', SDP_PSM: 'SDP', - AVDTP_PSM: 'AVDTP' - # TODO: add more PSM values + avdtp.AVDTP_PSM: 'AVDTP', } # ----------------------------------------------------------------------------- class PacketTracer: class AclStream: - def __init__(self, analyzer): + psms: MutableMapping[int, int] + peer: PacketTracer.AclStream + avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler] + + def __init__(self, analyzer: PacketTracer.Analyzer) -> None: self.analyzer = analyzer self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid self.psms = {} # PSM, by source_cid - self.peer = None # ACL stream in the other direction # pylint: disable=too-many-nested-blocks - def on_acl_pdu(self, pdu): + def on_acl_pdu(self, pdu: bytes) -> None: l2cap_pdu = L2CAP_PDU.from_bytes(pdu) if l2cap_pdu.cid == ATT_CID: @@ -81,26 +92,30 @@ def on_acl_pdu(self, pdu): # Check if this signals a new channel if control_frame.code == L2CAP_CONNECTION_REQUEST: - self.psms[control_frame.source_cid] = control_frame.psm + connection_request = cast(L2CAP_Connection_Request, control_frame) + self.psms[connection_request.source_cid] = connection_request.psm elif control_frame.code == L2CAP_CONNECTION_RESPONSE: + connection_response = cast(L2CAP_Connection_Response, control_frame) if ( - control_frame.result + connection_response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL ): if self.peer: - if psm := self.peer.psms.get(control_frame.source_cid): + if psm := self.peer.psms.get( + connection_response.source_cid + ): # Found a pending connection - self.psms[control_frame.destination_cid] = psm + self.psms[connection_response.destination_cid] = psm # For AVDTP connections, create a packet assembler for # each direction - if psm == AVDTP_PSM: + if psm == avdtp.AVDTP_PSM: self.avdtp_assemblers[ - control_frame.source_cid - ] = AVDTP_MessageAssembler(self.on_avdtp_message) + connection_response.source_cid + ] = avdtp.MessageAssembler(self.on_avdtp_message) self.peer.avdtp_assemblers[ - control_frame.destination_cid - ] = AVDTP_MessageAssembler( + connection_response.destination_cid + ] = avdtp.MessageAssembler( self.peer.on_avdtp_message ) @@ -113,7 +128,7 @@ def on_acl_pdu(self, pdu): elif psm == RFCOMM_PSM: rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload) self.analyzer.emit(rfcomm_frame) - elif psm == AVDTP_PSM: + elif psm == avdtp.AVDTP_PSM: self.analyzer.emit( f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}' @@ -130,22 +145,26 @@ def on_acl_pdu(self, pdu): else: self.analyzer.emit(l2cap_pdu) - def on_avdtp_message(self, transaction_label, message): + def on_avdtp_message( + self, transaction_label: int, message: avdtp.Message + ) -> None: self.analyzer.emit( f'{color("AVDTP", "green")} [{transaction_label}] {message}' ) - def feed_packet(self, packet): + def feed_packet(self, packet: HCI_AclDataPacket) -> None: self.packet_assembler.feed_packet(packet) class Analyzer: - def __init__(self, label, emit_message): + acl_streams: MutableMapping[int, PacketTracer.AclStream] + peer: PacketTracer.Analyzer + + def __init__(self, label: str, emit_message: Callable[..., None]) -> None: self.label = label self.emit_message = emit_message self.acl_streams = {} # ACL streams, by connection handle - self.peer = None # Analyzer in the other direction - def start_acl_stream(self, connection_handle): + def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream: logger.info( f'[{self.label}] +++ Creating ACL stream for connection ' f'0x{connection_handle:04X}' @@ -160,7 +179,7 @@ def start_acl_stream(self, connection_handle): return stream - def end_acl_stream(self, connection_handle): + def end_acl_stream(self, connection_handle: int) -> None: if connection_handle in self.acl_streams: logger.info( f'[{self.label}] --- Removing ACL stream for connection ' @@ -171,23 +190,29 @@ def end_acl_stream(self, connection_handle): # Let the other forwarder know so it can cleanup its stream as well self.peer.end_acl_stream(connection_handle) - def on_packet(self, packet): + def on_packet(self, packet: HCI_Packet) -> None: self.emit(packet) if packet.hci_packet_type == HCI_ACL_DATA_PACKET: + acl_packet = cast(HCI_AclDataPacket, packet) # Look for an existing stream for this handle, create one if it is the # first ACL packet for that connection handle - if (stream := self.acl_streams.get(packet.connection_handle)) is None: - stream = self.start_acl_stream(packet.connection_handle) - stream.feed_packet(packet) + if ( + stream := self.acl_streams.get(acl_packet.connection_handle) + ) is None: + stream = self.start_acl_stream(acl_packet.connection_handle) + stream.feed_packet(acl_packet) elif packet.hci_packet_type == HCI_EVENT_PACKET: - if packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT: - self.end_acl_stream(packet.connection_handle) + event_packet = cast(HCI_Event, packet) + if event_packet.event_code == HCI_DISCONNECTION_COMPLETE_EVENT: + self.end_acl_stream( + cast(HCI_Disconnection_Complete_Event, packet).connection_handle + ) - def emit(self, message): + def emit(self, message: Any) -> None: self.emit_message(f'[{self.label}] {message}') - def trace(self, packet, direction=0): + def trace(self, packet: HCI_Packet, direction: int = 0) -> None: if direction == 0: self.host_to_controller_analyzer.on_packet(packet) else: @@ -195,10 +220,10 @@ def trace(self, packet, direction=0): def __init__( self, - host_to_controller_label=color('HOST->CONTROLLER', 'blue'), - controller_to_host_label=color('CONTROLLER->HOST', 'cyan'), - emit_message=logger.info, - ): + host_to_controller_label: str = color('HOST->CONTROLLER', 'blue'), + controller_to_host_label: str = color('CONTROLLER->HOST', 'cyan'), + emit_message: Callable[..., None] = logger.info, + ) -> None: self.host_to_controller_analyzer = PacketTracer.Analyzer( host_to_controller_label, emit_message ) diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 7a2f0ede..4ccdeabd 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -391,6 +391,9 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame): See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST ''' + psm: int + source_cid: int + @staticmethod def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]: psm_length = 2 @@ -432,6 +435,11 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame): See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE ''' + source_cid: int + destination_cid: int + status: int + result: int + CONNECTION_SUCCESSFUL = 0x0000 CONNECTION_PENDING = 0x0001 CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002