Skip to content

Commit

Permalink
Write logs to file, split by database (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc authored Oct 27, 2024
1 parent bfaa5b0 commit f5fdbe7
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
17 changes: 16 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Statistics:
insert_records_count: int = 0
erase_events_count: int = 0
erase_records_count: int = 0
no_events_count: int = 0


class DbReplicator:
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.last_touch_time = 0

def run(self):
logger.info('launched db_replicator')
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
self.run_realtime_replication()
return
Expand Down Expand Up @@ -226,6 +228,9 @@ def perform_initial_replication_table(self, table_name):
primary_key_index = field_names.index(primary_key)
primary_key_type = field_types[primary_key_index]

stats_number_of_records = 0
last_stats_dump_time = time.time()

while True:

query_start_value = max_primary_key
Expand Down Expand Up @@ -258,6 +263,14 @@ def perform_initial_replication_table(self, table_name):
self.save_state_if_required()
self.prevent_binlog_removal()

stats_number_of_records += len(records)
curr_time = time.time()
if curr_time - last_stats_dump_time >= 60.0:
last_stats_dump_time = curr_time
logger.info(
f'replicating {table_name}, replicated {stats_number_of_records}, primary key: {max_primary_key}',
)

def run_realtime_replication(self):
if self.initial_only:
logger.info('skip running realtime replication, only initial replication was requested')
Expand All @@ -277,6 +290,8 @@ def run_realtime_replication(self):
if event is None:
time.sleep(DbReplicator.READ_LOG_INTERVAL)
self.upload_records_if_required(table_name=None)
self.stats.no_events_count += 1
self.log_stats_if_required()
continue
assert event.db_name == self.database
if self.database != self.target_database:
Expand Down Expand Up @@ -402,7 +417,7 @@ def log_stats_if_required(self):
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
return
self.last_dump_stats_time = curr_time
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__, indent=3)}')
logger.info(f'statistics:\n{json.dumps(self.stats.__dict__)}')
self.stats = Statistics()

def upload_records_if_required(self, table_name):
Expand Down
52 changes: 48 additions & 4 deletions mysql_ch_replicator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import argparse
import logging
from logging.handlers import RotatingFileHandler
import sys
import os

from .config import Settings
from .db_replicator import DbReplicator
Expand All @@ -10,15 +13,38 @@
from .runner import Runner


def set_logging_config(tags):
def set_logging_config(tags, log_file=None):

handlers = []
handlers.append(logging.StreamHandler(sys.stderr))
if log_file is not None:
handlers.append(
RotatingFileHandler(
filename=log_file,
maxBytes=50*1024*1024, # 50 Mb
backupCount=3,
encoding='utf-8',
delay=False,
)
)

logging.basicConfig(
level=logging.INFO,
format=f'[{tags} %(asctime)s %(levelname)8s] %(message)s',
handlers=handlers,
)


def run_binlog_replicator(args, config: Settings):
set_logging_config('binlogrepl')
if not os.path.exists(config.binlog_replicator.data_dir):
os.mkdir(config.binlog_replicator.data_dir)

log_file = os.path.join(
config.binlog_replicator.data_dir,
'binlog_replicator.log',
)

set_logging_config('binlogrepl', log_file=log_file)
binlog_replicator = BinlogReplicator(
settings=config,
)
Expand All @@ -29,11 +55,29 @@ def run_db_replicator(args, config: Settings):
if not args.db:
raise Exception("need to pass --db argument")

set_logging_config(f'dbrepl {args.db}')
db_name = args.db

if not os.path.exists(config.binlog_replicator.data_dir):
os.mkdir(config.binlog_replicator.data_dir)

db_dir = os.path.join(
config.binlog_replicator.data_dir,
db_name,
)

if not os.path.exists(db_dir):
os.mkdir(db_dir)

log_file = os.path.join(
db_dir,
'db_replicator.log',
)

set_logging_config(f'dbrepl {args.db}', log_file=log_file)

db_replicator = DbReplicator(
config=config,
database=args.db,
database=db_name,
target_database=getattr(args, 'target_db', None),
initial_only=args.initial_only,
)
Expand Down

0 comments on commit f5fdbe7

Please sign in to comment.