-
Notifications
You must be signed in to change notification settings - Fork 2
/
example_test.go
183 lines (160 loc) · 3.91 KB
/
example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package bluto_test
import (
"fmt"
"log"
"github.com/alibaba-go/bluto/bluto"
"github.com/alibaba-go/bluto/commander"
"github.com/gomodule/redigo/redis"
)
func Example_commandChain() {
bluto, _ := bluto.New(bluto.Config{
Address: "localhost:6379",
ConnectTimeoutSeconds: 10,
ReadTimeoutSeconds: 10,
})
defer bluto.ClosePool()
key1 := "SomeKey"
key2 := "Other"
var selectResult string
var setResult1 string
var setResult2 string
var keysResult []string
var delResult int
var getResult1 int
var getResult2 int
var flushResult string
errCmd := bluto.Borrow().
Select(&selectResult, 0).
Set(&setResult1, key1, 9, commander.SetOptionNX{}, commander.SetOptionEX{EX: 2}).
Set(&setResult2, key2, 9).
Keys(&keysResult, "*Key*").
Del(&delResult, key1, "NotExistKey").
Get(&getResult1, key1).
FlushAll(&flushResult, commander.FlushAllOptionAsync{}).
Get(&getResult2, key2).
Commit()
if errCmd != nil {
log.Fatal(errCmd)
}
fmt.Println(keysResult)
// Output: [SomeKey]
}
// Fields are the properties of each consumed message
type Fields struct {
Key string `redis:"Key"`
}
// Message represents each consumed message
type Message struct {
ID string
Fields *Fields
}
// RedisScan is the redis.Scanner interface implementation
func (m *Message) RedisScan(src interface{}) error {
// each message has two parts: 1-id, 2-fields
message, err := redis.Values(src, nil)
if err != nil {
return err
}
messageID, err := redis.String(message[0], nil)
if err != nil {
return err
}
m.ID = messageID
msgFieldDetails, err := redis.Values(message[1], nil)
if err != nil {
return err
}
var msgField Fields
err = redis.ScanStruct(msgFieldDetails, &msgField)
if err != nil {
return err
}
m.Fields = &msgField
return nil
}
// Stream represents each stream
type Stream struct {
Name string
Messages []*Message
}
// RedisScan is the redis.Scanner interface implementation
func (s *Stream) RedisScan(src interface{}) error {
// each stream has two parts: 1-name, 2-messages
stream, err := redis.Values(src, nil)
if err != nil {
return err
}
// set stream name
name, err := redis.String(stream[0], nil)
if err != nil {
return err
}
s.Name = name
// set stream messages
messages, err := redis.Values(stream[1], nil)
if err != nil {
return err
}
// each message has two parts: 1-id, 2-fields
for i := range messages {
message := messages[i]
msgDetails, err := redis.Values(message, nil)
if err != nil {
return err
}
// set message id
msgID, err := redis.String(msgDetails[0], nil)
if err != nil {
return err
}
// set message field
msgFieldDetails, err := redis.Values(msgDetails[1], nil)
if err != nil {
return err
}
var msgField Fields
err = redis.ScanStruct(msgFieldDetails, &msgField)
if err != nil {
return err
}
// set messages
s.Messages = append(s.Messages, &Message{
ID: msgID,
Fields: &msgField,
})
}
return nil
}
func Example_scanner() {
bluto, _ := bluto.New(bluto.Config{
Address: "localhost:6379",
ConnectTimeoutSeconds: 10,
ReadTimeoutSeconds: 10,
})
defer bluto.ClosePool()
groupName := "testGroup"
consumerName := "testConsumer"
key := "SomeKey"
var flushResult string
var xgroupCreateResult string
var xaddResult string
var xreadgroupResult []Stream
err := bluto.Borrow().FlushAll(&flushResult).Commit()
if err != nil {
log.Panic(err)
}
err = bluto.Borrow().XGroupCreate(&xgroupCreateResult, "testStream", groupName, "0-0", commander.XGroupCreateOptionMKStream{}).Commit()
if err != nil {
log.Panic(err)
}
err = bluto.Borrow().XAdd(&xaddResult, "testStream", "*", &Fields{Key: key}).Commit()
if err != nil {
log.Panic(err)
}
err = bluto.Borrow().XReadGroup(&xreadgroupResult, groupName, consumerName, []string{"testStream"}, []string{">"}).Commit()
if err != nil {
log.Panic(err)
}
fmt.Println(xreadgroupResult[0].Messages[0].Fields.Key)
// Output: SomeKey
}