Skip to content

Commit

Permalink
BUG/MAJOR: runtime: fix race condition when calling runtime socket
Browse files Browse the repository at this point in the history
Switched from using a channel and a goroutine to simple mutex, to avoid
race condition when the goroutine is finished and a channel is being
written to, making it block forever. Now a simple mutex is controlling
the read/write to the runtime unix socket
  • Loading branch information
mjuraga committed Oct 8, 2024
1 parent 496b7ba commit 8500ef9
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 129 deletions.
26 changes: 6 additions & 20 deletions runtime/acls_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package runtime

import (
"context"
"reflect"
"testing"
"time"

"github.com/haproxytech/client-native/v6/models"
)
Expand Down Expand Up @@ -52,9 +50,7 @@ func TestSingleRuntime_ShowACLS(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -159,9 +155,7 @@ func TestSingleRuntime_GetACL(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -253,9 +247,7 @@ func TestSingleRuntime_ShowACLFileEntries(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -329,9 +321,7 @@ func TestSingleRuntime_GetACLFileEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -436,9 +426,7 @@ func TestSingleRuntime_AddACLFileEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -521,9 +509,7 @@ func TestSingleRuntime_DeleteACLFileEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down
38 changes: 8 additions & 30 deletions runtime/certs_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package runtime

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -56,9 +55,7 @@ func TestSingleRuntime_ShowCerts(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -146,9 +143,7 @@ func TestSingleRuntime_GetCert(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -231,9 +226,7 @@ func TestSingleRuntime_ShowCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -256,7 +249,6 @@ func TestSingleRuntime_NewCertEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -308,9 +300,7 @@ func TestSingleRuntime_NewCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -328,7 +318,6 @@ func TestSingleRuntime_SetCertEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -396,9 +385,7 @@ func TestSingleRuntime_SetCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -416,7 +403,6 @@ func TestSingleRuntime_CommitCertEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -470,9 +456,7 @@ func TestSingleRuntime_CommitCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -490,7 +474,6 @@ func TestSingleRuntime_AbortCertEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -542,9 +525,7 @@ func TestSingleRuntime_AbortCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -562,7 +543,6 @@ func TestSingleRuntime_DeleteCertEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -614,9 +594,7 @@ func TestSingleRuntime_DeleteCertEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down
25 changes: 5 additions & 20 deletions runtime/crt-lists_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package runtime

import (
"context"
"reflect"
"testing"
"time"
)

func TestSingleRuntime_ShowCrtLists(t *testing.T) {
Expand Down Expand Up @@ -50,9 +48,7 @@ func TestSingleRuntime_ShowCrtLists(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down Expand Up @@ -136,9 +132,7 @@ func TestSingleRuntime_GetCrtList(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -161,7 +155,6 @@ func TestSingleRuntime_ShowCrtListEntries(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -236,9 +229,7 @@ func TestSingleRuntime_ShowCrtListEntries(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -263,7 +254,6 @@ func TestSingleRuntime_AddCrtListEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -338,9 +328,7 @@ func TestSingleRuntime_AddCrtListEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand All @@ -358,7 +346,6 @@ func TestSingleRuntime_DeleteCrtListEntry(t *testing.T) {
defer haProxy.Stop()

type fields struct {
jobs chan Task
socketPath string
masterWorkerMode bool
}
Expand Down Expand Up @@ -407,9 +394,7 @@ func TestSingleRuntime_DeleteCrtListEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
haProxy.SetResponses(&tt.socketResponse)
s := &SingleRuntime{}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second))
defer cancel()
err := s.Init(ctx, tt.fields.socketPath, tt.fields.masterWorkerMode)
err := s.Init(tt.fields.socketPath, tt.fields.masterWorkerMode)
if err != nil {
t.Errorf("SingleRuntime.Init() error = %v", err)
return
Expand Down
6 changes: 3 additions & 3 deletions runtime/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type Runtime interface {
IsStatsSocket() bool
}

func New(ctx context.Context, opt ...options.RuntimeOption) (Runtime, error) {
func New(_ context.Context, opt ...options.RuntimeOption) (Runtime, error) {
c := &client{
options: options.RuntimeOptions{},
}
Expand All @@ -163,9 +163,9 @@ func New(ctx context.Context, opt ...options.RuntimeOption) (Runtime, error) {
}

if c.options.MasterSocketData != nil {
err = c.initWithMasterSocket(ctx, c.options)
err = c.initWithMasterSocket(c.options)
} else {
err = c.initWithSockets(ctx, c.options)
err = c.initWithSockets(c.options)
}
if err != nil {
return nil, err
Expand Down
15 changes: 7 additions & 8 deletions runtime/runtime_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package runtime

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -36,7 +35,7 @@ import (
type client struct {
haproxyVersion *HAProxyVersion
options options.RuntimeOptions
runtime SingleRuntime
runtime *SingleRuntime
}

const (
Expand All @@ -47,12 +46,12 @@ const (
maxBufSize = 8192
)

func (c *client) initWithSockets(ctx context.Context, opt options.RuntimeOptions) error {
func (c *client) initWithSockets(opt options.RuntimeOptions) error {
socketPath := opt.Socket

runtime := SingleRuntime{}
runtime := &SingleRuntime{}
masterWorkerMode := false
err := runtime.Init(ctx, socketPath, masterWorkerMode, opt)
err := runtime.Init(socketPath, masterWorkerMode, opt)
if err != nil {
return err
}
Expand All @@ -61,15 +60,15 @@ func (c *client) initWithSockets(ctx context.Context, opt options.RuntimeOptions
return nil
}

func (c *client) initWithMasterSocket(ctx context.Context, opt options.RuntimeOptions) error {
func (c *client) initWithMasterSocket(opt options.RuntimeOptions) error {
masterSocketPath := opt.MasterSocketData.MasterSocketPath

if masterSocketPath == "" {
return errors.New("master socket not configured")
}
runtime := SingleRuntime{}
runtime := &SingleRuntime{}
masterWorkerMode := true
err := runtime.Init(ctx, masterSocketPath, masterWorkerMode, opt)
err := runtime.Init(masterSocketPath, masterWorkerMode, opt)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8500ef9

Please sign in to comment.