-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkafka_consumer.js
35 lines (29 loc) · 912 Bytes
/
kafka_consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
var avro = require('avsc');
var kafka = require('kafka-node');
var locationSchema = require('./location_schema.js');
var myAvro = avro.parse(locationSchema);
var HighLevelConsumer = kafka.HighLevelConsumer;
var kafkaClient = new kafka.Client('localhost:2181');
var topics = [{
topic: 'LOCATION_CHANGE'
}];
var options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'buffer'
}
var consumer = new HighLevelConsumer (kafkaClient, topics, options);
consumer.on('message', (message) => {
const buffer = new Buffer(message.value, 'binary'); // Read string into a buffer.
const decodedMessage = myAvro.fromBuffer(buffer.slice(0)); // Skip prefix.
console.log(decodedMessage);
});
consumer.on('error', (err) => {
console.log('error', err);
});
process.on('SIGINT', () => {
consumer.close(true, () => {
process.exit();
});
});