-
Notifications
You must be signed in to change notification settings - Fork 1
/
crawl.go
285 lines (237 loc) · 7.32 KB
/
crawl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"sync"
"time"
"golang.org/x/net/websocket"
)
// ReqKind10002 initiates a request to a relay URL with kind 10002 and processes responses.
func ReqKind10002(relayURL string) error {
// Create context with a timeout for the entire operation.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// Establish a WebSocket connection.
ws, err := establishWebSocketConnection(relayURL)
if err != nil {
return err
}
defer ws.Close()
// Send the "REQ" message.
if err := sendREQMessage(ws); err != nil {
return fmt.Errorf("failed to send REQ message: %v", err)
}
// Continuously receive and process messages until "EOSE" or connection closed.
return receiveMessages(ctx, ws)
}
// establishWebSocketConnection sets up and establishes the WebSocket connection.
func establishWebSocketConnection(relayURL string) (*websocket.Conn, error) {
config, err := websocket.NewConfig(relayURL, "http://localhost/")
if err != nil {
return nil, fmt.Errorf("config error: %v", err)
}
ws, err := websocket.DialConfig(config)
if err != nil {
return nil, fmt.Errorf("dial error: %v", err)
}
return ws, nil
}
// sendREQMessage creates and sends a REQ message to the WebSocket connection.
func sendREQMessage(ws *websocket.Conn) error {
subscriptionID := "crawlr"
req := []interface{}{
"REQ", subscriptionID, map[string]interface{}{
"kinds": []int{10002},
"limit": 100,
},
}
return websocket.JSON.Send(ws, req)
}
// receiveMessages continuously receives and processes messages from the WebSocket connection.
func receiveMessages(ctx context.Context, ws *websocket.Conn) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout: no response from relay")
default:
var msg []byte
if err := websocket.Message.Receive(ws, &msg); err != nil {
if err == io.EOF {
return nil // Connection closed normally.
}
return fmt.Errorf("receive error: %v", err)
}
if err := handleMessage(msg); err != nil {
logError(fmt.Sprintf("Error handling message: %v", err))
}
}
}
}
// handleMessage unmarshals a message and checks for "EOSE" or parses relay list data.
func handleMessage(msg []byte) error {
var response []interface{}
if err := json.Unmarshal(msg, &response); err != nil {
return fmt.Errorf("unmarshal error: %v", err)
}
// Check if the message indicates "EOSE" (End of Stream).
if len(response) > 0 && response[0] == "EOSE" {
return nil // EOSE received, successfully end.
}
// Otherwise, parse relay list.
return parseRelayList(msg)
}
// logError logs error messages (could be sent to a logging channel or external system).
func logError(message string) {
// In this example, we'll just print to the console.
// You can replace this with sending to a logging channel or external system.
fmt.Println(message)
}
// parseRelayList parses relay URLs from kind 10002 messages
func parseRelayList(message []byte) error {
var response []interface{}
if err := json.Unmarshal(message, &response); err != nil {
return fmt.Errorf("failed to parse message: %v", err)
}
// Expect the message to have at least 3 elements and be an "EVENT"
if len(response) < 3 || response[0] != "EVENT" {
return nil // Not an event message or insufficient data
}
// Extract event data, must be a map
eventData, ok := response[2].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid event data format")
}
// Extract "tags" from event data
tags, ok := eventData["tags"].([]interface{})
if !ok {
return fmt.Errorf("invalid tags format")
}
// Collect all valid relay URLs
var relayURLs []string
for _, tag := range tags {
if tagArr, ok := tag.([]interface{}); ok && len(tagArr) >= 2 && tagArr[0] == "r" {
// The second element must be the relay URL
if relayURL, ok := tagArr[1].(string); ok {
relayURLs = append(relayURLs, relayURL)
}
}
}
// Lock the global mutex only when modifying shared state
mu.Lock()
defer mu.Unlock()
for _, relayURL := range relayURLs {
classifyRelay(relayURL) // Classify each relay URL
}
return nil
}
// classifyRelay categorizes the relay URL into the appropriate list
func classifyRelay(relayURL string) {
normalizedURL := normalizeURL(relayURL)
if isMalformedRelay(normalizedURL) {
malformed[normalizedURL]++
} else if isLocalRelay(normalizedURL) {
local[normalizedURL]++
} else if isOnionRelay(normalizedURL) {
onion[normalizedURL]++
} else if isAPIRelay(normalizedURL) {
clearAPI[normalizedURL]++
} else {
clearOnline[normalizedURL]++
}
}
// crawlClearOnlineRelays crawls the relays from the clearOnline list concurrently
func crawlClearOnlineRelays(concurrency int) {
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
mu.Lock()
relays := make([]string, 0, len(clearOnline))
for relay := range clearOnline {
if !crawledRelays[relay] {
relays = append(relays, relay)
}
}
mu.Unlock()
for _, relay := range relays {
wg.Add(1)
sem <- struct{}{} // Block when reaching concurrency limit
go func(r string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore after task
for i := 0; i < maxTries; i++ {
err := attemptCrawl(r)
if err != nil {
logChannel <- fmt.Sprintf("Failed to crawl relay %s: %v", r, err)
mu.Lock()
clearOffline[r] = clearOnline[r] // Mark as offline after failure
delete(clearOnline, r) // Remove from online list
crawledRelays[r] = true // Mark it as crawled
mu.Unlock()
time.Sleep(backoffDuration) // Apply backoff between retries
} else {
logChannel <- fmt.Sprintf("Successfully crawled relay: %s", r)
mu.Lock()
crawledRelays[r] = true // Mark it as crawled after success
mu.Unlock()
break
}
}
}(relay)
}
wg.Wait() // Wait for all goroutines to finish
}
// attemptCrawl handles the crawl attempt and returns an error if unsuccessful
func attemptCrawl(relayURL string) error {
ctx, cancel := context.WithTimeout(context.Background(), crawlTimeout)
defer cancel()
wsConfig, err := websocket.NewConfig(relayURL, "http://localhost/")
if err != nil {
return fmt.Errorf("config error: %v", err)
}
ws, err := websocket.DialConfig(wsConfig)
if err != nil {
return fmt.Errorf("dial error: %v", err)
}
defer ws.Close()
// Send REQ message
subscriptionID := "crawlr"
req := []interface{}{
"REQ", subscriptionID, map[string]interface{}{
"kinds": []int{10002},
"limit": 100,
},
}
err = websocket.JSON.Send(ws, req)
if err != nil {
return fmt.Errorf("failed to send REQ message: %v", err)
}
// Wait for response or timeout
select {
case <-ctx.Done():
return fmt.Errorf("timeout: no response from relay")
default:
var msg []byte
err := websocket.Message.Receive(ws, &msg)
if err != nil {
return fmt.Errorf("receive error: %v", err)
}
// Parse response
var response []interface{}
if err := json.Unmarshal(msg, &response); err != nil {
return fmt.Errorf("failed to parse message: %v", err)
}
if len(response) > 0 && response[0] == "EOSE" {
return nil // Successfully reached end of stream
}
// Handle any other messages or continue to parse...
}
return nil
}
// Logger that prints messages without affecting the status bar
func logRelayEvents() {
for msg := range logChannel {
// Move the cursor up to print above the status bar
fmt.Printf("\033[F%s\n", msg)
}
}