-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtaskManager.go
74 lines (61 loc) · 1.7 KB
/
taskManager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package messagix
import (
"encoding/json"
"strconv"
"github.com/0xzer/messagix/methods"
"github.com/0xzer/messagix/socket"
)
type TaskManager struct {
client *Client
currTasks []socket.TaskData
traceId string
}
func (c *Client) NewTaskManager() *TaskManager {
return &TaskManager{
client: c,
currTasks: make([]socket.TaskData, 0),
traceId: "",
}
}
func (tm *TaskManager) FinalizePayload() ([]byte, error) {
p := &socket.TaskPayload{
EpochId: methods.GenerateEpochId(),
Tasks: tm.currTasks,
DataTraceId: tm.traceId,
VersionId: strconv.Itoa(int(tm.client.configs.VersionId)),
}
tm.currTasks = make([]socket.TaskData, 0)
return json.Marshal(p)
}
func (tm *TaskManager) setTraceId(traceId string) {
tm.traceId = traceId
}
func (tm *TaskManager) AddNewTask(task socket.Task) {
payload, queueName, marshalQueueName := task.Create()
label := task.GetLabel()
payloadMarshalled, err := json.Marshal(payload)
if err != nil {
tm.client.Logger.Err(err).Any("label", label).Msg("failed to marshal task payload")
return
}
if marshalQueueName {
queueName, err = json.Marshal(queueName)
if err != nil {
tm.client.Logger.Err(err).Any("label", label).Msg("failed to marshal queueName information for task")
return
}
queueName = string(queueName.([]byte))
}
taskData := socket.TaskData{
FailureCount: nil,
Label: label,
Payload: string(payloadMarshalled),
QueueName: queueName,
TaskId: tm.GetTaskId(),
}
tm.client.Logger.Debug().Any("label", label).Any("payload", payload).Any("queueName", queueName).Any("taskId", taskData.TaskId).Msg("Creating task")
tm.currTasks = append(tm.currTasks, taskData)
}
func (tm *TaskManager) GetTaskId() int64 {
return int64(tm.client.GetTaskId())
}