-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsseread.go
53 lines (50 loc) · 1.59 KB
/
sseread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package sseread
import (
"bufio"
"bytes"
"io"
)
// Read reads from an io.Reader, parses the data as Server-Sent Events, and invokes the provided callback function for each event.
// It returns an error if any occurs during reading or parsing the events.
func Read(responseBody io.Reader, callback func(event *Event)) (err error) {
scanner := bufio.NewScanner(responseBody)
ev := new(Event)
for scanner.Scan() {
line := scanner.Bytes()
firstColonIndex := bytes.IndexByte(line, ':')
if firstColonIndex == -1 {
callback(ev)
//start another new server sent event
ev = new(Event)
} else {
lineType, lineData := string(line[:firstColonIndex]), line[firstColonIndex+1:]
//parse event filed(line)
ev.ParseEventLine(lineType, lineData)
}
}
return scanner.Err()
}
// ReadCh reads from an io.Reader, parses the data as Server-Sent Events, and sends each event on a channel.
// It returns the channel of events and an error if any occurs during reading or parsing the events.
func ReadCh(responseBody io.Reader) (messages <-chan *Event, err error) {
channel := make(chan *Event)
scanner := bufio.NewScanner(responseBody)
go func() {
defer close(channel)
ev := new(Event)
for scanner.Scan() {
line := scanner.Bytes()
firstColonIndex := bytes.IndexByte(line, ':')
if firstColonIndex == -1 {
channel <- ev
//start another new server sent event
ev = new(Event)
} else {
lineType, lineData := string(line[:firstColonIndex]), line[firstColonIndex+1:]
//parse event filed(line)
ev.ParseEventLine(lineType, lineData)
}
}
}()
return channel, scanner.Err()
}