-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathysb_nodes.hpp
271 lines (245 loc) · 7.84 KB
/
ysb_nodes.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/******************************************************************************
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
******************************************************************************
*/
/*
* Classes of the Yahoo! Streaming Benchmark (TBB FlowGraph version)
*
* This version of the Yahoo! Streaming Benchmark is the one modified in the
* StreamBench project available in GitHub:
* https://github.com/lsds/StreamBench
*/
#ifndef YSB_NODES
#define YSB_NODES
// include
#include <tuple>
#include <mutex>
#include <atomic>
#include <cassert>
#include <sys/time.h>
#include <functional>
#include <unordered_map>
#include <ysb_common.hpp>
#include <campaign_generator.hpp>
using namespace std;
// global variable: starting time of the execution
volatile unsigned long start_time_usec;
// global variable: number of generated events
std::atomic<long> sentCounter;
// global variable: constant for the EOS
const unsigned long EOS = (unsigned long) -1;
/**
* \brief Function to return the number of microseconds from the epoch
*
* This function returns the number of microseconds from the epoch using
* the clock_gettime() call.
*/
static inline unsigned long current_time_usecs() __attribute__((always_inline));
static inline unsigned long current_time_usecs()
{
struct timespec t;
clock_gettime(CLOCK_REALTIME, &t);
return (t.tv_sec)*1000000L + (t.tv_nsec / 1000);
}
// Source functor
class YSBSource
{
private:
unsigned long execution_time_sec; // total execution time of the benchmark
unsigned long **ads_arrays;
unsigned int adsPerCampaign; // number of ads per campaign
size_t num_sent;
volatile unsigned long current_time_us;
unsigned int value;
bool eos = false;
public:
// constructor
YSBSource(unsigned long _time_sec, unsigned long **_ads_arrays, unsigned int _adsPerCampaign):
execution_time_sec(_time_sec), ads_arrays(_ads_arrays), adsPerCampaign(_adsPerCampaign), num_sent(0), value(0) {}
// source function
bool operator()(event_t *&event)
{
if (eos) return false; // stopping
event = new event_t();
current_time_us = current_time_usecs();
// fill the event's fields
event->ts = current_time_usecs() - start_time_usec;
event->user_id = 0; // not meaningful
event->page_id = 0; // not meaningful
event->ad_id = ads_arrays[(value % 100000) % (N_CAMPAIGNS * adsPerCampaign)][1];
event->ad_type = (value % 100000) % 5;
event->event_type = (value % 100000) % 3;
event->ip = 1; // not meaningful
value++;
num_sent++;
//volatile long mytime = current_time_usecs();
//while(current_time_usecs() - mytime <= 10);
double elapsed_time_sec = (current_time_us - start_time_usec) / 1000000.0;
if (elapsed_time_sec >= execution_time_sec) {
//cout << "[EventSource] Generated " << num_sent << " events" << endl;
sentCounter.fetch_add(num_sent);
eos = true;
event->ts = EOS;
}
return true;
}
};
// Filter functor
class YSBFilter
{
private:
unsigned int event_type; // forward only tuples with event_type
public:
// constructor
YSBFilter(unsigned int _event_type=0): event_type(_event_type) {}
// filter function
void operator()(event_t *event, filter_node_t::output_ports_type &op) {
if(event->event_type == event_type || event->ts == EOS) {
if (!std::get<0>(op).try_put(event)) abort();
}
}
};
// Join functor
class YSBJoin
{
private:
unordered_map<unsigned long, unsigned int> ↦ // hashmap
campaign_record *relational_table; // relational table
vector<window_node_t*> &workers;
public:
// constructor
YSBJoin(vector<window_node_t*> &_workers, unordered_map<unsigned long, unsigned int> &_map, campaign_record *_relational_table):
workers(_workers), map(_map), relational_table(_relational_table) {}
// constructor
YSBJoin(const YSBJoin &other):
workers(other.workers), map(other.map), relational_table(other.relational_table) {}
// join function
continue_msg operator()(event_t *event) {
if (event->ts == EOS) {
for(long i=0; i<workers.size(); ++i) {
joined_event_t *out = new joined_event_t();
out->ts = EOS;
if (!workers[i]->try_put(out)) abort();
}
delete event;
return continue_msg();
}
// check inside the hashmap
auto it = map.find(event->ad_id);
if (it != map.end()) {
joined_event_t *out = new joined_event_t();
out->ts = event->ts;
out->ad_id = event->ad_id;
campaign_record record = relational_table[(*it).second];
out->relational_ad_id = record.ad_id;
out->cmp_id = record.cmp_id;
// parte eseguita dal KF_Emitter (joined_event_t --> joined_event_t)
{
auto key = std::get<0>(out->getControlFields()); // key
size_t hashcode = hash<decltype(key)>()(key); // compute the hashcode of the key
// evaluate the routing function
size_t dest_w = hashcode % workers.size(); // routing_func(hashcode, pardegree);
// routing the data on the basis of a key value
if (!workers[dest_w]->try_put(out)) abort();
}
// input cleanup
delete event;
}
return continue_msg(); // keep going on
}
};
// Window operator
class WinAggregate
{
private:
long myid;
long pardegree1;
unordered_map<unsigned long, Window*> hashmap;
int eos_received = 0;
public:
// constructor
WinAggregate(long _myid, long _pardegree1): myid(_myid), pardegree1(_pardegree1) {}
// window function
void operator()(joined_event_t *in, window_node_t::output_ports_type &op) {
if (in->ts == EOS) { // end-of-stream management
if (++eos_received == pardegree1) {
for (auto& it: hashmap) {
if (it.second != nullptr) {
unsigned long cmp_id = it.first;
Window &win = *(it.second);
win_result *out = new win_result();
assert(out);
out->setControlFields(cmp_id, 0, win.last_ts);
out->count = win.count;
out->lastUpdate = win.last_ts;
if (!std::get<0>(op).try_put(out)) abort();
}
}
// forward EOS
win_result *out = new win_result();
assert(out);
out->ts = EOS;
if (!std::get<0>(op).try_put(out)) abort();
}
return;
}
unsigned long cmp_id = in->cmp_id;
unsigned long ts = in->ts;
auto it = hashmap.find(cmp_id);
if (it != hashmap.end()) {
Window &win = *(it->second);
if ((ts - win.initial_ts) >= 10000000) { // <--- 10s
win_result *out = new win_result();
assert(out);
out->setControlFields(cmp_id, 0, win.last_ts);
out->count = win.count;
out->lastUpdate = win.last_ts;
if (!std::get<0>(op).try_put(out)) abort();
// reset the window
win.set(1, ts, ts);
}
else {
win.count++;
win.last_ts = ts;
}
}
else {
Window *w = new Window(1, ts, ts);
hashmap[cmp_id] = w;
}
delete in;
}
};
// Sink functor
class YSBSink
{
private:
size_t received;
public:
// constructor
YSBSink(): received(0) {}
// sink function
long operator()(win_result *res) {
if (res->ts == EOS) {
delete res;
return 0;
}
received++;
delete res;
return 0;
}
// get the number of received results
size_t rcvResults() { return received; }
};
#endif