Skip to content

Commit

Permalink
fix!: update bigquery task / task_runs schemas to use snake case
Browse files Browse the repository at this point in the history
  • Loading branch information
ahal committed Jun 20, 2024
1 parent 6d86705 commit bfab253
Showing 1 changed file with 46 additions and 45 deletions.
91 changes: 46 additions & 45 deletions src/fxci_etl/pulse/handlers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@

@dataclass
class Run(Record):
reasonCreated: str
reasonResolved: str
reason_created: str
reason_resolved: str
resolved: str
runId: int
run_id: int
scheduled: str
started: str
state: str
taskId: str
workerGroup: str
workerId: str
task_id: str
worker_group: str
worker_id: str

@classmethod
def table_name(cls):
return "task_runs"

def __str__(self):
return f"{self.taskId} run {self.runId}"
return f"{self.task_id} run {self.run_id}"


@dataclass
Expand All @@ -35,18 +35,18 @@ class Tag:

@dataclass
class Task(Record):
schedulerId: str
taskGroupId: str
taskId: str
taskQueueId: str
scheduler_id: str
task_group_id: str
task_id: str
task_queue_id: str
tags: list[Tag]

@classmethod
def table_name(cls):
return "tasks"

def __str__(self):
return self.taskId
return self.task_id


@register()
Expand All @@ -61,40 +61,41 @@ def process_events(self, events):
records = []
for event in events:
data = event.data
run_record = {"taskId": data["status"]["taskId"]}

for key in ("runId", "workerGroup", "workerId"):
run_record[key] = data[key]

for key in (
"reasonCreated",
"reasonResolved",
"resolved",
"scheduled",
"started",
"state",
"workerGroup",
"workerId",
):
run_record[key] = data["status"]["runs"][-1][key]
records.append(Run.from_dict(run_record))

if run_record["runId"] == 0:
status = data["status"]
run = data["status"]["runs"][-1]
records.append(
Run.from_dict(
{
"task_id": status["taskId"],
"reason_created": run["reasonCreated"],
"reason_resolved": run["reasonResolved"],
"resolved": run["resolved"],
"run_id": data["runId"],
"scheduled": run["scheduled"],
"started": run["started"],
"state": run["state"],
"worker_group": run["workerGroup"],
"worker_id": run["workerId"],
}
)
)

if data["runId"] == 0:
# Only insert the task record for run 0 to avoid duplicate records.
task_record = {
"tags": [
{"key": k, "value": v} for k, v in data["task"]["tags"].items()
]
}
for key in (
"schedulerId",
"taskGroupId",
"taskId",
"taskQueueId",
):
task_record[key] = data["status"][key]

records.append(Task.from_dict(task_record))
records.append(
Task.from_dict(
{
"scheduler_id": status["schedulerId"],
"tags": [
{"key": k, "value": v}
for k, v in data["task"]["tags"].items()
],
"task_group_id": status["taskGroupId"],
"task_id": status["taskId"],
"task_queue_id": status["taskQueueId"],
}
)
)

if records:
self.loader.insert(records)

0 comments on commit bfab253

Please sign in to comment.