-
Notifications
You must be signed in to change notification settings - Fork 737
Databus 2.0 Client Event Model and Consumer API
Term | Definition |
Databus source | An abstraction for a data source in a database that Databus consumers can monitor for changes. You can think of the Databus source as a SQL table or view. |
Databus relay | A component that serves as a router between the monitored Databus sources and the consumers which monitor for changes in the sources. The relay makes sure that a consumer sees all changes the consumer is interested in and only those changes. |
Databus consumer | An object that implements the Databus callback API and listens to a stream of data events |
Databus consumer group | A group of consumers that process a single stream of data events as a group |
Databus client | The Databus provided library that lets the consumer communicate with one or more Databus relays. |
Data change event | An object that encapsulates the change of the data associated with a given Databus source key that is transmitted to the Databus consumers |
Consistency window | A sequence of events that preserve the consistency of the data, i.e. if one started from a consistent state of the data and applied all events, they will finish in a consistent state. |
Data change events encapsulate the changes in the source database for a given primary key. At a high-level a data change event has three parts:
- Meta data
- Key
- Data
The meta data of an event consists of:
- opcode :- specifies if the change is an UPSERT (i.e. update or insert) or DELETE
- sequence :- specifies the sequence number of event in the update timeline of the event
The full API is available here https://raw.github.com/linkedin/databus/master/databus-core/databus-core-impl/src/main/java/com/linkedin/databus/core/DbusEvent.java
- StartDataEventSequence - denotes the start of a sequence of data events from an events consistency window.
- StartSource - denotes the start of data events from the same Databus source
- DataEvent - denotes a data change event for the current Databus source
- EndSource - denotes the end of data events from the same Databus source
- EndDataEventSequence - denotes the end of a sequence of data events with the same SCN
- Rollback - an irrecoverable error occurred while obtaining the current event consistency window
A typical sequence of the above events can look like.
StartDataEventSequence(startSCN) --> StartSource(Source1) --> DataEvent(Source1.data1) --> ... --> DataEvent(Source1.dataN) --> EndSource(Source1) --> StartSource(Source2) --> DataEvent(Source2.data1) --> ... --> DataEvent(Source2.dataM) --> EndSource(Source2) --> ... --> EndDataEventSequence(endSCN)
Intuitively, the Databus client communicates with the consumer: “Here is the next batch of changes in the watched tables (sources). The changes are broken down by tables. Here are the changes in the first table, then the changes to the next table, etc. All the changes represent the delta from the previous consistent state of the database to the following consistent state.”
The above Databus event sequence is specific for a given consumer. Each consumer specifies an order of Databus sources it is interested in. The Databus client ensures that the data change events for the different sources are always presented to the consumer in that order.
It is good question to wonder about the order of the DataEvent events for a given source. We have also discussed and considered the addition of a Checkpoint event during the processing of data change events, but decided not to include checkpoint eventing in the databus2 api at this point. The idea is to allow the consumer to save intermediate state during processing. If the consumer supports such an event and a failure occurs, the client will try to resume the data events from the latest checkpoint. This can be helpful in processing large data event sequences, say after an update to the database that touched 20% of the rows.
Anywhere during the above event sequence, the client can send a Rollback message which means that an irrecoverable error occurred while obtaining the current event consistency window. The processing will resume with another window. The new window may start the same or earlier SCN to guarantee that no events are missed but some of the events may be replayed.
The Rollback event allows the Databus client to store minimal state while processing incoming data change events by shifting some of the responsibility for recovery to the consumer. The Databus client will provide a standard capability to buffer incoming events until an EndDataEventSequence event is seen. In this case, the consumer does not have to deal with the Rollback event but some of the available memory will be used by the Databus client for buffering and thus the memory will be unavailable to the consumer.
The event state transition is given below.
If the Databus consumer uses the multi-threaded API, each thread will see all StartDataEventSequence, EndDataEventSequence, StartSource, EndSource events and only a portion of the DataEvent events. There is a barrier synchronization across all threads at each StartSource and EndSource event.
For StartDataEventSequence, the SCN is the first SCN in the data event sequence. For EndDataEventSequence, the SCN is the last SVN in the data event sequence. For Rollback, the SCN is the SCN to which the rollback is being done, i.e. the SCN from the latest StartDataEventSequence event.
interface DbusEvent { SCN getScn(); long getTimestamp(); byte[] getKeyBytes(); byte[] getValueBytes(); long getLongKey(); String getStringKey(); <V> V getTypedValue(V reuse, Class<V extends SpecificRecord> targetClass, Schema sourceSchema); }
public static <T extends SpecificRecord> T readRecordJson(byte[] recBytes, T dest, Class<T> aClass, Schema sourceSchema) throws Exception { T reuse = null != dest ? dest : aClass.newInstance(); JsonDecoder jsonDec = new JsonDecoder(sourceSchema, new ByteArrayInputStream(recBytes)); ResolvingDecoder resDec = new ResolvingDecoder(sourceSchema, reuse.getSchema(), jsonDec); SpecificDatumReader<SpecificRecord> reader = new SpecificDatumReader<SpecificRecord>(reuse.getSchema()); return aClass.cast(reader.read(reuse, resDec)); }
The user will have to pass in the source schema (as obtained from the SourceEvent) or we have to store this as part of {{DbusEvent}}. {note}
If a listener returns false to a DbusEvent, the Databus client has to stop the event processing loop. Consumer should try to process all exceptions in their logic. If the Databus client encounters an exception from the consumer, it will stop the event processing loop.
The client retry semantics are as follows:
- Retry all DataEvents from the event consistency window.
Each consumer callback receives all of the following callbacks: {{on
- onDataEvent callbacks are distributed across consumers in a group
- onStartDataEventSequence, onEndDataEventSequence, onCheckpoint, onError, onEndSource have barriers before the invocation and after the completion of the callback
- onStartSource has a barrier before the invocation of the callback
- onRollback has a barrier after the completion of the callback
Here is a sample distribution of callbacks across a group of 3 consumers, a single consumer and a group of 2 consumers registered for the same source(s). We have illustrated only a single event window with a single source and 4 data events.