-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathapi.go
163 lines (140 loc) · 5.15 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package gorqlite
// this file has low level stuff:
//
// rqliteApiGet()
// rqliteApiPost()
//
// There is some code duplication between those and they should
// probably be combined into one function.
//
// nothing public here.
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
nurl "net/url"
"strings"
)
type ParameterizedStatement struct {
Query string
Arguments []interface{}
}
// method: rqliteApiCall() - internally handles api calls,
// not supposed to be used by other files
//
// - handles retries
// - handles timeouts
func (conn *Connection) rqliteApiCall(ctx context.Context, apiOp apiOperation, method string, requestBody []byte) ([]byte, error) {
// Verify that we have at least a single peer to which we can make the request
peers := conn.cluster.PeerList()
if len(peers) < 1 {
return nil, errors.New("don't have any cluster info")
}
trace("%s: I have a peer list %d peers long", conn.ID, len(peers))
// Keep list of failed requests to each peer, return in case all peers fail to answer
var failureLog []string
for i, peer := range peers {
trace("%s: attemping to contact peer %d", conn.ID, i)
url := conn.assembleURL(apiOp, peer)
// Prepare request
var bodyReader io.Reader
if requestBody != nil {
bodyReader = bytes.NewBuffer(requestBody)
}
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
if err != nil {
trace("%s: got error '%s' doing http.NewRequest", conn.ID, err.Error())
failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", redactURL(url), err.Error()))
continue
}
trace("%s: http.NewRequest() OK", conn.ID)
req.Header.Set("Content-Type", "application/json")
// Execute request using shared client
// We will close the response body as soon as we can to allow
// the TCP connection to escape back into client's pool
response, err := conn.client.Do(req)
if err != nil {
trace("%s: got error '%s' doing client.Do", conn.ID, err.Error())
failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", redactURL(url), err.Error()))
continue
}
// Read response body even if not a successful answer to return a descriptive error message
responseBody, err := io.ReadAll(response.Body)
if err != nil {
trace("%s: got error '%s' doing ioutil.ReadAll", conn.ID, err.Error())
failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", redactURL(url), err.Error()))
response.Body.Close()
continue
}
trace("%s: ioutil.ReadAll() OK", conn.ID)
// Check that we've got a successful answer
if response.StatusCode != http.StatusOK {
trace("%s: got code %s", conn.ID, response.Status)
failureLog = append(failureLog, fmt.Sprintf("%s failed, got: %s, message: %s", redactURL(url), response.Status, string(responseBody)))
response.Body.Close()
continue
}
response.Body.Close()
trace("%s: client.Do() OK", conn.ID)
return responseBody, nil
}
// All peers have failed to answer us, build a verbose error message
var builder strings.Builder
builder.WriteString("tried all peers unsuccessfully. here are the results:\n")
for n, v := range failureLog {
builder.WriteString(fmt.Sprintf(" peer #%d: %s\n", n, v))
}
return nil, errors.New(builder.String())
}
// redactURL redacts URL from the given parameter to be
// safely read by the client
func redactURL(url string) string {
u, err := nurl.Parse(url)
if err != nil {
return ""
}
return u.Redacted()
}
// method: rqliteApiGet() - for api_STATUS and api_NODES
//
// - lowest level interface - does not do any JSON unmarshaling
// - handles retries
// - handles timeouts
func (conn *Connection) rqliteApiGet(ctx context.Context, apiOp apiOperation) ([]byte, error) {
var responseBody []byte
trace("%s: rqliteApiGet() called", conn.ID)
// Allow only api_STATUS and api_NODES now - maybe someday BACKUP
if apiOp != api_STATUS && apiOp != api_NODES {
return responseBody, errors.New("rqliteApiGet() called for invalid api operation")
}
return conn.rqliteApiCall(ctx, apiOp, "GET", nil)
}
// method: rqliteApiPost() - for api_QUERY and api_WRITE
//
// - lowest level interface - does not do any JSON unmarshaling
// - handles retries
// - handles timeouts
func (conn *Connection) rqliteApiPost(ctx context.Context, apiOp apiOperation, sqlStatements []ParameterizedStatement) ([]byte, error) {
var responseBody []byte
// Allow only api_QUERY, api_WRITE and api_REQUEST
if apiOp != api_QUERY && apiOp != api_WRITE && apiOp != api_REQUEST {
return responseBody, errors.New("rqliteApiPost() called for invalid api operation")
}
trace("%s: rqliteApiPost() called for a QUERY of %d statements", conn.ID, len(sqlStatements))
formattedStatements := make([][]interface{}, 0, len(sqlStatements))
for _, statement := range sqlStatements {
formattedStatement := make([]interface{}, 0, len(statement.Arguments)+1)
formattedStatement = append(formattedStatement, statement.Query)
formattedStatement = append(formattedStatement, statement.Arguments...)
formattedStatements = append(formattedStatements, formattedStatement)
}
body, err := json.Marshal(formattedStatements)
if err != nil {
return nil, err
}
return conn.rqliteApiCall(ctx, apiOp, "POST", body)
}