diff --git a/bumble/profiles/vocs.py b/bumble/profiles/vocs.py new file mode 100644 index 00000000..71210d3a --- /dev/null +++ b/bumble/profiles/vocs.py @@ -0,0 +1,244 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import struct + +from typing import Optional + +from bumble import att +from bumble import utils +from bumble import device +from bumble import gatt +from bumble import gatt_client +from bumble.profiles.bap import AudioLocation + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + +MIN_VOLUME_OFFSET = -255 +MAX_VOLUME_OFFSET = 255 +CHANGE_COUNTER_MAX_VALUE = 0xFF + +SET_VOLUME_OFFSET_OPCODE = 0x01 + + +class ErrorCode(utils.OpenIntEnum): + """ + See Volume Offset Control Service 1.6. Application error codes. + """ + + INVALID_CHANGE_COUNTER = 0x80 + OPCODE_NOT_SUPPORTED = 0x81 + VALUE_OUT_OF_RANGE = 0x82 + + +# ----------------------------------------------------------------------------- +class VolumeOffsetControlService(gatt.TemplateService): + UUID = gatt.GATT_VOLUME_OFFSET_CONTROL_SERVICE + + volume_offset_state: gatt.Characteristic + audio_location: gatt.Characteristic + volume_offset_control_point: gatt.Characteristic + audio_output_description: gatt.Characteristic + + def __init__( + self, + volume_offset: int = 0, + change_counter: int = 0, + audio_channel_location: AudioLocation = AudioLocation.NOT_ALLOWED, + audio_output_description_value: str = '', + ) -> None: + + self.volume_offset = volume_offset + self.change_counter = change_counter + self.audio_channel_location = audio_channel_location + self.audio_output_description_value = audio_output_description_value + + characteristics = [] + + self.volume_offset_state = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + ), + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue(read=self._on_read_volume_offset_state), + ) + characteristics.append(self.volume_offset_state) + + self.audio_location = gatt.Characteristic( + uuid=gatt.GATT_AUDIO_LOCATION_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE + ), + permissions=( + gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION + | gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION + ), + value=gatt.CharacteristicValue( + read=self._on_read_audio_location, + write=self._on_write_audio_location, + ), + ) + characteristics.append(self.audio_location) + + self.volume_offset_control_point = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.WRITE, + permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue( + write=self._on_write_volume_offset_control_point + ), + ) + characteristics.append(self.volume_offset_control_point) + + self.audio_output_description = gatt.Characteristic( + uuid=gatt.GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE + ), + permissions=( + gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION + | gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION + ), + value=gatt.CharacteristicValue( + read=self._on_read_audio_output_description, + write=self._on_write_audio_output_description, + ), + ) + characteristics.append(self.audio_output_description) + + super().__init__(characteristics=characteristics, primary=False) + + @property + def volume_offset_state_bytes(self) -> bytes: + return struct.pack(' None: + self.volume_offset, self.change_counter = struct.unpack(' bytes: + return self.volume_offset_state_bytes + + @property + def audio_location_bytes(self) -> bytes: + return struct.pack(' None: + self.audio_channel_location = struct.unpack(' bytes: + return self.audio_location_bytes + + async def _on_write_audio_location( + self, connection: Optional[device.Connection], value: bytes + ) -> None: + assert connection + + self.audio_location_bytes = value + await connection.device.notify_subscribers( + attribute=self.audio_channel_location, + value=self.audio_location_bytes, + ) + self.emit('audio_location_change') + + async def _on_write_volume_offset_control_point( + self, connection: Optional[device.Connection], value: bytes + ) -> None: + assert connection + + opcode = value[0] + + if opcode != SET_VOLUME_OFFSET_OPCODE: + raise att.ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED) + + change_counter, volume_offset = struct.unpack(' bytes: + return self.audio_output_description_value.encode('utf-8') + + async def _on_write_audio_output_description(self, connection, value) -> None: + self.audio_output_description_value = value.decode('utf-8') + await connection.device.notify_subscribers( + attribute=self.audio_output_description, + value=self.audio_output_description_value.encode('utf-8'), + ) + self.emit('audio_output_description_change') + + +# ----------------------------------------------------------------------------- +# Client +# ----------------------------------------------------------------------------- +class VolumeOffsetControlServiceProxy(gatt_client.ProfileServiceProxy): + SERVICE_CLASS = VolumeOffsetControlService + + volume_offset_control_point: gatt_client.CharacteristicProxy + + def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: + self.service_proxy = service_proxy + + self.volume_offset_state = gatt.PackedCharacteristicAdapter( + service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC + )[0], + '