Skip to content

Commit

Permalink
Add etcd tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Dec 21, 2023
1 parent 038ae32 commit ba90be4
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 4 deletions.
6 changes: 3 additions & 3 deletions proxy_config_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ func (p *proxyConfigEtcd) EtcdKeyUpdated(client *EtcdClient, key string, data []
if found && info.Address != prev.Address {
// Address of a proxy has changed.
p.removeEtcdProxyLocked(key)
found = false
}

if otherKey, found := p.urlToKey[info.Address]; found && otherKey != key {
if otherKey, otherFound := p.urlToKey[info.Address]; otherFound && otherKey != key {
log.Printf("Address %s is already registered for key %s, ignoring %s", info.Address, otherKey, key)
return
}

if _, found := p.keyInfos[info.Address]; found {
if found {
p.keyInfos[key] = &info
p.urlToKey[info.Address] = key
p.proxy.KeepConnection(info.Address)
} else {
if err := p.proxy.AddConnection(false, info.Address); err != nil {
Expand Down
86 changes: 86 additions & 0 deletions proxy_config_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package signaling

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/dlintw/goconf"
"go.etcd.io/etcd/server/v3/embed"
)

type TestProxyInformationEtcd struct {
Address string `json:"address"`

OtherData string `json:"otherdata,omitempty"`
}

func newProxyConfigEtcd(t *testing.T, proxy McuProxy) (*embed.Etcd, ProxyConfig) {
t.Helper()
etcd, client := NewEtcdClientForTest(t)
cfg := goconf.NewConfigFile()
cfg.AddOption("mcu", "keyprefix", "proxies/")
p, err := NewProxyConfigEtcd(cfg, client, proxy)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
p.Stop()
})
return etcd, p
}

func SetEtcdProxy(t *testing.T, etcd *embed.Etcd, path string, proxy *TestProxyInformationEtcd) {
t.Helper()
data, err := json.Marshal(proxy)
if err != nil {
t.Fatal(err)
}
SetEtcdValue(etcd, path, data)
}

func TestProxyConfigEtcd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
proxy := newMcuProxyForConfig(t)
etcd, config := newProxyConfigEtcd(t, proxy)
SetEtcdProxy(t, etcd, "proxies/a", &TestProxyInformationEtcd{
Address: "https://foo/",
})
proxy.Expect("add", "https://foo/")
if err := config.Start(); err != nil {
t.Fatal(err)
}
proxy.WaitForEvents(ctx)

proxy.Expect("add", "https://bar/")
SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{
Address: "https://bar/",
})
proxy.WaitForEvents(ctx)

proxy.Expect("keep", "https://bar/")
SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{
Address: "https://bar/",
OtherData: "ignore-me",
})
proxy.WaitForEvents(ctx)

proxy.Expect("remove", "https://foo/")
DeleteEtcdValue(etcd, "proxies/a")
proxy.WaitForEvents(ctx)

proxy.Expect("remove", "https://bar/")
proxy.Expect("add", "https://baz/")
SetEtcdProxy(t, etcd, "proxies/b", &TestProxyInformationEtcd{
Address: "https://baz/",
})
proxy.WaitForEvents(ctx)

// Adding the same hostname multiple times should not trigger an event.
SetEtcdProxy(t, etcd, "proxies/c", &TestProxyInformationEtcd{
Address: "https://baz/",
})
time.Sleep(100 * time.Millisecond)
}
52 changes: 51 additions & 1 deletion proxy_config_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package signaling

import (
"context"
"net"
"reflect"
"runtime"
"strings"
"sync"
"testing"
)

Expand All @@ -29,25 +31,58 @@ type proxyConfigEvent struct {
type mcuProxyForConfig struct {
t *testing.T
expected []proxyConfigEvent
mu sync.Mutex
waiters []chan struct{}
}

func newMcuProxyForConfig(t *testing.T) *mcuProxyForConfig {
return &mcuProxyForConfig{
proxy := &mcuProxyForConfig{
t: t,
}
t.Cleanup(func() {
if len(proxy.expected) > 0 {
t.Errorf("expected events %+v were not triggered", proxy.expected)
}
})
return proxy
}

func (p *mcuProxyForConfig) Expect(action string, url string, ips ...net.IP) {
if len(ips) == 0 {
ips = nil
}

p.mu.Lock()
defer p.mu.Unlock()

p.expected = append(p.expected, proxyConfigEvent{
action: action,
url: url,
ips: ips,
})
}

func (p *mcuProxyForConfig) WaitForEvents(ctx context.Context) {
p.t.Helper()

p.mu.Lock()
defer p.mu.Unlock()

if len(p.expected) == 0 {
return
}

waiter := make(chan struct{})
p.waiters = append(p.waiters, waiter)
p.mu.Unlock()
defer p.mu.Lock()
select {
case <-ctx.Done():
p.t.Error(ctx.Err())
case <-waiter:
}
}

func (p *mcuProxyForConfig) checkEvent(event *proxyConfigEvent) {
p.t.Helper()
pc := make([]uintptr, 32)
Expand All @@ -70,6 +105,21 @@ func (p *mcuProxyForConfig) checkEvent(event *proxyConfigEvent) {
return
}

defer func() {
if len(p.expected) == 0 {
p.mu.Lock()
waiters := p.waiters
p.waiters = nil
p.mu.Unlock()

for _, ch := range waiters {
ch <- struct{}{}
}
}
}()

p.mu.Lock()
defer p.mu.Unlock()
expected := p.expected[0]
p.expected = p.expected[1:]
if !reflect.DeepEqual(expected, *event) {
Expand Down

0 comments on commit ba90be4

Please sign in to comment.