Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new prop to use map for routing #223

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props.";

private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TOPIC_TO_TABLES_MAPPING_PROP =
"iceberg.tables.topic-to-table-mapping";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic-enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.route-field";
Expand Down Expand Up @@ -117,6 +119,12 @@ public static String version() {

private static ConfigDef newConfigDef() {
ConfigDef configDef = new ConfigDef();
configDef.define(
TOPIC_TO_TABLES_MAPPING_PROP,
Type.LIST,
null,
Importance.LOW,
"Comma-delimited list of topics:tables mappings");
configDef.define(
TABLES_PROP,
Type.LIST,
Expand Down Expand Up @@ -329,6 +337,18 @@ public List<String> tables() {
return getList(TABLES_PROP);
}

public Map<String, String> topicToTableMap() {
Map<String, String> topicToTableMap = Maps.newHashMap();
for (String topicToTable : getList(TOPIC_TO_TABLES_MAPPING_PROP)) {
String[] propsplit = topicToTable.trim().split(":");
if (propsplit.length == 2) {
topicToTableMap.put(propsplit[0].trim(), propsplit[1].trim());
}
}
LOG.debug("Config: topicToTableMap: {}", topicToTableMap);
return topicToTableMap;
}

public boolean dynamicTablesEnabled() {
return getBoolean(TABLES_DYNAMIC_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,26 @@ private void save(SinkRecord record) {

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
} else if (config.topicToTableMap().size() > 0) {
routeRecordByMap(record);
} else {
routeRecordStatically(record);
}
}

private void routeRecordByMap(SinkRecord record) {
Map<String, String> topicToTableMap = config.topicToTableMap();
String topicName = record.topic();
String tableName = topicToTableMap.get(topicName);

if (tableName == null) {
routeRecordStatically(record);
return;
}

writerForTable(tableName, record, false).write(record);
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public void testDynamicRoute() {
workerTest(config, value);
}

@Test
public void testTopicToTableMapRoute() {
when(config.topicToTableMap()).thenReturn(ImmutableMap.of(SRC_TOPIC_NAME, TABLE_NAME));
Map<String, Object> value = ImmutableMap.of(SRC_TOPIC_NAME, TABLE_NAME);
workerTest(value);
}

private void workerTest(Map<String, Object> value) {
SinkTaskContext context = mock(SinkTaskContext.class);
when(context.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(SRC_TOPIC_NAME, 0)));

private void workerTest(IcebergSinkConfig config, Map<String, Object> value) {
WriterResult writeResult =
new WriterResult(
Expand Down