Skip to content

Commit

Permalink
Merge pull request #38 from datajoint-company/master
Browse files Browse the repository at this point in the history
Additional file loading checks and timeseries warnings
  • Loading branch information
CBroz1 authored Feb 6, 2023
2 parents 42ba453 + 8ead810 commit cebbf84
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ wheels/
*.egg-info/
.installed.cfg
*.egg
.idea/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
127 changes: 98 additions & 29 deletions pyopenephys/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import warnings
import json
from natsort import natsorted
import re

from .tools import *
from .openephys_tools import *
Expand Down Expand Up @@ -159,6 +160,18 @@ def __init__(self, foldername):
self._experiments = []
for (rel_path, id) in zip(experiments_names, exp_ids):
self._experiments.append(Experiment(op.join(self._absolute_foldername, rel_path), id, self))
elif list(Path(self._absolute_foldername).rglob('structure.oebin')):
# 'binary' format could also be detected with the existence of `structure.oebin` and `continuous` folder under recordings
oebin_files = list(Path(self._absolute_foldername).rglob('structure.oebin'))
if not np.all([(oebin_file.parent / 'continuous').exists() for oebin_file in oebin_files]):
raise FileNotFoundError(f"Could not find paired 'continuous' file for each oebin in {oebin_file.parent}")

self.format = 'binary'
experiments_names = sorted(set([oebin_file.parent.parent.name for oebin_file in oebin_files]))
exp_ids = [int(exp[-1]) if exp.startswith('experiment') else exp_idx for exp_idx, exp in enumerate(experiments_names)]
self._experiments = []
for (rel_path, id) in zip(experiments_names, exp_ids):
self._experiments.append(Experiment(op.join(self._absolute_foldername, rel_path), id, self))
else:
raise Exception("Only 'binary' and 'openephys' format are supported by pyopenephys")

Expand Down Expand Up @@ -205,8 +218,12 @@ def __init__(self, path, id, file):
self._recordings.append(Recording(self._absolute_foldername, int(self.id), self))

elif self.file.format == 'binary':
self._path = op.dirname(path)
self._read_settings(id)
if (Path(path) / 'settings.xml').exists():
self._path = path
self._read_settings(1)
else:
self._path = op.dirname(path)
self._read_settings(id)
recording_names = natsorted([f for f in os.listdir(self._absolute_foldername)
if os.path.isdir(op.join(self._absolute_foldername, f))
and 'recording' in f])
Expand Down Expand Up @@ -242,20 +259,23 @@ def _read_settings(self, id):
if fname.startswith('settings') and fname.endswith('.xml') and str(id) in fname]

if not len(set_fname) == 1:
# raise IOError('Unique settings file not found')
print("settings.xml not found. Can't load signal chain information")
self._set_fname = None
self.sig_chain = None
self.setting = None
self.format = None
self.nchan = None
self._start_datetime = datetime(1970, 1, 1)
if self.file.format == 'binary':
raise IOError(f'Unique settings file not found in {self._path}')
else:
print("settings.xml not found. Can't load signal chain information")
self._set_fname = None
self.sig_chain = None
self.setting = None
self.format = None
self.nchan = None
self._start_datetime = datetime(1970, 1, 1)
else:
self._set_fname = op.join(self._path, set_fname[0])
with open(self._set_fname) as f:
xmldata = f.read()
self.settings = xmltodict.parse(xmldata)['SETTINGS']
is_v4 = LooseVersion(self.settings['INFO']['VERSION']) >= LooseVersion('0.4.0.0')
is_v6 = LooseVersion(self.settings['INFO']['VERSION']) >= LooseVersion('0.6.0')
# read date in US format
if platform.system() == 'Windows':
locale.setlocale(locale.LC_ALL, 'english')
Expand All @@ -280,7 +300,19 @@ def _read_settings(self, id):
else:
processor_iter = [sigchain['PROCESSOR']]
for processor in processor_iter:
self.sig_chain.update({processor['@name']: processor['@NodeId']})
processor_node_id = processor.get("@nodeId", processor.get("@NodeId"))
if processor_node_id is None:
raise KeyError('Neither "@nodeId" nor "@NodeId" key found')

self.sig_chain.update({processor['@name']: processor_node_id})

if is_v6 and 'Neuropix-PXI' in processor['@name']:
# No explicit "is_source" or "is_sink" in v0.6.0+
# no "CHANNELS" details, thus the "gain" has to be inferred elsewhere
self.acquisition_system = processor['@name'].split('/')[-1]
self._channel_info['gain'] = {}
continue

if is_v4:
is_source = 'CHANNEL_INFO' in processor.keys() and processor['@isSource'] == '1'
is_source_alt = 'CHANNEL' in processor.keys() and processor['@isSource'] == '1'
Expand Down Expand Up @@ -321,7 +353,7 @@ def _read_settings(self, id):
recorder = self.settings['RECORDENGINES']['ENGINE'][recorder_idx]['@id']
if recorder == 'OPENEPHYS':
self.format = 'openephys'
elif recorder == 'RAWBINARY':
elif recorder in ('BINARY', 'RAWBINARY'):
self.format = 'binary'
else:
self.format = None
Expand Down Expand Up @@ -371,7 +403,7 @@ def __init__(self, path, id, experiment):
with oebin_files[0].open('r') as f:
self._oebin = json.load(f)
elif len(oebin_files) == 0:
raise FileNotFoundError("'structre.oebin' file not found! Impossible to retrieve configuration "
raise FileNotFoundError(f"'structure.oebin' file not found in ({self.absolute_foldername})! Impossible to retrieve configuration "
"information")
else:
raise Exception("Multiple oebin files found. Impossible to retrieve configuration information")
Expand Down Expand Up @@ -516,22 +548,30 @@ def _read_sync_message(self):
stimes = []

if self.format == 'binary':
sync_messagefile = [f for f in self.absolute_foldername.iterdir() if 'sync_messages' in f.name][0]
sync_messagefile = [f for f in self.absolute_foldername.iterdir() if 'sync_messages' in f.name]
if sync_messagefile:
sync_messagefile = sync_messagefile[0]
else:
warnings.warn(f'No "sync_messages" file found for binary format in {self.absolute_foldername}')
return info
elif self.format == 'openephys':
if self.experiment.id == 1:
sync_messagefile = self.absolute_foldername / 'messages.events'
else:
sync_messagefile = self.absolute_foldername / f'messages_{self.experiment.id}.events'

is_v4 = LooseVersion(self.experiment.settings['INFO']['VERSION']) >= LooseVersion('0.4.0.0')
is_v6 = LooseVersion(self.experiment.settings['INFO']['VERSION']) >= LooseVersion('0.6.0')
with sync_messagefile.open("r") as fh:
info['_processor_names'] = []
info['_processor_sample_rates'] = []
info['_processor_start_frames'] = []
info['messages'] = []
info['_software_sample_rate'] = None
info['_software_start_frame'] = None
while True:
spl = [s.strip('\x00') for s in fh.readline().split()]
sync_msg_line = fh.readline()
spl = [s.strip('\x00') for s in sync_msg_line.split()]
if not spl:
break
if 'Software' in spl:
Expand Down Expand Up @@ -564,6 +604,13 @@ def _read_sync_message(self):
sr = float(_enumerated_sample_rates[int(encoded_rate) - 1])
info['_processor_sample_rates'].append(sr)
info['_processor_start_frames'].append(int(spl[-1]))
elif sync_msg_line.startswith('Start Time for') and is_v6:
self.processor = True
match = re.match('Start Time for (.*) @ (\d+) Hz: (\d+)', sync_msg_line)
p_name, sr, stime = match.groups()
info['_processor_names'].append(p_name)
info['_processor_sample_rates'].append(float(sr))
info['_processor_start_frames'].append(int(stime))
else:
message = {'time': int(spl[0]),
'message': ' '.join(spl[1:])}
Expand All @@ -577,14 +624,13 @@ def _read_messages(self):
if self.format == 'binary':
if self._events_folder is not None:
message_folder = [f for f in self._events_folder.iterdir() if 'Message_Center' in f.name][0]
text_groups = [f for f in message_folder.iterdir()]
text_groups = [f.parent for f in Path(message_folder).rglob('*text.npy')]

if self.format == 'binary':
for tg in text_groups:
text = np.load(tg / 'text.npy')
ts = np.load(tg / 'timestamps.npy')
channels = np.load(tg / 'channels.npy')

ts = ts / self.sample_rate
ts = _load_timestamps(tg / 'timestamps.npy', self.sample_rate)
ts -= self.start_time

if len(text) > 0:
Expand Down Expand Up @@ -625,7 +671,7 @@ def _read_events(self):
TTL_groups = [f for f in processor_folder.iterdir() if 'TTL' in f.name]
for ttl in TTL_groups:
full_words = np.load(ttl / 'full_words.npy')
ts = np.load(ttl / 'timestamps.npy')
ts = _load_timestamps(ttl / 'timestamps.npy', self.sample_rate)
channels = np.load(ttl / 'channels.npy').astype(int)
unique_channels = np.unique(channels)
channel_states = np.load(ttl / 'channel_states.npy')
Expand All @@ -649,7 +695,6 @@ def _read_events(self):
else:
chan_states = None

ts_chans = ts_chans / self.sample_rate
ts_chans -= self.start_time
processor_folder_split = processor_folder.name.split("-")

Expand All @@ -669,7 +714,6 @@ def _read_events(self):
binary_groups = [f for f in processor_folder.iterdir() if 'binary' in f.name]
for bg in binary_groups:
full_words = np.load(bg / 'full_words.npy')
ts = np.load(bg / 'timestamps.npy')
channels = np.load(bg / 'channels.npy').astype(int)
channel_states = np.load(bg / 'channel_states.npy')
channel_states = channel_states / np.max(channel_states).astype(int)
Expand All @@ -684,7 +728,7 @@ def _read_events(self):
else:
sample_rate = self.sample_rate

ts = ts / sample_rate
ts = _load_timestamps(bg / 'timestamps.npy', sample_rate)
ts -= self.start_time

processor_folder_split = processor_folder.name.split("-")
Expand Down Expand Up @@ -743,7 +787,6 @@ def _read_tracking(self):
binary_groups = [f for f in tracking_folder.iterdir()]
for bg in binary_groups:
data_array = np.load(bg / 'data_array.npy')
ts = np.load(bg / 'timestamps.npy')
channels = np.load(bg / 'channels.npy')
metadata = np.load(bg / 'metadata.npy')
data_array = np.array([struct.unpack('4f', d) for d in data_array])
Expand All @@ -753,7 +796,7 @@ def _read_tracking(self):
else:
sample_rate = self.sample_rate

ts = ts / sample_rate
ts = _load_timestamps(bg / 'timestamps.npy', sample_rate)
ts -= self.start_time

if len(ts) > 0:
Expand Down Expand Up @@ -790,16 +833,16 @@ def _read_analog_signals(self):
data_folder = self._continuous_folder / cont["folder_name"]
nchan = cont["num_channels"]
sample_rate = cont["sample_rate"]
datfiles = [f for f in data_folder.iterdir() if f.suffix == '.dat']
datfiles = [f for f in data_folder.iterdir() if f.name == 'continuous.dat']

if len(datfiles) == 1:
datfile = datfiles[0]
with datfile.open("rb") as fh:
anas, nsamples = read_analog_binary_signals(fh, nchan)
ts = np.load(data_folder / 'timestamps.npy') / sample_rate
ts = _load_timestamps(data_folder / 'timestamps.npy', sample_rate)
self._start_times.append(ts[0] * pq.s)
if len(ts) != nsamples:
warnings.warn('timestamps and nsamples are different!')
warnings.warn('timestamps and nsamples are different ({})!'.format(data_folder))
ts = np.arange(nsamples) / sample_rate
else:
ts -= ts[0]
Expand Down Expand Up @@ -841,7 +884,7 @@ def _read_analog_signals(self):
datfile = [f for f in filenames if '.dat' in f and 'continuous' in f][0]
with open(op.join(processor_folder, datfile), "rb") as fh:
anas, nsamples = read_analog_binary_signals(fh, self.nchan)
ts = np.load(op.join(processor_folder, 'timestamps.npy')) / sample_rate
ts = _load_timestamps(processor_folder / 'timestamps.npy', sample_rate)
self._start_times.append(ts[0] * pq.s)
if len(ts) != nsamples:
warnings.warn('timestamps and nsamples are different!')
Expand Down Expand Up @@ -1060,3 +1103,29 @@ def export_matlab(self, filename):
dict_to_save.update({'events': np.array([ev.times for ev in self.events])})

sio.savemat(filename, dict_to_save)


def _load_timestamps(ts_npy_file, sample_rate):
"""
Load timestamps.npy file
Detect whether timestamps.npy is in sample or second
Returns timestamps in second (apply sample_rate if needed)
"""
ts = np.load(ts_npy_file)

if ts.dtype == np.int32 or ts.dtype == np.int64:
return ts / sample_rate

ts_diff = np.diff(ts)
if any(ts_diff <= 0):
warnings.warn('Loaded timestamps ({}) not monotonically increasing - constructing timestamps from sample rate instead!'.format(ts_npy_file))
return np.arange(len(ts)) / sample_rate

period = np.median(ts_diff)
if period == 1:
return ts / sample_rate

fs = 1/period
if not np.isclose(sample_rate, fs, rtol=3e-4):
raise ValueError(f'Error loading timestamps ({ts_npy_file})\nSignificant discrepancy found in the provided sample rate ({sample_rate}) and that computed from the data ({fs})')
return ts
2 changes: 1 addition & 1 deletion pyopenephys/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.1.5'
version = '1.1.6'

0 comments on commit cebbf84

Please sign in to comment.