Skip to content

Commit

Permalink
BIG
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Nov 16, 2024
1 parent c07e854 commit 698aa8d
Show file tree
Hide file tree
Showing 4 changed files with 570 additions and 1 deletion.
324 changes: 324 additions & 0 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_MIN_BIG_HANDLE = 0x00
DEVICE_MAX_BIG_HANDLE = 0xEF

DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
Expand Down Expand Up @@ -992,6 +994,132 @@ def __str__(self) -> str:
)


# -----------------------------------------------------------------------------
@dataclass
class BigParameters:
num_bis: int
sdu_interval: int
max_sdu: int
max_transport_latency: int
rtn: int
phy: hci.PhyBit = hci.PhyBit.LE_2M
packing: int = 0
framing: int = 0
encryption: int = 0
broadcast_code: bytes = bytes(16)


# -----------------------------------------------------------------------------
@dataclass
class Big(EventEmitter):
class State(IntEnum):
PENDING = 0
ACTIVE = 1
TERMINTED = 2

class Event(str, Enum):
ESTABLISHMENT = 'establishment'
ESTABLISHMENT_FAILURE = 'establishment_failure'
TERMINATION = 'termination'

big_handle: int
advertising_set: AdvertisingSet
parameters: BigParameters
state: State = State.PENDING

# Attributes provided by BIG Create Complete event
big_sync_delay: int = 0
transport_latency_big: int = 0
phy: int = 0
nse: int = 0
bn: int = 0
pto: int = 0
irc: int = 0
max_pdu: int = 0
iso_interval: int = 0
bis_handles: Iterable[int] = ()

def __post_init__(self) -> None:
super().__init__()
self.device = self.advertising_set.device

async def terminate(
self,
reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
if self.state != Big.State.ACTIVE:
logger.error('BIG %d is not active.', self.big_handle)
return

with closing(EventWatcher()) as watcher:
terminated = asyncio.Event()
watcher.once(self, Big.Event.TERMINATION, lambda _: terminated.set())
await self.device.send_command(
hci.HCI_LE_Terminate_BIG_Command(
big_handle=self.big_handle, reason=reason
),
check_result=True,
)
await terminated.wait()


# -----------------------------------------------------------------------------
@dataclass
class BigSyncParameters:
encryption: int
broadcast_code: int
mse: int
big_sync_timeout: int
bis: list[int]


# -----------------------------------------------------------------------------
@dataclass
class BigSync(EventEmitter):
class State(IntEnum):
PENDING = 0
ACTIVE = 1
TERMINTED = 2

class Event(str, Enum):
ESTABLISHMENT = 'establishment'
ESTABLISHMENT_FAILURE = 'establishment_failure'
TERMINATION = 'termination'

big_handle: int
pa_sync: PeriodicAdvertisingSync
parameters: BigSyncParameters
state: State = State.PENDING

# Attributes provided by BIG Create Sync Complete event
transport_latency_big: int = 0
nse: int = 0
bn: int = 0
pto: int = 0
irc: int = 0
max_pdu: int = 0
iso_interval: int = 0
bis_handles: Iterable[int] = ()

def __post_init__(self) -> None:
super().__init__()
self.device = self.pa_sync.device

async def terminate(self) -> None:
if self.state != BigSync.State.ACTIVE:
logger.error('BIG Sync %d is not active.', self.big_handle)
return

with closing(EventWatcher()) as watcher:
terminated = asyncio.Event()
watcher.once(self, BigSync.Event.TERMINATION, lambda _: terminated.set())
await self.device.send_command(
hci.HCI_LE_BIG_Terminate_Sync_Command(big_handle=self.big_handle),
check_result=True,
)
await terminated.wait()


# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
Expand Down Expand Up @@ -1709,6 +1837,8 @@ class Device(CompositeEventEmitter):
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
bigs = dict[int, Big]()
big_syncs = dict[int, BigSync]()
_pending_cis: Dict[int, Tuple[int, int]]

@composite_listener
Expand Down Expand Up @@ -2005,6 +2135,17 @@ def lookup_periodic_advertising_sync(
None,
)

def next_big_handle(self) -> int | None:
return next(
(
handle
for handle in range(DEVICE_MIN_BIG_HANDLE, DEVICE_MAX_BIG_HANDLE + 1)
if handle not in self.bigs.keys()
and handle not in self.big_syncs.keys()
),
None,
)

@deprecated("Please use create_l2cap_server()")
def register_l2cap_server(self, psm, server) -> int:
return self.l2cap_channel_manager.register_server(psm, server)
Expand Down Expand Up @@ -4109,6 +4250,93 @@ async def reject_cis_request(
check_result=True,
)

# [LE only]
@experimental('Only for testing.')
async def create_big(
self, advertising_set: AdvertisingSet, parameters: BigParameters
) -> Big:
if big_handle := self.next_big_handle() is None:
raise core.OutOfResourcesError("All valid BIG handles already in use")

with closing(EventWatcher()) as watcher:
await self.send_command(
hci.HCI_LE_Create_BIG_Command(
big_handle=big_handle,
advertising_handle=advertising_set.advertising_handle,
num_bis=parameters.num_bis,
sdu_interval=parameters.sdu_interval,
max_sdu=parameters.max_sdu,
max_transport_latency=parameters.max_transport_latency,
rtn=parameters.rtn,
phy=parameters.phy,
packing=parameters.packing,
framing=parameters.framing,
encryption=parameters.encryption,
broadcast_code=parameters.broadcast_code,
),
check_result=True,
)

big = Big(
big_handle=big_handle,
parameters=parameters,
advertising_set=advertising_set,
)
self.bigs[big_handle] = big
established = asyncio.get_running_loop().create_future()
watcher.once(big, 'establishment', lambda: established.set_result(None))
watcher.once(
big,
'establishment_failure',
lambda status: established.set_exception(hci.HCI_Error(status)),
)
await established

return big

# [LE only]
@experimental('Only for testing.')
async def create_big_sync(
self, pa_sync: PeriodicAdvertisingSync, parameters: BigSyncParameters
) -> BigSync:
if big_handle := self.next_big_handle() is None:
raise core.OutOfResourcesError("All valid BIG handles already in use")

if pa_sync_handle := pa_sync.sync_handle is None:
raise core.InvalidStateError("PA Sync is not established")

with closing(EventWatcher()) as watcher:
await self.send_command(
hci.HCI_LE_BIG_Create_Sync_Command(
big_handle=big_handle,
sync_handle=pa_sync_handle,
encryption=parameters.encryption,
broadcast_code=parameters.broadcast_code,
mse=parameters.mse,
big_sync_timeout=parameters.big_sync_timeout,
),
check_result=True,
)

big_sync = BigSync(
big_handle=big_handle,
parameters=parameters,
pa_sync=pa_sync,
)
self.big_syncs[big_handle] = big_sync
established = asyncio.get_running_loop().create_future()
watcher.once(
big_sync, 'establishment', lambda: established.set_result(None)
)
watcher.once(
big_sync,
'establishment_failure',
lambda status: established.set_exception(hci.HCI_Error(status)),
)
await established

return big_sync

async def get_remote_le_features(self, connection: Connection) -> hci.LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
Expand Down Expand Up @@ -4230,6 +4458,102 @@ def on_advertising_set_termination(
)
self.connecting_extended_advertising_sets[connection_handle] = advertising_set

@host_event_handler
def on_big_establishment(
self,
status: int,
big_handle: int,
bis_handles: List[int],
big_sync_delay: int,
transport_latency_big: int,
phy: int,
nse: int,
bn: int,
pto: int,
irc: int,
max_pdu: int,
iso_interval: int,
) -> None:
if not (big := self.bigs.get(big_handle)):
logger.warning('BIG %d not found', big_handle)
return

if status != hci.HCI_SUCCESS:
del self.bigs[big_handle]
logger.debug('Unable to create BIG %d', big_handle)
big.state = Big.State.TERMINTED
big.emit(Big.Event.ESTABLISHMENT_FAILURE, status)
return

big.bis_handles = bis_handles[:]
big.big_sync_delay = big_sync_delay
big.transport_latency_big = transport_latency_big
big.phy = phy
big.nse = nse
big.bn = bn
big.pto = pto
big.irc = irc
big.max_pdu = max_pdu
big.iso_interval = iso_interval
big.state = Big.State.ACTIVE

big.emit(Big.Event.ESTABLISHMENT)

@host_event_handler
def on_big_termination(self, reason: int, big_handle: int) -> None:
if not (big := self.bigs.pop(big_handle, None)):
logger.warning('BIG %d not found', big_handle)
return

big.state = Big.State.TERMINTED
big.emit(Big.Event.TERMINATION, reason)

@host_event_handler
def on_big_sync_establishment(
self,
status: int,
big_handle: int,
transport_latency_big: int,
nse: int,
bn: int,
pto: int,
irc: int,
max_pdu: int,
iso_interval: int,
bis_handles: list[int],
) -> None:
if not (big_sync := self.big_syncs.get(big_handle)):
logger.warning('BIG Sync %d not found', big_handle)
return

if status != hci.HCI_SUCCESS:
del self.big_syncs[big_handle]
logger.debug('Unable to create BIG Sync %d', big_handle)
big_sync.state = BigSync.State.TERMINTED
big_sync.emit(BigSync.Event.ESTABLISHMENT_FAILURE, status)
return

big_sync.transport_latency_big = transport_latency_big
big_sync.nse = nse
big_sync.bn = bn
big_sync.pto = pto
big_sync.irc = irc
big_sync.max_pdu = max_pdu
big_sync.iso_interval = iso_interval
big_sync.bis_handles = bis_handles
big_sync.state = BigSync.State.ACTIVE

big_sync.emit(BigSync.Event.ESTABLISHMENT)

@host_event_handler
def on_big_sync_lost(self, big_handle: int, reason: int) -> None:
if not (big_sync := self.big_syncs.pop(big_handle, None)):
logger.warning('BIG %d not found', big_handle)
return

big_sync.state = BigSync.State.TERMINTED
big_sync.emit(BigSync.Event.TERMINATION, reason)

def _complete_le_extended_advertising_connection(
self, connection: Connection, advertising_set: AdvertisingSet
) -> None:
Expand Down
Loading

0 comments on commit 698aa8d

Please sign in to comment.