From d35a1feb9da315afe6bf6060ede536f8a32956f3 Mon Sep 17 00:00:00 2001 From: Christoph Deil Date: Fri, 8 Oct 2021 17:43:35 +0200 Subject: [PATCH] Write header parser with Numpy --- test_whisper_pandas.py | 26 ++++++++-- whisper_pandas.py | 105 +++++++++++++++++++++++++++++++---------- 2 files changed, 103 insertions(+), 28 deletions(-) diff --git a/test_whisper_pandas.py b/test_whisper_pandas.py index 3d6e774..89f9a69 100644 --- a/test_whisper_pandas.py +++ b/test_whisper_pandas.py @@ -2,7 +2,7 @@ import pandas as pd from numpy.testing import assert_allclose -from whisper_pandas import WhisperFile, WhisperFileMeta +from whisper_pandas import WhisperFile, WhisperFileMeta, WhisperArchiveMeta @pytest.fixture(scope="session") @@ -22,9 +22,27 @@ def test_meta(meta): assert meta.max_retention == 315363600 assert_allclose(meta.x_files_factor, 0.5) - assert meta.archives[0].seconds_per_point == 10 - assert meta.archives[1].seconds_per_point == 60 - assert meta.archives[2].seconds_per_point == 3600 + assert meta.header_size == 52 + assert meta.file_size == 82785664 + assert meta.file_size_actual == 82785664 + assert meta.file_size_mismatch is False + + assert len(meta.archives) == 3 + assert meta.archives[0] == WhisperArchiveMeta( + index=0, offset=52, seconds_per_point=10, points=1555200 + ) + assert meta.archives[1] == WhisperArchiveMeta( + index=1, + offset=18662452, + seconds_per_point=60, + points=5256000, + ) + assert meta.archives[2] == WhisperArchiveMeta( + index=2, + offset=81734452, + seconds_per_point=3600, + points=87601, + ) def test_data_archive_1(wsp): diff --git a/whisper_pandas.py b/whisper_pandas.py index 226e609..ec6b4e3 100755 --- a/whisper_pandas.py +++ b/whisper_pandas.py @@ -6,7 +6,6 @@ from typing import List import numpy as np import pandas as pd -import whisper __all__ = [ "WhisperFile", @@ -16,11 +15,28 @@ # Whisper file element formats # See https://graphite.readthedocs.io/en/latest/whisper.html#database-format - +FMT_FILE_META = np.dtype( + [ + ("aggregation_type", ">u4"), + ("max_retention", ">u4"), + ("x_files_factor", ">f4"), + ("archive_count", ">u4"), + ] +) +FMT_ARCHIVE_META = np.dtype( + [("offset", ">u4"), ("seconds_per_point", ">u4"), ("points", ">u4")] +) FMT_POINT = np.dtype([("time", ">u4"), ("val", ">f8")]) -# TODO: change to format dtype and use it for parsing header -N_ARCHIVE = 12 -N_META = 16 +AGGREGATION_TYPE_TO_METHOD = { + 1: "average", + 2: "sum", + 3: "last", + 4: "max", + 5: "min", + 6: "avg_zero", + 7: "absmax", + 8: "absmin", +} @dataclasses.dataclass @@ -31,10 +47,23 @@ class WhisperArchiveMeta: offset: int seconds_per_point: int points: int - retention: int + + @classmethod + def _from_fh(cls, fh, index: int): + meta = np.fromfile(fh, dtype=FMT_ARCHIVE_META, count=1)[0] + return cls( + index=index, + offset=int(meta["offset"]), + seconds_per_point=float(meta["seconds_per_point"]), + points=int(meta["points"]), + ) + + @property + def retention(self) -> int: + return self.seconds_per_point * self.points @property - def size(self): + def size(self) -> int: return FMT_POINT.itemsize * self.points def print_info(self): @@ -56,32 +85,37 @@ class WhisperFileMeta: x_files_factor: float archives: List[WhisperArchiveMeta] + @staticmethod + def _meta_from_fh(fh): + meta = np.fromfile(fh, dtype=FMT_FILE_META, count=1)[0] + aggregation_method = AGGREGATION_TYPE_TO_METHOD[int(meta["aggregation_type"])] + return { + "aggregation_method": aggregation_method, + "max_retention": int(meta["max_retention"]), + "x_files_factor": float(meta["x_files_factor"]), + "archive_count": int(meta["archive_count"]), + } + @classmethod - def read(cls, path) -> "WhisperFileMeta": - info = whisper.info(path) + def _from_fh(cls, fh, path) -> "WhisperFileMeta": + file_meta = cls._meta_from_fh(fh) archives = [] - for index, _ in enumerate(info["archives"]): - archive = WhisperArchiveMeta( - index=index, - offset=_["offset"], - seconds_per_point=_["secondsPerPoint"], - points=_["points"], - retention=_["retention"], - ) - archives.append(archive) + for idx in range(file_meta["archive_count"]): + archive_meta = WhisperArchiveMeta._from_fh(fh, idx) + archives.append(archive_meta) return cls( path=str(path), - aggregation_method=info["aggregationMethod"], - max_retention=info["maxRetention"], - x_files_factor=info["xFilesFactor"], + aggregation_method=file_meta["aggregation_method"], + max_retention=file_meta["max_retention"], + x_files_factor=file_meta["x_files_factor"], archives=archives, ) @property def header_size(self) -> int: """Whisper file header size in bytes""" - return N_META + N_ARCHIVE * len(self.archives) + return FMT_FILE_META.itemsize + FMT_ARCHIVE_META.itemsize * len(self.archives) @property def file_size(self) -> int: @@ -123,7 +157,11 @@ class WhisperFile: @classmethod def read( - cls, path, archives: List[int] = None, dtype: str = "float32" + cls, + path, + archives: List[int] = None, + dtype: str = "float32", + meta_only: bool = False, ) -> "WhisperFile": """Read Whisper archive into a pandas.Series. @@ -137,12 +175,19 @@ def read( Default: all dtype : {"float32", "float64"} Value float data type + meta_only : bool + Only read metadata from file header """ - meta = WhisperFileMeta.read(path) + with Path(path).open("rb") as fh: + meta = WhisperFileMeta._from_fh(fh, path=path) + + if meta_only: + return cls(meta=meta, data=[]) data = [] for archive_id in range(len(meta.archives)): if archives is None or archive_id in archives: + # TODO: pass fh here, avoid 2x file open series = read_whisper_archive( path, info=meta.archives[archive_id], dtype=dtype ) @@ -182,6 +227,18 @@ def read_whisper_archive( return pd.Series(val, index).sort_index() +def read_whisper_archive_dataframe( + path: str, archive_id: int, dtype: str = "float32" +) -> pd.DataFrame: + info = WhisperFileMeta.read(path).archives[archive_id] + data = np.fromfile(path, dtype=FMT_POINT, count=info.points, offset=info.offset) + data = data[data["time"] != 0] + value = data["val"].astype(dtype) + time = data["time"].astype("uint32") + df = pd.DataFrame({"timestamp": time, "value": value}).sort_values("timestamp") + return df + + def main(): """Command line tool""" parser = argparse.ArgumentParser()