-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.js
164 lines (135 loc) · 4.79 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
var http = require('http')
, WebSocketServer = require('ws').Server
, express = require('express')
, util = require('util')
, app = express()
, config = require('config')
, sockets = require('./driver-zmq.js')
, client = require('restler');
//start server
var serverConfig = config.get('server');
var server = http.createServer(app);
server.listen(serverConfig.port);
var connections = {}
, connectionCounter = {};
var pub_socket = sockets.pub_socket
, router_socket = sockets.router_socket;
// setup websocket
var wss = new WebSocketServer({ server: server });
// driver REST API details
var restConfig = config.get('driver-api');
var restOpts = {username: restConfig.api_key, password: ''};
var apiUrl = util.format('http://%s:%s/drivers/auth/', restConfig.host, restConfig.port);
wss.on('connection', function connection(ws) {
var query = require('url').parse('http://abc.com' + ws.upgradeReq.url, true).query;
var errorHandler = function(msg){
return function(data, response){
try{
ws.send( JSON.stringify({error: msg}) );
console.log(msg + ' Error:' + data);
ws.close();
}catch(e){
console.log(e);
}
return;
}
}
//console.log('Auth token ' + JSON.stringify(query) );
if (!(query && query.auth_token)){
ws.send('{"error":"Authentication token not provided"}');
ws.close();
return;
}
// authenticate with driver backend
client.get(apiUrl + query.auth_token, restOpts)
.on('2XX', function(driver){
console.log('websocket authenticating ...' + driver.hash_key);
if (!driver.hash_key){
ws.send('{"error":"Server error"}');
console.log('Server error');
ws.close();
return;
}
// connection already exists for the driver
if (driver.hash_key in connections){
try {
console.log('Driver already connected. Closing old connection.');
connections[driver.hash_key].close();
} catch (ex) {
console.log(ex);
}
}
//authenticate
ws.driver = driver.hash_key;
ws.topic = 'gps_' + driver.hash_key;
ws.authenticated = true;
connectionCounter[ws.driver] = connectionCounter[ws.driver] || 0;
connectionCounter[ws.driver] += 1;
ws.count = connectionCounter[ws.driver];
connections[ws.driver] = ws;
ws.on('message', function incoming(message) {
if (!ws.authenticated){
ws.close();
return;
}
console.log('received: %s', message);
pub_socket.send([ws.topic, message]);
});
ws.on('close', function() {
console.log('My count: ' + ws.count + ' global: ' + connectionCounter[ws.driver]);
if (ws.driver && connectionCounter[ws.driver] && connectionCounter[ws.driver] == ws.count){
delete connections[ws.driver];
ws.authenticated = false;
pub_socket.send(['event_drop_driver', JSON.stringify({'driver_id': ws.driver}) ]);
console.log('Driver disconnected. ID: ' + driver.hash_key + ' Time: ' + new Date().toISOString());
}
});
/*
ws.on('ping', function(data, flags){
if (ws.authenticated == true){
try {
ws.pong(data);
} catch (ex) {
console.log(ex);
}
}
});
*/
// fire the event for driver connection
var msg = {'node': serverConfig.id, 'driver_id': ws.driver};
pub_socket.send(['event_new_driver', JSON.stringify(msg) ]);
console.log('Driver connected. ID: ' + driver.hash_key + ' Time: ' + new Date().toISOString() +
' counter: ' + connectionCounter[ws.driver]);
})
.on('4XX', errorHandler('Authentication failed'))
.on('5XX', errorHandler('Server error'))
.on('error', errorHandler('Error'));
});
/* listen to commands from driver backend and pings drivers */
router_socket.on('message', function(envelope, event, resourceId, msgId, data) {
console.log('Router received ' + event + ' ' + resourceId + ' ' + msgId + ' ' + data);
//console.log('Connections: ' + Object.keys(connections) );
/* handler zmq responses */
var zmqResponse = function(res){
router_socket.send([envelope].concat(res));
console.log('Router response: ' + res);
}
if (event == 'driver' || event == 'allDrivers'){
//response format expeted for the driver event
//[event, id, status code, message]
res = [event, msgId];
try{
if (resourceId in connections){
connections[resourceId].send(data.toString());
zmqResponse(res.concat([200, 'Success']));
}else{
throw 'Not found';
}
}catch(e){
zmqResponse(res.concat([410, 'Connection broken:' + e + ':' + e.stack]));
}
}else{
zmqResponse(res.concat([422, 'Error with message format']));
}
});
module.exports = app;