diff --git a/bumble/device.py b/bumble/device.py index 07f44c08..713b7926 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -51,129 +51,9 @@ from pyee import EventEmitter -from bumble import hci from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .gatt import Characteristic, Descriptor, Service -from .hci import ( - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, - HCI_CENTRAL_ROLE, - HCI_PERIPHERAL_ROLE, - HCI_COMMAND_STATUS_PENDING, - HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, - HCI_DISPLAY_YES_NO_IO_CAPABILITY, - HCI_DISPLAY_ONLY_IO_CAPABILITY, - HCI_EXTENDED_INQUIRY_MODE, - HCI_GENERAL_INQUIRY_LAP, - HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, - HCI_KEYBOARD_ONLY_IO_CAPABILITY, - HCI_LE_1M_PHY, - HCI_LE_1M_PHY_BIT, - HCI_LE_2M_PHY, - HCI_LE_CODED_PHY, - HCI_LE_CODED_PHY_BIT, - HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, - HCI_LE_RAND_COMMAND, - HCI_LE_READ_PHY_COMMAND, - HCI_LE_SET_PHY_COMMAND, - HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - HCI_OPERATION_CANCELLED_BY_HOST_ERROR, - HCI_R2_PAGE_SCAN_REPETITION_MODE, - HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, - HCI_SUCCESS, - HCI_WRITE_LE_HOST_SUPPORT_COMMAND, - HCI_Accept_Connection_Request_Command, - HCI_Authentication_Requested_Command, - HCI_Command_Status_Event, - HCI_Constant, - HCI_Create_Connection_Cancel_Command, - HCI_Create_Connection_Command, - HCI_Connection_Complete_Event, - HCI_Disconnect_Command, - HCI_Encryption_Change_Event, - HCI_Error, - HCI_IO_Capability_Request_Reply_Command, - HCI_Inquiry_Cancel_Command, - HCI_Inquiry_Command, - HCI_IsoDataPacket, - HCI_LE_Accept_CIS_Request_Command, - HCI_LE_Add_Device_To_Resolving_List_Command, - HCI_LE_Advertising_Report_Event, - HCI_LE_BIGInfo_Advertising_Report_Event, - HCI_LE_Clear_Resolving_List_Command, - HCI_LE_Connection_Update_Command, - HCI_LE_Create_Connection_Cancel_Command, - HCI_LE_Create_Connection_Command, - HCI_LE_Create_CIS_Command, - HCI_LE_Periodic_Advertising_Create_Sync_Command, - HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command, - HCI_LE_Periodic_Advertising_Report_Event, - HCI_LE_Periodic_Advertising_Sync_Transfer_Command, - HCI_LE_Periodic_Advertising_Terminate_Sync_Command, - HCI_LE_Enable_Encryption_Command, - HCI_LE_Extended_Advertising_Report_Event, - HCI_LE_Extended_Create_Connection_Command, - HCI_LE_Rand_Command, - HCI_LE_Read_PHY_Command, - HCI_LE_Read_Remote_Features_Command, - HCI_LE_Reject_CIS_Request_Command, - HCI_LE_Remove_Advertising_Set_Command, - HCI_LE_Set_Address_Resolution_Enable_Command, - HCI_LE_Set_Advertising_Data_Command, - HCI_LE_Set_Advertising_Enable_Command, - HCI_LE_Set_Advertising_Parameters_Command, - HCI_LE_Set_Advertising_Set_Random_Address_Command, - HCI_LE_Set_CIG_Parameters_Command, - HCI_LE_Set_Data_Length_Command, - HCI_LE_Set_Default_PHY_Command, - HCI_LE_Set_Extended_Scan_Enable_Command, - HCI_LE_Set_Extended_Scan_Parameters_Command, - HCI_LE_Set_Extended_Scan_Response_Data_Command, - HCI_LE_Set_Extended_Advertising_Data_Command, - HCI_LE_Set_Extended_Advertising_Enable_Command, - HCI_LE_Set_Extended_Advertising_Parameters_Command, - HCI_LE_Set_Host_Feature_Command, - HCI_LE_Set_Periodic_Advertising_Enable_Command, - HCI_LE_Set_PHY_Command, - HCI_LE_Set_Random_Address_Command, - HCI_LE_Set_Scan_Enable_Command, - HCI_LE_Set_Scan_Parameters_Command, - HCI_LE_Set_Scan_Response_Data_Command, - HCI_PIN_Code_Request_Reply_Command, - HCI_PIN_Code_Request_Negative_Reply_Command, - HCI_Read_BD_ADDR_Command, - HCI_Read_RSSI_Command, - HCI_Reject_Connection_Request_Command, - HCI_Remote_Name_Request_Command, - HCI_Switch_Role_Command, - HCI_Set_Connection_Encryption_Command, - HCI_StatusError, - HCI_SynchronousDataPacket, - HCI_User_Confirmation_Request_Negative_Reply_Command, - HCI_User_Confirmation_Request_Reply_Command, - HCI_User_Passkey_Request_Negative_Reply_Command, - HCI_User_Passkey_Request_Reply_Command, - HCI_Write_Class_Of_Device_Command, - HCI_Write_Extended_Inquiry_Response_Command, - HCI_Write_Inquiry_Mode_Command, - HCI_Write_LE_Host_Support_Command, - HCI_Write_Local_Name_Command, - HCI_Write_Scan_Enable_Command, - HCI_Write_Secure_Connections_Host_Support_Command, - HCI_Write_Simple_Pairing_Mode_Command, - Address, - OwnAddressType, - LeFeature, - LeFeatureMask, - LmpFeatureMask, - Phy, - phy_list_to_bits, -) from .host import Host from .profiles.gap import GenericAccessService from .core import ( @@ -207,6 +87,7 @@ KeyStore, PairingKeys, ) +from bumble import hci from bumble import pairing from bumble import gatt_client from bumble import gatt_server @@ -262,7 +143,7 @@ DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE + hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE ) DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0 @@ -286,8 +167,8 @@ class ObjectLookupError(BaseBumbleError): @dataclass class Advertisement: # Attributes - address: Address - rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + address: hci.Address + rssi: int = hci.HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE is_legacy: bool = False is_anonymous: bool = False is_connectable: bool = False @@ -299,17 +180,17 @@ class Advertisement: primary_phy: int = 0 secondary_phy: int = 0 tx_power: int = ( - HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + hci.HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE ) sid: int = 0 data_bytes: bytes = b'' # Constants TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( - HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + hci.HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE ) RSSI_NOT_AVAILABLE: ClassVar[int] = ( - HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + hci.HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE ) def __post_init__(self) -> None: @@ -317,10 +198,10 @@ def __post_init__(self) -> None: @classmethod def from_advertising_report(cls, report) -> Optional[Advertisement]: - if isinstance(report, HCI_LE_Advertising_Report_Event.Report): + if isinstance(report, hci.HCI_LE_Advertising_Report_Event.Report): return LegacyAdvertisement.from_advertising_report(report) - if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report): + if isinstance(report, hci.HCI_LE_Extended_Advertising_Report_Event.Report): return ExtendedAdvertisement.from_advertising_report(report) return None @@ -336,18 +217,18 @@ def from_advertising_report(cls, report): is_legacy=True, is_connectable=report.event_type in ( - HCI_LE_Advertising_Report_Event.ADV_IND, - HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, + hci.HCI_LE_Advertising_Report_Event.ADV_IND, + hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, ), is_directed=report.event_type - == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, + == hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, is_scannable=report.event_type in ( - HCI_LE_Advertising_Report_Event.ADV_IND, - HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, + hci.HCI_LE_Advertising_Report_Event.ADV_IND, + hci.HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, ), is_scan_response=report.event_type - == HCI_LE_Advertising_Report_Event.SCAN_RSP, + == hci.HCI_LE_Advertising_Report_Event.SCAN_RSP, data_bytes=report.data, ) @@ -361,14 +242,14 @@ def from_advertising_report(cls, report): return cls( address = report.address, rssi = report.rssi, - is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, - is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, - is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, - is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, - is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, - is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, - is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, - is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, + is_legacy = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, + is_anonymous = report.address.address_type == hci.HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, + is_connectable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, + is_directed = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, + is_scannable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, + is_scan_response = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, + is_complete = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, + is_truncated = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, primary_phy = report.primary_phy, secondary_phy = report.secondary_phy, tx_power = report.tx_power, @@ -473,15 +354,15 @@ def is_high_duty_cycle_directed_connectable(self): class LegacyAdvertiser: device: Device advertising_type: AdvertisingType - own_address_type: OwnAddressType - peer_address: Address + own_address_type: hci.OwnAddressType + peer_address: hci.Address auto_restart: bool async def start(self) -> None: # Set/update the advertising data if the advertising type allows it if self.advertising_type.has_data: await self.device.send_command( - HCI_LE_Set_Advertising_Data_Command( + hci.HCI_LE_Set_Advertising_Data_Command( advertising_data=self.device.advertising_data ), check_result=True, @@ -490,7 +371,7 @@ async def start(self) -> None: # Set/update the scan response data if the advertising is scannable if self.advertising_type.is_scannable: await self.device.send_command( - HCI_LE_Set_Scan_Response_Data_Command( + hci.HCI_LE_Set_Scan_Response_Data_Command( scan_response_data=self.device.scan_response_data ), check_result=True, @@ -498,7 +379,7 @@ async def start(self) -> None: # Set the advertising parameters await self.device.send_command( - HCI_LE_Set_Advertising_Parameters_Command( + hci.HCI_LE_Set_Advertising_Parameters_Command( advertising_interval_min=self.device.advertising_interval_min, advertising_interval_max=self.device.advertising_interval_max, advertising_type=int(self.advertising_type), @@ -513,14 +394,14 @@ async def start(self) -> None: # Enable advertising await self.device.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), + hci.HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), check_result=True, ) async def stop(self) -> None: # Disable advertising await self.device.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), + hci.HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), check_result=True, ) @@ -537,8 +418,8 @@ class AdvertisingEventProperties: include_tx_power: bool = False def __int__(self) -> int: - properties = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) + properties = hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( + 0 ) if self.is_connectable: properties |= properties.CONNECTABLE_ADVERTISING @@ -576,21 +457,21 @@ def from_advertising_type( # ----------------------------------------------------------------------------- @dataclass class PeriodicAdvertisement: - address: Address + address: hci.Address sid: int tx_power: int = ( - HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + hci.HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE ) - rssi: int = HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE + rssi: int = hci.HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE is_truncated: bool = False data_bytes: bytes = b'' # Constants TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( - HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + hci.HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE ) RSSI_NOT_AVAILABLE: ClassVar[int] = ( - HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE + hci.HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE ) def __post_init__(self) -> None: @@ -602,7 +483,7 @@ def __post_init__(self) -> None: # ----------------------------------------------------------------------------- @dataclass class BIGInfoAdvertisement: - address: Address + address: hci.Address sid: int num_bis: int nse: int @@ -613,12 +494,12 @@ class BIGInfoAdvertisement: max_pdu: int sdu_interval: int max_sdu: int - phy: Phy + phy: hci.Phy framed: bool encrypted: bool @classmethod - def from_report(cls, address: Address, sid: int, report) -> Self: + def from_report(cls, address: hci.Address, sid: int, report) -> Self: return cls( address, sid, @@ -631,7 +512,7 @@ def from_report(cls, address: Address, sid: int, report) -> Self: report.max_pdu, report.sdu_interval, report.max_sdu, - Phy(report.phy), + hci.Phy(report.phy), report.framing != 0, report.encryption != 0, ) @@ -639,7 +520,9 @@ def from_report(cls, address: Address, sid: int, report) -> Self: # ----------------------------------------------------------------------------- # TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 -AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap +AdvertisingChannelMap = ( + hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap +) # ----------------------------------------------------------------------------- @@ -652,19 +535,19 @@ class AdvertisingParameters: primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL primary_advertising_channel_map: ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap + hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap ) = ( AdvertisingChannelMap.CHANNEL_37 | AdvertisingChannelMap.CHANNEL_38 | AdvertisingChannelMap.CHANNEL_39 ) - own_address_type: OwnAddressType = OwnAddressType.RANDOM - peer_address: Address = Address.ANY + own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM + peer_address: hci.Address = hci.Address.ANY advertising_filter_policy: int = 0 advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER - primary_advertising_phy: Phy = Phy.LE_1M + primary_advertising_phy: hci.Phy = hci.Phy.LE_1M secondary_advertising_max_skip: int = 0 - secondary_advertising_phy: Phy = Phy.LE_1M + secondary_advertising_phy: hci.Phy = hci.Phy.LE_1M advertising_sid: int = 0 enable_scan_request_notifications: bool = False primary_advertising_phy_options: int = 0 @@ -684,7 +567,7 @@ class AdvertisingSet(EventEmitter): device: Device advertising_handle: int auto_restart: bool - random_address: Optional[Address] + random_address: Optional[hci.Address] advertising_parameters: AdvertisingParameters advertising_data: bytes scan_response_data: bytes @@ -711,7 +594,7 @@ async def set_advertising_parameters( ) response = await self.device.send_command( - HCI_LE_Set_Extended_Advertising_Parameters_Command( + hci.HCI_LE_Set_Extended_Advertising_Parameters_Command( advertising_handle=self.advertising_handle, advertising_event_properties=int( advertising_parameters.advertising_event_properties @@ -752,10 +635,10 @@ async def set_advertising_parameters( async def set_advertising_data(self, advertising_data: bytes) -> None: # pylint: disable=line-too-long await self.device.send_command( - HCI_LE_Set_Extended_Advertising_Data_Command( + hci.HCI_LE_Set_Extended_Advertising_Data_Command( advertising_handle=self.advertising_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, advertising_data=advertising_data, ), check_result=True, @@ -775,10 +658,10 @@ async def set_scan_response_data(self, scan_response_data: bytes) -> None: return await self.device.send_command( - HCI_LE_Set_Extended_Scan_Response_Data_Command( + hci.HCI_LE_Set_Extended_Scan_Response_Data_Command( advertising_handle=self.advertising_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=hci.HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, scan_response_data=scan_response_data, ), check_result=True, @@ -788,16 +671,15 @@ async def set_scan_response_data(self, scan_response_data: bytes) -> None: async def set_periodic_advertising_parameters( self, advertising_parameters: PeriodicAdvertisingParameters ) -> None: - # TODO: send command self.periodic_advertising_parameters = advertising_parameters async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: # TODO: send command self.periodic_advertising_data = advertising_data - async def set_random_address(self, random_address: Address) -> None: + async def set_random_address(self, random_address: hci.Address) -> None: await self.device.send_command( - HCI_LE_Set_Advertising_Set_Random_Address_Command( + hci.HCI_LE_Set_Advertising_Set_Random_Address_Command( advertising_handle=self.advertising_handle, random_address=(random_address or self.device.random_address), ), @@ -818,7 +700,7 @@ async def start( (the default) for an unlimited number of advertisements. """ await self.device.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( + hci.HCI_LE_Set_Extended_Advertising_Enable_Command( enable=1, advertising_handles=[self.advertising_handle], durations=[round(duration * 100)], @@ -832,7 +714,7 @@ async def start( async def start_periodic(self, include_adi: bool = False) -> None: await self.device.send_command( - HCI_LE_Set_Periodic_Advertising_Enable_Command( + hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( enable=1 | (2 if include_adi else 0), advertising_handles=self.advertising_handle, ), @@ -843,7 +725,7 @@ async def start_periodic(self, include_adi: bool = False) -> None: async def stop(self) -> None: await self.device.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( + hci.HCI_LE_Set_Extended_Advertising_Enable_Command( enable=0, advertising_handles=[self.advertising_handle], durations=[0], @@ -857,7 +739,7 @@ async def stop(self) -> None: async def stop_periodic(self) -> None: await self.device.send_command( - HCI_LE_Set_Periodic_Advertising_Enable_Command( + hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( enable=0, advertising_handles=self.advertising_handle, ), @@ -868,7 +750,7 @@ async def stop_periodic(self) -> None: async def remove(self) -> None: await self.device.send_command( - HCI_LE_Remove_Advertising_Set_Command( + hci.HCI_LE_Remove_Advertising_Set_Command( advertising_handle=self.advertising_handle ), check_result=True, @@ -893,7 +775,7 @@ class State(Enum): _state: State sync_handle: Optional[int] - advertiser_address: Address + advertiser_address: hci.Address sid: int skip: int sync_timeout: float # Sync timeout, in seconds @@ -906,7 +788,7 @@ class State(Enum): def __init__( self, device: Device, - advertiser_address: Address, + advertiser_address: hci.Address, sid: int, skip: int, sync_timeout: float, @@ -921,7 +803,7 @@ def __init__( self.skip = skip self.sync_timeout = sync_timeout self.filter_duplicates = filter_duplicates - self.status = HCI_SUCCESS + self.status = hci.HCI_SUCCESS self.advertiser_phy = 0 self.periodic_advertising_interval = 0 self.advertiser_clock_accuracy = 0 @@ -941,14 +823,14 @@ async def establish(self) -> None: if self.state != self.State.INIT: raise InvalidStateError('sync not in init state') - options = HCI_LE_Periodic_Advertising_Create_Sync_Command.Options(0) + options = hci.HCI_LE_Periodic_Advertising_Create_Sync_Command.Options(0) if self.filter_duplicates: options |= ( - HCI_LE_Periodic_Advertising_Create_Sync_Command.Options.DUPLICATE_FILTERING_INITIALLY_ENABLED + hci.HCI_LE_Periodic_Advertising_Create_Sync_Command.Options.DUPLICATE_FILTERING_INITIALLY_ENABLED ) response = await self.device.send_command( - HCI_LE_Periodic_Advertising_Create_Sync_Command( + hci.HCI_LE_Periodic_Advertising_Create_Sync_Command( options=options, advertising_sid=self.sid, advertiser_address_type=self.advertiser_address.address_type, @@ -958,8 +840,8 @@ async def establish(self) -> None: sync_cte_type=0, ) ) - if response.status != HCI_Command_Status_Event.PENDING: - raise HCI_StatusError(response) + if response.status != hci.HCI_Command_Status_Event.PENDING: + raise hci.HCI_StatusError(response) self.state = self.State.PENDING @@ -970,9 +852,9 @@ async def terminate(self) -> None: if self.state == self.State.PENDING: self.state = self.State.CANCELLED response = await self.device.send_command( - HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(), + hci.HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(), ) - if response.return_parameters == HCI_SUCCESS: + if response.return_parameters == hci.HCI_SUCCESS: if self in self.device.periodic_advertising_syncs: self.device.periodic_advertising_syncs.remove(self) return @@ -981,7 +863,7 @@ async def terminate(self) -> None: self.state = self.State.TERMINATED if self.sync_handle is not None: await self.device.send_command( - HCI_LE_Periodic_Advertising_Terminate_Sync_Command( + hci.HCI_LE_Periodic_Advertising_Terminate_Sync_Command( sync_handle=self.sync_handle ) ) @@ -1013,7 +895,7 @@ def on_establishment( AsyncRunner.spawn(self.terminate()) return - if status == HCI_SUCCESS: + if status == hci.HCI_SUCCESS: self.sync_handle = sync_handle self.advertiser_phy = advertiser_phy self.periodic_advertising_interval = periodic_advertising_interval @@ -1026,7 +908,7 @@ def on_establishment( if self in self.device.periodic_advertising_syncs: self.device.periodic_advertising_syncs.remove(self) - if status == HCI_OPERATION_CANCELLED_BY_HOST_ERROR: + if status == hci.HCI_OPERATION_CANCELLED_BY_HOST_ERROR: self.state = self.State.CANCELLED self.emit('cancellation') return @@ -1042,7 +924,7 @@ def on_periodic_advertising_report(self, report) -> None: self.data_accumulator += report.data if ( report.data_status - == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_MORE_TO_COME + == hci.HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_MORE_TO_COME ): return @@ -1055,7 +937,7 @@ def on_periodic_advertising_report(self, report) -> None: report.rssi, is_truncated=( report.data_status - == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME + == hci.HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME ), data_bytes=self.data_accumulator, ), @@ -1278,13 +1160,13 @@ class ScoLink(CompositeEventEmitter): acl_connection: Connection handle: int link_type: int - sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None + sink: Optional[Callable[[hci.HCI_SynchronousDataPacket], Any]] = None def __post_init__(self) -> None: super().__init__() async def disconnect( - self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + self, reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR ) -> None: await self.device.disconnect(self, reason) @@ -1302,13 +1184,13 @@ class State(IntEnum): cis_id: int # CIS ID assigned by Central device cig_id: int # CIG ID assigned by Central device state: State = State.PENDING - sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None + sink: Optional[Callable[[hci.HCI_IsoDataPacket], Any]] = None def __post_init__(self) -> None: super().__init__() async def disconnect( - self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + self, reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR ) -> None: await self.device.disconnect(self, reason) @@ -1318,11 +1200,11 @@ class Connection(CompositeEventEmitter): device: Device handle: int transport: int - self_address: Address - self_resolvable_address: Optional[Address] - peer_address: Address - peer_resolvable_address: Optional[Address] - peer_le_features: Optional[LeFeatureMask] + self_address: hci.Address + self_resolvable_address: Optional[hci.Address] + peer_address: hci.Address + peer_resolvable_address: Optional[hci.Address] + peer_le_features: Optional[hci.LeFeatureMask] role: int encryption: int authenticated: bool @@ -1478,7 +1360,7 @@ async def create_l2cap_channel( return await self.device.create_l2cap_channel(connection=self, spec=spec) async def disconnect( - self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + self, reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR ) -> None: await self.device.disconnect(self, reason) @@ -1549,7 +1431,7 @@ async def transfer_periodic_sync( async def request_remote_name(self): return await self.device.request_remote_name(self) - async def get_remote_le_features(self) -> LeFeatureMask: + async def get_remote_le_features(self) -> hci.LeFeatureMask: """[LE Only] Reads remote LE supported features. Returns: @@ -1565,9 +1447,9 @@ async def __aexit__(self, exc_type, exc_value, traceback): if exc_type is None: try: await self.disconnect() - except HCI_StatusError as error: + except hci.HCI_StatusError as error: # Invalid parameter means the connection is no longer valid - if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR: + if error.error_code != hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR: raise def __str__(self): @@ -1594,7 +1476,7 @@ def __str__(self): class DeviceConfiguration: # Setup defaults name: str = DEVICE_DEFAULT_NAME - address: Address = Address(DEVICE_DEFAULT_ADDRESS) + address: hci.Address = hci.Address(DEVICE_DEFAULT_ADDRESS) class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL @@ -1632,12 +1514,12 @@ def load_from_dict(self, config: Dict[str, Any]) -> None: # Load simple properties if address := config.pop('address', None): - self.address = Address(address) + self.address = hci.Address(address) # Load or synthesize an IRK if irk := config.pop('irk', None): self.irk = bytes.fromhex(irk) - elif self.address != Address(DEVICE_DEFAULT_ADDRESS): + elif self.address != hci.Address(DEVICE_DEFAULT_ADDRESS): # Construct an IRK from the address bytes # NOTE: this is not secure, but will always give the same IRK for the same # address @@ -1774,9 +1656,11 @@ def host_event_handler(function): # ----------------------------------------------------------------------------- class Device(CompositeEventEmitter): # Incomplete list of fields. - random_address: Address # Random private address that may change periodically - public_address: Address # Public address that is globally unique (from controller) - static_address: Address # Random static address that does not change once set + random_address: hci.Address # Random private address that may change periodically + public_address: ( + hci.Address + ) # Public address that is globally unique (from controller) + static_address: hci.Address # Random static address that does not change once set classic_enabled: bool name: str class_of_device: int @@ -1784,11 +1668,12 @@ class Device(CompositeEventEmitter): advertising_data: bytes scan_response_data: bytes connections: Dict[int, Connection] - pending_connections: Dict[Address, Connection] + pending_connections: Dict[hci.Address, Connection] classic_pending_accepts: Dict[ - Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] + hci.Address, + List[asyncio.Future[Union[Connection, Tuple[hci.Address, int, int]]]], ] - advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] + advertisement_accumulators: Dict[hci.Address, AdvertisementDataAccumulator] periodic_advertising_syncs: List[PeriodicAdvertisingSync] config: DeviceConfiguration legacy_advertiser: Optional[LegacyAdvertiser] @@ -1822,7 +1707,7 @@ def on_characteristic_subscription( def with_hci( cls, name: str, - address: Address, + address: hci.Address, hci_source: TransportSource, hci_sink: TransportSink, ) -> Device: @@ -1858,7 +1743,7 @@ def from_config_file_with_hci( def __init__( self, name: Optional[str] = None, - address: Optional[Address] = None, + address: Optional[hci.Address] = None, config: Optional[DeviceConfiguration] = None, host: Optional[Host] = None, generic_access_service: bool = True, @@ -1890,8 +1775,8 @@ def __init__( self.inquiry_response = None self.address_resolver = None self.classic_pending_accepts = { - Address.ANY: [] - } # Futures, by BD address OR [Futures] for Address.ANY + hci.Address.ANY: [] + } # Futures, by BD address OR [Futures] for hci.Address.ANY # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated. if sys.version_info >= (3, 10): @@ -1907,7 +1792,7 @@ def __init__( self.config = config self.name = config.name - self.public_address = Address.ANY + self.public_address = hci.Address.ANY self.random_address = config.address self.static_address = config.address self.class_of_device = config.class_of_device @@ -1982,7 +1867,7 @@ def __init__( # If an address is passed, override the address from the config if address: if isinstance(address, str): - address = Address(address) + address = hci.Address(address) self.random_address = address self.static_address = address @@ -2062,7 +1947,7 @@ def lookup_connection(self, connection_handle: int) -> Optional[Connection]: def find_connection_by_bd_addr( self, - bd_addr: Address, + bd_addr: hci.Address, transport: Optional[int] = None, check_address_type: bool = False, ) -> Optional[Connection]: @@ -2201,8 +2086,8 @@ async def power_on(self) -> None: await self.host.reset() # Try to get the public address from the controller - response = await self.send_command(HCI_Read_BD_ADDR_Command()) - if response.return_parameters.status == HCI_SUCCESS: + response = await self.send_command(hci.HCI_Read_BD_ADDR_Command()) + if response.return_parameters.status == hci.HCI_SUCCESS: logger.debug( color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') ) @@ -2219,9 +2104,9 @@ async def power_on(self) -> None: smp.SMP_BR_CID, self.on_smp_pdu ) - if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): + if self.host.supports_command(hci.HCI_WRITE_LE_HOST_SUPPORT_COMMAND): await self.send_command( - HCI_Write_LE_Host_Support_Command( + hci.HCI_Write_LE_Host_Support_Command( le_supported_host=int(self.le_enabled), simultaneous_le_host=int(self.le_simultaneous_enabled), ), @@ -2230,12 +2115,12 @@ async def power_on(self) -> None: if self.le_enabled: # Generate a random address if not set. - if self.static_address == Address.ANY_RANDOM: - self.static_address = Address.generate_static_address() + if self.static_address == hci.Address.ANY_RANDOM: + self.static_address = hci.Address.generate_static_address() # If LE Privacy is enabled, generate an RPA if self.le_privacy_enabled: - self.random_address = Address.generate_private_address(self.irk) + self.random_address = hci.Address.generate_private_address(self.irk) logger.info(f'Initial RPA: {self.random_address}') if self.le_rpa_timeout > 0: # Start a task to periodically generate a new RPA @@ -2245,15 +2130,15 @@ async def power_on(self) -> None: else: self.random_address = self.static_address - if self.random_address != Address.ANY_RANDOM: + if self.random_address != hci.Address.ANY_RANDOM: logger.debug( color( - f'LE Random Address: {self.random_address}', + f'LE Random hci.Address: {self.random_address}', 'yellow', ) ) await self.send_command( - HCI_LE_Set_Random_Address_Command( + hci.HCI_LE_Set_Random_Address_Command( random_address=self.random_address ), check_result=True, @@ -2266,7 +2151,7 @@ async def power_on(self) -> None: # Enable address resolution if self.address_resolution_offload: await self.send_command( - HCI_LE_Set_Address_Resolution_Enable_Command( + hci.HCI_LE_Set_Address_Resolution_Enable_Command( address_resolution_enable=1 ), check_result=True, @@ -2274,8 +2159,8 @@ async def power_on(self) -> None: if self.cis_enabled: await self.send_command( - HCI_LE_Set_Host_Feature_Command( - bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM, + hci.HCI_LE_Set_Host_Feature_Command( + bit_number=hci.LeFeature.CONNECTED_ISOCHRONOUS_STREAM, bit_value=1, ), check_result=True, @@ -2283,18 +2168,20 @@ async def power_on(self) -> None: if self.classic_enabled: await self.send_command( - HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) + hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) ) await self.send_command( - HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) + hci.HCI_Write_Class_Of_Device_Command( + class_of_device=self.class_of_device + ) ) await self.send_command( - HCI_Write_Simple_Pairing_Mode_Command( + hci.HCI_Write_Simple_Pairing_Mode_Command( simple_pairing_mode=int(self.classic_ssp_enabled) ) ) await self.send_command( - HCI_Write_Secure_Connections_Host_Support_Command( + hci.HCI_Write_Secure_Connections_Host_Support_Command( secure_connections_host_support=int(self.classic_sc_enabled) ) ) @@ -2302,14 +2189,16 @@ async def power_on(self) -> None: await self.set_discoverable(self.discoverable) if self.classic_interlaced_scan_enabled: - if self.host.supports_lmp_features(LmpFeatureMask.INTERLACED_PAGE_SCAN): + if self.host.supports_lmp_features( + hci.LmpFeatureMask.INTERLACED_PAGE_SCAN + ): await self.send_command( hci.HCI_Write_Page_Scan_Type_Command(page_scan_type=1), check_result=True, ) if self.host.supports_lmp_features( - LmpFeatureMask.INTERLACED_INQUIRY_SCAN + hci.LmpFeatureMask.INTERLACED_INQUIRY_SCAN ): await self.send_command( hci.HCI_Write_Inquiry_Scan_Type_Command(scan_type=1), @@ -2344,11 +2233,11 @@ async def update_rpa(self) -> bool: logger.debug('skipping RPA update') return False - random_address = Address.generate_private_address(self.irk) + random_address = hci.Address.generate_private_address(self.irk) response = await self.send_command( - HCI_LE_Set_Random_Address_Command(random_address=self.random_address) + hci.HCI_LE_Set_Random_Address_Command(random_address=self.random_address) ) - if response.return_parameters == HCI_SUCCESS: + if response.return_parameters == hci.HCI_SUCCESS: logger.info(f'new RPA: {random_address}') self.random_address = random_address return True @@ -2371,13 +2260,13 @@ async def refresh_resolving_list(self) -> None: self.address_resolver = smp.AddressResolver(resolving_keys) if self.address_resolution_offload or self.address_generation_offload: - await self.send_command(HCI_LE_Clear_Resolving_List_Command()) + await self.send_command(hci.HCI_LE_Clear_Resolving_List_Command()) # Add an empty entry for non-directed address generation. await self.send_command( - HCI_LE_Add_Device_To_Resolving_List_Command( - peer_identity_address_type=Address.ANY.address_type, - peer_identity_address=Address.ANY, + hci.HCI_LE_Add_Device_To_Resolving_List_Command( + peer_identity_address_type=hci.Address.ANY.address_type, + peer_identity_address=hci.Address.ANY, peer_irk=bytes(16), local_irk=self.irk, ) @@ -2385,7 +2274,7 @@ async def refresh_resolving_list(self) -> None: for irk, address in resolving_keys: await self.send_command( - HCI_LE_Add_Device_To_Resolving_List_Command( + hci.HCI_LE_Add_Device_To_Resolving_List_Command( peer_identity_address_type=address.address_type, peer_identity_address=address, peer_irk=irk, @@ -2393,16 +2282,16 @@ async def refresh_resolving_list(self) -> None: ) ) - def supports_le_features(self, feature: LeFeatureMask) -> bool: + def supports_le_features(self, feature: hci.LeFeatureMask) -> bool: return self.host.supports_le_features(feature) def supports_le_phy(self, phy: int) -> bool: - if phy == HCI_LE_1M_PHY: + if phy == hci.HCI_LE_1M_PHY: return True feature_map = { - HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY, - HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY, + hci.HCI_LE_2M_PHY: hci.LeFeatureMask.LE_2M_PHY, + hci.HCI_LE_CODED_PHY: hci.LeFeatureMask.LE_CODED_PHY, } if phy not in feature_map: raise InvalidArgumentError('invalid PHY') @@ -2411,17 +2300,17 @@ def supports_le_phy(self, phy: int) -> bool: @property def supports_le_extended_advertising(self): - return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) + return self.supports_le_features(hci.LeFeatureMask.LE_EXTENDED_ADVERTISING) @property def supports_le_periodic_advertising(self): - return self.supports_le_features(LeFeatureMask.LE_PERIODIC_ADVERTISING) + return self.supports_le_features(hci.LeFeatureMask.LE_PERIODIC_ADVERTISING) async def start_advertising( self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, - target: Optional[Address] = None, - own_address_type: int = OwnAddressType.RANDOM, + target: Optional[hci.Address] = None, + own_address_type: int = hci.OwnAddressType.RANDOM, auto_restart: bool = False, advertising_data: Optional[bytes] = None, scan_response_data: Optional[bytes] = None, @@ -2472,7 +2361,7 @@ async def start_advertising( raise InvalidArgumentError('directed advertising requires a target') peer_address = target else: - peer_address = Address.ANY + peer_address = hci.Address.ANY # If we're already advertising, stop now because we'll be re-creating # a new advertiser or advertising set. @@ -2494,7 +2383,7 @@ async def start_advertising( ), primary_advertising_interval_min=self.advertising_interval_min, primary_advertising_interval_max=self.advertising_interval_max, - own_address_type=OwnAddressType(own_address_type), + own_address_type=hci.OwnAddressType(own_address_type), peer_address=peer_address, ), advertising_data=( @@ -2509,7 +2398,7 @@ async def start_advertising( self.legacy_advertiser = LegacyAdvertiser( device=self, advertising_type=advertising_type, - own_address_type=OwnAddressType(own_address_type), + own_address_type=hci.OwnAddressType(own_address_type), peer_address=peer_address, auto_restart=auto_restart, ) @@ -2531,7 +2420,7 @@ async def stop_advertising(self) -> None: async def create_advertising_set( self, advertising_parameters: Optional[AdvertisingParameters] = None, - random_address: Optional[Address] = None, + random_address: Optional[hci.Address] = None, advertising_data: bytes = b'', scan_response_data: bytes = b'', periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, @@ -2600,7 +2489,7 @@ async def create_advertising_set( # provided. if ( advertising_parameters.own_address_type - in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + in (hci.OwnAddressType.RANDOM, hci.OwnAddressType.RESOLVABLE_OR_RANDOM) and random_address is None ): random_address = self.random_address @@ -2640,11 +2529,11 @@ async def create_advertising_set( # TODO: call LE Set Periodic Advertising Data command raise NotImplementedError('periodic advertising not yet supported') - except HCI_Error as error: + except hci.HCI_Error as error: # Remove the advertising set so that it doesn't stay dangling in the # controller. await self.send_command( - HCI_LE_Remove_Advertising_Set_Command( + hci.HCI_LE_Remove_Advertising_Set_Command( advertising_handle=advertising_handle ), check_result=False, @@ -2687,9 +2576,9 @@ async def start_scanning( active: bool = True, scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms - own_address_type: int = OwnAddressType.RANDOM, + own_address_type: int = hci.OwnAddressType.RANDOM, filter_duplicates: bool = False, - scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY], + scanning_phys: List[int] = [hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY], ) -> None: # Check that the arguments are legal if scan_interval < scan_window: @@ -2709,29 +2598,29 @@ async def start_scanning( if not legacy and self.supports_le_extended_advertising: # Set the scanning parameters scan_type = ( - HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING + hci.HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active - else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING + else hci.HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING ) scanning_filter_policy = ( - HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY + hci.HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY ) # TODO: support other types scanning_phy_count = 0 scanning_phys_bits = 0 - if HCI_LE_1M_PHY in scanning_phys: - scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT + if hci.HCI_LE_1M_PHY in scanning_phys: + scanning_phys_bits |= 1 << hci.HCI_LE_1M_PHY_BIT scanning_phy_count += 1 - if HCI_LE_CODED_PHY in scanning_phys: - if self.supports_le_features(LeFeatureMask.LE_CODED_PHY): - scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT + if hci.HCI_LE_CODED_PHY in scanning_phys: + if self.supports_le_features(hci.LeFeatureMask.LE_CODED_PHY): + scanning_phys_bits |= 1 << hci.HCI_LE_CODED_PHY_BIT scanning_phy_count += 1 if scanning_phy_count == 0: raise InvalidArgumentError('at least one scanning PHY must be enabled') await self.send_command( - HCI_LE_Set_Extended_Scan_Parameters_Command( + hci.HCI_LE_Set_Extended_Scan_Parameters_Command( own_address_type=own_address_type, scanning_filter_policy=scanning_filter_policy, scanning_phys=scanning_phys_bits, @@ -2744,7 +2633,7 @@ async def start_scanning( # Enable scanning await self.send_command( - HCI_LE_Set_Extended_Scan_Enable_Command( + hci.HCI_LE_Set_Extended_Scan_Enable_Command( enable=1, filter_duplicates=1 if filter_duplicates else 0, duration=0, # TODO allow other values @@ -2755,25 +2644,25 @@ async def start_scanning( else: # Set the scanning parameters scan_type = ( - HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING + hci.HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active - else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING + else hci.HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING ) await self.send_command( # pylint: disable=line-too-long - HCI_LE_Set_Scan_Parameters_Command( + hci.HCI_LE_Set_Scan_Parameters_Command( le_scan_type=scan_type, le_scan_interval=int(scan_window / 0.625), le_scan_window=int(scan_window / 0.625), own_address_type=own_address_type, - scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, + scanning_filter_policy=hci.HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, ), check_result=True, ) # Enable scanning await self.send_command( - HCI_LE_Set_Scan_Enable_Command( + hci.HCI_LE_Set_Scan_Enable_Command( le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 ), check_result=True, @@ -2786,14 +2675,16 @@ async def stop_scanning(self, legacy: bool = False) -> None: # Disable scanning if not legacy and self.supports_le_extended_advertising: await self.send_command( - HCI_LE_Set_Extended_Scan_Enable_Command( + hci.HCI_LE_Set_Extended_Scan_Enable_Command( enable=0, filter_duplicates=0, duration=0, period=0 ), check_result=True, ) else: await self.send_command( - HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), + hci.HCI_LE_Set_Scan_Enable_Command( + le_scan_enable=0, filter_duplicates=0 + ), check_result=True, ) @@ -2813,7 +2704,7 @@ def on_advertising_report(self, report): async def create_periodic_advertising_sync( self, - advertiser_address: Address, + advertiser_address: hci.Address, sid: int, skip: int = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP, sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT, @@ -2874,7 +2765,7 @@ def on_periodic_advertising_sync_establishment( status: int, sync_handle: int, advertising_sid: int, - advertiser_address: Address, + advertiser_address: hci.Address, advertiser_phy: int, periodic_advertising_interval: int, advertiser_clock_accuracy: int, @@ -2912,7 +2803,7 @@ def on_periodic_advertising_sync_loss( def on_periodic_advertising_report( self, periodic_advertising_sync: PeriodicAdvertisingSync, - report: HCI_LE_Periodic_Advertising_Report_Event, + report: hci.HCI_LE_Periodic_Advertising_Report_Event, ): periodic_advertising_sync.on_periodic_advertising_report(report) @@ -2921,33 +2812,35 @@ def on_periodic_advertising_report( def on_biginfo_advertising_report( self, periodic_advertising_sync: PeriodicAdvertisingSync, - report: HCI_LE_BIGInfo_Advertising_Report_Event, + report: hci.HCI_LE_BIGInfo_Advertising_Report_Event, ): periodic_advertising_sync.on_biginfo_advertising_report(report) async def start_discovery(self, auto_restart: bool = True) -> None: await self.send_command( - HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), + hci.HCI_Write_Inquiry_Mode_Command( + inquiry_mode=hci.HCI_EXTENDED_INQUIRY_MODE + ), check_result=True, ) response = await self.send_command( - HCI_Inquiry_Command( - lap=HCI_GENERAL_INQUIRY_LAP, + hci.HCI_Inquiry_Command( + lap=hci.HCI_GENERAL_INQUIRY_LAP, inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, num_responses=0, # Unlimited number of responses. ) ) - if response.status != HCI_Command_Status_Event.PENDING: + if response.status != hci.HCI_Command_Status_Event.PENDING: self.discovering = False - raise HCI_StatusError(response) + raise hci.HCI_StatusError(response) self.auto_restart_inquiry = auto_restart self.discovering = True async def stop_discovery(self) -> None: if self.discovering: - await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) + await self.send_command(hci.HCI_Inquiry_Cancel_Command(), check_result=True) self.auto_restart_inquiry = True self.discovering = False @@ -2972,7 +2865,7 @@ async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled): scan_enable = 0x00 return await self.send_command( - HCI_Write_Scan_Enable_Command(scan_enable=scan_enable) + hci.HCI_Write_Scan_Enable_Command(scan_enable=scan_enable) ) async def set_discoverable(self, discoverable: bool = True) -> None: @@ -2993,7 +2886,7 @@ async def set_discoverable(self, discoverable: bool = True) -> None: # Update the controller await self.send_command( - HCI_Write_Extended_Inquiry_Response_Command( + hci.HCI_Write_Extended_Inquiry_Response_Command( fec_required=0, extended_inquiry_response=self.inquiry_response ), check_result=True, @@ -3013,12 +2906,12 @@ async def set_connectable(self, connectable: bool = True) -> None: async def connect( self, - peer_address: Union[Address, str], + peer_address: Union[hci.Address, str], transport: int = BT_LE_TRANSPORT, connection_parameters_preferences: Optional[ Dict[int, ConnectionParametersPreferences] ] = None, - own_address_type: int = OwnAddressType.RANDOM, + own_address_type: int = hci.OwnAddressType.RANDOM, timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, always_resolve: bool = False, ) -> Connection: @@ -3030,7 +2923,7 @@ async def connect( Args: peer_address: - Address or name of the device to connect to. + hci.Address or name of the device to connect to. If a string is passed: If the string is an address followed by a `@` suffix, the `always_resolve` argument is implicitly set to True, so the connection is made to the @@ -3050,8 +2943,8 @@ async def connect( own_address_type: (BLE only, ignored for BR/EDR) - OwnAddressType.RANDOM to use this device's random address, or - OwnAddressType.PUBLIC to use this device's public address. + hci.OwnAddressType.RANDOM to use this device's random address, or + hci.OwnAddressType.PUBLIC to use this device's public address. timeout: Maximum time to wait for a connection to be established, in seconds. @@ -3080,13 +2973,13 @@ async def connect( if isinstance(peer_address, str): try: if transport == BT_LE_TRANSPORT and peer_address.endswith('@'): - peer_address = Address.from_string_for_transport( + peer_address = hci.Address.from_string_for_transport( peer_address[:-1], transport ) always_resolve = True logger.debug('forcing address resolution') else: - peer_address = Address.from_string_for_transport( + peer_address = hci.Address.from_string_for_transport( peer_address, transport ) except (InvalidArgumentError, ValueError): @@ -3100,11 +2993,11 @@ async def connect( # All BR/EDR addresses should be public addresses if ( transport == BT_BR_EDR_TRANSPORT - and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS + and peer_address.address_type != hci.Address.PUBLIC_DEVICE_ADDRESS ): raise InvalidArgumentError('BR/EDR addresses must be PUBLIC') - assert isinstance(peer_address, Address) + assert isinstance(peer_address, hci.Address) if transport == BT_LE_TRANSPORT and always_resolve: logger.debug('resolving address') @@ -3139,13 +3032,13 @@ def on_connection_failure(error): if connection_parameters_preferences is None: if connection_parameters_preferences is None: connection_parameters_preferences = { - HCI_LE_1M_PHY: ConnectionParametersPreferences.default + hci.HCI_LE_1M_PHY: ConnectionParametersPreferences.default } self.connect_own_address_type = own_address_type if self.host.supports_command( - HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND + hci.HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND ): # Only keep supported PHYs phys = sorted( @@ -3162,7 +3055,7 @@ def on_connection_failure(error): raise InvalidArgumentError('at least one supported PHY needed') phy_count = len(phys) - initiating_phys = phy_list_to_bits(phys) + initiating_phys = hci.phy_list_to_bits(phys) connection_interval_mins = [ int( @@ -3207,7 +3100,7 @@ def on_connection_failure(error): ] result = await self.send_command( - HCI_LE_Extended_Create_Connection_Command( + hci.HCI_LE_Extended_Create_Connection_Command( initiator_filter_policy=0, own_address_type=own_address_type, peer_address_type=peer_address.address_type, @@ -3230,12 +3123,12 @@ def on_connection_failure(error): ) ) else: - if HCI_LE_1M_PHY not in connection_parameters_preferences: + if hci.HCI_LE_1M_PHY not in connection_parameters_preferences: raise InvalidArgumentError('1M PHY preferences required') - prefs = connection_parameters_preferences[HCI_LE_1M_PHY] + prefs = connection_parameters_preferences[hci.HCI_LE_1M_PHY] result = await self.send_command( - HCI_LE_Create_Connection_Command( + hci.HCI_LE_Create_Connection_Command( le_scan_interval=int( DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625 ), @@ -3266,18 +3159,18 @@ def on_connection_failure(error): # TODO: allow passing other settings result = await self.send_command( - HCI_Create_Connection_Command( + hci.HCI_Create_Connection_Command( bd_addr=peer_address, packet_type=0xCC18, # FIXME: change - page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE, + page_scan_repetition_mode=hci.HCI_R2_PAGE_SCAN_REPETITION_MODE, clock_offset=0x0000, allow_role_switch=0x01, reserved=0, ) ) - if result.status != HCI_Command_Status_Event.PENDING: - raise HCI_StatusError(result) + if result.status != hci.HCI_Command_Status_Event.PENDING: + raise hci.HCI_StatusError(result) # Wait for the connection process to complete if transport == BT_LE_TRANSPORT: @@ -3292,10 +3185,12 @@ def on_connection_failure(error): ) except asyncio.TimeoutError: if transport == BT_LE_TRANSPORT: - await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) + await self.send_command( + hci.HCI_LE_Create_Connection_Cancel_Command() + ) else: await self.send_command( - HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) + hci.HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) ) try: @@ -3313,7 +3208,7 @@ def on_connection_failure(error): async def accept( self, - peer_address: Union[Address, str] = Address.ANY, + peer_address: Union[hci.Address, str] = hci.Address.ANY, role: int = BT_PERIPHERAL_ROLE, timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, ) -> Connection: @@ -3330,7 +3225,7 @@ async def accept( if isinstance(peer_address, str): try: - peer_address = Address(peer_address) + peer_address = hci.Address(peer_address) except InvalidArgumentError: # If the address is not parsable, assume it is a name instead logger.debug('looking for peer by name') @@ -3338,16 +3233,16 @@ async def accept( peer_address, BT_BR_EDR_TRANSPORT ) # TODO: timeout - assert isinstance(peer_address, Address) + assert isinstance(peer_address, hci.Address) - if peer_address == Address.NIL: + if peer_address == hci.Address.NIL: raise InvalidArgumentError('accept on nil address') # Create a future so that we can wait for the request pending_request_fut = asyncio.get_running_loop().create_future() - if peer_address == Address.ANY: - self.classic_pending_accepts[Address.ANY].append(pending_request_fut) + if peer_address == hci.Address.ANY: + self.classic_pending_accepts[hci.Address.ANY].append(pending_request_fut) elif peer_address in self.classic_pending_accepts: raise InvalidStateError('accept connection already pending') else: @@ -3363,8 +3258,10 @@ async def accept( ) except Exception: # Remove future from device context - if peer_address == Address.ANY: - self.classic_pending_accepts[Address.ANY].remove(pending_request_fut) + if peer_address == hci.Address.ANY: + self.classic_pending_accepts[hci.Address.ANY].remove( + pending_request_fut + ) else: self.classic_pending_accepts.pop(peer_address) raise @@ -3376,7 +3273,7 @@ async def accept( # Otherwise, result came from `on_connection_request` peer_address, _class_of_device, _link_type = result - assert isinstance(peer_address, Address) + assert isinstance(peer_address, hci.Address) # Create a future so that we can wait for the connection's result pending_connection = asyncio.get_running_loop().create_future() @@ -3399,7 +3296,7 @@ def on_connection_failure(error): self.on('connection_failure', on_connection_failure) # Save pending connection, with the Peripheral role. - # Even if we requested a role switch in the HCI_Accept_Connection_Request + # Even if we requested a role switch in the hci.HCI_Accept_Connection_Request # command, this connection is still considered Peripheral until an eventual # role change event. self.pending_connections[peer_address] = Connection.incomplete( @@ -3409,7 +3306,9 @@ def on_connection_failure(error): try: # Accept connection request await self.send_command( - HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) + hci.HCI_Accept_Connection_Request_Command( + bd_addr=peer_address, role=role + ) ) # Wait for connection complete @@ -3444,7 +3343,7 @@ async def cancel_connection(self, peer_address=None): if not self.is_le_connecting: return await self.send_command( - HCI_LE_Create_Connection_Cancel_Command(), check_result=True + hci.HCI_LE_Create_Connection_Cancel_Command(), check_result=True ) # BR/EDR: try to cancel to ongoing connection @@ -3453,7 +3352,7 @@ async def cancel_connection(self, peer_address=None): else: if isinstance(peer_address, str): try: - peer_address = Address(peer_address) + peer_address = hci.Address(peer_address) except InvalidArgumentError: # If the address is not parsable, assume it is a name instead logger.debug('looking for peer by name') @@ -3462,7 +3361,7 @@ async def cancel_connection(self, peer_address=None): ) # TODO: timeout await self.send_command( - HCI_Create_Connection_Cancel_Command(bd_addr=peer_address), + hci.HCI_Create_Connection_Cancel_Command(bd_addr=peer_address), check_result=True, ) @@ -3476,12 +3375,14 @@ async def disconnect( # Request a disconnection result = await self.send_command( - HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) + hci.HCI_Disconnect_Command( + connection_handle=connection.handle, reason=reason + ) ) try: - if result.status != HCI_Command_Status_Event.PENDING: - raise HCI_StatusError(result) + if result.status != hci.HCI_Command_Status_Event.PENDING: + raise hci.HCI_StatusError(result) # Wait for the disconnection process to complete self.disconnecting = True @@ -3503,7 +3404,7 @@ async def set_data_length(self, connection, tx_octets, tx_time) -> None: raise InvalidArgumentError('tx_time must be between 0x0148 and 0x4290') return await self.send_command( - HCI_LE_Set_Data_Length_Command( + hci.HCI_LE_Set_Data_Length_Command( connection_handle=connection.handle, tx_octets=tx_octets, tx_time=tx_time, @@ -3545,7 +3446,7 @@ async def update_connection_parameters( raise ConnectionParameterUpdateError(l2cap_result) result = await self.send_command( - HCI_LE_Connection_Update_Command( + hci.HCI_LE_Connection_Update_Command( connection_handle=connection.handle, connection_interval_min=connection_interval_min, connection_interval_max=connection_interval_max, @@ -3555,18 +3456,18 @@ async def update_connection_parameters( max_ce_length=max_ce_length, ) ) - if result.status != HCI_Command_Status_Event.PENDING: - raise HCI_StatusError(result) + if result.status != hci.HCI_Command_Status_Event.PENDING: + raise hci.HCI_StatusError(result) async def get_connection_rssi(self, connection): result = await self.send_command( - HCI_Read_RSSI_Command(handle=connection.handle), check_result=True + hci.HCI_Read_RSSI_Command(handle=connection.handle), check_result=True ) return result.return_parameters.rssi async def get_connection_phy(self, connection): result = await self.send_command( - HCI_LE_Read_PHY_Command(connection_handle=connection.handle), + hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle), check_result=True, ) return (result.return_parameters.tx_phy, result.return_parameters.rx_phy) @@ -3574,7 +3475,7 @@ async def get_connection_phy(self, connection): async def set_connection_phy( self, connection, tx_phys=None, rx_phys=None, phy_options=None ): - if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND): + if not self.host.supports_command(hci.HCI_LE_SET_PHY_COMMAND): logger.warning('ignoring request, command not supported') return @@ -3583,21 +3484,21 @@ async def set_connection_phy( ) result = await self.send_command( - HCI_LE_Set_PHY_Command( + hci.HCI_LE_Set_PHY_Command( connection_handle=connection.handle, all_phys=all_phys_bits, - tx_phys=phy_list_to_bits(tx_phys), - rx_phys=phy_list_to_bits(rx_phys), + tx_phys=hci.phy_list_to_bits(tx_phys), + rx_phys=hci.phy_list_to_bits(rx_phys), phy_options=0 if phy_options is None else int(phy_options), ) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_LE_Set_PHY_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) async def set_default_phy(self, tx_phys=None, rx_phys=None): all_phys_bits = (1 if tx_phys is None else 0) | ( @@ -3605,10 +3506,10 @@ async def set_default_phy(self, tx_phys=None, rx_phys=None): ) return await self.send_command( - HCI_LE_Set_Default_PHY_Command( + hci.HCI_LE_Set_Default_PHY_Command( all_phys=all_phys_bits, - tx_phys=phy_list_to_bits(tx_phys), - rx_phys=phy_list_to_bits(rx_phys), + tx_phys=hci.phy_list_to_bits(tx_phys), + rx_phys=hci.phy_list_to_bits(rx_phys), ), check_result=True, ) @@ -3617,7 +3518,7 @@ async def transfer_periodic_sync( self, connection: Connection, sync_handle: int, service_data: int = 0 ) -> None: return await self.send_command( - HCI_LE_Periodic_Advertising_Sync_Transfer_Command( + hci.HCI_LE_Periodic_Advertising_Sync_Transfer_Command( connection_handle=connection.handle, service_data=service_data, sync_handle=sync_handle, @@ -3681,7 +3582,9 @@ def on_peer_found(address, ad_data): elif transport == BT_BR_EDR_TRANSPORT and not was_discovering: await self.stop_discovery() - async def find_peer_by_identity_address(self, identity_address: Address) -> Address: + async def find_peer_by_identity_address( + self, identity_address: hci.Address + ) -> hci.Address: """ Scan for a peer with a resolvable address that can be resolved to a given identity address. @@ -3777,7 +3680,7 @@ async def get_long_term_key( return keys.ltk_peripheral.value return None - async def get_link_key(self, address: Address) -> Optional[bytes]: + async def get_link_key(self, address: hci.Address) -> Optional[bytes]: if self.keystore is None: return None @@ -3803,7 +3706,7 @@ def on_authentication(): pending_authentication.set_result(None) def on_authentication_failure(error_code): - pending_authentication.set_exception(HCI_Error(error_code)) + pending_authentication.set_exception(hci.HCI_Error(error_code)) connection.on('connection_authentication', on_authentication) connection.on('connection_authentication_failure', on_authentication_failure) @@ -3811,16 +3714,16 @@ def on_authentication_failure(error_code): # Request the authentication try: result = await self.send_command( - HCI_Authentication_Requested_Command( + hci.HCI_Authentication_Requested_Command( connection_handle=connection.handle ) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_Authentication_Requested_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) # Wait for the authentication to complete await connection.abort_on('disconnection', pending_authentication) @@ -3841,7 +3744,7 @@ def on_encryption_change(): pending_encryption.set_result(None) def on_encryption_failure(error_code): - pending_encryption.set_exception(HCI_Error(error_code)) + pending_encryption.set_exception(hci.HCI_Error(error_code)) connection.on('connection_encryption_change', on_encryption_change) connection.on('connection_encryption_failure', on_encryption_failure) @@ -3869,11 +3772,11 @@ def on_encryption_failure(error_code): else: raise InvalidOperationError('no LTK found for peer') - if connection.role != HCI_CENTRAL_ROLE: + if connection.role != hci.HCI_CENTRAL_ROLE: raise InvalidStateError('only centrals can start encryption') result = await self.send_command( - HCI_LE_Enable_Encryption_Command( + hci.HCI_LE_Enable_Encryption_Command( connection_handle=connection.handle, random_number=rand, encrypted_diversifier=ediv, @@ -3881,26 +3784,26 @@ def on_encryption_failure(error_code): ) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_LE_Enable_Encryption_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) else: result = await self.send_command( - HCI_Set_Connection_Encryption_Command( + hci.HCI_Set_Connection_Encryption_Command( connection_handle=connection.handle, encryption_enable=0x01 if enable else 0x00, ) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_Set_Connection_Encryption_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) # Wait for the result await connection.abort_on('disconnection', pending_encryption) @@ -3932,32 +3835,34 @@ def on_role_change(new_role): pending_role_change.set_result(new_role) def on_role_change_failure(error_code): - pending_role_change.set_exception(HCI_Error(error_code)) + pending_role_change.set_exception(hci.HCI_Error(error_code)) connection.on('role_change', on_role_change) connection.on('role_change_failure', on_role_change_failure) try: result = await self.send_command( - HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) + hci.HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_Switch_Role_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) await connection.abort_on('disconnection', pending_role_change) finally: connection.remove_listener('role_change', on_role_change) connection.remove_listener('role_change_failure', on_role_change_failure) # [Classic only] - async def request_remote_name(self, remote: Union[Address, Connection]) -> str: + async def request_remote_name(self, remote: Union[hci.Address, Connection]) -> str: # Set up event handlers pending_name = asyncio.get_running_loop().create_future() - peer_address = remote if isinstance(remote, Address) else remote.peer_address + peer_address = ( + remote if isinstance(remote, hci.Address) else remote.peer_address + ) handler = self.on( 'remote_name', @@ -3970,7 +3875,7 @@ async def request_remote_name(self, remote: Union[Address, Connection]) -> str: failure_handler = self.on( 'remote_name_failure', lambda address, error_code: ( - pending_name.set_exception(HCI_Error(error_code)) + pending_name.set_exception(hci.HCI_Error(error_code)) if address == peer_address else None ), @@ -3978,20 +3883,20 @@ async def request_remote_name(self, remote: Union[Address, Connection]) -> str: try: result = await self.send_command( - HCI_Remote_Name_Request_Command( + hci.HCI_Remote_Name_Request_Command( bd_addr=peer_address, - page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, + page_scan_repetition_mode=hci.HCI_Remote_Name_Request_Command.R2, reserved=0, clock_offset=0, # TODO investigate non-0 values ) ) - if result.status != HCI_COMMAND_STATUS_PENDING: + if result.status != hci.HCI_COMMAND_STATUS_PENDING: logger.warning( 'HCI_Remote_Name_Request_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' + f'{hci.HCI_Constant.error_name(result.status)}' ) - raise HCI_StatusError(result) + raise hci.HCI_StatusError(result) # Wait for the result return await self.abort_on('flush', pending_name) @@ -4011,7 +3916,7 @@ async def setup_cig( retransmission_number: int, max_transport_latency: Tuple[int, int], ) -> List[int]: - """Sends HCI_LE_Set_CIG_Parameters_Command. + """Sends hci.HCI_LE_Set_CIG_Parameters_Command. Args: cig_id: CIG_ID. @@ -4029,7 +3934,7 @@ async def setup_cig( num_cis = len(cis_id) response = await self.send_command( - HCI_LE_Set_CIG_Parameters_Command( + hci.HCI_LE_Set_CIG_Parameters_Command( cig_id=cig_id, sdu_interval_c_to_p=sdu_interval[0], sdu_interval_p_to_c=sdu_interval[1], @@ -4041,8 +3946,8 @@ async def setup_cig( cis_id=cis_id, max_sdu_c_to_p=[max_sdu[0]] * num_cis, max_sdu_p_to_c=[max_sdu[1]] * num_cis, - phy_c_to_p=[HCI_LE_2M_PHY] * num_cis, - phy_p_to_c=[HCI_LE_2M_PHY] * num_cis, + phy_c_to_p=[hci.HCI_LE_2M_PHY] * num_cis, + phy_p_to_c=[hci.HCI_LE_2M_PHY] * num_cis, rtn_c_to_p=[retransmission_number] * num_cis, rtn_p_to_c=[retransmission_number] * num_cis, ), @@ -4084,12 +3989,12 @@ def on_cis_establishment(cis_link: CisLink) -> None: def on_cis_establishment_failure(cis_handle: int, status: int) -> None: if pending_future := pending_cis_establishments.get(cis_handle): - pending_future.set_exception(HCI_Error(status)) + pending_future.set_exception(hci.HCI_Error(status)) watcher.on(self, 'cis_establishment', on_cis_establishment) watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure) await self.send_command( - HCI_LE_Create_CIS_Command( + hci.HCI_LE_Create_CIS_Command( cis_connection_handle=[p[0] for p in cis_acl_pairs], acl_connection_handle=[p[1] for p in cis_acl_pairs], ), @@ -4128,13 +4033,13 @@ def on_establishment() -> None: pending_establishment.set_result(None) def on_establishment_failure(status: int) -> None: - pending_establishment.set_exception(HCI_Error(status)) + pending_establishment.set_exception(hci.HCI_Error(status)) watcher.on(cis_link, 'establishment', on_establishment) watcher.on(cis_link, 'establishment_failure', on_establishment_failure) await self.send_command( - HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), + hci.HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), check_result=True, ) @@ -4149,14 +4054,16 @@ def on_establishment_failure(status: int) -> None: async def reject_cis_request( self, handle: int, - reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) -> None: await self.send_command( - HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason), + hci.HCI_LE_Reject_CIS_Request_Command( + connection_handle=handle, reason=reason + ), check_result=True, ) - async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: + async def get_remote_le_features(self, connection: Connection) -> hci.LeFeatureMask: """[LE Only] Reads remote LE supported features. Args: @@ -4166,22 +4073,22 @@ async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: LE features supported by the remote device. """ with closing(EventWatcher()) as watcher: - read_feature_future: asyncio.Future[LeFeatureMask] = ( + read_feature_future: asyncio.Future[hci.LeFeatureMask] = ( asyncio.get_running_loop().create_future() ) def on_le_remote_features(handle: int, features: int): if handle == connection.handle: - read_feature_future.set_result(LeFeatureMask(features)) + read_feature_future.set_result(hci.LeFeatureMask(features)) def on_failure(handle: int, status: int): if handle == connection.handle: - read_feature_future.set_exception(HCI_Error(status)) + read_feature_future.set_exception(hci.HCI_Error(status)) watcher.on(self.host, 'le_remote_features', on_le_remote_features) watcher.on(self.host, 'le_remote_features_failure', on_failure) await self.send_command( - HCI_LE_Read_Remote_Features_Command( + hci.HCI_LE_Read_Remote_Features_Command( connection_handle=connection.handle ), check_result=True, @@ -4201,8 +4108,8 @@ def on_link_key(self, bd_addr, link_key, key_type): # Store the keys in the key store if self.keystore: authenticated = key_type in ( - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, + hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, ) pairing_keys = PairingKeys() pairing_keys.link_key = PairingKeys.Key( @@ -4256,7 +4163,7 @@ def on_advertising_set_termination( advertising_set.on_termination(status) - if status != HCI_SUCCESS: + if status != hci.HCI_SUCCESS: logger.debug( f'advertising set {advertising_handle} ' f'terminated with status {status}' @@ -4285,13 +4192,13 @@ def _complete_le_extended_advertising_connection( advertising_set.random_address if advertising_set.random_address is not None and advertising_set.advertising_parameters.own_address_type - in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + in (hci.OwnAddressType.RANDOM, hci.OwnAddressType.RESOLVABLE_OR_RANDOM) else self.public_address ) if advertising_set.advertising_parameters.own_address_type in ( - OwnAddressType.RANDOM, - OwnAddressType.PUBLIC, + hci.OwnAddressType.RANDOM, + hci.OwnAddressType.PUBLIC, ): connection.self_resolvable_address = None @@ -4307,11 +4214,11 @@ def _complete_le_extended_advertising_connection( def _emit_le_connection(self, connection: Connection) -> None: # If supported, read which PHY we're connected with before # notifying listeners of the new connection. - if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + if self.host.supports_command(hci.HCI_LE_READ_PHY_COMMAND): async def read_phy(): result = await self.send_command( - HCI_LE_Read_PHY_Command(connection_handle=connection.handle), + hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle), check_result=True, ) connection.phy = ConnectionPHY( @@ -4332,24 +4239,24 @@ def on_connection( self, connection_handle: int, transport: int, - peer_address: Address, - self_resolvable_address: Optional[Address], - peer_resolvable_address: Optional[Address], + peer_address: hci.Address, + self_resolvable_address: Optional[hci.Address], + peer_resolvable_address: Optional[hci.Address], role: int, connection_parameters: ConnectionParameters, ) -> None: # Convert all-zeros addresses into None. - if self_resolvable_address == Address.ANY_RANDOM: + if self_resolvable_address == hci.Address.ANY_RANDOM: self_resolvable_address = None if ( - peer_resolvable_address == Address.ANY_RANDOM + peer_resolvable_address == hci.Address.ANY_RANDOM or not peer_address.is_resolved ): peer_resolvable_address = None logger.debug( f'*** Connection: [0x{connection_handle:04X}] ' - f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}' + f'{peer_address} {"" if role is None else hci.HCI_Constant.role_name(role)}' ) if connection_handle in self.connections: logger.warning( @@ -4373,20 +4280,20 @@ def on_connection( if peer_address.is_resolvable: resolved_address = self.address_resolver.resolve(peer_address) if resolved_address is not None: - logger.debug(f'*** Address resolved as {resolved_address}') + logger.debug(f'*** hci.Address resolved as {resolved_address}') peer_resolvable_address = peer_address peer_address = resolved_address self_address = None own_address_type: Optional[int] = None - if role == HCI_CENTRAL_ROLE: + if role == hci.HCI_CENTRAL_ROLE: own_address_type = self.connect_own_address_type assert own_address_type is not None else: if self.supports_le_extended_advertising: # We'll know the address when the advertising set terminates, # Use a temporary placeholder value for self_address. - self_address = Address.ANY_RANDOM + self_address = hci.Address.ANY_RANDOM else: # We were connected via a legacy advertisement. if self.legacy_advertiser: @@ -4401,15 +4308,15 @@ def on_connection( self.public_address if own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, + hci.OwnAddressType.PUBLIC, + hci.OwnAddressType.RESOLVABLE_OR_PUBLIC, ) else self.random_address ) # Some controllers may return local resolvable address even not using address # generation offloading. Ignore the value to prevent SMP failure. - if own_address_type in (OwnAddressType.RANDOM, OwnAddressType.PUBLIC): + if own_address_type in (hci.OwnAddressType.RANDOM, hci.OwnAddressType.PUBLIC): self_resolvable_address = None # Create a connection. @@ -4423,11 +4330,11 @@ def on_connection( peer_resolvable_address, role, connection_parameters, - ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), + ConnectionPHY(hci.HCI_LE_1M_PHY, hci.HCI_LE_1M_PHY), ) self.connections[connection_handle] = connection - if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser: + if role == hci.HCI_PERIPHERAL_ROLE and self.legacy_advertiser: if self.legacy_advertiser.auto_restart: advertiser = self.legacy_advertiser connection.once( @@ -4437,12 +4344,12 @@ def on_connection( else: self.legacy_advertiser = None - if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: + if role == hci.HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: # We can emit now, we have all the info we need self._emit_le_connection(connection) return - if role == HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising: + if role == hci.HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising: if advertising_set := self.connecting_extended_advertising_sets.pop( connection_handle, None ): @@ -4453,7 +4360,9 @@ def on_connection( @host_event_handler def on_connection_failure(self, transport, peer_address, error_code): - logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}') + logger.debug( + f'*** Connection failed: {hci.HCI_Constant.error_name(error_code)}' + ) # For directed advertising, this means a timeout if ( @@ -4469,7 +4378,7 @@ def on_connection_failure(self, transport, peer_address, error_code): transport, peer_address, 'hci', - HCI_Constant.error_name(error_code), + hci.HCI_Constant.error_name(error_code), ) self.emit('connection_failure', error) @@ -4480,8 +4389,8 @@ def on_connection_request(self, bd_addr, class_of_device, link_type): # Handle SCO request. if link_type in ( - HCI_Connection_Complete_Event.SCO_LINK_TYPE, - HCI_Connection_Complete_Event.ESCO_LINK_TYPE, + hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE, + hci.HCI_Connection_Complete_Event.ESCO_LINK_TYPE, ): if connection := self.find_connection_by_bd_addr( bd_addr, transport=BT_BR_EDR_TRANSPORT @@ -4497,8 +4406,8 @@ def on_connection_request(self, bd_addr, class_of_device, link_type): future.set_result((bd_addr, class_of_device, link_type)) # match first pending future for ANY address - elif len(self.classic_pending_accepts[Address.ANY]) > 0: - future = self.classic_pending_accepts[Address.ANY].pop(0) + elif len(self.classic_pending_accepts[hci.Address.ANY]) > 0: + future = self.classic_pending_accepts[hci.Address.ANY].pop(0) future.set_result((bd_addr, class_of_device, link_type)) # device configuration is set to accept any incoming connection @@ -4509,7 +4418,7 @@ def on_connection_request(self, bd_addr, class_of_device, link_type): ) self.host.send_command_sync( - HCI_Accept_Connection_Request_Command( + hci.HCI_Accept_Connection_Request_Command( bd_addr=bd_addr, role=0x01 # Remain the peripheral ) ) @@ -4517,9 +4426,9 @@ def on_connection_request(self, bd_addr, class_of_device, link_type): # reject incoming connection else: self.host.send_command_sync( - HCI_Reject_Connection_Request_Command( + hci.HCI_Reject_Connection_Request_Command( bd_addr=bd_addr, - reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, + reason=hci.HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, ) ) @@ -4552,7 +4461,7 @@ def on_disconnection_failure(self, connection, error_code): connection.transport, connection.peer_address, 'hci', - HCI_Constant.error_name(error_code), + hci.HCI_Constant.error_name(error_code), ) connection.emit('disconnection_failure', error) @@ -4597,19 +4506,19 @@ def on_authentication_io_capability_request(self, connection): authentication_requirements = ( # No Bonding ( - HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, ), # General Bonding ( - HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, - HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, ), )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0] # Respond self.host.send_command_sync( - HCI_IO_Capability_Request_Reply_Command( + hci.HCI_IO_Capability_Request_Reply_Command( bd_addr=connection.peer_address, io_capability=pairing_config.delegate.classic_io_capability, oob_data_present=0x00, # Not present @@ -4659,29 +4568,29 @@ async def na() -> bool: # See Bluetooth spec @ Vol 3, Part C 5.2.2.6 methods = { - HCI_DISPLAY_ONLY_IO_CAPABILITY: { - HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, - HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, - HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: { + hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, + hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, + hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, }, - HCI_DISPLAY_YES_NO_IO_CAPABILITY: { - HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, - HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, - HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: { + hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, + hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, + hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, }, - HCI_KEYBOARD_ONLY_IO_CAPABILITY: { - HCI_DISPLAY_ONLY_IO_CAPABILITY: na, - HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, - HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: { + hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: na, + hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, + hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, }, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { - HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, - HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, - HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { + hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, + hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, + hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, + hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, }, } @@ -4691,7 +4600,7 @@ async def reply() -> None: try: if await connection.abort_on('disconnection', method()): await self.host.send_command( - HCI_User_Confirmation_Request_Reply_Command( + hci.HCI_User_Confirmation_Request_Reply_Command( bd_addr=connection.peer_address ) ) @@ -4700,7 +4609,7 @@ async def reply() -> None: logger.warning(f'exception while confirming: {error}') await self.host.send_command( - HCI_User_Confirmation_Request_Negative_Reply_Command( + hci.HCI_User_Confirmation_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -4721,7 +4630,7 @@ async def reply() -> None: ) if number is not None: await self.host.send_command( - HCI_User_Passkey_Request_Reply_Command( + hci.HCI_User_Passkey_Request_Reply_Command( bd_addr=connection.peer_address, numeric_value=number ) ) @@ -4730,7 +4639,7 @@ async def reply() -> None: logger.warning(f'exception while asking for pass-key: {error}') await self.host.send_command( - HCI_User_Passkey_Request_Negative_Reply_Command( + hci.HCI_User_Passkey_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -4747,7 +4656,7 @@ def on_pin_code_request(self, connection): io_capability = pairing_config.delegate.classic_io_capability # Respond - if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: + if io_capability == hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: # Ask the user to enter a string async def get_pin_code(): pin_code = await connection.abort_on( @@ -4759,7 +4668,7 @@ async def get_pin_code(): pin_code_len = len(pin_code) assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes" await self.host.send_command( - HCI_PIN_Code_Request_Reply_Command( + hci.HCI_PIN_Code_Request_Reply_Command( bd_addr=connection.peer_address, pin_code_length=pin_code_len, pin_code=pin_code, @@ -4768,7 +4677,7 @@ async def get_pin_code(): else: logger.debug("delegate.get_string() returned None") await self.host.send_command( - HCI_PIN_Code_Request_Negative_Reply_Command( + hci.HCI_PIN_Code_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -4776,7 +4685,7 @@ async def get_pin_code(): asyncio.create_task(get_pin_code()) else: self.host.send_command_sync( - HCI_PIN_Code_Request_Negative_Reply_Command( + hci.HCI_PIN_Code_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -4852,7 +4761,9 @@ def on_sco_connection_failure( # [Classic only] @host_event_handler @experimental('Only for testing') - def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: + def on_sco_packet( + self, sco_handle: int, packet: hci.HCI_SynchronousDataPacket + ) -> None: if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink: sco_link.sink(packet) @@ -4916,7 +4827,7 @@ def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: # [LE only] @host_event_handler @experimental('Only for testing') - def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: + def on_iso_packet(self, handle: int, packet: hci.HCI_IsoDataPacket) -> None: if (cis_link := self.cis_links.get(handle)) and cis_link.sink: cis_link.sink(packet) @@ -4932,14 +4843,14 @@ def on_connection_encryption_change(self, connection, encryption): if ( not connection.authenticated and connection.transport == BT_BR_EDR_TRANSPORT - and encryption == HCI_Encryption_Change_Event.AES_CCM + and encryption == hci.HCI_Encryption_Change_Event.AES_CCM ): connection.authenticated = True connection.sc = True if ( not connection.authenticated and connection.transport == BT_LE_TRANSPORT - and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM + and encryption == hci.HCI_Encryption_Change_Event.E0_OR_AES_CCM ): connection.authenticated = True connection.sc = True @@ -5067,7 +4978,7 @@ def on_pairing_start(self, connection: Connection) -> None: def on_pairing( self, connection: Connection, - identity_address: Optional[Address], + identity_address: Optional[hci.Address], keys: PairingKeys, sc: bool, ) -> None: diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index aff063c1..05724139 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -39,7 +39,6 @@ AdvertisingEventProperties, AdvertisingType, Device, - Phy, ) from bumble.gatt import Service from bumble.hci import ( @@ -47,6 +46,7 @@ HCI_PAGE_TIMEOUT_ERROR, HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, Address, + Phy, ) from google.protobuf import any_pb2 # pytype: disable=pyi-error from google.protobuf import empty_pb2 # pytype: disable=pyi-error diff --git a/rust/src/wrapper/hci.rs b/rust/src/wrapper/hci.rs index 52480c81..bf5ffeb6 100644 --- a/rust/src/wrapper/hci.rs +++ b/rust/src/wrapper/hci.rs @@ -80,7 +80,7 @@ impl Address { /// Creates a new [Address] object. pub fn new(address: &str, address_type: AddressType) -> PyResult { Python::with_gil(|py| { - PyModule::import(py, intern!(py, "bumble.device"))? + PyModule::import(py, intern!(py, "bumble.hci"))? .getattr(intern!(py, "Address"))? .call1((address, address_type)) .map(|any| Self(any.into()))