-
Notifications
You must be signed in to change notification settings - Fork 5
/
udpservice.go
173 lines (158 loc) · 4.3 KB
/
udpservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//
package tnet
import (
"context"
"errors"
"fmt"
"net"
"trpc.group/trpc-go/tnet/internal/netutil"
"trpc.group/trpc-go/tnet/internal/poller"
goreuseport "trpc.group/trpc-go/tnet/internal/reuseport"
"trpc.group/trpc-go/tnet/log"
)
// NewUDPService creates a udp service. Ensure that all listeners are listening to the same address.
func NewUDPService(lns []PacketConn, handler UDPHandler, opt ...Option) (Service, error) {
if err := validateListeners(lns); err != nil {
return nil, err
}
return newUDPService(lns, handler, opt...)
}
func newUDPService(lns []PacketConn, handler UDPHandler, opt ...Option) (Service, error) {
var opts options
opts.setDefault()
for _, o := range opt {
o.f(&opts)
}
s := &udpservice{
reqHandle: handler,
opts: opts,
hupCh: make(chan struct{}),
}
for _, ln := range lns {
conn, ok := ln.(*udpconn)
if !ok {
return nil, fmt.Errorf("listeners are not of udpconn type: %T, they should be created by tnet.ListenPackets", ln)
}
conn.SetMaxPacketSize(s.opts.maxUDPPacketSize)
s.conns = append(s.conns, conn)
}
return s, nil
}
// NewPacketConn creates a tnet.PacketConn from net.PacketConn. Note that
// conn must listen on UDP and make sure that conn implements syscall.Conn.
func NewPacketConn(conn net.PacketConn) (PacketConn, error) {
if err := netutil.ValidateUDP(conn); err != nil {
return nil, fmt.Errorf("validate listener fail: %w", err)
}
uc, err := newUDPConn(conn)
if err != nil {
return nil, err
}
return uc, nil
}
func listenUDP(network string, address string, reuseport bool) ([]PacketConn, error) {
var lns []PacketConn
n := 1
listenPacket := net.ListenPacket
if reuseport {
n = poller.NumPollers()
listenPacket = goreuseport.ListenPacket
}
for i := 0; i < n; i++ {
rawConn, err := listenPacket(network, address)
if err != nil {
return nil, fmt.Errorf("udp listen error:%v", err)
}
conn, err := newUDPConn(rawConn)
if err != nil {
return nil, err
}
lns = append(lns, conn)
// Set the address with a specified port to prevent the user from listening on a random port.
address = rawConn.LocalAddr().String()
}
return lns, nil
}
func newUDPConn(listener net.PacketConn) (*udpconn, error) {
fd, err := netutil.GetFD(listener)
if err != nil {
listener.Close()
return nil, err
}
conn := &udpconn{
nfd: netFD{
fd: fd,
fdtype: fdUDP,
sock: listener,
network: listener.LocalAddr().Network(),
laddr: listener.LocalAddr(),
udpBufferSize: defaultUDPBufferSize,
},
readTrigger: make(chan struct{}, 1),
}
conn.inBuffer.Initialize()
conn.outBuffer.Initialize()
return conn, nil
}
type udpservice struct {
reqHandle UDPHandler
hupCh chan struct{}
conns []*udpconn
opts options
}
// Serve starts the service.
func (s *udpservice) Serve(ctx context.Context) error {
defer s.close()
for _, conn := range s.conns {
if err := conn.SetOnRequest(s.reqHandle); err != nil {
return err
}
conn.SetNonBlocking(s.opts.nonblocking)
if s.opts.onUDPClosed != nil {
conn.SetOnClosed(s.opts.onUDPClosed)
}
if err := conn.schedule(); err != nil {
return err
}
}
log.Infof("tnet udp service started, current number of pollers: %d, use tnet.SetNumPollers to change it\n",
poller.NumPollers())
select {
case <-ctx.Done():
return ctx.Err()
case <-s.hupCh:
return errors.New("service is closed")
}
}
func (s *udpservice) close() error {
for _, conn := range s.conns {
if err := conn.Close(); err != nil {
return err
}
}
return nil
}
func validateListeners(lns []PacketConn) error {
if len(lns) == 0 {
return errors.New("listeners can't be nil")
}
// Ensure that all lisnters are listening to the same address.
firstAddr := lns[0].LocalAddr()
for i := 1; i < len(lns); i++ {
if addr := lns[i].LocalAddr(); addr.String() != firstAddr.String() {
return fmt.Errorf("listeners have different local address: %s, %s", firstAddr, addr)
}
}
return nil
}