From dc0a24de08f6324fb9ac896685be7d54c52ad84e Mon Sep 17 00:00:00 2001 From: Filipp Ozinov Date: Sun, 27 Oct 2024 23:50:26 +0400 Subject: [PATCH] Write exceptions to log files --- mysql_ch_replicator/binlog_replicator.py | 16 ++++++++- mysql_ch_replicator/db_replicator.py | 46 +++++++++++++----------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/mysql_ch_replicator/binlog_replicator.py b/mysql_ch_replicator/binlog_replicator.py index 5e35174..b71b500 100644 --- a/mysql_ch_replicator/binlog_replicator.py +++ b/mysql_ch_replicator/binlog_replicator.py @@ -385,11 +385,22 @@ def run(self): killer = GracefulKiller() + last_log_time = time.time() + total_processed_events = 0 + while not killer.kill_now: try: + curr_time = time.time() + if curr_time - last_log_time > 60: + last_log_time = curr_time + logger.info( + f'last transaction id: {last_transaction_id}, processed events: {total_processed_events}', + ) + last_read_count = 0 for event in self.stream: last_read_count += 1 + total_processed_events += 1 transaction_id = (self.stream.log_file, self.stream.log_pos) last_transaction_id = transaction_id @@ -457,8 +468,11 @@ def run(self): time.sleep(BinlogReplicator.READ_LOG_INTERVAL) except OperationalError as e: - print('=== operational error', e) + logger.error(f'operational error {str(e)}', exc_info=True) time.sleep(15) + except Exception: + logger.error(f'unhandled error {str(e)}', exc_info=True) + raise logger.info('stopping binlog_replicator') self.data_writer.close_all() diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 3a567c8..f529e8e 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -125,28 +125,32 @@ def __init__(self, config: Settings, database: str, target_database: str = None, self.last_touch_time = 0 def run(self): - logger.info('launched db_replicator') - if self.state.status == Status.RUNNING_REALTIME_REPLICATION: - self.run_realtime_replication() - return - if self.state.status == Status.PERFORMING_INITIAL_REPLICATION: + try: + logger.info('launched db_replicator') + if self.state.status == Status.RUNNING_REALTIME_REPLICATION: + self.run_realtime_replication() + return + if self.state.status == Status.PERFORMING_INITIAL_REPLICATION: + self.perform_initial_replication() + self.run_realtime_replication() + return + + logger.info('recreating database') + self.clickhouse_api.database = self.target_database_tmp + self.clickhouse_api.recreate_database() + self.state.tables = self.mysql_api.get_tables() + self.state.tables = [ + table for table in self.state.tables if self.config.is_table_matches(table) + ] + self.state.last_processed_transaction = self.data_reader.get_last_transaction_id() + self.state.save() + logger.info(f'last known transaction {self.state.last_processed_transaction}') + self.create_initial_structure() self.perform_initial_replication() self.run_realtime_replication() - return - - logger.info('recreating database') - self.clickhouse_api.database = self.target_database_tmp - self.clickhouse_api.recreate_database() - self.state.tables = self.mysql_api.get_tables() - self.state.tables = [ - table for table in self.state.tables if self.config.is_table_matches(table) - ] - self.state.last_processed_transaction = self.data_reader.get_last_transaction_id() - self.state.save() - logger.info(f'last known transaction {self.state.last_processed_transaction}') - self.create_initial_structure() - self.perform_initial_replication() - self.run_realtime_replication() + except Exception: + logger.error(f'unhandled exception', exc_info=True) + raise def create_initial_structure(self): self.state.status = Status.CREATING_INITIAL_STRUCTURES @@ -417,7 +421,7 @@ def log_stats_if_required(self): if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL: return self.last_dump_stats_time = curr_time - logger.info(f'statistics:\n{json.dumps(self.stats.__dict__)}') + logger.info(f'stats: {json.dumps(self.stats.__dict__)}') self.stats = Statistics() def upload_records_if_required(self, table_name):