This is a proxy gateway based on kafka-rest[1] (2.0.1) and atmosphere[2] (2.4.x) to transparently add a Websocket based transport to kafka-rest’s HTTP REST proxy gateway. As a result, the provided services of kafka-rest can be invoked over HTTP or Websocket and possibly other protocols supported by atmosphere. In addition, new services utilizing Websocket can also be supported in the same gateway.
The advantage of using Websocket is its support of asynchronous duplex communication over a single socket. Messages can be sent and received asynchronously on a single connection. This advantage could be particularly utilized when the protocol gateway is specifically designed and optimized for the Websocket transport.
In contrast, this particular gateway uses a different approach, namely using kafka-rest’s existing REST services and transparently enabling those services to be invoked over Websocket and other protocols such as Server-Side-Events (SSE) using atmosphere. In addition, new consumer services are added to support the transport of messages over the socket continuously. There are advantages and disadvantages in this approach. For more details, please refer to section Advantages and Disadvantages.
The shell scripts bin/kafka-rest-atmosphere-start and bin/kafka-rest-atmosphere-stop are available in package’s bin folder. These scripts are based on kafka-rest’s shell scripts and can be used to start or stop the standalone kafka-rest-atmosphere server, respectively.
A binary package is available from here.
tar xzf kafka-rest-atmosphere-0.0.3-package.tgz cd kafka-rest-atmosphere-0.0.3-package
To change the default setting, customize properties in etc/kafka-rest/kafka-rest.properties. In addition to the properties supported by kafka-rest, the following properties can be set.
Name | Description | Default |
---|---|---|
atmosphere.simple-rest.protocol.detached |
uses the detached content mode |
false |
Use your websocket client to connect to ws://localhost:8082 For example, you can use the Echo demo at http://www.websocket.org/echo.html as your client and connect to the server at ws://localhost:8082/ and paste the appropriate JSON messages in its Message text field. There is also a node client based on atmosphere.js available at package’s share/utils/node.
One may add additional Atmosphere options in the query parameters on the initial request. One specific option supported by kafka-rest-atmosphere is parameter X-Atmosphere-SimpleRestProtocolDetached to dynamically enable or disable the detached mode per connection.
Any of the REST operations that are supported by kafka-rest may be invoked over HTTP or Websocket. When invoking an operation over Websocket, the message must be serialized in the Websocket binding format using the following JSON format.
{"id": "<identifier>", "method": "<method>", "path": "<path>", "type": "<type_header>", "accept": "<accept_header>", "headers": <headers_map>, "continue": <continue>}<content>
where <identifier> represents an identifier to correlate the response to its original request, <method> represents the request method, <path> represents the request path, <type_header> and <accept_header> represent the optional content-type and accept headers, and <headers_map> represents the optional map containing additinal headers. The content is detached from the JSON envelope and placed after the envelope. Finally, the optional <continue> property may be set to true to indicate the message is part of a sequence. Each sequence is identified by a unique <identifier>. The continued messages must be reassembled at the receiving end to produce the original data stream.
For example, an HTTP request to list topics
GET /topics HTTP/1.1 Host: localhost:8082 Accept: */*
will correspond to a JSON message of
{"id": "...", "method": "GET", "path": "/topics"}
Similarly, an HTTP request to publish a message "Kafka" to topic "test"
POST /topics/test HTTP/1.1 Host: localhost:8082 Accept: */* Content-Type: application/vnd.kafka.binary.v1+json Content-Length: 34 {"records":[{"value":"S2Fma2E="}]}
will correspond to a JSON message of
{"id": "...", "method": "POST", "path": "/topics/test", "type": "application/vnd.kafka.binary.v1+json"}{"records":[{"value":"S2Fma2E="}]}
For details and more samples, refer to kafka-rest-websocket-binding [3]. For available operations, refer to the kafka-rest documentation [4]
Additional operations are specifically included for the Websocket binding to allow a consumer to subscribe to a topic and get messages pushed asynchronously over Websocket. This feature requires a modified version of kafka-rest [5].
To subscribe to a Binary topic
{"id": "...", "method": "GET", "path": "/ws/consumers/<consumer_group>/instances/<instance_name>/topics/<topic>", "accept": "application/vnd.kafka.binary.v1+json"}
To subscribe to a JSON topic
{"id": "...", "method": "GET", "path": "/ws/consumers/<consumer_group>/instances/<instance_name>/topics/<topic>", "accept": "application/vnd.kafka.json.v1+json"}
To unsubscribe from the topic
{"id": "...", "method": "DELETE", "path": "/ws/consumers/<consumer_group>/instances/<instance_name>/topics/<topic>"}
In the following, the advantages and disadvantages of this particular approach are described.
-
The client can use the same payload as used in kafka-rest’s http proxy.
-
The existing kafka-rest services can be reused directly and invoked over either http or Websocket
-
The consumer specific subscription services are integrated to the existing kafka-rest’s services so that messages can be transferred to the subscribers asynchronously.
-
The client can choose its preferred protocol based on its use cases.
-
The same transport security setting can be reused for both protocols.
-
atmosphere supports additional protocols such as SSE which can also be supported along with Websocket.
Currently, Server-Side-Events (SSE) is also supported to receive responses over a single connection that is kept open.
For HTTP, please refer to the documentation for kafka-rest [4].
-
0.1.0-SNAPSHOT
-
only use the detached mode
-
allow additional headers to be included using "headers" attribute.
-
upgrade atmosphere to 2.4.26
-
upgrade atmosphere-client to 2.3.5
-
-
0.0.3
-
made the detached mode can be configured per connection (using atmosphere-2.4.6)
-
support the chunked request handling (using atmosphere-2.4.6)
-
support aggregation of records when the subscription mode (using kafka-rest-atmosphere-2.0.1-elakito-03)
-
made the websocket buffer size and other websocket properties configurable
-
-
0.0.2
-
added start/stop script
-
added the detached mode at the server’s response generation
-
added the detached mode response processing in the js client
-
switched to use the new protocol interceptor added to atmpsphere-2.4.4
-
use atmosphere’s completion-aware feature to generate the correct continue flag in the chunked responses
-
-
0.0.1
-
the initial version using atmpsphre-2.4.3 with a custom protocol interceptor
-