diff --git a/bumble/device.py b/bumble/device.py index 3d4b59a2..01caddb5 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -34,7 +34,6 @@ Tuple, Type, TypeVar, - Set, Union, cast, overload, diff --git a/bumble/helpers.py b/bumble/helpers.py index a3633f1b..c6a6d2ec 100644 --- a/bumble/helpers.py +++ b/bumble/helpers.py @@ -21,7 +21,13 @@ from typing import cast, Any, Optional import logging +from bumble import avc +from bumble import avctp from bumble import avdtp +from bumble import avrcp +from bumble import crypto +from bumble import rfcomm +from bumble import sdp from bumble.colors import color from bumble.att import ATT_CID, ATT_PDU from bumble.smp import SMP_CID, SMP_Command @@ -47,13 +53,6 @@ HCI_AclDataPacket, HCI_Disconnection_Complete_Event, ) -from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM -from bumble.sdp import SDP_PDU, SDP_PSM -from bumble import crypto -from bumble.avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM -from bumble.avctp import MessageAssembler as AVCTP_MessageAssembler, AVCTP_PSM -from bumble.avrcp import AVRCP_PID -from bumble.avc import Frame as AVC_Frame # ----------------------------------------------------------------------------- @@ -64,14 +63,14 @@ # ----------------------------------------------------------------------------- PSM_NAMES = { - RFCOMM_PSM: 'RFCOMM', - SDP_PSM: 'SDP', - AVDTP_PSM: 'AVDTP', - AVCTP_PSM: 'AVCTP' + rfcomm.RFCOMM_PSM: 'RFCOMM', + sdp.SDP_PSM: 'SDP', + avdtp.AVDTP_PSM: 'AVDTP', + avctp.AVCTP_PSM: 'AVCTP' # TODO: add more PSM values } -AVCTP_PID_NAMES = {AVRCP_PID: 'AVRCP'} +AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'} # ----------------------------------------------------------------------------- class PacketTracer: @@ -79,13 +78,13 @@ class AclStream: psms: MutableMapping[int, int] peer: Optional[PacketTracer.AclStream] avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler] + avctp_assemblers: MutableMapping[int, avctp.MessageAssembler] def __init__(self, analyzer: PacketTracer.Analyzer) -> None: self.analyzer = analyzer self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid self.avctp_assemblers = {} # AVCTP assemblers, by source_cid - self.avrcp_assemblers = {} # AVRCP assemblers, by source_cid self.psms = {} # PSM, by source_cid self.peer = None @@ -115,9 +114,7 @@ def on_acl_pdu(self, pdu: bytes) -> None: == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL ): if self.peer and ( - psm := self.peer.psms.get( - connection_response.source_cid - ) + psm := self.peer.psms.get(connection_response.source_cid) ): # Found a pending connection self.psms[connection_response.destination_cid] = psm @@ -130,41 +127,37 @@ def on_acl_pdu(self, pdu: bytes) -> None: ] = avdtp.MessageAssembler(self.on_avdtp_message) self.peer.avdtp_assemblers[ connection_response.destination_cid - ] = avdtp.MessageAssembler( - self.peer.on_avdtp_message - ) - elif psm == AVCTP_PSM: + ] = avdtp.MessageAssembler(self.peer.on_avdtp_message) + elif psm == avctp.AVCTP_PSM: self.avctp_assemblers[ connection_response.source_cid - ] = AVCTP_MessageAssembler(self.on_avctp_message) + ] = avctp.MessageAssembler(self.on_avctp_message) self.peer.avctp_assemblers[ connection_response.destination_cid - ] = AVCTP_MessageAssembler(self.peer.on_avctp_message) + ] = avctp.MessageAssembler(self.peer.on_avctp_message) else: # Try to find the PSM associated with this PDU if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)): - if psm == SDP_PSM: - sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload) + if psm == sdp.SDP_PSM: + sdp_pdu = sdp.SDP_PDU.from_bytes(l2cap_pdu.payload) self.analyzer.emit(sdp_pdu) - elif psm == RFCOMM_PSM: - rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload) + elif psm == rfcomm.RFCOMM_PSM: + rfcomm_frame = rfcomm.RFCOMM_Frame.from_bytes(l2cap_pdu.payload) self.analyzer.emit(rfcomm_frame) elif psm == avdtp.AVDTP_PSM: self.analyzer.emit( f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}' ) - assembler = self.avdtp_assemblers.get(l2cap_pdu.cid) - if assembler: - assembler.on_pdu(l2cap_pdu.payload) - elif psm == AVCTP_PSM: + if avdtp_assembler := self.avdtp_assemblers.get(l2cap_pdu.cid): + avdtp_assembler.on_pdu(l2cap_pdu.payload) + elif psm == avctp.AVCTP_PSM: self.analyzer.emit( f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, ' f'PSM=AVCTP]: {l2cap_pdu.payload.hex()}' ) - assembler = self.avctp_assemblers.get(l2cap_pdu.cid) - if assembler: - assembler.on_pdu(l2cap_pdu.payload) + if avctp_assembler := self.avctp_assemblers.get(l2cap_pdu.cid): + avctp_assembler.on_pdu(l2cap_pdu.payload) else: psm_string = name_or_number(PSM_NAMES, psm) self.analyzer.emit( @@ -181,9 +174,16 @@ def on_avdtp_message( f'{color("AVDTP", "green")} [{transaction_label}] {message}' ) - def on_avctp_message(self, transaction_label: int, is_command: bool, ipid: bool, pid: int, payload: bytes): - if pid == AVRCP_PID: - avc_frame = AVC_Frame.from_bytes(payload) + def on_avctp_message( + self, + transaction_label: int, + is_command: bool, + ipid: bool, + pid: int, + payload: bytes, + ): + if pid == avrcp.AVRCP_PID: + avc_frame = avc.Frame.from_bytes(payload) details = str(avc_frame) else: details = payload.hex() diff --git a/bumble/utils.py b/bumble/utils.py index 1bb84c62..7d6db5ab 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -35,7 +35,6 @@ Union, overload, ) -import traceback from pyee import EventEmitter @@ -459,7 +458,7 @@ def experimental(msg: str): """ def wrapper(function): - @wraps(function) + @functools.wraps(function) def inner(*args, **kwargs): warnings.warn(msg, FutureWarning) return function(*args, **kwargs)