-
Notifications
You must be signed in to change notification settings - Fork 0
/
change_tracker.py
150 lines (125 loc) · 6.69 KB
/
change_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import psycopg2
import time
import sys
import argparse
def main(table_name, columns, dbname='qdb', user='admin', host='127.0.0.1', port=8812, password='quest', row_threshold=1000, check_interval=30, timestamp_column='timestamp', tracking_table=None, tracking_id=None):
conn = psycopg2.connect(
dbname=dbname,
user=user,
host=host,
port=port,
password=password
)
cur = conn.cursor()
# Initial query to get the latest transaction ID and structure version
if tracking_table and tracking_id:
# Create tracking table if it does not exist
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {tracking_table} (
timestamp TIMESTAMP,
trackingId SYMBOL,
tableName SYMBOL,
sequencerTxn LONG
) timestamp (timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, trackingId, tableName);
""")
conn.commit()
# Get the latest transaction ID from tracking table
cur.execute(f"""
SELECT tableName, sequencerTxn
FROM {tracking_table}
WHERE trackingId = '{tracking_id}'
LATEST ON timestamp
PARTITION BY tableName;
""")
latest_txn_record = cur.fetchone()
if latest_txn_record:
latest_txn_id = latest_txn_record[1]
cur.execute(f"SELECT structureVersion FROM wal_transactions('{table_name}') WHERE sequencerTxn = {latest_txn_id} LIMIT 1")
latest_structure_version = cur.fetchone()[0]
else:
cur.execute(f"SELECT sequencerTxn, structureVersion FROM wal_transactions('{table_name}') ORDER BY sequencerTxn DESC LIMIT 1")
latest_txn_id, latest_structure_version = cur.fetchone()
else:
cur.execute(f"SELECT sequencerTxn, structureVersion FROM wal_transactions('{table_name}') ORDER BY sequencerTxn DESC LIMIT 1")
latest_txn_id, latest_structure_version = cur.fetchone()
print(f"Starting from transaction ID: {latest_txn_id} with structure version: {latest_structure_version}")
while True:
time.sleep(check_interval)
# Query to get new transactions
cur.execute(f"""
SELECT sequencerTxn, minTimestamp, maxTimestamp, rowCount, structureVersion
FROM wal_transactions('{table_name}')
WHERE sequencerTxn > {latest_txn_id}
""")
new_transactions = cur.fetchall()
if not new_transactions:
continue
# Check for structure version changes
for txn in new_transactions:
if txn[4] != latest_structure_version:
print(f"Structure version changed from {latest_structure_version} to {txn[4]} on transaction {txn[0]}")
latest_structure_version = txn[4]
# Aggregate the number of rows in new transactions, ignoring None values
total_new_rows = sum(txn[3] for txn in new_transactions if txn[3] is not None)
if total_new_rows < row_threshold:
continue
# Find the min and max timestamps from new transactions, ignoring None values
min_timestamp = min((txn[1] for txn in new_transactions if txn[1] is not None), default=None)
max_timestamp = max((txn[2] for txn in new_transactions if txn[2] is not None), default=None)
if min_timestamp is None or max_timestamp is None:
continue
# Construct the aggregation query
column_list = columns.split(',')
aggregations = ', '.join([
f"first({col}) AS {col}_first, last({col}) AS {col}_last, min({col}) AS {col}_min, max({col}) AS {col}_max, avg({col}) AS {col}_avg"
for col in column_list
])
query = f"""
SELECT {aggregations}
FROM {table_name}
WHERE {timestamp_column} BETWEEN '{min_timestamp}' AND '{max_timestamp}'
"""
# Execute the aggregation query
cur.execute(query)
results = cur.fetchone()
# Output the results
print(f"Aggregated results from {min_timestamp} to {max_timestamp}:")
print(f"Included Transactions: {new_transactions[0][0]} to {new_transactions[-1][0]}")
print(f"Total Rows: {total_new_rows}")
# Print column headers
headers = [f"{col}_first" for col in column_list] + \
[f"{col}_last" for col in column_list] + \
[f"{col}_min" for col in column_list] + \
[f"{col}_max" for col in column_list] + \
[f"{col}_avg" for col in column_list]
print(", ".join(headers))
# Print the results
print(", ".join(map(str, results)))
# Update the latest transaction ID
latest_txn_id = new_transactions[-1][0]
# Update tracking table with the latest transaction
if tracking_table and tracking_id:
timestamp_now = time.strftime('%Y-%m-%dT%H:%M:%S')
cur.execute(f"""
INSERT INTO {tracking_table} (timestamp, trackingId, tableName, sequencerTxn)
VALUES ('{timestamp_now}', '{tracking_id}', '{table_name}', {latest_txn_id})
""")
conn.commit()
cur.close()
conn.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Monitor and aggregate changes in a QuestDB table.')
parser.add_argument('--table_name', required=True, help='The name of the table to monitor.')
parser.add_argument('--row_threshold', type=int, default=1000, help='The number of rows to trigger aggregation.')
parser.add_argument('--check_interval', type=int, default=30, help='The interval (in seconds) to check for new transactions.')
parser.add_argument('--columns', required=True, help='Comma-separated list of columns to aggregate.')
parser.add_argument('--timestamp_column', default='timestamp', help='The name of the timestamp column.')
parser.add_argument('--dbname', default='qdb', help='The name of the database.')
parser.add_argument('--user', default='admin', help='The database user.')
parser.add_argument('--host', default='127.0.0.1', help='The database host.')
parser.add_argument('--port', type=int, default=8812, help='The database port.')
parser.add_argument('--password', default='quest', help='The database password.')
parser.add_argument('--tracking_table', help='The name of the tracking table.')
parser.add_argument('--tracking_id', help='The tracking ID for this run.')
args = parser.parse_args()
main(args.table_name, args.columns, args.dbname, args.user, args.host, args.port, args.password, args.row_threshold, args.check_interval, args.timestamp_column, args.tracking_table, args.tracking_id)