From a8ec1b09493616dde52b658317e3c87551016176 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Fri, 3 Feb 2023 18:45:45 -0800 Subject: [PATCH 1/2] minor cleanup of the internals of the usb transport implementation --- bumble/transport/usb.py | 111 +++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 58 deletions(-) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index ccc82c19..baeb27ff 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -113,7 +113,7 @@ class UsbPacketSink: def __init__(self, device, acl_out): self.device = device self.acl_out = acl_out - self.transfer = device.getTransfer() + self.acl_out_transfer = device.getTransfer() self.packets = collections.deque() # Queue of packets waiting to be sent self.loop = asyncio.get_running_loop() self.cancel_done = self.loop.create_future() @@ -137,21 +137,20 @@ def on_packet(self, packet): # The queue was previously empty, re-prime the pump self.process_queue() - def on_packet_sent(self, transfer): + def transfer_callback(self, transfer): status = transfer.getStatus() - # logger.debug(f'<<< USB out transfer callback: status={status}') # pylint: disable=no-member if status == usb1.TRANSFER_COMPLETED: - self.loop.call_soon_threadsafe(self.on_packet_sent_) + self.loop.call_soon_threadsafe(self.on_packet_sent) elif status == usb1.TRANSFER_CANCELLED: self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) else: logger.warning( - color(f'!!! out transfer not completed: status={status}', 'red') + color(f'!!! OUT transfer not completed: status={status}', 'red') ) - def on_packet_sent_(self): + def on_packet_sent(self): if self.packets: self.packets.popleft() self.process_queue() @@ -163,22 +162,20 @@ def process_queue(self): packet = self.packets[0] packet_type = packet[0] if packet_type == hci.HCI_ACL_DATA_PACKET: - self.transfer.setBulk( - self.acl_out, packet[1:], callback=self.on_packet_sent + self.acl_out_transfer.setBulk( + self.acl_out, packet[1:], callback=self.transfer_callback ) - logger.debug('submit ACL') - self.transfer.submit() + self.acl_out_transfer.submit() elif packet_type == hci.HCI_COMMAND_PACKET: - self.transfer.setControl( + self.acl_out_transfer.setControl( USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, 0, 0, 0, packet[1:], - callback=self.on_packet_sent, + callback=self.transfer_callback, ) - logger.debug('submit COMMAND') - self.transfer.submit() + self.acl_out_transfer.submit() else: logger.warning(color(f'unsupported packet type {packet_type}', 'red')) @@ -193,11 +190,11 @@ async def terminate(self): self.packets.clear() # If we have a transfer in flight, cancel it - if self.transfer.isSubmitted(): + if self.acl_out_transfer.isSubmitted(): # Try to cancel the transfer, but that may fail because it may have # already completed try: - self.transfer.cancel() + self.acl_out_transfer.cancel() logger.debug('waiting for OUT transfer cancellation to be done...') await self.cancel_done @@ -206,27 +203,22 @@ async def terminate(self): logger.debug('OUT transfer likely already completed') class UsbPacketSource(asyncio.Protocol, ParserSource): - def __init__(self, context, device, metadata, acl_in, events_in): + def __init__(self, device, metadata, acl_in, events_in): super().__init__() - self.context = context self.device = device self.metadata = metadata self.acl_in = acl_in + self.acl_in_transfer = None self.events_in = events_in + self.events_in_transfer = None self.loop = asyncio.get_running_loop() self.queue = asyncio.Queue() self.dequeue_task = None - self.closed = False - self.event_loop_done = self.loop.create_future() self.cancel_done = { hci.HCI_EVENT_PACKET: self.loop.create_future(), hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), } - self.events_in_transfer = None - self.acl_in_transfer = None - - # Create a thread to process events - self.event_thread = threading.Thread(target=self.run) + self.closed = False def start(self): # Set up transfer objects for input @@ -234,7 +226,7 @@ def start(self): self.events_in_transfer.setInterrupt( self.events_in, READ_SIZE, - callback=self.on_packet_received, + callback=self.transfer_callback, user_data=hci.HCI_EVENT_PACKET, ) self.events_in_transfer.submit() @@ -243,22 +235,23 @@ def start(self): self.acl_in_transfer.setBulk( self.acl_in, READ_SIZE, - callback=self.on_packet_received, + callback=self.transfer_callback, user_data=hci.HCI_ACL_DATA_PACKET, ) self.acl_in_transfer.submit() self.dequeue_task = self.loop.create_task(self.dequeue()) - self.event_thread.start() - def on_packet_received(self, transfer): + @property + def usb_transfer_submitted(self): + return ( + self.events_in_transfer.isSubmitted() + or self.acl_in_transfer.isSubmitted() + ) + + def transfer_callback(self, transfer): packet_type = transfer.getUserData() status = transfer.getStatus() - # logger.debug( - # f'<<< USB IN transfer callback: status={status} ' - # f'packet_type={packet_type} ' - # f'length={transfer.getActualLength()}' - # ) # pylint: disable=no-member if status == usb1.TRANSFER_COMPLETED: @@ -267,19 +260,18 @@ def on_packet_received(self, transfer): + transfer.getBuffer()[: transfer.getActualLength()] ) self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) + + # Re-submit the transfer so we can receive more data + transfer.submit() elif status == usb1.TRANSFER_CANCELLED: self.loop.call_soon_threadsafe( self.cancel_done[packet_type].set_result, None ) - return else: logger.warning( - color(f'!!! transfer not completed: status={status}', 'red') + color(f'!!! IN transfer not completed: status={status}', 'red') ) - # Re-submit the transfer so we can receive more data - transfer.submit() - async def dequeue(self): while not self.closed: try: @@ -288,21 +280,6 @@ async def dequeue(self): return self.parser.feed_data(packet) - def run(self): - logger.debug('starting USB event loop') - while ( - self.events_in_transfer.isSubmitted() - or self.acl_in_transfer.isSubmitted() - ): - # pylint: disable=no-member - try: - self.context.handleEvents() - except usb1.USBErrorInterrupted: - pass - - logger.debug('USB event loop done') - self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None) - def close(self): self.closed = True @@ -331,15 +308,14 @@ async def terminate(self): f'IN[{packet_type}] transfer likely already completed' ) - # Wait for the thread to terminate - await self.event_loop_done - class UsbTransport(Transport): def __init__(self, context, device, interface, setting, source, sink): super().__init__(source, sink) self.context = context self.device = device self.interface = interface + self.loop = asyncio.get_running_loop() + self.event_loop_done = self.loop.create_future() # Get exclusive access device.claimInterface(interface) @@ -352,6 +328,22 @@ def __init__(self, context, device, interface, setting, source, sink): source.start() sink.start() + # Create a thread to process events + self.event_thread = threading.Thread(target=self.run) + self.event_thread.start() + + def run(self): + logger.debug('starting USB event loop') + while self.source.usb_transfer_submitted: + # pylint: disable=no-member + try: + self.context.handleEvents() + except usb1.USBErrorInterrupted: + pass + + logger.debug('USB event loop done') + self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None) + async def close(self): self.source.close() self.sink.close() @@ -361,6 +353,9 @@ async def close(self): self.device.close() self.context.close() + # Wait for the thread to terminate + await self.event_loop_done + # Find the device according to the spec moniker load_libusb() context = usb1.USBContext() @@ -540,7 +535,7 @@ def find_endpoints(device): except usb1.USBError: logger.warning('failed to set configuration') - source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) + source = UsbPacketSource(device, device_metadata, acl_in, events_in) sink = UsbPacketSink(device, acl_out) return UsbTransport(context, device, interface, setting, source, sink) except usb1.USBError as error: From 9c7089c8fff039ba41ed14ee53ac3bf4cfb595bd Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sun, 19 Nov 2023 11:36:38 -0800 Subject: [PATCH 2/2] terminate when unplugged --- bumble/transport/usb.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index baeb27ff..d48b2392 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -24,9 +24,10 @@ import usb1 -from .common import Transport, ParserSource -from .. import hci -from ..colors import color +from bumble.transport.common import Transport, ParserSource +from bumble import hci +from bumble.colors import color +from bumble.utils import AsyncRunner # ----------------------------------------------------------------------------- @@ -271,6 +272,7 @@ def transfer_callback(self, transfer): logger.warning( color(f'!!! IN transfer not completed: status={status}', 'red') ) + self.loop.call_soon_threadsafe(self.on_transport_lost) async def dequeue(self): while not self.closed: