Skip to content

Commit

Permalink
feat(network): introduce PacketProxy to handle UDP traffic (#27)
Browse files Browse the repository at this point in the history
This PR introduces a network stack neutral `network.PacketProxy` that can be used to **relay** (e.g. a UDP proxy server) or **process** (e.g. a local DNS server) UDP packets. It is more efficient than the `transport.PacketListener` when you try to implement a local UDP server that can process UDP requests and get the responses without going to the internet (see #26 ).
  • Loading branch information
jyyi1 authored Jul 19, 2023
1 parent 33ceec5 commit 76434db
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 99 deletions.
9 changes: 0 additions & 9 deletions network/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@ package network

import (
"fmt"
"os"
"syscall"
)

// Portable analogs of some common errors.
//
// Errors returned from this package and all sub-packages may be tested against these errors with [errors.Is].
var (
// ErrClosed is the error returned by an I/O call on a network device that has already been closed, or that is closed
// by another goroutine before the I/O is completed. This may be wrapped in another error, and should normally be
// tested using errors.Is(err, network.ErrClosed).
ErrClosed = fmt.Errorf("the network device is closed: %w", os.ErrClosed)

// ErrMsgSize is the error returned by a Write on a network device that the size of the message to be sent is bigger
// than the maximum message size the device can process.
ErrMsgSize = fmt.Errorf("packet size is too big: %w", syscall.EMSGSIZE)
Expand Down
5 changes: 3 additions & 2 deletions network/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

/*
The network package defines interfaces and provides utilities for network layer (OSI layer 3) functionalities. For
example, the IPDevice interface can be used to read and write IP packets from a physical or virtual network device.
example, the [IPDevice] interface can be used to read and write IP packets from a physical or virtual network device.
In addition, user-space network stack implementations are also included in the sub-packages (such as
network/lwip2transport) that can translate raw IP packets into TCP/UDP connections.
network/lwip2transport) that can translate raw IP packets into TCP/UDP flows. You can implement a [PacketProxy]
to handle UDP traffic, and a [transport.StreamDialer] to handle TCP traffic.
*/
package network
28 changes: 28 additions & 0 deletions network/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package network

import (
"errors"
)

// Portable analogs of some common errors.
//
// Errors returned from this package and all sub-packages may be tested against these errors with [errors.Is].

// ErrClosed is the error returned by an I/O call on a network device or proxy that has already been closed, or that is
// closed by another goroutine before the I/O is completed. This may be wrapped in another error, and should normally
// be tested using errors.Is(err, network.ErrClosed).
var ErrClosed = errors.New("network device already closed")
29 changes: 14 additions & 15 deletions network/lwip2transport/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"io"
"sync"
"time"

"github.com/Jigsaw-Code/outline-internal-sdk/network"
"github.com/Jigsaw-Code/outline-internal-sdk/transport"
Expand Down Expand Up @@ -47,29 +46,29 @@ type lwIPDevice struct {
var instMu sync.Mutex
var inst *lwIPDevice = nil

// ConfigureDevice configures the singleton LwIPTransportDevice using the [transport.StreamDialer] sd to handle TCP
// streams and the [transport.PacketListener] pl to handle UDP packets.
// ConfigureDevice configures the singleton LwIP device using the [transport.StreamDialer] to handle TCP streams and
// the [transport.PacketProxy] to handle UDP packets.
//
// LwIPTransportDevice is a [network.IPDevice] that can translate IP packets to TCP/UDP traffic and vice versa. It uses
// the [lwIP library] to perform the translation.
// LwIP device is a [network.IPDevice] that can translate IP packets to TCP/UDP traffic and vice versa. It uses the
// [lwIP library] to perform the translation.
//
// LwIPTransportDevice must be a singleton object due to limitations in [lwIP library]. If you try to call
// ConfigureDevice more than once, we will Close the previous device and reconfigures it.
// LwIP device must be a singleton object due to limitations in [lwIP library]. If you try to call ConfigureDevice more
// than once, we will Close the previous device and reconfigures it.
//
// To use a LwIPTransportDevice:
// To use a LwIP device:
// 1. Call [ConfigureDevice] with two handlers for TCP and UDP traffic.
// 2. Write IP packets to the device. The device will translate the IP packets to TCP/UDP traffic and send them to the
// appropriate handlers.
// 3. Read IP packets from the device to get the TCP/UDP responses.
//
// A LwIPTransportDevice is NOT thread-safe. However it is safe to use Write, Read/WriteTo and Close in different
// goroutines. But keep in mind that only one goroutine can call Write at a time; and only one goroutine can use either
// Read or WriteTo at a time.
// A LwIP device is NOT thread-safe. However it is safe to use Write, Read/WriteTo and Close in different goroutines.
// But keep in mind that only one goroutine can call Write at a time; and only one goroutine can use either Read or
// WriteTo at a time.
//
// [lwIP library]: https://savannah.nongnu.org/projects/lwip/
func ConfigureDevice(sd transport.StreamDialer, pl transport.PacketListener) (network.IPDevice, error) {
if sd == nil || pl == nil {
return nil, errors.New("both sd and pl are required")
func ConfigureDevice(sd transport.StreamDialer, pp network.PacketProxy) (network.IPDevice, error) {
if sd == nil || pp == nil {
return nil, errors.New("both sd and pp are required")
}

instMu.Lock()
Expand All @@ -80,7 +79,7 @@ func ConfigureDevice(sd transport.StreamDialer, pl transport.PacketListener) (ne
}
inst = &lwIPDevice{
tcp: newTCPHandler(sd),
udp: newUDPHandler(pl, 30*time.Second),
udp: newUDPHandler(pp),
stack: lwip.NewLWIPStack(),
done: make(chan struct{}),
rdBuf: make(chan []byte),
Expand Down
15 changes: 10 additions & 5 deletions network/lwip2transport/device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"net"
"os"
"syscall"
"testing"

"github.com/Jigsaw-Code/outline-internal-sdk/network"
Expand All @@ -34,11 +35,15 @@ func TestStackClosedWriteError(t *testing.T) {
n, err := t2s.Write([]byte{0x01})
require.Exactly(t, 0, n)
require.ErrorIs(t, err, network.ErrClosed)
require.ErrorIs(t, err, os.ErrClosed) // network.ErrClosed should wrap os.ErrClosed

// network.ErrClosed should not wrap golang's ErrClosed errors
require.NotErrorIs(t, err, os.ErrClosed)
require.NotErrorIs(t, err, net.ErrClosed)
require.NotErrorIs(t, err, syscall.ESHUTDOWN)
}

func reConfigurelwIPDeviceForTest(t *testing.T, sd transport.StreamDialer, pl transport.PacketListener) *lwIPDevice {
t2s, err := ConfigureDevice(sd, pl)
func reConfigurelwIPDeviceForTest(t *testing.T, sd transport.StreamDialer, pp network.PacketProxy) *lwIPDevice {
t2s, err := ConfigureDevice(sd, pp)
require.NoError(t, err)
t2sInternal, ok := t2s.(*lwIPDevice)
require.True(t, ok)
Expand All @@ -49,10 +54,10 @@ type errTcpUdpHandler struct {
err error
}

func (h *errTcpUdpHandler) Dial(ctx context.Context, raddr string) (transport.StreamConn, error) {
func (h *errTcpUdpHandler) Dial(context.Context, string) (transport.StreamConn, error) {
return nil, h.err
}

func (h *errTcpUdpHandler) ListenPacket(ctx context.Context) (net.PacketConn, error) {
func (h *errTcpUdpHandler) NewSession(network.PacketResponseReceiver) (network.PacketRequestSender, error) {
return nil, h.err
}
134 changes: 66 additions & 68 deletions network/lwip2transport/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,103 +15,101 @@
package lwip2transport

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/Jigsaw-Code/outline-internal-sdk/transport"
"github.com/Jigsaw-Code/outline-internal-sdk/network"
lwip "github.com/eycorsican/go-tun2socks/core"
)

// Compilation guard against interface implementation
var _ lwip.UDPConnHandler = (*udpHandler)(nil)
var _ network.PacketResponseReceiver = (*udpConnResponseWriter)(nil)

type udpHandler struct {
// Protects the connections map
sync.Mutex

// Used to establish connections to the proxy
listener transport.PacketListener

// How long to wait for a packet from the proxy. Longer than this and the connection
// is closed.
timeout time.Duration

// Maps local UDP addresses (IPv4:port/[IPv6]:port) to connections to the proxy.
conns map[string]net.PacketConn
mu sync.Mutex // Protects the senders field
proxy network.PacketProxy // A network stack neutral implementation of UDP PacketProxy
senders map[string]network.PacketRequestSender // Maps local lwIP UDP socket to PacketRequestSender
}

// newUDPHandler returns a lwIP UDP connection handler.
//
// `pl` is an UDP packet listener.
// `timeout` is the UDP read and write timeout.
func newUDPHandler(pl transport.PacketListener, timeout time.Duration) *udpHandler {
// `pktProxy` is a PacketProxy that handles UDP packets.
func newUDPHandler(pktProxy network.PacketProxy) *udpHandler {
return &udpHandler{
listener: pl,
timeout: timeout,
conns: make(map[string]net.PacketConn, 8),
proxy: pktProxy,
senders: make(map[string]network.PacketRequestSender, 8),
}
}

func (h *udpHandler) Connect(tunConn lwip.UDPConn, target *net.UDPAddr) error {
proxyConn, err := h.listener.ListenPacket(context.Background())
if err != nil {
return err
}
h.Lock()
h.conns[tunConn.LocalAddr().String()] = proxyConn
h.Unlock()
go h.relayPacketsFromProxy(tunConn, proxyConn)
// Connect does nothing. New UDP sessions will be created in ReceiveTo.
func (h *udpHandler) Connect(tunConn lwip.UDPConn, _ *net.UDPAddr) error {
return nil
}

// relayPacketsFromProxy relays packets from the proxy to the TUN device.
func (h *udpHandler) relayPacketsFromProxy(tunConn lwip.UDPConn, proxyConn net.PacketConn) {
buf := lwip.NewBytes(lwip.BufSize)
defer func() {
h.close(tunConn)
lwip.FreeBytes(buf)
}()
for {
proxyConn.SetDeadline(time.Now().Add(h.timeout))
n, sourceAddr, err := proxyConn.ReadFrom(buf)
if err != nil {
return
}
// No resolution will take place, the address sent by the proxy is a resolved IP.
sourceUDPAddr, err := net.ResolveUDPAddr("udp", sourceAddr.String())
if err != nil {
return
}
_, err = tunConn.WriteFrom(buf[:n], sourceUDPAddr)
if err != nil {
// ReceiveTo relays packets from the lwIP TUN device to the proxy. It's called by lwIP. ReceiveTo will also create a
// new UDP session if `data` is the first packet from the `tunConn`.
func (h *udpHandler) ReceiveTo(tunConn lwip.UDPConn, data []byte, destAddr *net.UDPAddr) (err error) {
laddr := tunConn.LocalAddr().String()

h.mu.Lock()
reqSender, ok := h.senders[laddr]
if !ok {
if reqSender, err = h.newSession(tunConn); err != nil {
return
}
h.senders[laddr] = reqSender
}
h.mu.Unlock()

_, err = reqSender.WriteTo(data, destAddr)
return
}

// ReceiveTo relays packets from the TUN device to the proxy. It's called by tun2socks.
func (h *udpHandler) ReceiveTo(tunConn lwip.UDPConn, data []byte, destAddr *net.UDPAddr) error {
h.Lock()
proxyConn, ok := h.conns[tunConn.LocalAddr().String()]
h.Unlock()
if !ok {
return fmt.Errorf("connection %v->%v does not exist", tunConn.LocalAddr(), destAddr)
// newSession creates a new PacketRequestSender related to conn. The caller needs to put the new PacketRequestSender
// to the h.senders map.
func (h *udpHandler) newSession(conn lwip.UDPConn) (network.PacketRequestSender, error) {
respWriter := &udpConnResponseWriter{conn, h}
reqSender, err := h.proxy.NewSession(respWriter)
if err != nil {
respWriter.Close()
}
return reqSender, err
}

// closeSession cleans up resources related to conn.
func (h *udpHandler) closeSession(conn lwip.UDPConn) error {
h.mu.Lock()
defer h.mu.Unlock()

laddr := conn.LocalAddr().String()
err := conn.Close()
if reqSender, ok := h.senders[laddr]; ok {
reqSender.Close()
delete(h.senders, laddr)
}
proxyConn.SetDeadline(time.Now().Add(h.timeout))
_, err := proxyConn.WriteTo(data, destAddr)
return err
}

func (h *udpHandler) close(tunConn lwip.UDPConn) {
laddr := tunConn.LocalAddr().String()
tunConn.Close()
h.Lock()
defer h.Unlock()
if proxyConn, ok := h.conns[laddr]; ok {
proxyConn.Close()
delete(h.conns, laddr)
// The PacketResponseWriter that will write responses to the lwip network stack.
type udpConnResponseWriter struct {
conn lwip.UDPConn
h *udpHandler
}

// Write relays packets from the proxy to the lwIP TUN device.
func (r *udpConnResponseWriter) WriteFrom(p []byte, source net.Addr) (int, error) {
// net.Addr -> *net.UDPAddr, because r.conn.WriteFrom requires *net.UDPAddr
// and this is more reliable than type assertion
// also the source address host will be an IP address, no actual resolution will be done
srcAddr, err := net.ResolveUDPAddr("udp", source.String())
if err != nil {
return 0, err
}
return r.conn.WriteFrom(p, srcAddr)
}

// Close informs the udpHandler to close the UDPConn and clean up the UDP session.
func (r *udpConnResponseWriter) Close() error {
return r.h.closeSession(r.conn)
}
Loading

0 comments on commit 76434db

Please sign in to comment.