This repository has been archived by the owner on Jan 15, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
poller.go
140 lines (127 loc) · 3.07 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Package poller provides level triggered readiness notification and
// reliable closing of file descriptors.
package poller
import (
"io"
"syscall"
)
// A Poller provides readiness notification and reliable closing of
// registered file descriptors.
type Poller struct {
poller
}
// Register registers a file describtor with the Poller and returns a
// Pollable which can be used for reading/writing as well as readiness
// notification.
//
// File descriptors registered with the poller will be placed into
// non-blocking mode.
func (p *Poller) Register(fd uintptr) (*Pollable, error) {
if err := syscall.SetNonblock(int(fd), true); err != nil {
return nil, err
}
return p.register(fd)
}
// Pollable represents a file descriptor that can be read/written
// and polled/waited for readiness notification.
type Pollable struct {
fd uintptr
cr, cw chan error
poller
}
// Read reads up to len(b) bytes from the underlying fd. It returns the number of
// bytes read and an error, if any. EOF is signaled by a zero count with
// err set to io.EOF.
//
// Callers to Read will block if there is no data available to read.
func (p *Pollable) Read(b []byte) (int, error) {
n, e := p.read(b)
if n < 0 {
n = 0
}
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
}
if e != nil {
return n, e
}
return n, nil
}
func (p *Pollable) read(b []byte) (int, error) {
for {
n, e := syscall.Read(int(p.fd), b)
if e != syscall.EAGAIN {
return n, e
}
if err := p.WaitRead(); err != nil {
return 0, err
}
}
}
// Write writes len(b) bytes to the fd. It returns the number of bytes
// written and an error, if any. Write returns a non-nil error when n !=
// len(b).
//
// Callers to Write will block if there is no buffer capacity available.
func (p *Pollable) Write(b []byte) (int, error) {
n, e := p.write(b)
if n < 0 {
n = 0
}
if n != len(b) {
return n, io.ErrShortWrite
}
if e != nil {
return n, e
}
return n, nil
}
func (p *Pollable) write(b []byte) (int, error) {
for {
// TODO(dfc) this is wrong
n, e := syscall.Write(int(p.fd), b)
if e != syscall.EAGAIN {
return n, e
}
if err := p.WaitWrite(); err != nil {
return 0, err
}
}
}
// Close deregisters the Pollable and closes the underlying file descriptor.
func (p *Pollable) Close() error {
err := p.deregister(p)
// p.fd = uintptr(-1) // TODO(dfc) fix
return err
}
// WaitRead waits for the Pollable to become ready for
// reading.
func (p *Pollable) WaitRead() error {
debug("pollable: %p, fd: %v waitread", p, p.fd)
if err := p.poller.waitRead(p); err != nil {
return err
}
return <-p.cr
}
// WaitWrite waits for the Pollable to become ready for
// writing.
func (p *Pollable) WaitWrite() error {
if err := p.poller.waitWrite(p); err != nil {
return err
}
return <-p.cw
}
func (p *Pollable) wake(mode int, err error) {
debug("pollable: %p, fd: %v wake: %c, %v", p, p.fd, mode, err)
if mode == 'r' {
p.cr <- err
} else {
p.cw <- err
}
}
type poller interface {
register(fd uintptr) (*Pollable, error)
waitRead(*Pollable) error
waitWrite(*Pollable) error
deregister(*Pollable) error
}