-
Notifications
You must be signed in to change notification settings - Fork 737
Databus Load Balancing Client
It emerged that Databus client application instances desired to partition the Databus change stream for parallel processing and fault tolerance. A feature was added to the client library that allowed client instances to group themselves into a cluster and dynamically assign themselves partitions. Partition reassignment when instances joined /left the cluster was automatic and the load (number of partitions assigned) was uniform across instances. The partitions i were numbers 0..N-1 and the databus client library used them to instantiate connections to the databus stream to consume events whose primary key belonged to partition i. The partition function used to filter the events was MOD. The filtering itself was performed at the change stream (Databus relays and bootstrap) that resulted in better network bandwidth utilization.
- Client applications consume partitions of databus stream numbered 0,..N-1 .
- Filtering at server-side using mod function on primary key (specified in Event Schema) . N is specified by client application.
- The partitions 0 to N-1 are specified statically in configuration - not ideal for environments where client app instances are partition agnostic (no partition specific local state) during processing and app instances are added/removed often.
- Client application instances join a cluster
- Partitions dynamically assigned and redistributed uniformly to instances within a cluster
N and clusterName specified by client applications .
- Checkpoints written centrally (to zookeeper via Helix) to enable partitions to move across instances seamlessly.
- Databus uses Helix for cluster management
public class MyConsumer extends AbstractDatabusCombinedConsumer { ........... ........... }
/** * There can be 3 use-case scenario * (a) A simple case where there is one consumer instance to consume all events for a partition. * (b) A case where there are multiple logical sources (tables/views from a single physical DB) and seperate consumer instances to handle each sources for a partition. * (c) A case where there are multiple consumer instances that load shares the events for a partition (Note: It is a simple round-robin based dispatch with no guarantee of source level partitioning). */ /* * Case (a): Most commonly used */ public class MyConsumerFactory implements DbusClusterConsumerFactory { @Override Collection<DatabusCombinedConsumer> createPartitionedConsumers(DbusClusterInfo clusterInfo, DbusPartitionInfo partitionInfo) { // Set Consumers (mandatory) DatabusCombinedConsumer c1 = new MyConsumer(); return Arrays.asList(c1); } } /* * Case (b) */ public class MyConsumerFactory implements DbusClusterConsumerFactory { @Override Collection<DatabusCombinedConsumer> createPartitionedConsumers(DbusClusterInfo clusterInfo, DbusPartitionInfo partitionInfo) { //Assume 2 sources : "com.linkedin.events.anets.Anets" and "com.linkedin.events.anets.AnetMembers" // Set Consumers (mandatory) DatabusCombinedConsumer c1 = new MyConsumer(); // For Anets DatabusCombinedConsumer c2 = new MyConsumer(); // For AnetMembers // Set Multi-Source Consumers Map<String, DatabusStreamConsumer> streamMap = new HashMap<String, DatabusStreamConsumer>(); Map<String, DatabusStreamConsumer> bsMap = new HashMap<String, DatabusBootstrapConsumer>(); streamMap.put("com.linkedin.events.anets.Anets",c1); streamMap.put("com.linkedin.events.anets.AnetMembers",c2); bsMap.put("com.linkedin.events.anets.Anets",c1); bsMap.put("com.linkedin.events.anets.AnetMembers",c2); DatabusMultiSourceCombinedConsumer consumer = new DatabusMultiSourceCombinedConsumer(streamMap, bsMap); return Arrays.asList(consumer); } } /* * Case (c) */ public class MyConsumerFactory implements DbusClusterConsumerFactory { @Override Collection<DatabusCombinedConsumer> createPartitionedConsumers(DbusClusterInfo clusterInfo, DbusPartitionInfo partitionInfo) { // Set Consumers (mandatory) DatabusCombinedConsumer c1 = new MyConsumer(); DatabusCombinedConsumer c2 = new MyConsumer(); return Arrays.asList(c1, c2); } }
public class MyListener implements DbusPartitionListener { @Override public void onAddPartition(DbusPartitionInfo partitionInfo, DatabusRegistration reg) { // Set RegId (optional) reg.withRegId(new RegistrationId("myId" + partitionInfo+getPartitionId())); //Set Checkpoint //WARNING: (optional. Do only if you really want to manage the checkpoint) Checkpoint ckpt = new Checkpoint(); ckpt.setWindowSCN(...) .... reg.storeCheckpoint(ckpt); // Log and/or process this notification. } @Override public void onDropPartition(DbusPartitionInfo partitionInfo) { // Log and/or process this notification. } }
import com.linkedin.databus.core.util.ConfigBuilder; import com.linkedin.databus.core.util.ConfigLoader; import com.linkedin.databus.core.util.InvalidConfigException; import com.linkedin.databus.client.DatabusHttpClientImpl; public class Databus2ClientReg { private DatabusHttpClientImpl _httpClient; public registerDatabus2ClientAndStart(Properties dbus2ClientProps) { // Instantiate DatabusHttpClient Config configBuilder = new Config(); ConfigLoader<StaticConfig> configLoader = new ConfigLoader<StaticConfig>("databus2.client.", configBuilder); configLoader.loadConfig(databus2ClientProps); StaticConfig clientConfig = configBuilder.build(); _httpClient = new DatabusHttpClientImpl(_clientConfig.getClient()); //save it in member_variable for shutdown String src1 = "com.linkedin.events.liar.jobrelay.LiarJobRelay"; String src2 = "com.linkedin.events.liar.memberrelay.LiarMemberRelay"; // instantiate listener DbusPartitionListener myListener = new MyListener(); //instantiate Consumer Factory DbusClusterConsumerFactory myConsFactory = new MyConsumerFactory(); //Instantiate Server-Side Filter (MOD partition) DbusModPartitionedFilterFactory filterFactory = new DbusModPartitionedFilterFactory(src1, src2); // register and start DatabusRegistration reg = _httpClient.registerCluster("myCluster", myConsFactory, filterFactory, myListener, src1, src2); reg.start(); _httpClient.start(); // Call _httpClient.shutdown during shutdown of the client application. } }
Cluster-specific configuration:
databus2.client.clientCluster(1).clusterName=<Client Cluster Name> databus2.client.clientCluster(1).zkAddr=<Colon-separated ZKHOST:PORT> databus2.client.clientCluster(1).numPartitions=<Number of Partitions> # Number Of Clients to be alive before partition assignment happens: default: 1 databus2.client.clientCluster(1).quorum=<Quorum Size> databus2.client.clientCluster(1).zkSessionTimeoutMs=<ZK Session Timeout> databus2.client.clientCluster(1).zkConnectionTimeoutMs=<ZK Connection Timeout> databus2.client.clientCluster(1).checkpointIntervalMs=<Min time period in ms between persisting chpts- min value 5 min>