Skip to content

Databus 2.0 protocol

groelofs edited this page Jan 15, 2013 · 2 revisions

Databus v2.0 protocol

HTTP Interface for Serving

Databus2 Relay

Logical Sources-to-Id Mapping

Returns the list of all logical sources that are available on the relay.

  • Request:
GET /sources?v=<n>

The optional parameter v specifies the protocol version to use. The currently supported versions are 1 (default) and 2.

  • Success response:
{sources :
   [{ "id": <source_id>, "name":<src_name>}, {.... } ]
}
  • Example:
[{"name":"EspressoDB.EmailTest","id":402},{"name":"EspressoDB.EspressoData","id":403},{"name":"EspressoDB.Email","id":401},{"name":"EspressoDB.IdAddressPair","id":404},{"name":"EspressoDB.IdNamePair","id":405},{"name":"TestDB.Table2","id":702},{"name":"TestDB.Table3","id":703},{"name":"TestDB.Table1","id":701},{"name":"BusinessDB.Company","id":102},{"name":"BusinessDB.EmailDomain","id":103},{"name":"BusinessDB.CSAdmins","id":101},{"name":"MailboxDB.EmailDetail","id":501},{"name":"BusinessDB.Recommendation","id":108},{"name":"MailboxDB.EmailMetadata","id":502},{"name":"BusinessDB.RestrictedDomain","id":109},{"name":"MailboxDB.Mailbox","id":503},{"name":"BusinessDB.Metadata","id":106},{"name":"BusinessDB.Product","id":107},{"name":"BusinessDB.EmailTracking","id":104},{"name":"BusinessDB.Flag","id":105},{"name":"MyDB.Company","id":601}]
  • Semantics:

Returns all the sources (table/view names and id) of a given source database. The source database itself is defined in the configuration file loaded by the relay server or available through integration with the Cluster Manager.

Register for sources (Subscribe)

Returns all versions of schema for the specified sources.

  • Request:
GET /register?sources=src_id,..

The optional sources parameter is a comma-separated list of logical source ids for which to return the schemas. If the sources parameter is not specified, the schemas for all logical sources will be returned.

  • Success response:
HTTP 200 OK
{schemas :
    [{"id": src_id, "schema": schema_string}..]
}
  • Example:
[{"id":202,"version":1,"schema":"{\"type\":\"record\",\"name\":\"EmailMetadata\",\"namespace\":\"test\",\"fields\":[{\"name\":\"fromEmail\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"toFieldCount\",\"type\":\"int\"},{\"name\":\"recipientsShown\",\"type\":\"boolean\"},{\"name\":\"deliveryEmail\",\"type\":\"string\"},{\"name\":\"messageType\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"contentType\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"contentId\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"cancelKey\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"actionStatus\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"contentStatus\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"deliveryDirective\",\"type\":[\"null\",\"string\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"actionType\",\"type\":[\"null\",\"string\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"fromContractId\",\"type\":[\"null\",\"int\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"isInbox\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isStarred\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isActioned\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isBounced\",\"type\":\"boolean\"},{\"name\":\"isForwarded\",\"type\":\"boolean\"},{\"name\":\"isReplied\",\"type\":\"boolean\"},{\"name\":\"isUnread\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isArchived\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isReplyMsg\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isCancelled\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isSuspended\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isWithdrawn\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"isBlocked\",\"type\":\"boolean\",\"indexType\":\"attribute\"},{\"name\":\"bounceKey\",\"type\":[\"null\",\"string\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"batchKey\",\"type\":[\"null\",\"string\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"createdOn\",\"type\":\"string\",\"indexType\":\"attribute\"},{\"name\":\"fromMemberID\",\"type\":\"int\",\"indexType\":\"attribute\"},{\"name\":\"fromField\",\"type\":\"string\"},{\"name\":\"toField\",\"type\":\"string\"},{\"name\":\"shortBody\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"subject\",\"type\":\"string\"},{\"name\":\"payload\",\"type\":[\"null\",\"string\"],\"default\":\"null\"},{\"name\":\"deletedState\",\"type\":[\"null\",\"string\"],\"default\":\"null\",\"indexType\":\"attribute\"},{\"name\":\"legacyId\",\"type\":[\"null\",\"string\"],\"indexType\":\"attribute\"}],\"schemaType\":\"DocumentSchema\"}"},{"id":203,"version":1,"schema":"{\"type\":\"record\",\"name\":\"Mailbox\",\"namespace\":\"test\",\"fields\":[{\"name\":\"unreadMsgCount\",\"type\":\"int\"},{\"name\":\"pendingInvCount\",\"type\":\"int\"}],\"schemaType\":\"DocumentSchema\"}"}]
  • Failure Response
HTTP 404

Consume from Source since checkpoint

Returns all events since a given checkpoint.

  • Request:
GET /stream?sources=src_id,...&streamFromLatestScn=<true_or_false>&checkPoint=<checkpoint>&size=<size>&output=json/binary&filters=<filter>
  • Response, binary format:
HTTP 200 OK
Content-Type: application/binary
 
<binary representation of an array of Databus events>
  • Response, JSON format:
HTTP 200 OK
Content-Type: application/json
 
[{"src_id":...,"schema_digest":...,"key":..,"scn":..., "windowScn":...,"partition-id":..., "value": ...},
...
]
  • Response, no events:
    HTTP 200 OK
    X-Databus-Error : no-events
        
  • Response, checkpoint too old:
    HTTP 200 OK
    X-Databus-Error : checkpoint-too-old
        
  • Detailed description
    • sources - a comma-separated list of logical source IDs
    • streamFromLastestScn -
    • checkPoint - the JSON-serialized checkpoint from which to start streaming
    • size - maximum number of bytes to return
    • output - output format for events (JSON or BINARY)
    • filters - optional parameter specifying the server-side filters to apply before returning the events
  • SCN Semantics
    • Let ScnSet be the set of SCNs in the buffer, such that for any element s in ScnSet, minScn <= s <= maxScn; and prevScn is the s that immediately preceded minScn (prevScn is not in ScnSet).
    • Client asks for events with streamFromLatestScn = true. It is equivalent to (sinceScn=lastWrittenSequence, sinceWindowOffset=0).
    • Client asks for events since a checkpoint with windowScn = sinceScn, offset = sinceWindowOffset, and streamFromLatestScn = false.
    • Client asks for events since a ‘flexible checkpoint’ with streamFromLatestScn = false. It is equivalent to (sinceScn=minScn, sinceWindowOffset=0).
      • Finding Relay Start SCN
        • relayStartScn is the SCN of the first window written by the /stream call. A negative value of relayStartScn indicates that no data will be written out.
        Condition Constraints Actions
        sinceWindowOffset -1 | 1. minScn < s <= maxScn 2. sinceScn >= prevScn if (2) is violated, throw ScnNotFound; relayStartScn = min(s) : s > sinceScn; if no such s exists, relayStartScn = -1
        sinceWindowOffset >=0 1. minScn <= s <= maxScn 2. sinceScn >= minScn if (2) is violated, throw ScnNotFound; relayStartScn = min(s) : s >= sinceScn; if no such s exists, relayStartScn = -1
  • Filter Semantics

After startSCN is determined by the above logic, all the events that can be streamed (candidate events) will be filtered based on the filter configuration for each source before being streamed out.

Format of Checkpoint

Checkpoint is an opaque marker that the fat client will create and persist on its end. The checkpoint contains information about the last message in the stream that has been successfully consumed. When presented with a checkpoint, the relay will start streaming from the next event window after the checkpoint. TODO: add link to checkpoint format

Format of Filter

The filter configuration is used for doing server-side filtering on Databus Relay and Bootstrap-Server ends. Two types of filtering are supported:

  • RANGE
  • MOD

TODO: add link or text describing RANGE and MOD syntax

Note: The partition function (range, hash) is applied to the field specified in the event schema key ‘pk’ specified in field “meta” [e.g., “meta” : “dbFieldName=forum.sy$comments_with_disc_6;pk=commentKey” ]. Otherwise the default is to use the fieldname ‘key’ specified in the source table/view. Filtering specification for 2 sources in the /stream call (one requesting RANGE type filtering and other requesting MOD type filtering).

"<src_id>":{filters:[{"keyRange":{"end":"<last_id>","start":<start_id>}}],"partitionType":"RANGE"}, "<src_id>":{filters:[{"numBuckets":<numBuckets>,"bktRange":{"end":"<last_bucket>","start":<start_bucket>}}],"partitionType":"MOD"}}

Format of subscriptions

The subs field takes the JSON serialization of an array of subscription objects:

{
  "physicalSource":{"uri":"jdbc://some/uri"},
  "physicalPartition":{"id": 1},
  "logicalPartition":{"source":{"id":1, "name":"com.linkedin.test.source1"}, "id": 0}
}

Request compression

  • Request:
GET /stream?sources="src_id,..."&checkPoint=<checkpoint>&size=<size>&output=json/binary
Accept-Encoding: gzip, deflate
  • Response: standard HTTP compression.

Databus2 Bootstrap

StartSCN Request (snapshot)

  • Request:
GET /startSCN?sources=<comma_separated_source_names>&checkPoint=<checkpoint>
  • Response:
{"<SCN>"}
  • Checkpoint fields used/set in startSCN request:
Checkpoint Field Semantics
bootstrap_since_scn The current SCN of the client == ckpt.windowScn)
consumption_mode The consumption mode will be set to BOOTSTRAP_SNAPSHOT
snapshot_source The first source which will be bootstrapped from its snapshot table (“tab” table)

Snapshot Bootstrap Request

  • Request:
GET /bootstrap?sources=<comma_separated_source_ids>&checkPoint=<checkpoint>&batchSize=<value>&output=<json|binary>&filter=<filter>
  • Response:
Header : PhaseCompleted = TRUE (if snapshot for this source done)
Body :  DbusEvents (Binary/JSON format)
  • Filtering specification for source requesting RANGE-type filtering:
"{filters:[{"keyRange":{"end":"<last_id>","start":<start_id>}}],"partitionType":"RANGE"}"
  • Filtering specification for source requesting MOD-type filtering):
"{filters:[{"numBuckets":<numBuckets>,"bktRange":{"end":"<last_bucket>","start":<start_bucket>}}],"partitionType":"MOD"}"
  • Input Parameter Description
Field Is this part of Checkpoint Semantics
sources No This contains the comma separated list of source Ids (NOTE: This is not used by bootstrap Server. The source info is got from the ckeckpoint)
batchSize No Number of bytes the client can take in its event buffer.
output No Output format of the events streamed out. Can be binary or json
filter No Server Side Filter
bootstrap_start_scn Yes The startSCN value the client got in its startSCN request
bootstrap_since_scn Yes The SCN of the client before bootstrap started.
consumption_mode Yes The mode of bootstrap. Should be set to BOOTSTRAP_SNAPSHOT.
snapshot_source Yes The current source name that is being bootstrapped (from snapshot).
snapshot_offset Yes The current rowId of the snapshot table for the source from which snapshot should continue (chunking)

TargetSCN

  • Request:
GET /targetSCN?source=com.linkedin.events.example.person.Person_V1
  • Response:
"<SCN>"

Bootstrap Catchup

  • Request:
GET /bootstrap?sources=<comma_separated_sources_list>&checkPoint=<checkpoint>&batchSize=<value>&output=<json|binary>&filter=<filter>
  • Response:
Header : PhaseCompleted = TRUE (if catchup for this source done)
Body :  DbusEvents (Binary/JSON format)
  • Input Parameter Description
Field Is this part of Checkpoint Semantics
sources No This contains the comma separated list of source Ids (NOTE: This is not used by bootstrap Server. The source info is got from the ckeckpoint)
batchSize No Number of bytes the client can take in its event buffer.
output No Output format of the events streamed out. Can be binary or json
filter No Server Side Filter
bootstrap_start_scn Yes The startSCN value the client got in its startSCN request
bootstrap_since_scn Yes The SCN of the client before bootstrap started.
bootstrap_target_scn Yes The targetSCN value the client got in its targetSCN request
consumption_mode Yes The mode of bootstrap. Should be set to BOOTSTRAP_CATCHUP.
catchup_source Yes The current source name that is being bootstrapped (from log tables).
windowOffset Yes The current rowId of the log table for the source from which catchup should continue (chunking)
windowScn Yes Current bootstrap SCN seen by the bootstrapping client

Databus2 Client

See Databus2 Client Design for details.

Client Request Processing

sinceScn is determined as follows:

  • The persisted Checkpoint is read to obtain sinceScn . If none available and bootstrap is not disabled, a ‘flexible’ sinceScn = 0. Otherwise, sinceScn is set to be ‘flexible’, in which case, the client obtains events from the beginning of the relay’s buffer. sinceScn is minScn of the relay.
  • Client requests updates from sinceScn.
  • Relay contains SCNs in the range [=minScn=, maxScn=] (both inclusive). (Metrics: =RelayInboundEventBuffer.minScn, RelayInboundEventBuffer.maxScn)
  • BootstrapDB has a snapshot up to startScn. (Metric: BootstrapApplierState.currentSCN)
  • BootstrapProducer has the latest SCN from a relay at targetScn. (Metric: BootstrapProducerState.currentSCN)
Condition Action Metric Comments
minScn < sinceScn < maxScn Read from Relay RelayOutboundEventBuffer.numDataEvents Normal operation, requested SCN is found in relay buffer
startScn > targetScn Exception Sanity check, this should never occur- snapshot cannot be ahead of bootstrap producer
sinceScn not in [minScn, maxScn]; sinceScn <= startScn <= targetScn Bootstrap inc NumReqBootstrap, NumReqSnapshot, NumReqCatchup Snapshot plus catchup
sinceScn not in [minScn, maxScn]; startScn <= targetScn < sinceScn BootstrapTooOldException inc NumErrReqDatabaseTooOld Bootstrap DB is lagging and sinceScn is not found in relay.
sinceScn not in [minScn, maxScn]; startScn <= sinceScn <= targetScn Bootstrap inc NumReqBootstrap, NumReqCatchup BootstrapApplier is lagging. Bootstrapping still done but the snapshot phase will yield no events while all the client work happens in catchup phase

HTTP Interface for State Inspection

Relay

Inspect all physical sources

  • Request:
GET /physicalSources
  • Success response (TBD; currently returns all info):
{physicalSources:
   [{"name":"pSrc_name","id":100, sources": [{ "id": 1, "name":src_name}, {.... } ]}, {...}]
}
  • Semantics: Returns all the physical sources (partition names and id) of a given relay. The sources are defined in the configuration file loaded by the relay server.

Inspect all physical buffers

  • Request:
GET /physicalBuffers
  • Success response:
{
 {physicalPartition:[{physicalSource},..]}, {...},...
}
  • Semantics: Returns the map of all the physical partitions to set of corresponding physical sources.

Get DbusEventBuffer Info by Source/Physical partition

  • Sample request:
curl "http://localhost:11150/bufferInfo/inbound/ExampleDB16/0"
  • Sample response:
{"minScn":4294967297,"maxScn":4294967305,"timestampLatestEvent":1336522355027,"timestampFirstEvent":1336522351508}
  • Semantics: This query returns the buffer info (min/max SCN and timestamps of first and latest events) to the caller.

Client

List of Databus V2 client consumer registrations and their states

Returns a map from registration id to the list of their subscriptions and their status.

  • Sample request:
GET /clientState/registrations
  • Sample response:
[ {
  "state" : "STARTED",
  "status" : "DatabusComponentStatus [_componentName=Status_example_lb_test1_d41d8cd9, _retriesCounter=BackoffTimer [_config=BackoffTimerStaticConfig [_initSleep=0, _maxSleep=0, _sleepIncFactor=0.0, _sleepIncDelta=0, _maxRetryNum=-1], _name=Status_example_lb_test1_d41d8cd9.errorRetries, _retrySleepMs=0, _retriesNum=0, _retryStartTs=-1], _status=INITIALIZING, _message=The databus component is initializing..., _lastStartTime=-1]",
  "filter" : null,
  "subs" : [ {
    "physicalPartition" : {
      "name" : "*",
      "id" : -1,
      "anyPartitionWildcard" : true,
      "wildcard" : true
    },
    "logicalSource" : {
      "name" : "com.linkedin.events.example.person.Person_V1",
      "id" : -32768,
      "wildcard" : false,
      "allSourcesWildcard" : false
    },
    "physicalSource" : {
      "anySourceWildcard" : true,
      "wildcard" : true,
      "uri" : "databus:physical-source:ANY",
      "role" : {
        "role" : "ANY"
      },
      "resourceKey" : "",
      "masterSourceWildcard" : false,
      "slaveSourceWildcard" : false
    },
    "logicalPartition" : {
      "id" : -1,
      "source" : {
        "name" : "com.linkedin.events.example.person.Person_V1",
        "id" : -32768,
        "wildcard" : false,
        "allSourcesWildcard" : false
      },
      "wildcard" : true,
      "allPartitionsWildcard" : true
    }
  }]

State of Databus V2 consumer registration

Returns information about registration, such as subscription and status.

  • Sample request:
GET /clientState/registration/<registrationId>
  • Sample response:
{
  "state" : "STARTED",
  "status" : "DatabusComponentStatus [_componentName=Status_example_lb_test1_d41d8cd9, _retriesCounter=BackoffTimer [_config=BackoffTimerStaticConfig [_initSleep=0, _maxSleep=0, _sleepIncFactor=0.0, _sleepIncDelta=0, _maxRetryNum=-1], _name=Status_example_lb_test1_d41d8cd9.errorRetries, _retrySleepMs=0, _retriesNum=0, _retryStartTs=-1], _status=INITIALIZING, _message=The databus component is initializing..., _lastStartTime=-1]",
  "filter" : null,
  "subs" : [ {
    "physicalPartition" : {
      "name" : "*",
      "id" : -1,
      "anyPartitionWildcard" : true,
      "wildcard" : true
    },
    "logicalSource" : {
      "name" : "com.linkedin.events.example.person.Person_V1",
      "id" : -32768,
      "wildcard" : false,
      "allSourcesWildcard" : false
    },
    "physicalSource" : {
      "anySourceWildcard" : true,
      "wildcard" : true,
      "uri" : "databus:physical-source:ANY",
      "role" : {
        "role" : "ANY"
      },
      "resourceKey" : "",
      "masterSourceWildcard" : false,
      "slaveSourceWildcard" : false
    },
    "logicalPartition" : {
      "id" : -1,
      "source" : {
        "name" : "com.linkedin.events.example.person.Person_V1",
        "id" : -32768,
        "wildcard" : false,
        "allSourcesWildcard" : false
      },
      "wildcard" : true,
      "allPartitionsWildcard" : true
    }
  }}

Client clusters supported by this client instance

Returns information about client clusters active in this client instance.

  • Sample request:
GET /clientState/clientClusters
  • Sample response:
[ {
  "name" : "example_lb_test1",
  "numTotalPartitions" : 10,
  "minimumActiveNodes" : 1
} ]

Client partitions active for a given cluster in a client instance

Returns information about client partitions active for a cluster in this client instance.

  • Sample request:
GET /clientState/clientPartitions/<cluster>
  • Sample response:
[ {
  "partition" : 3,
  "regId" : {
    "id" : "example_lb_test1_d41d8cd9_3"
  }
}, {
  "partition" : 4,
  "regId" : {
    "id" : "example_lb_test1_d41d8cd9_4"
  }
}, {
  "partition" : 1,
  "regId" : {
    "id" : "example_lb_test1_d41d8cd9_1"
  }
}, {
  "partition" : 2,
  "regId" : {
    "id" : "example_lb_test1_d41d8cd9_2"
  }
} ]

Client partition status for a given cluster in a client instance

Returns status of registration corresponding to a partition that is active.

  • Sample request:
GET /clientState/clientPartition/<cluster>/<partition>
  • Sample response:
{
  "state" : "STARTED",
  "status" : "DatabusComponentStatus [_componentName=Status_example_lb_test1_d41d8cd9_1, _retriesCounter=BackoffTimer [_config=BackoffTimerStaticConfig [_initSleep=0, _maxSleep=0, _sleepIncFactor=0.0, _sleepIncDelta=0, _maxRetryNum=-1], _name=Status_example_lb_test1_d41d8cd9_1.errorRetries, _retrySleepMs=0, _retriesNum=0, _retryStartTs=-1], _status=RUNNING, _message=The databus component is running normally., _lastStartTime=1356071656357]",
  "filter" : {
    "configMap" : {
      "com.linkedin.events.example.person.Person_V1" : {
        "filterConfig" : {
          "buckets" : {
            "idConfigs" : [ {
              "type" : "SINGLE",
              "idMax" : 1,
              "idMin" : 1
            } ]
          },
          "numBuckets" : 10
        },
        "partitionType" : "MOD"
      },
      "com.linkedin.events.example.group.Group_V1" : {
        "filterConfig" : {
          "buckets" : {
            "idConfigs" : [ {
              "type" : "SINGLE",
              "idMax" : 1,
              "idMin" : 1
            } ]
          },
          "numBuckets" : 10
        },
        "partitionType" : "MOD"
      }
    }
  },
  "subs" : [ {
    "physicalPartition" : {
      "name" : "*",
      "id" : -1,
      "anyPartitionWildcard" : true,
      "wildcard" : true
    },
    "logicalSource" : {
      "name" : "com.linkedin.events.example.group.Group_V1",
      "id" : -32768,
      "wildcard" : false,
      "allSourcesWildcard" : false
    },
    "physicalSource" : {
      "anySourceWildcard" : true,
      "wildcard" : true,
      "uri" : "databus:physical-source:ANY",
      "role" : {
        "role" : "ANY"
      },
      "resourceKey" : "",
      "masterSourceWildcard" : false,
      "slaveSourceWildcard" : false
    },
    "logicalPartition" : {
      "id" : -1,
      "source" : {
        "name" : "com.linkedin.events.example.group.Group_V1",
        "id" : -32768,
        "wildcard" : false,
        "allSourcesWildcard" : false
      },
      "wildcard" : true,
      "allPartitionsWildcard" : true
    }
  } ]

Get terse client information

  • Sample request:
curl http://localhost:10566/clientCommand/printClientInfo?pretty
  • Sample response:
{
  "clusterManager.relayZkConnectString" : "localhost:2181",
  "clusterManager.instanceName" : "databusTestClient:8887"
}
  • Semantics: This command can be used as a starting point to diagnose configuration or other issues in the client.

HTTP Interface for State Modification

Relay

Disconnect all clients

  • Request:
GET /relayCommand/disconnectClients
  • Success response:
{
 None
}
  • Semantics: Disconnects all the client connections that the relay is currently servicing. Note that any other command line clients, if connected, will also be disconnected.

Control Data Flow from Sources (start/stop/pause)

  • Request:
GET /controlSources/start?sources=<source-name>&scn=<sinceSCN>
"sources" :  name of the source database. It is specified in the configuration loaded by the relay server
"sinceSCN" : [optional] transfer of updates in sourceDB whose change number (SCN) is greater than the one specified  : See semantics for special sinceSCN values. [default : -1]
  • Response:
{"name" : "<source-name>", "status" : "running", "SCN" : <start-scn>" }
  • Request:
GET /controlSources/pause?sources=<source-name>
"sources" :  [optional] name of the source database specified in the configuration loaded by the relay server : defaults to all sources
  • Response:
{"name" : "<source-name>", "status" : "paused", "SCN" : <last-read-scn>" }
  • Request:
GET /controlSources/unpause?sources=<source-name>&
"sources" : [optional] name of the source database specified in the configuration loaded by the relay server : defaults to all sources
  • Response:
{"name" : "<source-name>", "status" : "running", "SCN" : <last-read-scn>" }
  • Request:
GET /controlSources/shutdown?sources=<source-name>
"sources" : [optional] name of the source database specified in the configuration loaded by the relay server : defaults to all sources
  • Response:
{"name" : "<source-name>", "status" : "running", "SCN" : <last-read-scn>" }
  • Request:
GET /controlSources/status?sources=<source-name>
"sources" :  name of the source database specified in the configuration loaded by the relay server : defaults to all sources
  • Response:
{"name" : "<source-name>", "status" : "running|paused|shutdown", "SCN" : <last-read-scn>" }
  • Semantics:
    • Start transfer of updates in sourceDB whose change number (SCN) is greater than the one specified. (See table.)
    • Pause/Unpause transfer of updates.
    • Shutdown stops the transfer of updates.
sinceSCN Action
-1, <0 default: sinceSCN = persistedSCN from disk, if available; otherwise _sinceSCN = 0
0 sinceSCN = maxSCNinDB; obtained by querying the DB
1 sinceSCN = maxSCNinDB; obtained by querying the DB
N sinceSCN = N, if N > 0

Control DB Monitoring Source

  • Request:
GET /controlSources/start?sources=dbMonitor
GET /controlSources/shutdown?sources=dbMonitor
  • Response:
{"name" : "dbMonitor", "status" : "running|shutdown", "SCN" : 0" }
  • Semantics:
    • Start / stop DB monitoring thread that monitors maxScn’s of various sources; more precisely ‘maxDBScn’ Mbean metric
    • The thread checks the maxScn of each source once every 5 seconds.

Client

Reset all relay connections

  • Request:
GET /relayCommand/resetRelayConnections
  • Success response:
{
 None
}
  • Semantics: Disconnects all the relay connections and reconnects to new relay(s) that hold the partition.

HTTP Interface for Statistics

  • In general, all statistics are available as Mbeans and as http interfaces.
  • Common interfaces:
    • Enable/Disable
  • HTTP responses appear as JSON and are described using Avro Serialization Format.

Common Container Metrics

  • These are metrics that are applicable to each of the components below (Relay, Bootstrap and Client/Consumer).
  • They are comprised of:
    • connection metrics (inbound and outbound)
    • event buffer metrics, both inbound (corresponding to events being written to) and outbound (events being read from), as well as some that are properties of the shared event buffer (minScn or freeBytes).

Connection Stats

Description
Interface
  • Outbound and Inbound Clients
GET uri://containerStats/outbound/clients
GET uri://containerStats/inbound/clients
HTTP/1.1 200 OK
["client1","client2",....]
  • Outbound and Inbound Connection Statistics
GET uri://containerStats/outbound/total
GET uri://containerStats/outbound/client/<client>
GET uri://containerStats/inbound/total

Event Buffer Statistics

Description
Interface
  • Inbound and Outbound Sources
GET uri://containerStats/outbound/events/sources
GET uri://containerStats/inbound/events/sources
HTTP/1.1 200 OK
["srdId1","srcId2",....]
  • Outbound and Inbound EventBuffer Statistics
GET uri://containerStats/outbound/events/total
GET uri://containerStats/outbound/events/client/<client>
GET uri://containerStats/outbound/events/source/<srcId>
GET uri://containerStats/inbound/events/total
GET uri://containerStats/inbound/events/client/<client>
GET uri://containerStats/inbound/events/source/<srcId>
  • Inbound and Outbound Physical Sources
GET uri://containerStats/outbound/events/psources
GET uri://containerStats/inbound/events/psources
HTTP/1.1 200 OK
["pSrdId1","pSrcId2",....]
Example:
curl hostname.foo:11140/containerStats/inbound/events/psources
["BusinessDB:36","BusinessDB:19","BusinessDB:38","BusinessDB:17","BusinessDB:32","BusinessDB:4"]
  • Outbound and Inbound EventBuffer Statistics by Physical Source
GET uri://containerStats/outbound/events/total
GET uri://containerStats/outbound/events/psource/<srcId>
GET uri://containerStats/inbound/events/total
GET uri://containerStats/inbound/events/psource/<pSrcId>
Example:
curl hostname.foo:11140/containerStats/inbound/events/psource/BusinessDB:10?pretty
{
  "freeSpace" : 0,
  "dimension" : "BusinessDB_10",
  "minScn" : 9223372036854775807,
  "prevScn" : 0,
  "maxScn" : 0,
  "sizeDataEvents" : 0,
  "numDataEvents" : 0,
  "timestampLastResetMs" : 1334597467792,
  "timeSinceLastResetMs" : 1,
  "timestampMaxScnEvent" : 0,
  "numPeers" : 0,
  "numDataEventsFiltered" : 0,
  "maxSeenWinScn" : -9223372036854775808,
  "maxFilteredWinScn" : 0,
  "minSeenWinScn" : 9223372036854775807,
  "sizeDataEventsPayload" : 0,
  "sizeDataEventsFiltered" : 0,
  "sizeDataEventsPayloadFiltered" : 0,
  "numSysEvents" : 0,
  "sizeSysEvents" : 0,
  "numInvalidEvents" : 0,
  "numHeaderErrEvents" : 0,
  "numPayloadErrEvents" : 0,
  "timeSinceLastAccess" : 1334597467793,
  "timeSinceCreation" : 1,
  "timeSpan" : -9223372036854775807,
  "timeSinceLastEvent" : 1334597467793,
  "timestampMinScnEvent" : 9223372036854775807,
  "latencyEvent" : 0,
  "enabled" : false,
  "threadSafe" : false
}

Source Metrics

  • Measured on Databus Relay
  • Metrics relevant to the interfaces supported by the event producer, e.g., an Oracle transaction reader (/controlSources?source=<>)

Interface

  • Mbean
    • Mean per source, where source refers to ‘view/table of database’, associated with id attribute of source specification.
    • maxDBScn : max SCN in DB per source
Mbean Query Attribute Example Desc
com.linkedin.databus2:name=*,type=SourceStatistics AvgEventFactoryTimeMillisPerEvent 0
com.linkedin.databus2:name=*,type=SourceStatistics AvgEventSerializedSize 0
com.linkedin.databus2:name=*,type=SourceStatistics AvgNumEventsPerNonEmptyCycle 0
com.linkedin.databus2:name=*,type=SourceStatistics MaxDBScn 0
com.linkedin.databus2:name=*,type=SourceStatistics MaxScn 0
com.linkedin.databus2:name=*,type=SourceStatistics MillisSinceLastCycleWithEvents 1300838230604
com.linkedin.databus2:name=*,type=SourceStatistics NumConsecutiveCyclesWithEvents 0
com.linkedin.databus2:name=*,type=SourceStatistics NumConsecutiveCyclesWithoutEvents 0
com.linkedin.databus2:name=*,type=SourceStatistics NumCyclesTotal 0
com.linkedin.databus2:name=*,type=SourceStatistics NumCyclesWithEvents 0
com.linkedin.databus2:name=*,type=SourceStatistics NumCyclesWithoutEvents 0
com.linkedin.databus2:name=*,type=SourceStatistics TotalEvents 0
com.linkedin.databus2:name=*,type=SourceStatistics TimeSinceLastDBAccess 0
com.linkedin.databus2:name=*,type=SourceStatistics NumErrors 0
  • HTTP: absent

Source DB Metrics

  • Measured on Databus Relay by monitoring source /controlSources?sources=dbMonitor
  • Metrics of source DB, such as maxScn per source and of the database

Interface

  • Mbean
Mbean Query Attribute Example Desc
com.linkedin.databus2:name=<db-name>,type=DBStatistics MaxDBScn 0 The largest SCN measured at DB
com.linkedin.databus2:name=<db-name>,type=DBStatistics LastUpdateTimestamp 0 Timestamp in seconds at which MaxDBScn was last updated
com.linkedin.databus2:name=<db-source-name>,type=SourceDBStatistics MaxScn 0 Largest SCN of the source (view/table) measured at DB
com.linkedin.databus2:name=<db-source-name>,type=SourceDBStatistics LastUpdateTimestamp 0 Timestamp in seconds at which MaxScn was last updated
  • HTTP: absent

Relay Metrics

  • Measured on DatabusRelay and Databus Consumers/Clients
  • Metrics relevant to the interfaces supported by the relay

Description

Interface

  • Outbound Sources
GET uri://relayStats/outbound/http/sources
HTTP/1.1 200 OK
["srdId1","srcId2",....]
  • Outbound clients
GET uri://relayStats/outbound/http/clients
HTTP/1.1 200 OK
["client1","client2",....]
  • Relay metrics
GET uri://relayStats/outbound/http/total
GET uri://relayStats/outbound/http/client/<client>
GET uri://relayStats/outbound/http/source/<srcId>

Bootstrap Metrics

  • Metrics relevant to the interfaces supported by bootstrap service

Interface

Client/Consumer Metrics

  • Statistics relevant to interfaces supported by the client

Registration Stats

  • Request:
GET /clientStats/inbound/events/registration/<RegistrationId>
  • Sample response:
curl localhost:9001/clientStats/inbound/events/registration/EspressoTestStreamConsumer_e4742e7d?pretty
{
  "name" : "EspressoTestStreamConsumer_e4742e7d",
  "sources" : [ ],
  "enabled" : true,
  "totalStats" : {
    "freeSpace" : 500000,
    "minScn" : 9223372036854775807,
    "prevScn" : 0,
    "dimension" : "EspressoTestStreamConsumer_e4742e7d.total",
    "timestampLastResetMs" : 1332873494944,
    "timeSinceLastResetMs" : 1020614,
    "timestampMaxScnEvent" : 0,
    "numPeers" : 0,
    "numDataEvents" : 0,
    "numDataEventsFiltered" : 0,
    "maxSeenWinScn" : -9223372036854775808,
    "maxFilteredWinScn" : 0,
    "minSeenWinScn" : 9223372036854775807,
    "sizeDataEvents" : 0,
    "sizeDataEventsPayload" : 0,
    "sizeDataEventsFiltered" : 0,
    "sizeDataEventsPayloadFiltered" : 0,
    "numSysEvents" : 0,
    "sizeSysEvents" : 0,
    "numInvalidEvents" : 0,
    "numHeaderErrEvents" : 0,
    "numPayloadErrEvents" : 0,
    "maxScn" : 0,
    "timeSinceLastAccess" : 1332874515558,
    "timeSinceCreation" : 1020614,
    "timeSpan" : -9223372036854775807,
    "timeSinceLastEvent" : 1332874515558,
    "timestampMinScnEvent" : 9223372036854775807,
    "latencyEvent" : 0,
    "enabled" : true,
    "threadSafe" : false
  },
  "physicalSources" : [ ],
  "peers" : [ ],
  "ownerId" : 2130716997,
  "threadSafe" : false
}

Client Http Stats

Memory statistics

Returns statistics for all, heap, and nonheap memory. The implementation uses [[http://download.oracle.com/javase/6/docs/api/java/lang/management/MemoryMXBean.html][java.lang.management.MemoryMXBean]]

  • Request:
GET /javaStats/memory
GET /javaStats/memory/heap
GET /javaStats/memory/nonheap
  • Example:
$ curl 'localhost:8080/javaStats/memory/heap'
{"init":0,"used":7672240,"committed":85000192,"max":129957888}
 
 
$ curl 'localhost:8080/javaStats/memory/nonheap'
{"init":24317952,"used":13394264,"committed":24317952,"max":138412032}
 
 
$ curl 'localhost:8080/stats/memory?pretty'
{
  nonHeapMemoryUsage : {
    init : 24317952,
    used : 13875136,
    committed : 24317952,
    max : 138412032
  },
  heapMemoryUsage : {
    init : 0,
    used : 10082504,
    committed : 85000192,
    max : 129957888
  },
  objectPendingFinalizationCount : 0,
  verbose : false,
  notificationInfo : [ {
    notifTypes : [ "java.management.memory.threshold.exceeded", "java.management.memory.collection.threshold.exceeded" ],
    name : "javax.management.Notification",
    descriptor : {
      fields : [ ],
      valid : true,
      fieldNames : [ ]
    },
    description : "Memory Notification"
  } ]
}

Garbage-collection statistics

Returns GC statistics. The implementation uses java.lang.management.GarbageCollectorMXBean

  • Returns a list of the above beans for the different types of GC.
  • Request:
GET /javaStats/gc
  • Example:
$ curl 'localhost:8080/javaStats/gc?pretty'
[ {
  collectionCount : 1,
  collectionTime : 3,
  lastGcInfo : {
    id : 1,
    startTime : 542,
    duration : 4,
    compositeType : {
      array : false,
      typeName : "sun.management.ParNew.GcInfoCompositeType",
      className : "javax.management.openmbean.CompositeData",
      description : "CompositeType for GC info for ParNew"
    },
    endTime : 546,
    memoryUsageBeforeGc : {
      CMS Perm Gen : {
        init : 21757952,
        used : 9994000,
        committed : 21757952,
        max : 88080384
      },
      Par Eden Space : {
        init : 17432576,
        used : 17432576,
        committed : 17432576,
        max : 17432576
      },
      Code Cache : {
        init : 2560000,
        used : 638400,
        committed : 2560000,
        max : 50331648
      },
      Par Survivor Space : {
        init : 2162688,
        used : 0,
        committed : 2162688,
        max : 2162688
      },
      CMS Old Gen : {
        init : 65404928,
        used : 0,
        committed : 65404928,
        max : 110362624
      }
    },
    memoryUsageAfterGc : {
      CMS Perm Gen : {
        init : 21757952,
        used : 9994000,
        committed : 21757952,
        max : 88080384
      },
      Par Eden Space : {
        init : 17432576,
        used : 0,
        committed : 17432576,
        max : 17432576
      },
      Code Cache : {
        init : 2560000,
        used : 638400,
        committed : 2560000,
        max : 50331648
      },
      Par Survivor Space : {
        init : 2162688,
        used : 1425728,
        committed : 2162688,
        max : 2162688
      },
      CMS Old Gen : {
        init : 65404928,
        used : 0,
        committed : 65404928,
        max : 110362624
      }
    }
  },
  name : "ParNew",
  valid : true,
  memoryPoolNames : [ "Par Eden Space", "Par Survivor Space" ]
}, {
  collectionCount : 0,
  collectionTime : 0,
  lastGcInfo : null,
  name : "ConcurrentMarkSweep",
  valid : true,
  memoryPoolNames : [ "Par Eden Space", "Par Survivor Space", "CMS Old Gen", "CMS Perm Gen" ]
} ]

OS-level statistics

Returns statistics at the operating-system level. The implementation uses java.lang.management.OperatingSystemMXBean

  • Request:
GET /javaStats/os
  • Example:
$ curl 'localhost:8080/javaStats/os?pretty'