-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample.cpp
130 lines (108 loc) · 4.53 KB
/
sample.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <iostream>
#include <thread>
#include <satella/satella.hpp>
satella::post_func_t create_worker_thread(const satella::cancellation_token &token)
{
// Create job_queue
auto job_queue = std::make_shared<satella::single_consumer_queue<satella::job_func_t>>();
// Create worker thread
std::thread worker([token, job_queue] {
std::cout << "worker thread: " << std::this_thread::get_id() << std::endl;
try {
for (;;) {
// pop and execute job function
// note: pop() blocks threads until push() is executed
job_queue->pop(token)();
}
} catch (satella::cancelled_error) {
}
});
worker.detach();
// Create producer function
return [job_queue](satella::job_func_t &&job) {
job_queue->push(std::move(job));
};
}
int main()
{
std::cout << "main thread: " << std::this_thread::get_id() << std::endl;
satella::cancellation_token_source cts;
auto producer = create_worker_thread(cts.get_token());
// Create task1 that returns the value of int
auto task1 = satella::make_task(producer, [] {
std::cout << "start task1 thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "end task1" << std::endl;
return 5;
});
// task1 is task<int>
static_assert(std::is_same<decltype(task1), satella::task<int>>::value);
// Create task2 that receives the result of task1 and returns the value of double
auto task2 = std::move(task1).then([](auto &&result) {
// result is int
static_assert(std::is_same<decltype(result), int&&>::value);
std::cout << "start task2 thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "end task2" << std::endl;
if (result == 0) {
throw std::invalid_argument("result == 0");
}
return (double)result / 2;
});
// task2 is task<double>
static_assert(std::is_same<decltype(task2), satella::task<double>>::value);
// Create task3 that receives the result of task2 and returns the value of float
// note: If an exception occurs in task1, task2 will not be executed but task3 will be executed
auto task3 = std::move(task2).then_catch([](auto &&result) {
// result is variant<std::exception_ptr, double>
static_assert(std::is_same<decltype(result), satella::variant<std::exception_ptr, double>&&>::value);
// catch task2 (or task1) exception
double val;
if (satella::index(result) == 0) {
try {
std::rethrow_exception(satella::get<0>(result));
} catch (const std::invalid_argument&) {
val = 0;
}
} else {
val = satella::get<1>(result);
}
std::cout << "start task3 thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "end task3" << std::endl;
return (float)val * 3;
});
// task3 is task<float>
static_assert(std::is_same<decltype(task3), satella::task<float>>::value);
// Create producer function 2
auto producer2 = create_worker_thread(cts.get_token());
// Create task4 to be executed by worker thread 2
auto task4 = std::move(task3).then(producer2, [](auto &&result) {
// result is float
static_assert(std::is_same<decltype(result), float&&>::value);
std::cout << "start task4 thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "end task4" << std::endl;
return result * 2;
});
// task4 is task<float>
static_assert(std::is_same<decltype(task4), satella::task<float>>::value);
// Can be assigned to a convertible type task
satella::task<double> task5 = std::move(task4);
double result;
try {
// Wait for the result of task5
// note: The main thread is not blocked by the time you reach this point
// All tasks are executed in worker thread
auto val = std::move(task5).get();
// val is float
static_assert(std::is_same<decltype(val), double>::value);
result = val;
} catch (...) {
// Handles exceptions that were not caught in task3 or that occurred in task4
result = -1;
}
// result == 15
std::cout << "result: " << result << std::endl;
cts.cancel();
}