diff --git a/pyxtf/vendors/kongsberg.py b/pyxtf/vendors/kongsberg.py index 206b091..9a14380 100644 --- a/pyxtf/vendors/kongsberg.py +++ b/pyxtf/vendors/kongsberg.py @@ -1,6 +1,7 @@ ''' -This file contains some of the kongsberg datagram formats. +This file contains some of the kongsberg datagram formats. Only used for the raw datagrams Based on Kongsberg Document 850-160692 (EM Series Datagram Formats) Rev.U (June 2015) +Note: use pyall if you're trying to read all-files The following echosounders: EM3002, EM710, EM302, EM122, EM2040, EM2040C and ME70BO use little endian byte ordering. Beware that others might not. @@ -12,7 +13,6 @@ from enum import IntEnum, unique from io import IOBase, BytesIO import numpy as np -from datetime import date from pyxtf.xtf_ctypes import XTFBase import warnings @@ -85,12 +85,6 @@ class KMOutputDatagramHeader(KMBase): ('Time', ctypes.c_uint32) # Time since midnight in milliseconds ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - class KMRawRangeAngle78_TX(XTFBase): ''' @@ -109,12 +103,6 @@ class KMRawRangeAngle78_TX(XTFBase): ('SignalBandwidth', ctypes.c_float) # In Hz ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - class KMRawRangeAngle78_RX(XTFBase): ''' @@ -134,12 +122,6 @@ class KMRawRangeAngle78_RX(XTFBase): ('Spare', ctypes.c_uint8) ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - def has_valid_detection(self): return not bool(self.DetectionInfo & 0b10000000) @@ -196,59 +178,55 @@ class KMRawRangeAngle78(KMBase): ('Checksum', ctypes.c_uint16) # Checksum of data between STX and ETX ] - def __init__(self, buffer=None, *args, **kwargs): - super().__init__(*args, **kwargs) - if not buffer: - self.StartID = 0x02 - self.DatagramType = KMDatagramType.raw_range_and_angle_78.value - self.EndID = 0x03 - - def __new__(cls, buffer: IOBase = None): - if buffer: - if type(buffer) in [bytes, bytearray]: - buffer = BytesIO(buffer) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + if type(buffer) in [bytes, bytearray]: + buffer = BytesIO(buffer) # Read bytes up until the variable-sized data - base_bytes = buffer.read(cls.TX.offset) - n_bytes = ctypes.c_uint32.from_buffer_copy(base_bytes, cls.NumberOfBytes.offset).value - n_tx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Ntx.offset).value - n_rx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Nrx.offset).value - - # Read remaining bytes - remaining_bytes = buffer.read(n_bytes - cls.TX.offset + cls.NumberOfBytes.size) - - # Create new class dynamically with string array at the correct size - new_name = cls.__name__ + '_ntx{}_nrx{}'.format(n_tx, n_rx) - new_fields = cls._fields_.copy() - tx_idx = [i for i, (name, type) in enumerate(cls._fields_) if name == 'TX'][0] - rx_idx = [i for i, (name, type) in enumerate(cls._fields_) if name == 'RX'][0] - new_fields[tx_idx] = ('TX', KMRawRangeAngle78_TX * n_tx) - new_fields[rx_idx] = ('RX', KMRawRangeAngle78_RX * n_rx) - new_cls = type(new_name, (ctypes.LittleEndianStructure,), { - '__str__': cls.__str__, - '_pack_': cls._pack_, - '_fields_': new_fields - }) - - all_bytes = base_bytes + remaining_bytes - obj = new_cls.from_buffer_copy(all_bytes) - - # Checksum (not crc16, but a straight sum of bytes with overflow) - chk = (sum(all_bytes[new_cls.DatagramType.offset:new_cls.EndID.offset]) & 0xFFFF) - if chk != obj.Checksum: - warning_str = '{}: Checksum failed'.format(cls.__name__) - warnings.warn(warning_str) - - else: - obj = super().__new__(cls) + base_bytes = buffer.read(cls.TX.offset) + n_bytes = ctypes.c_uint32.from_buffer_copy(base_bytes, cls.NumberOfBytes.offset).value + n_tx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Ntx.offset).value + n_rx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Nrx.offset).value + + # Read remaining bytes + remaining_bytes = buffer.read(n_bytes - cls.TX.offset + cls.NumberOfBytes.size) + + # Create new class dynamically with string array at the correct size + new_name = cls.__name__ + '_ntx{}_nrx{}'.format(n_tx, n_rx) + new_fields = cls._fields_.copy() + tx_idx = [i for i, (name, fieldtype) in enumerate(cls._fields_) if name == 'TX'][0] + rx_idx = [i for i, (name, fieldtype) in enumerate(cls._fields_) if name == 'RX'][0] + new_fields[tx_idx] = ('TX', KMRawRangeAngle78_TX * n_tx) + new_fields[rx_idx] = ('RX', KMRawRangeAngle78_RX * n_rx) + new_cls = type(new_name, (ctypes.LittleEndianStructure,), { + '__str__': cls.__str__, + '_pack_': cls._pack_, + '_fields_': new_fields + }) + + all_bytes = base_bytes + remaining_bytes + obj = new_cls.from_buffer_copy(all_bytes) + + # Checksum (not crc16, but a straight sum of bytes with overflow) + chk = (sum(all_bytes[new_cls.DatagramType.offset:new_cls.EndID.offset]) & 0xFFFF) + if chk != obj.Checksum: + warning_str = '{}: Checksum failed'.format(cls.__name__) + warnings.warn(warning_str) return obj + def __init__(self): + super().__init__() + self.StartID = 0x02 + self.DatagramType = KMDatagramType.raw_range_and_angle_78.value + self.EndID = 0x03 + if __name__ == '__main__': from pyxtf.xtf_io import xtf_read from pyxtf.xtf_ctypes import XTFHeaderType - test_path = r'..\..\data\Survey\27apr\EM2040\em2040-0007-l02-20160427-124929_RAW.xtf' + test_path = r'input.xtf' (fh, p) = xtf_read(test_path) print('The following (supported) packets are present (XTFHeaderType:count): \n\t{}\n'.format( @@ -258,6 +236,6 @@ def __new__(cls, buffer: IOBase = None): if XTFHeaderType.multibeam_raw_beam_angle in p: # The KMRawRangeAngle78 data is stored as raw bytes in the data field of a XTFPingHeader type data_bytes = p[XTFHeaderType.multibeam_raw_beam_angle][0].data - decoded_data = KMRawRangeAngle78(data_bytes) + decoded_data = KMRawRangeAngle78.create_from_buffer(buffer=data_bytes) print(decoded_data) diff --git a/pyxtf/xtf_ctypes.py b/pyxtf/xtf_ctypes.py index 955762c..a24ad83 100644 --- a/pyxtf/xtf_ctypes.py +++ b/pyxtf/xtf_ctypes.py @@ -63,23 +63,33 @@ class XTFBase(ctypes.LittleEndianStructure): Exposes basic utility like printing of fields and constructing class from a buffer. """ - def __new__(cls, buffer: IOBase = None): - if buffer: - if type(buffer) in [bytes, bytearray]: - buffer = BytesIO(buffer) - - header_bytes = buffer.read(ctypes.sizeof(cls)) - if not header_bytes: - raise RuntimeError('XTF file shorter than expected (end hit while reading {})'.format(cls.__name__)) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + """ + Initializes the XTF structure by copying from the target buffer. + Note: not to be confused with .from_buffer and .from_buffer_copy which are the direct ctypes functions + :param buffer: Input bytes + :param file_header: XTFFileHeader, only necessary for XTFPingHeader + :return: + """ + if type(buffer) in [bytes, bytearray]: + buffer = BytesIO(buffer) - obj = cls.from_buffer_copy(header_bytes) - else: - obj = super().__new__(cls) + header_bytes = buffer.read(ctypes.sizeof(cls)) + if not header_bytes: + raise RuntimeError('XTF file shorter than expected (end hit while reading {})'.format(cls.__name__)) - return obj + return cls.from_buffer_copy(header_bytes) - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(*args, **kwargs) + @classmethod + def view_from_buffer(cls, buffer: IOBase, file_header=None): + """ + Returns a view of the XTF structure in the buffer without copying. + :param buffer: Input bytes + :param file_header: XTFFileHeader, only necessary for XTFPingHeader + :return: + """ + raise NotImplementedError("Views has not been implemented") def __str__(self): """ @@ -142,13 +152,9 @@ class XTFChanInfo(XTFBase): ('ReservedArea2', ctypes.c_uint8 * 53) ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer, *args, **kwargs) - if not buffer: - self.Reserved = 1024 # For compatibility reasons with old viewers + def __init__(self): + super().__init__() + self.Reserved = 1024 # For compatibility reasons with old viewers class XTFFileHeader(XTFBase): @@ -215,25 +221,27 @@ def channel_count(self, verbose: bool = False) -> int: return n_channels - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) + def __init__(self): + super().__init__() + self.FileFormat = 0x7B + self.SystemType = 1 + # Set ChanInfo[i].Reserved to 1024 for compatibility reasons (used to be NumSamples) + for i in range(0, 6): + self.ChanInfo[i].Reserved = 1024 + self.RecordingProgramName = b'pyxtf' + self.RecordingProgramVersion = b'223' + + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + + # Initialize additional fields + obj.subbottom_info = [x for x in obj.ChanInfo if x.TypeOfChannel == XTFChannelType.subbottom] + sonar_types = (XTFChannelType.port.value, XTFChannelType.stbd.value) + obj.sonar_info = [x for x in obj.ChanInfo if x.TypeOfChannel in sonar_types][:obj.NumberOfSonarChannels] + obj.bathy_info = [x for x in obj.ChanInfo if x.TypeOfChannel == XTFChannelType.bathy][:obj.NumberOfBathymetryChannels] - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer, *args, **kwargs) - - if buffer: - self.subbottom_info = [x for x in self.ChanInfo if x.TypeOfChannel == XTFChannelType.subbottom] - sonar_types = (XTFChannelType.port.value, XTFChannelType.stbd.value) - self.sonar_info = [x for x in self.ChanInfo if x.TypeOfChannel in sonar_types][:self.NumberOfSonarChannels] - self.bathy_info = [x for x in self.ChanInfo if x.TypeOfChannel == XTFChannelType.bathy][:self.NumberOfBathymetryChannels] - else: - self.FileFormat = 0x7B - self.SystemType = 1 - # Set ChanInfo[i].Reserved to 1024 for compatibility reasons (used to be NumSamples) - for i in range(0, 6): - self.ChanInfo[i].Reserved = 1024 - self.RecordingProgramName = b'pyxtf' - self.RecordingProgramVersion = b'223' + return obj class XTFPacket(XTFBase): @@ -244,12 +252,6 @@ class XTFPacket(XTFBase): _pack_ = 1 _fields_ = [] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - def get_time(self): # All XTF packets has the fields Year, Month, Day, Hour, Minute, Second # Some packets come with SourceEpoch (time since 1970-1-1) which is used if present @@ -304,17 +306,20 @@ class XTFPacketStart(XTFPacket): ('NumBytesThisRecord', ctypes.c_uint32) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header: XTFFileHeader=None): + obj = super().create_from_buffer(buffer) - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if buffer: - if self.MagicNumber != 0xFACE: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.MagicNumber = 0xFACE - self.HeaderType = XTFHeaderType.user_defined + if obj.MagicNumber != 0xFACE: + raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') + + return obj + + def __init__(self): + super().__init__() + + self.MagicNumber = 0xFACE + self.HeaderType = XTFHeaderType.user_defined class XTFAttitudeData(XTFPacketStart): @@ -339,13 +344,9 @@ class XTFAttitudeData(XTFPacketStart): ('Reserved3', ctypes.c_uint8) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.attitude + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.attitude class XTFNotesHeader(XTFPacketStart): @@ -361,13 +362,9 @@ class XTFNotesHeader(XTFPacketStart): ('NotesText', ctypes.c_char * 200) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.notes.value + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.notes.value class XTFRawSerialHeader(XTFPacketStart): @@ -385,19 +382,19 @@ class XTFRawSerialHeader(XTFPacketStart): ('StringSize', ctypes.c_uint16) # After this, the number of ascii bytes follow ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - obj = super().__new__(cls, buffer=buffer, file_header=file_header, *args, **kwargs) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header: XTFFileHeader=None): + obj = super().create_from_buffer(buffer) + # TODO: Make getters/setters that updates StringSize when changed + obj.RawAsciiData = buffer.read(ctypes.sizeof(ctypes.c_char) * obj.StringSize.value) - if buffer: - # TODO: Make getters/setters that updates StringSize when changed - obj.RawAsciiData = buffer.read(ctypes.sizeof(ctypes.c_char) * obj.StringSize.value) - else: - obj.RawAsciiData = b'' + return obj - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.raw_serial.value + def __init__(self): + super().__init__() + + self.RawAsciiData = b'' + self.HeaderType = XTFHeaderType.raw_serial.value # Serialport and subchannelnumber is the same variable # The documentation uses serialport, so this redirection is added to match the docs @@ -410,7 +407,6 @@ def SerialPort(self, value): self.SubChannelNumber = value # Just declaration of variables to more easily generate entries in the .pyi file - # TODO: Generate it automatically by inspecing __init__ _typing_static_ = [] _typing_instance_ = [ ('SerialPort', 'ctypes.c_uint8'), @@ -447,12 +443,6 @@ class XTFPingChanHeader(XTFBase): ('ReservedSpace', ctypes.c_uint8 * 4) ] - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - class XTFPingHeader(XTFPacketStart): _pack_ = 1 @@ -546,102 +536,100 @@ class XTFPingHeader(XTFPacketStart): XTFHeaderType.multibeam_raw_beam_angle ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - p_header = super().__new__(cls, buffer=buffer, file_header=file_header) - - p_header.ping_chan_headers = [] # type: List[XTFPingChanHeader] - p_header.data = None - - if buffer: - if not file_header: - raise RuntimeError('Initialization of XTFPingHeader from buffer requires file_header to be passed.') - - # Sonar and bathy has a different data structure following the header - if p_header.HeaderType == XTFHeaderType.sonar: - p_header.data = [] # type: List[np.ndarray] - - for i in range(0, p_header.NumChansToFollow): - # Retrieve XTFPingChanHeader for this channel - p_chan = XTFPingChanHeader(buffer=buffer) - p_header.ping_chan_headers.append(p_chan) - - # Backwards-compatibility: retrive from NumSamples if possible, else use old field - n_samples = p_chan.NumSamples if p_chan.NumSamples > 0 else file_header.sonar_info[i].Reserved - - # Calculate number of bytes to read - n_bytes = n_samples * file_header.sonar_info[i].BytesPerSample - n_bytes_remaining = p_header.NumBytesThisRecord - \ - ctypes.sizeof(XTFPingHeader) - \ - ctypes.sizeof(XTFPingChanHeader) - if n_bytes > n_bytes_remaining: - raise RuntimeError('Number of bytes to read exceeds the number of bytes remaining in packet.') - - # Read the data and output as a numpy array of the specified bytes-per-sample - samples = buffer.read(n_samples * file_header.sonar_info[i].BytesPerSample) - if not samples: - raise RuntimeError('File ended while reading data packets (file corrupt?)') - - # Favor getting the sample format from the dedicated field added in X41. - # If the field is not populated deduce the type from the bytes per sample field. - if file_header.sonar_info[i].SampleFormat in sample_format_dtype: - sample_format = sample_format_dtype[file_header.sonar_info[i].SampleFormat] - else: - sample_format = xtf_dtype[file_header.sonar_info[i].BytesPerSample] + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header: XTFFileHeader=None): + if not file_header: + raise RuntimeError('Initialization of XTFPingHeader from buffer requires file_header to be passed.') + + obj = super().create_from_buffer(buffer) - samples = np.frombuffer(samples, dtype=sample_format) - p_header.data.append(samples) + obj.ping_chan_headers = [] # type: List[XTFPingChanHeader] + obj.data = None - elif p_header.HeaderType == XTFHeaderType.bathy_xyza: - # Bathymetry uses the same header as sonar, but without the XTFPingChanHeaders + # Sonar and bathy has a different data structure following the header + if obj.HeaderType == XTFHeaderType.sonar: + obj.data = [] # type: List[np.ndarray] - # TODO: Should the sub-channel number be used to index chan_info (?) - #sub_chan = p_header.SubChannelNumber + for i in range(0, obj.NumChansToFollow): + # Retrieve XTFPingChanHeader for this channel + p_chan = XTFPingChanHeader(buffer=buffer) + obj.ping_chan_headers.append(p_chan) + + # Backwards-compatibility: retrive from NumSamples if possible, else use old field + n_samples = p_chan.NumSamples if p_chan.NumSamples > 0 else file_header.sonar_info[i].Reserved - # Read the data that follows - n_bytes = p_header.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) - samples = buffer.read(n_bytes) + # Calculate number of bytes to read + n_bytes = n_samples * file_header.sonar_info[i].BytesPerSample + n_bytes_remaining = obj.NumBytesThisRecord - \ + ctypes.sizeof(XTFPingHeader) - \ + ctypes.sizeof(XTFPingChanHeader) + if n_bytes > n_bytes_remaining: + raise RuntimeError('Number of bytes to read exceeds the number of bytes remaining in packet.') + + # Read the data and output as a numpy array of the specified bytes-per-sample + samples = buffer.read(n_samples * file_header.sonar_info[i].BytesPerSample) if not samples: - warn('XTFBathyHeader without any data encountered.') + raise RuntimeError('File ended while reading data packets (file corrupt?)') - # Processed bathy data consists of repeated XTFBeamXYZA structures - # Note: Using a ctypes array is a _lot_ faster than constructing a list of BeamXYZA - num_xyza = n_bytes // ctypes.sizeof(XTFBeamXYZA) - xyza_array_type = XTFBeamXYZA * num_xyza - xyza_array_type._pack_ = 1 - p_header.data = xyza_array_type.from_buffer_copy(samples) + # Favor getting the sample format from the dedicated field added in X41. + # If the field is not populated deduce the type from the bytes per sample field. + if file_header.sonar_info[i].SampleFormat in sample_format_dtype: + sample_format = sample_format_dtype[file_header.sonar_info[i].SampleFormat] + else: + sample_format = xtf_dtype[file_header.sonar_info[i].BytesPerSample] - elif p_header.HeaderType == XTFHeaderType.reson_7018_watercolumn: - # 7018 water column consists of XTFPingHeader followed by (one?) XTFPingChanHeader, then vendor data + samples = np.frombuffer(samples, dtype=sample_format) + obj.data.append(samples) - # Retrieve XTFPingChanHeader - p_chan = XTFPingChanHeader(buffer=buffer) - p_header.ping_chan_headers.append(p_chan) + elif obj.HeaderType == XTFHeaderType.bathy_xyza: + # Bathymetry uses the same header as sonar, but without the XTFPingChanHeaders - # Read the data that follows - n_bytes = p_header.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) - ctypes.sizeof(XTFPingChanHeader) - samples = buffer.read(n_bytes) - if not samples: - warn('XTFPingHeader (Reson7018) without any data encountered.') + # TODO: Should the sub-channel number be used to index chan_info (?) + # sub_chan = obj.SubChannelNumber + + # Read the data that follows + n_bytes = obj.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) + samples = buffer.read(n_bytes) + if not samples: + warn('XTFBathyHeader without any data encountered.') + + # Processed bathy data consists of repeated XTFBeamXYZA structures + # Note: Using a ctypes array is a _lot_ faster than constructing a list of BeamXYZA + num_xyza = n_bytes // ctypes.sizeof(XTFBeamXYZA) + xyza_array_type = XTFBeamXYZA * num_xyza + xyza_array_type._pack_ = 1 + obj.data = xyza_array_type.from_buffer_copy(samples) + + elif obj.HeaderType == XTFHeaderType.reson_7018_watercolumn: + # 7018 water column consists of XTFPingHeader followed by (one?) XTFPingChanHeader, then vendor data - p_header.data = samples + # Retrieve XTFPingChanHeader + p_chan = XTFPingChanHeader(buffer=buffer) + obj.ping_chan_headers.append(p_chan) - else: - # Generic XTFPingHeader construction - n_bytes = p_header.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) - samples = buffer.read(n_bytes) - if not samples and n_bytes > 0: - warn('XTFPingHeader without any data encountered.') + # Read the data that follows + n_bytes = obj.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) - ctypes.sizeof(XTFPingChanHeader) + samples = buffer.read(n_bytes) + if not samples: + warn('XTFPingHeader (Reson7018) without any data encountered.') - # The data is the raw bytes following the header - p_header.data = samples + obj.data = samples - return p_header + else: + # Generic XTFPingHeader construction + n_bytes = obj.NumBytesThisRecord - ctypes.sizeof(XTFPingHeader) + samples = buffer.read(n_bytes) + if not samples and n_bytes > 0: + warn('XTFPingHeader without any data encountered.') + + # The data is the raw bytes following the header + obj.data = samples - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) + return obj - if not buffer: - self.HeaderType = XTFHeaderType.sonar.value + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.sonar.value class XTFPosRawNavigation(XTFPacketStart): @@ -664,13 +652,9 @@ class XTFPosRawNavigation(XTFPacketStart): ('Reserved2', ctypes.c_uint8) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.pos_raw_navigation.value + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.pos_raw_navigation.value class XTFQPSSingleBeam(XTFPacketStart): @@ -692,13 +676,9 @@ class XTFQPSSingleBeam(XTFPacketStart): ('Reserved2', ctypes.c_uint8 * 7) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.q_singlebeam.value + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.q_singlebeam.value class XTFQPSMultiTXEntry(XTFBase): @@ -715,12 +695,6 @@ class XTFQPSMultiTXEntry(XTFBase): ('Reserved', ctypes.c_float * 4) ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - class XTFQPSMBEEntry(XTFBase): _pack_ = 1 @@ -736,12 +710,6 @@ class XTFQPSMBEEntry(XTFBase): ('Reserved', ctypes.c_float * 4) ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer=buffer, *args, **kwargs) - class XTFRawCustomHeader(XTFPacket): _pack_ = 1 @@ -768,17 +736,18 @@ class XTFRawCustomHeader(XTFPacket): ('Reserved2', ctypes.c_uint8 * 7) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + if obj.MagicNumber != 0xFACE: + raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if buffer: - if self.MagicNumber != 0xFACE: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.MagicNumber = 0xFACE - self.HeaderType = XTFHeaderType.custom_vendor_data.value + return obj + + def __init__(self): + super().__init__() + self.MagicNumber = 0xFACE + self.HeaderType = XTFHeaderType.custom_vendor_data.value class XTFHeaderNavigation(XTFPacket): @@ -804,17 +773,18 @@ class XTFHeaderNavigation(XTFPacket): ('Reserved2', ctypes.c_uint8 * 6) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + if obj.MagicNumber != 0xFACE: + raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if buffer: - if self.MagicNumber != 0xFACE: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.MagicNumber = 0xFACE - self.HeaderType = XTFHeaderType.navigation.value + return obj + + def __init__(self): + super().__init__() + self.MagicNumber = 0xFACE + self.HeaderType = XTFHeaderType.navigation.value class XTFHeaderGyro(XTFPacket): @@ -838,17 +808,18 @@ class XTFHeaderGyro(XTFPacket): ('Reserved1', ctypes.c_uint8 * 26) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + if obj.MagicNumber != 0xFACE: + raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if buffer: - if self.MagicNumber != 0xFACE: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.MagicNumber = 0xFACE - self.HeaderType = XTFHeaderType.gyro.value + return obj + + def __init__(self): + super().__init__() + self.MagicNumber = 0xFACE + self.HeaderType = XTFHeaderType.gyro.value class XTFHighSpeedSensor(XTFPacketStart): @@ -866,13 +837,9 @@ class XTFHighSpeedSensor(XTFPacketStart): ('Reserved3', ctypes.c_uint8 * 34) ] - def __new__(cls, buffer: IOBase = None, file_header: XTFFileHeader = None): - return super().__new__(cls, buffer=buffer, file_header=file_header) - - def __init__(self, buffer: IOBase = None, file_header: XTFFileHeader = None, *args, **kwargs): - super().__init__(buffer=buffer, file_header=file_header, *args, **kwargs) - if not buffer: - self.HeaderType = XTFHeaderType.highspeed_sensor2.value + def __init__(self): + super().__init__() + self.HeaderType = XTFHeaderType.highspeed_sensor2.value class XTFBeamXYZA(XTFBase): @@ -886,12 +853,6 @@ class XTFBeamXYZA(XTFBase): ('ucQuality', ctypes.c_uint8) ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer, *args, **kwargs) - class SNP0(XTFBase): _pack_ = 1 @@ -930,16 +891,17 @@ class SNP0(XTFBase): ('BeamCnt', ctypes.c_uint16) # Number of beams ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + if obj.ID != 0x534E5030: + raise RuntimeError('XTF packet does not start with the correct identifier (0x534E5030).') - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer, *args, **kwargs) - if buffer: - if self.ID != 0x534E5030: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.ID = 0x534E5030 + return obj + + def __init__(self): + super().__init__() + self.ID = 0x534E5030 class SNP1(XTFBase): @@ -957,19 +919,17 @@ class SNP1(XTFBase): ('FragSamples', ctypes.c_uint16) # Fragment size, samples ] - def __new__(cls, buffer: IOBase = None): - return super().__new__(cls, buffer=buffer) - - def __init__(self, buffer: IOBase = None, *args, **kwargs): - super().__init__(buffer, *args, **kwargs) - if buffer: - if self.ID != 0x534E5031: - raise RuntimeError('XTF packet does not start with the correct identifier (0xFACE).') - else: - self.ID = 0x534E5031 + @classmethod + def create_from_buffer(cls, buffer: IOBase, file_header=None): + obj = super().create_from_buffer(buffer) + if obj.ID != 0x534E5031: + raise RuntimeError('XTF packet does not start with the correct identifier (0x534E5031).') + return obj -#endregion C-Types + def __init__(self): + super().__init__() + self.ID = 0x534E5031 # Mapping from enumerated header type to the class implementation @@ -978,7 +938,7 @@ def __init__(self, buffer: IOBase = None, *args, **kwargs): XTFHeaderType.sonar: XTFPingHeader, XTFHeaderType.bathy: XTFPingHeader, XTFHeaderType.bathy_xyza: XTFPingHeader, - XTFHeaderType.multibeam_raw_beam_angle: XTFPingHeader, + XTFHeaderType.multibeam_raw_beam_angle: XTFPingHeader, # Raw vendor data is returned as raw bytes XTFHeaderType.reson_7125_snippet: XTFPingHeader, # The custom vendor data is returned as raw bytes XTFHeaderType.reson_7125: XTFPingHeader, # The custom vendor data is returned as raw bytes XTFHeaderType.reson_7018_watercolumn: XTFPingHeader, # The custom vendor data is returned as raw bytes diff --git a/pyxtf/xtf_io.py b/pyxtf/xtf_io.py index 6722c1a..03c73e2 100644 --- a/pyxtf/xtf_io.py +++ b/pyxtf/xtf_io.py @@ -4,6 +4,7 @@ from os.path import splitext, isfile from typing import Tuple, Any, Dict, Union, Generator, Iterable from warnings import warn +import ctypes from pyxtf.xtf_ctypes import * @@ -57,7 +58,7 @@ def xtf_read_gen(path: str, types: List[XTFHeaderType] = None) -> Generator[ # Read XTF file with open(path, 'rb') as f: # Read initial file header - file_header = XTFFileHeader(buffer=f) + file_header = XTFFileHeader.create_from_buffer(buffer=f) n_channels = file_header.channel_count() if n_channels > 6: @@ -76,7 +77,7 @@ def xtf_read_gen(path: str, types: List[XTFHeaderType] = None) -> Generator[ # How to read and construct each type is implemented in the class (default impl. in XTFBase.__new__) p_class = XTFPacketClasses.get(p_headertype, None) if p_class: - p_header = p_class(buffer=f, file_header=file_header) + p_header = p_class.create_from_buffer(buffer=f, file_header=file_header) yield p_header else: warning_str = 'Unsupported packet type \'{}\' encountered'.format(str(p_headertype)) @@ -107,7 +108,7 @@ def xtf_read_gen(path: str, types: List[XTFHeaderType] = None) -> Generator[ # How to read and construct each type is implemented in the class (default impl. in XTFBase.__new__) p_class = XTFPacketClasses.get(p_headertype, None) if p_class: - p_header = p_class(buffer=f, file_header=file_header) + p_header = p_class.create_from_buffer(buffer=f, file_header=file_header) yield p_header else: warning_str = 'Unsupported packet type \'{}\' encountered'.format(str(p_headertype))