diff --git a/README.md b/README.md index 4bbe39a..6d4af9a 100644 --- a/README.md +++ b/README.md @@ -110,13 +110,22 @@ binlog_replicator: records_per_file: 100000 databases: 'database_name_pattern_*' +tables: '*' ``` - `mysql` MySQL connection settings - `clickhouse` ClickHouse connection settings - `binlog_replicator.data_dir` Directory for store binary log and application state -- `databases` Databases name pattern to replicate, eg `db_*` will match `db_1` `db_2` `db_test` +- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported +- `tables` (__optional__) - tables to filter, list is also supported + +Few more tables / dbs examples: + +```yaml +databases: ['my_database_1', 'my_database_2'] +tables: ['table_1', 'table_2*'] +``` ### Advanced Features @@ -144,13 +153,13 @@ pip install -r requirements.txt ### Running Tests -For running test you will need: -1. MySQL and ClickHouse server -2. `tests_config.yaml` that will be used during tests -3. Run tests with: - +1. Use docker-compose to install all requirements: +```bash +sudo docker compose -f docker-compose-tests.yaml up +``` +2. Run tests with: ```bash -pytest -v -s test_mysql_ch_replicator.py +sudo docker exec -w /app/ -it mysql_ch_replicator-replicator-1 python3 -m pytest -v -s test_mysql_ch_replicator.py ``` ## Contribution diff --git a/mysql_ch_replicator/binlog_replicator.py b/mysql_ch_replicator/binlog_replicator.py index 7bb92ae..ca087f3 100644 --- a/mysql_ch_replicator/binlog_replicator.py +++ b/mysql_ch_replicator/binlog_replicator.py @@ -401,6 +401,12 @@ def run(self): log_event = LogEvent() if hasattr(event, 'table'): log_event.table_name = event.table + if isinstance(log_event.table_name, bytes): + log_event.table_name = log_event.table_name.decode('utf-8') + + if not self.settings.is_table_matches(log_event.table_name): + continue + log_event.db_name = event.schema if isinstance(log_event.db_name, bytes): diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 7a3b4ba..cbdb53e 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -33,6 +33,7 @@ def __init__(self): self.clickhouse = ClickhouseSettings() self.binlog_replicator = BinlogReplicatorSettings() self.databases = '' + self.tables = '*' self.settings_file = '' def load(self, settings_file): @@ -43,8 +44,26 @@ def load(self, settings_file): self.mysql = MysqlSettings(**data['mysql']) self.clickhouse = ClickhouseSettings(**data['clickhouse']) self.databases = data['databases'] - assert isinstance(self.databases, str) + self.tables = data.get('tables', '*') + assert isinstance(self.databases, str) or isinstance(self.databases, list) + assert isinstance(self.tables, str) or isinstance(self.tables, list) self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator']) + @classmethod + def is_pattern_matches(cls, substr, pattern): + if not pattern or pattern == '*': + return True + if isinstance(pattern, str): + return fnmatch.fnmatch(substr, pattern) + if isinstance(pattern, list): + for allowed_pattern in pattern: + if fnmatch.fnmatch(substr, allowed_pattern): + return True + return False + raise ValueError() + def is_database_matches(self, db_name): - return fnmatch.fnmatch(db_name, self.databases) + return self.is_pattern_matches(db_name, self.databases) + + def is_table_matches(self, table_name): + return self.is_pattern_matches(table_name, self.tables) diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 1c8237f..26097fc 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -136,6 +136,9 @@ def run(self): self.clickhouse_api.database = self.target_database_tmp self.clickhouse_api.recreate_database() self.state.tables = self.mysql_api.get_tables() + self.state.tables = [ + table for table in self.state.tables if self.config.is_table_matches(table) + ] self.state.last_processed_transaction = self.data_reader.get_last_transaction_id() self.state.save() logger.info(f'last known transaction {self.state.last_processed_transaction}') @@ -150,6 +153,8 @@ def create_initial_structure(self): self.state.save() def create_initial_structure_table(self, table_name): + if not self.config.is_table_matches(table_name): + return mysql_create_statement = self.mysql_api.get_table_create_statement(table_name) mysql_structure = self.converter.parse_mysql_table_structure( mysql_create_statement, required_table_name=table_name, @@ -198,6 +203,9 @@ def perform_initial_replication(self): def perform_initial_replication_table(self, table_name): logger.info(f'running initial replication for table {table_name}') + if not self.config.is_table_matches(table_name): + logger.info(f'skip table {table_name} - not matching any allowed table') + max_primary_key = None if self.state.initial_replication_table == table_name: # continue replication from saved position @@ -294,7 +302,8 @@ def handle_event(self, event: LogEvent): EventType.QUERY.value: self.handle_query_event, } - event_handlers[event.event_type](event) + if not event.table_name or self.config.is_table_matches(event.table_name): + event_handlers[event.event_type](event) self.stats.events_count += 1 self.stats.last_transaction = event.transaction_id @@ -367,6 +376,8 @@ def handle_alter_query(self, query, db_name): def handle_create_table_query(self, query, db_name): mysql_structure, ch_structure = self.converter.parse_create_table_query(query) + if not self.config.is_table_matches(mysql_structure.table_name): + return self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure) self.clickhouse_api.create_table(ch_structure) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index b749cb9..8721174 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -20,21 +20,21 @@ class BinlogReplicatorRunner(ProcessRunner): - def __init__(self): - super().__init__(f'./main.py --config {CONFIG_FILE} binlog_replicator') + def __init__(self, cfg_file=CONFIG_FILE): + super().__init__(f'./main.py --config {cfg_file} binlog_replicator') class DbReplicatorRunner(ProcessRunner): - def __init__(self, db_name, additional_arguments=None): + def __init__(self, db_name, additional_arguments=None, cfg_file=CONFIG_FILE): additional_arguments = additional_arguments or '' if not additional_arguments.startswith(' '): additional_arguments = ' ' + additional_arguments - super().__init__(f'./main.py --config {CONFIG_FILE} --db {db_name} db_replicator{additional_arguments}') + super().__init__(f'./main.py --config {cfg_file} --db {db_name} db_replicator{additional_arguments}') class RunAllRunner(ProcessRunner): - def __init__(self, db_name): - super().__init__(f'./main.py --config {CONFIG_FILE} run_all --db {db_name}') + def __init__(self, cfg_file=CONFIG_FILE): + super().__init__(f'./main.py --config {cfg_file} run_all') def kill_process(pid, force=False): @@ -57,15 +57,16 @@ def prepare_env( cfg: config.Settings, mysql: mysql_api.MySQLApi, ch: clickhouse_api.ClickhouseApi, + db_name: str = TEST_DB_NAME ): if os.path.exists(cfg.binlog_replicator.data_dir): shutil.rmtree(cfg.binlog_replicator.data_dir) os.mkdir(cfg.binlog_replicator.data_dir) - mysql.drop_database(TEST_DB_NAME) - mysql.create_database(TEST_DB_NAME) - mysql.set_database(TEST_DB_NAME) - ch.drop_database(TEST_DB_NAME) - assert_wait(lambda: TEST_DB_NAME not in ch.get_databases()) + mysql.drop_database(db_name) + mysql.create_database(db_name) + mysql.set_database(db_name) + ch.drop_database(db_name) + assert_wait(lambda: db_name not in ch.get_databases()) def test_e2e_regular(): @@ -299,7 +300,7 @@ def test_runner(): mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True) mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True) - run_all_runner = RunAllRunner(TEST_DB_NAME) + run_all_runner = RunAllRunner() run_all_runner.run() assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) @@ -371,4 +372,59 @@ def test_initial_only(): ch.execute_command(f'USE {TEST_DB_NAME}') assert TEST_TABLE_NAME in ch.get_tables() - assert len(ch.select(TEST_TABLE_NAME)) == 2 \ No newline at end of file + assert len(ch.select(TEST_TABLE_NAME)) == 2 + + +def test_database_tables_filtering(): + cfg = config.Settings() + cfg.load('tests_config_databases_tables.yaml') + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database='test_db_2', + clickhouse_settings=cfg.clickhouse, + ) + + mysql.drop_database('test_db_3') + mysql.create_database('test_db_3') + ch.drop_database('test_db_3') + + prepare_env(cfg, mysql, ch, db_name='test_db_2') + + mysql.execute(f''' +CREATE TABLE test_table_3 ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + age int, + PRIMARY KEY (id) +); + ''') + + mysql.execute(f''' + CREATE TABLE test_table_2 ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + age int, + PRIMARY KEY (id) + ); + ''') + + mysql.execute(f"INSERT INTO test_table_3 (name, age) VALUES ('Ivan', 42);", commit=True) + mysql.execute(f"INSERT INTO test_table_2 (name, age) VALUES ('Ivan', 42);", commit=True) + + run_all_runner = RunAllRunner(cfg_file='tests_config_databases_tables.yaml') + run_all_runner.run() + + assert_wait(lambda: 'test_db_2' in ch.get_databases()) + assert 'test_db_3' not in ch.get_databases() + + ch.execute_command('USE test_db_2') + + assert_wait(lambda: 'test_table_2' in ch.get_tables()) + assert_wait(lambda: len(ch.select('test_table_2')) == 1) + + assert 'test_table_3' not in ch.get_tables() diff --git a/tests_config_databases_tables.yaml b/tests_config_databases_tables.yaml new file mode 100644 index 0000000..ee1498c --- /dev/null +++ b/tests_config_databases_tables.yaml @@ -0,0 +1,19 @@ + +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + +databases: ['test_db_1*', 'test_db_2'] +tables: ['test_table_1*', 'test_table_2']