Skip to content

Commit

Permalink
Write exceptions to log files
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc committed Oct 27, 2024
1 parent f02b958 commit dc0a24d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
16 changes: 15 additions & 1 deletion mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,22 @@ def run(self):

killer = GracefulKiller()

last_log_time = time.time()
total_processed_events = 0

while not killer.kill_now:
try:
curr_time = time.time()
if curr_time - last_log_time > 60:
last_log_time = curr_time
logger.info(
f'last transaction id: {last_transaction_id}, processed events: {total_processed_events}',
)

last_read_count = 0
for event in self.stream:
last_read_count += 1
total_processed_events += 1
transaction_id = (self.stream.log_file, self.stream.log_pos)
last_transaction_id = transaction_id

Expand Down Expand Up @@ -457,8 +468,11 @@ def run(self):
time.sleep(BinlogReplicator.READ_LOG_INTERVAL)

except OperationalError as e:
print('=== operational error', e)
logger.error(f'operational error {str(e)}', exc_info=True)
time.sleep(15)
except Exception:
logger.error(f'unhandled error {str(e)}', exc_info=True)
raise

logger.info('stopping binlog_replicator')
self.data_writer.close_all()
Expand Down
46 changes: 25 additions & 21 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,32 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.last_touch_time = 0

def run(self):
logger.info('launched db_replicator')
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
return
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
try:
logger.info('launched db_replicator')
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
return
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
self.perform_initial_replication()
self.run_realtime_replication()
return

logger.info('recreating database')
self.clickhouse_api.database = self.target_database_tmp
self.clickhouse_api.recreate_database()
self.state.tables = self.mysql_api.get_tables()
self.state.tables = [
table for table in self.state.tables if self.config.is_table_matches(table)
]
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
self.state.save()
logger.info(f'last known transaction {self.state.last_processed_transaction}')
self.create_initial_structure()
self.perform_initial_replication()
self.run_realtime_replication()
return

logger.info('recreating database')
self.clickhouse_api.database = self.target_database_tmp
self.clickhouse_api.recreate_database()
self.state.tables = self.mysql_api.get_tables()
self.state.tables = [
table for table in self.state.tables if self.config.is_table_matches(table)
]
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
self.state.save()
logger.info(f'last known transaction {self.state.last_processed_transaction}')
self.create_initial_structure()
self.perform_initial_replication()
self.run_realtime_replication()
except Exception:
logger.error(f'unhandled exception', exc_info=True)
raise

def create_initial_structure(self):
self.state.status = Status.CREATING_INITIAL_STRUCTURES
Expand Down Expand Up @@ -417,7 +421,7 @@ def log_stats_if_required(self):
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
return
self.last_dump_stats_time = curr_time
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__)}')
logger.info(f'stats: {json.dumps(self.stats.__dict__)}')
self.stats = Statistics()

def upload_records_if_required(self, table_name):
Expand Down

0 comments on commit dc0a24d

Please sign in to comment.