Skip to content

Commit

Permalink
implemented ScanData1D class
Browse files Browse the repository at this point in the history
  • Loading branch information
Bing Li committed Oct 14, 2024
1 parent 4e8570c commit ecd7390
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 284 deletions.
2 changes: 2 additions & 0 deletions src/tavi/data/nxentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _read_recursively(nexus_entry, items=None):
value = str(value.asstr()[...])
except TypeError: # arrays instead
value = value[...]
if not np.shape(value): # single element
value = value.tolist()

if not attr_dict: # empty attributes
items.update({key: {"dataset": value}})
Expand Down
196 changes: 50 additions & 146 deletions src/tavi/data/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from tavi.data.nxentry import NexusEntry
from tavi.data.plotter import Plot1D
from tavi.data.scan_data import ScanData1D
from tavi.sample.xtal import Xtal
from tavi.utilities import spice_to_mantid

Expand Down Expand Up @@ -166,110 +167,25 @@ def get_data(self) -> dict:
data_dict.update({name: self._nexus_dict.get(name)})
return data_dict

def _rebin_tol(
self,
x_raw: np.ndarray,
y_raw: np.ndarray,
y_str: str,
rebin_params: tuple,
norm_channel: Literal["time", "monitor", "mcu", None],
norm_val: float,
):
"""Rebin with tolerance"""

rebin_min, rebin_max, rebin_step = rebin_params
if rebin_min is None:
rebin_min = np.min(x_raw)
if rebin_max is None:
rebin_max = np.max(x_raw)

x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
x = np.zeros_like(x_grid)
y = np.zeros_like(x_grid)
counts = np.zeros_like(x_grid)
weights = np.zeros_like(x_grid)
yerr = None

if norm_channel is None: # rebin, no renorm
weight_channel = self.scan_info.preset_channel
weight = self.data[weight_channel]
for i, x0 in enumerate(x_raw):
idx = np.nanargmax(x_grid + rebin_step / 2 >= x0)
y[idx] += y_raw[i]
x[idx] += x_raw[i] * weight[i]
weights[idx] += weight[i]
counts[idx] += 1

# errror bars for detector only
if "detector" in y_str:
yerr = np.sqrt(y) / counts
y = y / counts
x = x / weights
return (x, y, yerr)

# rebin and renorm
norm = self.data[norm_channel]
for i, x0 in enumerate(x_raw):
idx = np.nanargmax(x_grid + rebin_step / 2 >= x0)
y[idx] += y_raw[i]
x[idx] += x_raw[i] * norm[i]
counts[idx] += norm[i]

# errror bars for detector only
if "detector" in y_str:
yerr = np.sqrt(y) / counts * norm_val
y = y / counts * norm_val
x = x / counts
return (x, y, yerr)

def _rebin_grid(
self,
x_raw: np.ndarray,
y_raw: np.ndarray,
y_str: str,
rebin_params: tuple,
norm_channel: Literal["time", "monitor", "mcu", None],
norm_val: float,
):
"""Rebin with a regular grid"""

rebin_min, rebin_max, rebin_step = rebin_params
if rebin_min is None:
rebin_min = np.min(x_raw)
if rebin_max is None:
rebin_max = np.max(x_raw)

x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
y = np.zeros_like(x)
cts = np.zeros_like(x)
yerr = None
# rebin, no renorm
if norm_channel is None:
for i, x0 in enumerate(x_raw):
idx = np.nanargmax(x + rebin_step / 2 >= x0)
y[idx] += y_raw[i]
cts[idx] += 1

# errror bars for detector only
if "detector" in y_str:
yerr = np.sqrt(y) / cts
y = y / cts
return (x, y, yerr)

# rebin and renorm
norm = self.data[norm_channel]
for i, x0 in enumerate(x_raw):
idx = np.nanargmax(x + rebin_step / 2 >= x0)
y[idx] += y_raw[i]
cts[idx] += norm[i]

# errror bars for detector only
if "detector" in y_str:
yerr = np.sqrt(y) / cts * norm_val
y = y / cts * norm_val
return (x, y, yerr)

def generate_curve(
@staticmethod
def validate_rebin_params(rebin_params):
if isinstance(rebin_params, tuple):
if len(rebin_params) != 3:
raise ValueError("Rebin parameters should have the form (min, max, step)")
rebin_min, rebin_max, rebin_step = rebin_params
if (rebin_min >= rebin_max) or (rebin_step < 0):
raise ValueError(f"Nonsensical rebin parameters {rebin_params}")

elif isinstance(rebin_params, float | int):
if rebin_params < 0:
raise ValueError("Rebin step needs to be greater than zero.")
rebin_params = (None, None, float(rebin_params))

else:
raise ValueError(f"Unrecogonized rebin parameters {rebin_params}")
return rebin_params

def get_plot_data(
self,
x_str: Optional[str] = None,
y_str: Optional[str] = None,
Expand All @@ -291,63 +207,51 @@ def generate_curve(
take as (min, max, step) if a tuple of size 3 is given
"""

if x_str is None:
x_str = self.scan_info.def_x

if y_str is None:
y_str = self.scan_info.def_y
x_str = self.scan_info.def_x if x_str is None else x_str
y_str = self.scan_info.def_y if y_str is None else y_str

x_raw = self.data[x_str]
y_raw = self.data[y_str]

yerr = None
scan_data = ScanData1D(x=self.data[x_str], y=self.data[y_str])

if rebin_type is None: # no rebin
x = x_raw
y = y_raw
# errror bars for detector only
if "detector" in y_str:
yerr = np.sqrt(y)
# normalize y-axis without rebining along x-axis
if norm_channel is not None:
norm = self.data[norm_channel] / norm_val
y = y / norm
if yerr is not None:
yerr = yerr / norm

plot1d = Plot1D(x=x, y=y, yerr=yerr)
if norm_channel is not None: # normalize y-axis without rebining along x-axis
scan_data.renorm(norm_col=self.data[norm_channel] / norm_val)

plot1d = Plot1D(x=scan_data.x, y=scan_data.y, yerr=scan_data.err)
plot1d.make_labels(x_str, y_str, norm_channel, norm_val, self.scan_info)

return plot1d

# validate rebin params
if isinstance(rebin_params, tuple):
if len(rebin_params) != 3:
raise ValueError("Rebin parameters should have the form (min, max, step)")
rebin_min, rebin_max, rebin_step = rebin_params
if (rebin_min >= rebin_max) or (rebin_step < 0):
raise ValueError(f"Nonsensical rebin parameters {rebin_params}")

elif isinstance(rebin_params, float | int):
if rebin_params < 0:
raise ValueError("Rebin step needs to be greater than zero.")
rebin_params = (None, float(rebin_params), None)
else:
raise ValueError(f"Unrecogonized rebin parameters {rebin_params}")
# Rebin, first validate rebin params
rebin_params_tuple = Scan.validate_rebin_params(rebin_params)

match rebin_type:
case "tol": # x weighted by normalization channel
x, y, yerr = self._rebin_tol(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val)
case "tol":
if norm_channel is None: # x weighted by preset channel
weight_channel = self.scan_info.preset_channel
scan_data.rebin_tol(rebin_params_tuple, weight_col=self.data[weight_channel])
else: # x weighted by normalization channel
scan_data.rebin_tol_renorm(
rebin_params_tuple,
norm_col=self.data[norm_channel],
norm_val=norm_val,
)
case "grid":
x, y, yerr = self._rebin_grid(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val)
if norm_channel is None:
scan_data.rebin_grid(rebin_params_tuple)
else:
scan_data.rebin_grid_renorm(
rebin_params_tuple,
norm_col=self.data[norm_channel],
norm_val=norm_val,
)
case _:
raise ValueError('Unrecogonized rebin type. Needs to be "tol" or "grid".')

plot1d = Plot1D(x=x, y=y, yerr=yerr)
plot1d = Plot1D(x=scan_data.x, y=scan_data.y, yerr=scan_data.err)
plot1d.make_labels(x_str, y_str, norm_channel, norm_val, self.scan_info)
return plot1d

def plot_curve(
def plot(
self,
x_str: Optional[str] = None,
y_str: Optional[str] = None,
Expand All @@ -358,7 +262,7 @@ def plot_curve(
):
"""Plot a 1D curve gnerated from a singal scan in a new window"""

plot1d = self.generate_curve(x_str, y_str, norm_channel, norm_val, rebin_type, rebin_step)
plot1d = self.get_plot_data(x_str, y_str, norm_channel, norm_val, rebin_type, rebin_step)

fig, ax = plt.subplots()
plot1d.plot_curve(ax)
Expand Down
110 changes: 110 additions & 0 deletions src/tavi/data/scan_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-


import numpy as np


class ScanData1D(object):

ZERO = 1e-6

def __init__(self, x: np.ndarray, y: np.ndarray) -> None:

self.ind = np.argsort(x)
self.x = x[self.ind]
self.y = y[self.ind]
self.err = np.sqrt(y)

def renorm(self, norm_col: np.ndarray, norm_val: float = 1.0):
"""Renormalized to norm_val"""
norm_col = norm_col[self.ind]
self.y = self.y / norm_col * norm_val
self.err = self.err / norm_col * norm_val

def rebin_tol(self, rebin_params: tuple, weight_col: np.ndarray):
"""Rebin with tolerance"""

rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
rebin_max = np.max(self.x) if rebin_max is None else rebin_max

x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
x = np.zeros_like(x_grid)
y = np.zeros_like(x_grid)
counts = np.zeros_like(x_grid)
weights = np.zeros_like(x_grid)

for i, x0 in enumerate(self.x):
idx = np.nanargmax(x_grid + rebin_step / 2 + ScanData1D.ZERO >= x0)
y[idx] += self.y[i]
x[idx] += self.x[i] * weight_col[i]
weights[idx] += weight_col[i]
counts[idx] += 1

self.err = np.sqrt(y) / counts
self.y = y / counts
self.x = x / weights

def rebin_tol_renorm(self, rebin_params: tuple, norm_col: np.ndarray, norm_val: float = 1.0):
"""Rebin with tolerance and renormalize"""
rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
rebin_max = np.max(self.x) if rebin_max is None else rebin_max

x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
x = np.zeros_like(x_grid)
y = np.zeros_like(x_grid)
counts = np.zeros_like(x_grid)

norm_col = norm_col[self.ind]

for i, x0 in enumerate(self.x):
idx = np.nanargmax(x_grid + rebin_step / 2 + ScanData1D.ZERO >= x0)
y[idx] += self.y[i]
x[idx] += self.x[i] * norm_col[i]
counts[idx] += norm_col[i]

self.err = np.sqrt(y) / counts * norm_val
self.y = y / counts * norm_val
self.x = x / counts

def rebin_grid(self, rebin_params: tuple):
"""Rebin with a regular grid"""
rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
rebin_max = np.max(self.x) if rebin_max is None else rebin_max

x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
y = np.zeros_like(x)
counts = np.zeros_like(x)

for i, x0 in enumerate(self.x):
idx = np.nanargmax(x + rebin_step / 2 + ScanData1D.ZERO >= x0)
y[idx] += self.y[i]
counts[idx] += 1

self.x = x
self.err = np.sqrt(y) / counts
self.y = y / counts

def rebin_grid_renorm(self, rebin_params: tuple, norm_col: np.ndarray, norm_val: float = 1.0):
"""Rebin with a regular grid and renormalize"""

rebin_min, rebin_max, rebin_step = rebin_params
rebin_min = np.min(self.x) if rebin_min is None else rebin_min
rebin_max = np.max(self.x) if rebin_max is None else rebin_max

x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
y = np.zeros_like(x)
counts = np.zeros_like(x)

norm_col = norm_col[self.ind]

for i, x0 in enumerate(self.x): # plus ZERO helps improve precision
idx = np.nanargmax(x + rebin_step / 2 + ScanData1D.ZERO >= x0)
y[idx] += self.y[i]
counts[idx] += norm_col[i]

self.x = x
self.err = np.sqrt(y) / counts * norm_val
self.y = y / counts * norm_val
Loading

0 comments on commit ecd7390

Please sign in to comment.