Skip to content

Commit

Permalink
add closed/failed tcp connection #10
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed Aug 4, 2019
1 parent 218af03 commit 87dd79c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 3 deletions.
23 changes: 21 additions & 2 deletions app/agent/syslog/syslog_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// GetWriter returns syslog writer for non-win platform
func GetWriter(ctx context.Context, host, proto, prefix, containerName string) (res io.WriteCloser, err error) {

var wr *syslog.Writer
switch proto {
case "udp", "udp4":
e := repeater.NewDefault(10, time.Second).Do(ctx, func() error {
Expand All @@ -24,10 +25,10 @@ func GetWriter(ctx context.Context, host, proto, prefix, containerName string) (
return res, e
case "tcp", "tcp4":
e := repeater.NewDefault(10, time.Second).Do(ctx, func() error {
res, err = syslog.Dial("tcp4", host, syslog.LOG_WARNING|syslog.LOG_DAEMON, prefix+containerName)
wr, err = syslog.Dial("tcp4", host, syslog.LOG_WARNING|syslog.LOG_DAEMON, prefix+containerName)
return err
})
return res, e
return &syslogRetryWriter{swr: wr}, e
}
return nil, errors.Errorf("unknown syslog protocol %s", proto)
}
Expand All @@ -36,3 +37,21 @@ func GetWriter(ctx context.Context, host, proto, prefix, containerName string) (
func IsSupported() bool {
return true
}

// syslogRetryWriter wraps syslog.Writer with connection close on write errors and causes con=nil
// syslog.Write will redial if conn=nil
type syslogRetryWriter struct {
swr *syslog.Writer
}

func (s *syslogRetryWriter) Write(p []byte) (n int, err error) {
n, err = s.swr.Write(p)
if err != nil {
_ = s.swr.Close()
}
return n, err
}

func (s *syslogRetryWriter) Close() error {
return s.swr.Close()
}
6 changes: 5 additions & 1 deletion app/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

docker "github.com/fsouza/go-dockerclient"
log "github.com/go-pkgz/lgr"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"gopkg.in/natefinch/lumberjack.v2"

Expand Down Expand Up @@ -143,7 +144,10 @@ func (a AgentCmd) makeLogWriters(ctx context.Context, containerName, group strin
}

if len(logWriters) == 0 {
return nil, nil, errors.Errorf("all log writers failed. file %+v, syslog %+v", fileErr, syslogErr)
errs := new(multierror.Error)
errs = multierror.Append(errs, fileErr)
errs = multierror.Append(errs, syslogErr)
return nil, nil, errors.Errorf("all log writers failed, %+v", errs.Error())
}

return lw, ew, nil
Expand Down
73 changes: 73 additions & 0 deletions app/cmd/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package cmd

import (
"bytes"
"context"
"io/ioutil"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -160,3 +165,71 @@ func Test_makeLogWritersSyslogPassed(t *testing.T) {
_, err = errWr.Write([]byte("xxx123 line 2\n"))
assert.NoError(t, err)
}

func Test_makeLogWritersSyslogTCP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
time.AfterFunc(1*time.Second, cancel)

wg := sync.WaitGroup{}
buf := bytes.Buffer{}
ts, err := net.Listen("tcp4", "localhost:5514")
var accepted int32
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
l, ok := ts.(*net.TCPListener)
if ok {
require.NoError(t, l.SetDeadline(time.Now().Add(10*time.Millisecond)))
}
conn, err := l.Accept()
if err != nil {
continue
}
b := make([]byte, 1000)
_, err = conn.Read(b)
require.NoError(t, err)
buf.Write(b)
require.NoError(t, conn.Close())
atomic.AddInt32(&accepted, 1)
}
}
}()

time.Sleep(10 * time.Millisecond)

opts := AgentOpts{EnableSyslog: true, SyslogHost: "127.0.0.1:5514", SyslogProt: "tcp", SyslogPrefix: "docker/"}
a := AgentCmd{AgentOpts: opts}

stdWr, errWr, err := a.makeLogWriters(ctx, "container1", "gr1")
require.NoError(t, err)
assert.Equal(t, stdWr, errWr, "same writer for out and err in syslog")

// write to out writer
_, err = stdWr.Write([]byte("abc line 1\n"))
assert.NoError(t, err)
_, err = stdWr.Write([]byte("xxx123 line 2\n"))
assert.NoError(t, err)

// write to err writer
_, err = errWr.Write([]byte("err line 1\n"))
assert.NoError(t, err)
_, err = errWr.Write([]byte("err xxx123 line 2345\n"))
assert.NoError(t, err)

wg.Wait()
t.Log(buf.String())
res := strings.Split(buf.String(), "\n")
assert.Equal(t, 5, len(res), "4 messages + final eol")
assert.Contains(t, res[0], "docker/container1")
assert.Contains(t, res[0], ": abc line 1")
assert.Contains(t, res[3], "docker/container1")
assert.Contains(t, res[3], ": err xxx123 line 2345")

assert.Equal(t, int32(1), atomic.LoadInt32(&accepted))
}

0 comments on commit 87dd79c

Please sign in to comment.