Skip to content

Commit

Permalink
Refactoring: Use class method instead of overloading __new__
Browse files Browse the repository at this point in the history
  • Loading branch information
oysstu committed Jul 22, 2018
1 parent 03371b7 commit 6b2c094
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 326 deletions.
108 changes: 43 additions & 65 deletions pyxtf/vendors/kongsberg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'''
This file contains some of the kongsberg datagram formats.
This file contains some of the kongsberg datagram formats. Only used for the raw datagrams
Based on Kongsberg Document 850-160692 (EM Series Datagram Formats) Rev.U (June 2015)
Note: use pyall if you're trying to read all-files
The following echosounders: EM3002, EM710, EM302, EM122, EM2040, EM2040C and ME70BO
use little endian byte ordering. Beware that others might not.
Expand All @@ -12,7 +13,6 @@
from enum import IntEnum, unique
from io import IOBase, BytesIO
import numpy as np
from datetime import date

from pyxtf.xtf_ctypes import XTFBase
import warnings
Expand Down Expand Up @@ -85,12 +85,6 @@ class KMOutputDatagramHeader(KMBase):
('Time', ctypes.c_uint32) # Time since midnight in milliseconds
]

def __new__(cls, buffer: IOBase = None):
return super().__new__(cls, buffer=buffer)

def __init__(self, buffer: IOBase = None, *args, **kwargs):
super().__init__(buffer=buffer, *args, **kwargs)


class KMRawRangeAngle78_TX(XTFBase):
'''
Expand All @@ -109,12 +103,6 @@ class KMRawRangeAngle78_TX(XTFBase):
('SignalBandwidth', ctypes.c_float) # In Hz
]

def __new__(cls, buffer: IOBase = None):
return super().__new__(cls, buffer=buffer)

def __init__(self, buffer: IOBase = None, *args, **kwargs):
super().__init__(buffer=buffer, *args, **kwargs)


class KMRawRangeAngle78_RX(XTFBase):
'''
Expand All @@ -134,12 +122,6 @@ class KMRawRangeAngle78_RX(XTFBase):
('Spare', ctypes.c_uint8)
]

def __new__(cls, buffer: IOBase = None):
return super().__new__(cls, buffer=buffer)

def __init__(self, buffer: IOBase = None, *args, **kwargs):
super().__init__(buffer=buffer, *args, **kwargs)

def has_valid_detection(self):
return not bool(self.DetectionInfo & 0b10000000)

Expand Down Expand Up @@ -196,59 +178,55 @@ class KMRawRangeAngle78(KMBase):
('Checksum', ctypes.c_uint16) # Checksum of data between STX and ETX
]

def __init__(self, buffer=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if not buffer:
self.StartID = 0x02
self.DatagramType = KMDatagramType.raw_range_and_angle_78.value
self.EndID = 0x03

def __new__(cls, buffer: IOBase = None):
if buffer:
if type(buffer) in [bytes, bytearray]:
buffer = BytesIO(buffer)
@classmethod
def create_from_buffer(cls, buffer: IOBase, file_header=None):
if type(buffer) in [bytes, bytearray]:
buffer = BytesIO(buffer)

# Read bytes up until the variable-sized data
base_bytes = buffer.read(cls.TX.offset)
n_bytes = ctypes.c_uint32.from_buffer_copy(base_bytes, cls.NumberOfBytes.offset).value
n_tx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Ntx.offset).value
n_rx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Nrx.offset).value

# Read remaining bytes
remaining_bytes = buffer.read(n_bytes - cls.TX.offset + cls.NumberOfBytes.size)

# Create new class dynamically with string array at the correct size
new_name = cls.__name__ + '_ntx{}_nrx{}'.format(n_tx, n_rx)
new_fields = cls._fields_.copy()
tx_idx = [i for i, (name, type) in enumerate(cls._fields_) if name == 'TX'][0]
rx_idx = [i for i, (name, type) in enumerate(cls._fields_) if name == 'RX'][0]
new_fields[tx_idx] = ('TX', KMRawRangeAngle78_TX * n_tx)
new_fields[rx_idx] = ('RX', KMRawRangeAngle78_RX * n_rx)
new_cls = type(new_name, (ctypes.LittleEndianStructure,), {
'__str__': cls.__str__,
'_pack_': cls._pack_,
'_fields_': new_fields
})

all_bytes = base_bytes + remaining_bytes
obj = new_cls.from_buffer_copy(all_bytes)

# Checksum (not crc16, but a straight sum of bytes with overflow)
chk = (sum(all_bytes[new_cls.DatagramType.offset:new_cls.EndID.offset]) & 0xFFFF)
if chk != obj.Checksum:
warning_str = '{}: Checksum failed'.format(cls.__name__)
warnings.warn(warning_str)

else:
obj = super().__new__(cls)
base_bytes = buffer.read(cls.TX.offset)
n_bytes = ctypes.c_uint32.from_buffer_copy(base_bytes, cls.NumberOfBytes.offset).value
n_tx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Ntx.offset).value
n_rx = ctypes.c_uint16.from_buffer_copy(base_bytes, cls.Nrx.offset).value

# Read remaining bytes
remaining_bytes = buffer.read(n_bytes - cls.TX.offset + cls.NumberOfBytes.size)

# Create new class dynamically with string array at the correct size
new_name = cls.__name__ + '_ntx{}_nrx{}'.format(n_tx, n_rx)
new_fields = cls._fields_.copy()
tx_idx = [i for i, (name, fieldtype) in enumerate(cls._fields_) if name == 'TX'][0]
rx_idx = [i for i, (name, fieldtype) in enumerate(cls._fields_) if name == 'RX'][0]
new_fields[tx_idx] = ('TX', KMRawRangeAngle78_TX * n_tx)
new_fields[rx_idx] = ('RX', KMRawRangeAngle78_RX * n_rx)
new_cls = type(new_name, (ctypes.LittleEndianStructure,), {
'__str__': cls.__str__,
'_pack_': cls._pack_,
'_fields_': new_fields
})

all_bytes = base_bytes + remaining_bytes
obj = new_cls.from_buffer_copy(all_bytes)

# Checksum (not crc16, but a straight sum of bytes with overflow)
chk = (sum(all_bytes[new_cls.DatagramType.offset:new_cls.EndID.offset]) & 0xFFFF)
if chk != obj.Checksum:
warning_str = '{}: Checksum failed'.format(cls.__name__)
warnings.warn(warning_str)

return obj

def __init__(self):
super().__init__()
self.StartID = 0x02
self.DatagramType = KMDatagramType.raw_range_and_angle_78.value
self.EndID = 0x03


if __name__ == '__main__':
from pyxtf.xtf_io import xtf_read
from pyxtf.xtf_ctypes import XTFHeaderType
test_path = r'..\..\data\Survey\27apr\EM2040\em2040-0007-l02-20160427-124929_RAW.xtf'
test_path = r'input.xtf'
(fh, p) = xtf_read(test_path)

print('The following (supported) packets are present (XTFHeaderType:count): \n\t{}\n'.format(
Expand All @@ -258,6 +236,6 @@ def __new__(cls, buffer: IOBase = None):
if XTFHeaderType.multibeam_raw_beam_angle in p:
# The KMRawRangeAngle78 data is stored as raw bytes in the data field of a XTFPingHeader type
data_bytes = p[XTFHeaderType.multibeam_raw_beam_angle][0].data
decoded_data = KMRawRangeAngle78(data_bytes)
decoded_data = KMRawRangeAngle78.create_from_buffer(buffer=data_bytes)
print(decoded_data)

Loading

0 comments on commit 6b2c094

Please sign in to comment.