forked from dardevelin/taiga-events-js
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrabbit.coffee
126 lines (103 loc) · 3.53 KB
/
rabbit.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
amqp = require('amqplib')
Promise = require('bluebird')
amqpUrl = require('./events-config').config.url
config = {
"exchange": {
"name": "events",
"type": "topic",
"options": {
"durable": false,
"autoDelete": true
}
},
"queue": {
"name": ""
"options": {
"autoDelete": true,
"exclusive": true
}
},
"channel": {
noAck: true
}
}
# Return the connection, creates the connection if it does not exist.
getConnection = do ->
connection = null
return () ->
return new Promise (resolve, reject) ->
if (!connection)
amqp.connect(amqpUrl).then (conn) ->
connection = conn
resolve(connection)
else
resolve(connection)
# Return the user channel
channels = do ->
chs = {}
pendingChannels = {}
removeClient = (client_id) ->
get(client_id).then (channel) ->
channel.close()
delete chs[client_id]
get = (client_id) ->
if pendingChannels[client_id]
return pendingChannels[client_id]
pendingChannels[client_id] = new Promise (resolve, reject) ->
if !chs[client_id]
getConnection()
.then (connection) -> connection.createChannel()
.then (channel) ->
chs[client_id] = channel
return resolve(chs[client_id])
else
resolve(chs[client_id])
return pendingChannels[client_id].then (ch) ->
delete pendingChannels[client_id]
return ch
return {
removeClient: removeClient
get: get
}
# Return a new queue
queues = do ->
getExchange = (channel) ->
return channel.assertExchange(config.exchange.name, config.exchange.type, config.exchange.options)
getQueue = (channel, exchange) ->
return channel.assertQueue(config.queue.name, config.queue.options).then (qok) -> qok.queue
return {
create: (channel, client_id, routing_key) ->
return getExchange(channel)
.then (exchange) -> getQueue(channel)
}
subscriptions = do ->
subs = {}
bindAndSubscribe = (channel, queue, routing_key, cb) ->
channel.bindQueue(queue, config.exchange.name, routing_key)
return channel.consume(queue, cb, {noAck: true})
registerSubscription = (client_id, routing_key, consumerTag) ->
subs[client_id] = subs[client_id] || {}
subs[client_id][routing_key] = consumerTag
subscribe = (client_id, routing_key, cb) ->
channels.get(client_id)
.then (channel) ->
queues.create(channel)
.then (queue) -> bindAndSubscribe(channel, queue, routing_key, cb)
.then (ok) -> registerSubscription(client_id, routing_key, ok.consumerTag)
unsubscribe = (client_id, routing_key) ->
channels.get(client_id).then (channel) ->
consumerTag = subs[client_id][routing_key]
if consumerTag != undefined
channel.cancel(consumerTag)
removeClient = (client_id) ->
delete subs[client_id]
return {
subscribe: subscribe
unsubscribe: unsubscribe
removeClient: removeClient
}
exports.destroy = (client_id) ->
subscriptions.removeClient(client_id)
channels.removeClient(client_id)
exports.subscribe = subscriptions.subscribe
exports.unsubscribe = subscriptions.unsubscribe