-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.js
70 lines (57 loc) · 1.78 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
var PROTO_PATH = __dirname + '/randomNum.proto';
var grpc = require('grpc');
var avro = require('avsc');
var kafka = require('kafka-node');
var locationSchema = require('./location_schema.js');
var random_proto = grpc.load(PROTO_PATH).pckgRandom;
var HighLevelProducer = kafka.HighLevelProducer;
var myAvro = avro.parse(locationSchema);
var HighLevelProducer = kafka.HighLevelProducer;
//Kafka Client connects Zookeeper on 2181 port which is default port
var kafkaClient = new kafka.Client('localhost:2181', '001', {
sessionTimeout: 300,
spinDelay: 100,
retries: 2
});
var myProducer = new HighLevelProducer(kafkaClient);
//keeps the data in Memory
const inMemoryLocations = [];
function saveInMemory(data) {
inMemoryLocations.push(data);
}
kafkaClient.on('error', function(error) {
console.error(error);
});
function getLocations(call){
var data = {
longitude: call.request.longitude,
latitude: call.request.latitude,
}
saveInMemory(data);
var buffer = myAvro.toBuffer(data);
// Create a new payload
var payload = [{
topic: 'LOCATION_CHANGE',
messages: buffer
}];
//Send payload to Kafka and log result/error
myProducer.send(payload, (error, result) => {
if (error) {
console.error(error);
}
});
//to see sended data from client
console.log("Longitude: " + call.request.longitude + "\n" + "Latitude: " + call.request.latitude);
}
function main() {
var server = new grpc.Server();
server.addService(random_proto.srvRandom.service, {getLocations: getLocations});
server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure());
server.start();
}
myProducer.on('ready', () => {
main();
});
myProducer.on('error', (error) => {
console.error(error);
});