diff --git a/src/fxci_etl/pulse/handlers/bigquery.py b/src/fxci_etl/pulse/handlers/bigquery.py index 96e6693..653e25b 100644 --- a/src/fxci_etl/pulse/handlers/bigquery.py +++ b/src/fxci_etl/pulse/handlers/bigquery.py @@ -8,23 +8,23 @@ @dataclass class Run(Record): - reasonCreated: str - reasonResolved: str + reason_created: str + reason_resolved: str resolved: str - runId: int + run_id: int scheduled: str started: str state: str - taskId: str - workerGroup: str - workerId: str + task_id: str + worker_group: str + worker_id: str @classmethod def table_name(cls): return "task_runs" def __str__(self): - return f"{self.taskId} run {self.runId}" + return f"{self.task_id} run {self.run_id}" @dataclass @@ -35,10 +35,10 @@ class Tag: @dataclass class Task(Record): - schedulerId: str - taskGroupId: str - taskId: str - taskQueueId: str + scheduler_id: str + task_group_id: str + task_id: str + task_queue_id: str tags: list[Tag] @classmethod @@ -46,7 +46,7 @@ def table_name(cls): return "tasks" def __str__(self): - return self.taskId + return self.task_id @register() @@ -61,40 +61,41 @@ def process_events(self, events): records = [] for event in events: data = event.data - run_record = {"taskId": data["status"]["taskId"]} - - for key in ("runId", "workerGroup", "workerId"): - run_record[key] = data[key] - - for key in ( - "reasonCreated", - "reasonResolved", - "resolved", - "scheduled", - "started", - "state", - "workerGroup", - "workerId", - ): - run_record[key] = data["status"]["runs"][-1][key] - records.append(Run.from_dict(run_record)) - - if run_record["runId"] == 0: + status = data["status"] + run = data["status"]["runs"][-1] + records.append( + Run.from_dict( + { + "task_id": status["taskId"], + "reason_created": run["reasonCreated"], + "reason_resolved": run["reasonResolved"], + "resolved": run["resolved"], + "run_id": data["runId"], + "scheduled": run["scheduled"], + "started": run["started"], + "state": run["state"], + "worker_group": run["workerGroup"], + "worker_id": run["workerId"], + } + ) + ) + + if data["runId"] == 0: # Only insert the task record for run 0 to avoid duplicate records. - task_record = { - "tags": [ - {"key": k, "value": v} for k, v in data["task"]["tags"].items() - ] - } - for key in ( - "schedulerId", - "taskGroupId", - "taskId", - "taskQueueId", - ): - task_record[key] = data["status"][key] - - records.append(Task.from_dict(task_record)) + records.append( + Task.from_dict( + { + "scheduler_id": status["schedulerId"], + "tags": [ + {"key": k, "value": v} + for k, v in data["task"]["tags"].items() + ], + "task_group_id": status["taskGroupId"], + "task_id": status["taskId"], + "task_queue_id": status["taskQueueId"], + } + ) + ) if records: self.loader.insert(records)