Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Expand UDP Single Port mode (UDPMux) to ServerReflexive and Relay #403

Closed
wants to merge 12 commits into from
69 changes: 47 additions & 22 deletions gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type closeable interface {
}

// Close a net.Conn and log if we have a failure
func closeConnAndLog(c closeable, log logging.LeveledLogger, msg string) {
if c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
func closeConnAndLog(c closeable, isUDPMux bool, log logging.LeveledLogger, msg string) {
if c == nil || isUDPMux || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
log.Warnf("Conn is not allocated (%s)", msg)
return
}
Expand Down Expand Up @@ -111,6 +111,7 @@ func (a *Agent) gatherCandidates(ctx context.Context) {
wg.Add(1)
go func() {
a.gatherCandidatesRelay(ctx, a.urls)

wg.Done()
}()
case CandidateTypePeerReflexive, CandidateTypeUnspecified:
Expand Down Expand Up @@ -203,13 +204,13 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ

c, err := NewCandidateHost(&hostConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
closeConnAndLog(conn, false, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
continue
}

if a.mDNSMode == MulticastDNSModeQueryAndGather {
if err = c.setIP(ip); err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
closeConnAndLog(conn, false, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
continue
}
}
Expand Down Expand Up @@ -262,7 +263,7 @@ func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error {

c, err := NewCandidateHost(&hostConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host mux candidate: %s %d: %v\n", candidateIP, port, err))
closeConnAndLog(conn, true, a.log, fmt.Sprintf("Failed to create host mux candidate: %s %d: %v\n", candidateIP, port, err))
// already logged error
return nil
}
Expand Down Expand Up @@ -292,16 +293,26 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []
go func() {
defer wg.Done()

conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
if err != nil {
a.log.Warnf("Failed to listen %s: %v\n", network, err)
return
}
var conn net.PacketConn
var err error

if a.udpMux != nil {
conn, err = a.udpMux.GetConn(a.localUfrag)
if err != nil {
a.log.Warnf("Failed to listen for %s: %v\n", network, err)
return
}
} else {
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
if err != nil {
closeConnAndLog(conn, false, a.log, fmt.Sprintf("Failed to listen for %s: %v\n", network, err))
return
}
}
laddr := conn.LocalAddr().(*net.UDPAddr)
mappedIP, err := a.extIPMapper.findExternalIP(laddr.IP.String())
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("1:1 NAT mapping is enabled but no external IP is found for %s\n", laddr.IP.String()))
closeConnAndLog(conn, false, a.log, fmt.Sprintf("1:1 NAT mapping is enabled but no external IP is found for %s\n", laddr.IP.String()))
return
}

Expand All @@ -315,7 +326,7 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []
}
c, err := NewCandidateServerReflexive(&srflxConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n",
closeConnAndLog(conn, false, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n",
network,
mappedIP.String(),
laddr.Port,
Expand Down Expand Up @@ -347,22 +358,29 @@ func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkT
go func(url URL, network string) {
defer wg.Done()

var conn net.PacketConn

hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
if err != nil {
a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
return
}

conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
isUDPMux := a.udpMux != nil
switch isUDPMux {
case true:
conn, err = a.udpMux.GetConn(a.localUfrag)
default:
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
}
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to listen for %s: %v\n", serverAddr.String(), err))
closeConnAndLog(conn, false, a.log, fmt.Sprintf("Failed to listen for %s: %v\n", serverAddr.String(), err))
return
}

xoraddr, err := getXORMappedAddr(conn, serverAddr, stunGatherTimeout)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("could not get server reflexive address %s %s: %v\n", network, url, err))
closeConnAndLog(conn, isUDPMux, a.log, fmt.Sprintf("could not get server reflexive address %s %s: %v\n", network, url, err))
return
}

Expand All @@ -380,7 +398,7 @@ func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkT
}
c, err := NewCandidateServerReflexive(&srflxConfig)
if err != nil {
closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
closeConnAndLog(conn, isUDPMux, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
return
}

Expand Down Expand Up @@ -422,11 +440,18 @@ func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*URL) { //noli
RelAddr string
RelPort int
relayProtocol string
isUDPMux bool
)

switch {
case url.Proto == ProtoTypeUDP && url.Scheme == SchemeTypeTURN:
if locConn, err = a.net.ListenPacket(network, "0.0.0.0:0"); err != nil {
isUDPMux = a.udpMux != nil
if isUDPMux {
locConn, err = a.udpMux.GetConn(a.localUfrag)
if err != nil {
a.log.Warnf("Failed to listen %s: %v\n", network, err)
}
} else if locConn, err = a.net.ListenPacket(network, "0.0.0.0:0"); err != nil {
a.log.Warnf("Failed to listen %s: %v\n", network, err)
return
}
Expand Down Expand Up @@ -514,20 +539,20 @@ func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*URL) { //noli
Net: a.net,
})
if err != nil {
closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to build new turn.Client %s %s\n", TURNServerAddr, err))
closeConnAndLog(locConn, isUDPMux, a.log, fmt.Sprintf("Failed to build new turn.Client %s %s\n", TURNServerAddr, err))
return
}

if err = client.Listen(); err != nil {
client.Close()
closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to listen on turn.Client %s %s\n", TURNServerAddr, err))
closeConnAndLog(locConn, isUDPMux, a.log, fmt.Sprintf("Failed to listen on turn.Client %s %s\n", TURNServerAddr, err))
return
}

relayConn, err := client.Allocate()
if err != nil {
client.Close()
closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to allocate on turn.Client %s %s\n", TURNServerAddr, err))
closeConnAndLog(locConn, isUDPMux, a.log, fmt.Sprintf("Failed to allocate on turn.Client %s %s\n", TURNServerAddr, err))
return
}

Expand Down Expand Up @@ -555,7 +580,7 @@ func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*URL) { //noli
relayConnClose()

client.Close()
closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to create relay candidate: %s %s: %v\n", network, raddr.String(), err))
closeConnAndLog(locConn, isUDPMux, a.log, fmt.Sprintf("Failed to create relay candidate: %s %s: %v\n", network, raddr.String(), err))
return
}

Expand Down
5 changes: 3 additions & 2 deletions gather_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about this build tag comment here. Whats the purpose of the // +build comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to keep compatibility with older versions as the new directive was introduced in Go v1.7

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlsmaycon Thanks :)

// +build !js

package ice
Expand Down Expand Up @@ -408,10 +409,10 @@ func TestCloseConnLog(t *testing.T) {
a, err := NewAgent(&AgentConfig{})
assert.NoError(t, err)

closeConnAndLog(nil, a.log, "normal nil")
closeConnAndLog(nil, false, a.log, "normal nil")

var nc *net.UDPConn
closeConnAndLog(nc, a.log, "nil ptr")
closeConnAndLog(nc, false, a.log, "nil ptr")

assert.NoError(t, a.Close())
}
Expand Down