Skip to content

Commit

Permalink
Tables and databases filtering (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc authored Oct 13, 2024
1 parent e93d777 commit a0ffc58
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 23 deletions.
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,22 @@ binlog_replicator:
records_per_file: 100000

databases: 'database_name_pattern_*'
tables: '*'
```
- `mysql` MySQL connection settings
- `clickhouse` ClickHouse connection settings
- `binlog_replicator.data_dir` Directory for store binary log and application state
- `databases` Databases name pattern to replicate, eg `db_*` will match `db_1` `db_2` `db_test`
- `databases` Databases name pattern to replicate, e.g. `db_*` will match `db_1` `db_2` `db_test`, list is also supported
- `tables` (__optional__) - tables to filter, list is also supported

Few more tables / dbs examples:

```yaml
databases: ['my_database_1', 'my_database_2']
tables: ['table_1', 'table_2*']
```

### Advanced Features

Expand Down Expand Up @@ -144,13 +153,13 @@ pip install -r requirements.txt

### Running Tests

For running test you will need:
1. MySQL and ClickHouse server
2. `tests_config.yaml` that will be used during tests
3. Run tests with:

1. Use docker-compose to install all requirements:
```bash
sudo docker compose -f docker-compose-tests.yaml up
```
2. Run tests with:
```bash
pytest -v -s test_mysql_ch_replicator.py
sudo docker exec -w /app/ -it mysql_ch_replicator-replicator-1 python3 -m pytest -v -s test_mysql_ch_replicator.py
```

## Contribution
Expand Down
6 changes: 6 additions & 0 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ def run(self):
log_event = LogEvent()
if hasattr(event, 'table'):
log_event.table_name = event.table
if isinstance(log_event.table_name, bytes):
log_event.table_name = log_event.table_name.decode('utf-8')

if not self.settings.is_table_matches(log_event.table_name):
continue

log_event.db_name = event.schema

if isinstance(log_event.db_name, bytes):
Expand Down
23 changes: 21 additions & 2 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self):
self.clickhouse = ClickhouseSettings()
self.binlog_replicator = BinlogReplicatorSettings()
self.databases = ''
self.tables = '*'
self.settings_file = ''

def load(self, settings_file):
Expand All @@ -43,8 +44,26 @@ def load(self, settings_file):
self.mysql = MysqlSettings(**data['mysql'])
self.clickhouse = ClickhouseSettings(**data['clickhouse'])
self.databases = data['databases']
assert isinstance(self.databases, str)
self.tables = data.get('tables', '*')
assert isinstance(self.databases, str) or isinstance(self.databases, list)
assert isinstance(self.tables, str) or isinstance(self.tables, list)
self.binlog_replicator = BinlogReplicatorSettings(**data['binlog_replicator'])

@classmethod
def is_pattern_matches(cls, substr, pattern):
if not pattern or pattern == '*':
return True
if isinstance(pattern, str):
return fnmatch.fnmatch(substr, pattern)
if isinstance(pattern, list):
for allowed_pattern in pattern:
if fnmatch.fnmatch(substr, allowed_pattern):
return True
return False
raise ValueError()

def is_database_matches(self, db_name):
return fnmatch.fnmatch(db_name, self.databases)
return self.is_pattern_matches(db_name, self.databases)

def is_table_matches(self, table_name):
return self.is_pattern_matches(table_name, self.tables)
13 changes: 12 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def run(self):
self.clickhouse_api.database = self.target_database_tmp
self.clickhouse_api.recreate_database()
self.state.tables = self.mysql_api.get_tables()
self.state.tables = [
table for table in self.state.tables if self.config.is_table_matches(table)
]
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
self.state.save()
logger.info(f'last known transaction {self.state.last_processed_transaction}')
Expand All @@ -150,6 +153,8 @@ def create_initial_structure(self):
self.state.save()

def create_initial_structure_table(self, table_name):
if not self.config.is_table_matches(table_name):
return
mysql_create_statement = self.mysql_api.get_table_create_statement(table_name)
mysql_structure = self.converter.parse_mysql_table_structure(
mysql_create_statement, required_table_name=table_name,
Expand Down Expand Up @@ -198,6 +203,9 @@ def perform_initial_replication(self):
def perform_initial_replication_table(self, table_name):
logger.info(f'running initial replication for table {table_name}')

if not self.config.is_table_matches(table_name):
logger.info(f'skip table {table_name} - not matching any allowed table')

max_primary_key = None
if self.state.initial_replication_table == table_name:
# continue replication from saved position
Expand Down Expand Up @@ -294,7 +302,8 @@ def handle_event(self, event: LogEvent):
EventType.QUERY.value: self.handle_query_event,
}

event_handlers[event.event_type](event)
if not event.table_name or self.config.is_table_matches(event.table_name):
event_handlers[event.event_type](event)

self.stats.events_count += 1
self.stats.last_transaction = event.transaction_id
Expand Down Expand Up @@ -367,6 +376,8 @@ def handle_alter_query(self, query, db_name):

def handle_create_table_query(self, query, db_name):
mysql_structure, ch_structure = self.converter.parse_create_table_query(query)
if not self.config.is_table_matches(mysql_structure.table_name):
return
self.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
self.clickhouse_api.create_table(ch_structure)

Expand Down
82 changes: 69 additions & 13 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@


class BinlogReplicatorRunner(ProcessRunner):
def __init__(self):
super().__init__(f'./main.py --config {CONFIG_FILE} binlog_replicator')
def __init__(self, cfg_file=CONFIG_FILE):
super().__init__(f'./main.py --config {cfg_file} binlog_replicator')


class DbReplicatorRunner(ProcessRunner):
def __init__(self, db_name, additional_arguments=None):
def __init__(self, db_name, additional_arguments=None, cfg_file=CONFIG_FILE):
additional_arguments = additional_arguments or ''
if not additional_arguments.startswith(' '):
additional_arguments = ' ' + additional_arguments
super().__init__(f'./main.py --config {CONFIG_FILE} --db {db_name} db_replicator{additional_arguments}')
super().__init__(f'./main.py --config {cfg_file} --db {db_name} db_replicator{additional_arguments}')


class RunAllRunner(ProcessRunner):
def __init__(self, db_name):
super().__init__(f'./main.py --config {CONFIG_FILE} run_all --db {db_name}')
def __init__(self, cfg_file=CONFIG_FILE):
super().__init__(f'./main.py --config {cfg_file} run_all')


def kill_process(pid, force=False):
Expand All @@ -57,15 +57,16 @@ def prepare_env(
cfg: config.Settings,
mysql: mysql_api.MySQLApi,
ch: clickhouse_api.ClickhouseApi,
db_name: str = TEST_DB_NAME
):
if os.path.exists(cfg.binlog_replicator.data_dir):
shutil.rmtree(cfg.binlog_replicator.data_dir)
os.mkdir(cfg.binlog_replicator.data_dir)
mysql.drop_database(TEST_DB_NAME)
mysql.create_database(TEST_DB_NAME)
mysql.set_database(TEST_DB_NAME)
ch.drop_database(TEST_DB_NAME)
assert_wait(lambda: TEST_DB_NAME not in ch.get_databases())
mysql.drop_database(db_name)
mysql.create_database(db_name)
mysql.set_database(db_name)
ch.drop_database(db_name)
assert_wait(lambda: db_name not in ch.get_databases())


def test_e2e_regular():
Expand Down Expand Up @@ -299,7 +300,7 @@ def test_runner():
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True)

run_all_runner = RunAllRunner(TEST_DB_NAME)
run_all_runner = RunAllRunner()
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
Expand Down Expand Up @@ -371,4 +372,59 @@ def test_initial_only():
ch.execute_command(f'USE {TEST_DB_NAME}')

assert TEST_TABLE_NAME in ch.get_tables()
assert len(ch.select(TEST_TABLE_NAME)) == 2
assert len(ch.select(TEST_TABLE_NAME)) == 2


def test_database_tables_filtering():
cfg = config.Settings()
cfg.load('tests_config_databases_tables.yaml')

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database='test_db_2',
clickhouse_settings=cfg.clickhouse,
)

mysql.drop_database('test_db_3')
mysql.create_database('test_db_3')
ch.drop_database('test_db_3')

prepare_env(cfg, mysql, ch, db_name='test_db_2')

mysql.execute(f'''
CREATE TABLE test_table_3 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f'''
CREATE TABLE test_table_2 (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

mysql.execute(f"INSERT INTO test_table_3 (name, age) VALUES ('Ivan', 42);", commit=True)
mysql.execute(f"INSERT INTO test_table_2 (name, age) VALUES ('Ivan', 42);", commit=True)

run_all_runner = RunAllRunner(cfg_file='tests_config_databases_tables.yaml')
run_all_runner.run()

assert_wait(lambda: 'test_db_2' in ch.get_databases())
assert 'test_db_3' not in ch.get_databases()

ch.execute_command('USE test_db_2')

assert_wait(lambda: 'test_table_2' in ch.get_tables())
assert_wait(lambda: len(ch.select('test_table_2')) == 1)

assert 'test_table_3' not in ch.get_tables()
19 changes: 19 additions & 0 deletions tests_config_databases_tables.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: ['test_db_1*', 'test_db_2']
tables: ['test_table_1*', 'test_table_2']

0 comments on commit a0ffc58

Please sign in to comment.