Skip to content

Commit

Permalink
feat: add end of run workflow with data validation
Browse files Browse the repository at this point in the history
  • Loading branch information
maffettone committed Jul 1, 2024
1 parent 55fe525 commit 6de49ac
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
28 changes: 28 additions & 0 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time

from prefect import flow, get_run_logger, task
from tiled.client import from_profile

tiled_client = from_profile("nsls2")


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym="ucal"):
logger = get_run_logger()
run = tiled_client[beamline_acronym]["raw"][uid]
logger.info(f"Validating uid {run.start['uid']}")
start_time = time.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = time.monotonic()
stream_data = run[stream].read()
stream_elapsed_time = time.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = time.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@flow
def general_data_validation(uid, beamline_acronym="ucal"):
read_all_streams(uid, beamline_acronym)
17 changes: 17 additions & 0 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from prefect import flow, get_run_logger, task

from data_validation import general_data_validation


@task
def log_completion():
logger = get_run_logger()
logger.info("Complete")


@flow
def end_of_run_workflow(stop_doc):
uid = stop_doc["run_start"]
general_data_validation(uid)
# Here is where exporters could be added
log_completion()

0 comments on commit 6de49ac

Please sign in to comment.