Skip to content

Commit

Permalink
Merge pull request #898 from strukturag/check-cluster-versions
Browse files Browse the repository at this point in the history
Check version of cluster nodes and log warning if different.
  • Loading branch information
fancycode authored Jan 16, 2025
2 parents 11fc1be + ce14835 commit 7b7f9ac
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 30 deletions.
24 changes: 14 additions & 10 deletions grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ func (c *GrpcClient) SetSelf(self bool) {
c.isSelf.Store(self)
}

func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) {
func (c *GrpcClient) GetServerId(ctx context.Context) (string, string, error) {
statsGrpcClientCalls.WithLabelValues("GetServerId").Inc()
response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true))
if err != nil {
return "", err
return "", "", err
}

return response.GetServerId(), nil
return response.GetServerId(), response.GetVersion(), nil
}

func (c *GrpcClient) LookupResumeId(ctx context.Context, resumeId string) (*LookupResumeIdReply, error) {
Expand Down Expand Up @@ -398,7 +398,8 @@ type grpcClientsList struct {
}

type GrpcClients struct {
mu sync.RWMutex
mu sync.RWMutex
version string

clientsMap map[string]*grpcClientsList
clients []*GrpcClient
Expand All @@ -421,10 +422,11 @@ type GrpcClients struct {
closeFunc context.CancelFunc
}

func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) {
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) {
initializedCtx, initializedFunc := context.WithCancel(context.Background())
closeCtx, closeFunc := context.WithCancel(context.Background())
result := &GrpcClients{
version: version,
dnsMonitor: dnsMonitor,
etcdClient: etcdClient,
initializedCtx: initializedCtx,
Expand Down Expand Up @@ -499,12 +501,12 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool
return false
}

func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, error) {
func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, string, error) {
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

id, err := client.GetServerId(ctx2)
return id, err
id, version, err := client.GetServerId(ctx2)
return id, version, err
}

func (c *GrpcClients) checkIsSelf(ctx context.Context, target string, client *GrpcClient) {
Expand All @@ -522,7 +524,7 @@ loop:
return
}

id, err := c.getServerIdWithTimeout(ctx, client)
id, version, err := c.getServerIdWithTimeout(ctx, client)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand All @@ -539,8 +541,10 @@ loop:
log.Printf("GRPC target %s is this server, removing", client.Target())
c.closeClient(client)
client.SetSelf(true)
} else if version != c.version {
log.Printf("WARNING: Node %s is runing different version %s than local node (%s)", client.Target(), version, c.version)
} else {
log.Printf("Checked GRPC server id of %s", client.Target())
log.Printf("Checked GRPC server id of %s running version %s", client.Target(), version)
}
break loop
}
Expand Down
4 changes: 2 additions & 2 deletions grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {

func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) {
dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually
client, err := NewGrpcClients(config, etcdClient, dnsMonitor)
client, err := NewGrpcClients(config, etcdClient, dnsMonitor, "0.0.0")
require.NoError(t, err)
t.Cleanup(func() {
client.Close()
Expand Down Expand Up @@ -336,7 +336,7 @@ func Test_GrpcClients_Encryption(t *testing.T) {
require.NoError(clients.WaitForInitialized(ctx))

for _, client := range clients.GetClients() {
_, err := client.GetServerId(ctx)
_, _, err := client.GetServerId(ctx)
require.NoError(err)
}
})
Expand Down
34 changes: 22 additions & 12 deletions grpc_internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message GetServerIdRequest {

message GetServerIdReply {
string serverId = 1;
string version = 2;
}
5 changes: 4 additions & 1 deletion grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type GrpcServer struct {
UnimplementedRpcMcuServer
UnimplementedRpcSessionsServer

version string
creds credentials.TransportCredentials
conn *grpc.Server
listener net.Listener
Expand All @@ -78,7 +79,7 @@ type GrpcServer struct {
hub GrpcServerHub
}

func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {
func NewGrpcServer(config *goconf.ConfigFile, version string) (*GrpcServer, error) {
var listener net.Listener
if addr, _ := GetStringOptionWithEnv(config, "grpc", "listen"); addr != "" {
var err error
Expand All @@ -95,6 +96,7 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {

conn := grpc.NewServer(grpc.Creds(creds))
result := &GrpcServer{
version: version,
creds: creds,
conn: conn,
listener: listener,
Expand Down Expand Up @@ -265,6 +267,7 @@ func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdReques
statsGrpcServerCalls.WithLabelValues("GetServerId").Inc()
return &GetServerIdReply{
ServerId: s.serverId,
Version: s.version,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewGrpcServerForTestWithConfig(t *testing.T, config *goconf.ConfigFile) (se
addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
config.AddOption("grpc", "listen", addr)
var err error
server, err = NewGrpcServer(config)
server, err = NewGrpcServer(config, "0.0.0")
if isErrorAddressAlreadyInUse(err) {
continue
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
defer cancel1()

_, err = client1.GetServerId(ctx1)
_, _, err = client1.GetServerId(ctx1)
require.NoError(err)

org2 := "Updated client"
Expand All @@ -245,6 +245,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
defer cancel2()

// This will fail if the CA certificate has not been reloaded by the server.
_, err = client2.GetServerId(ctx2)
_, _, err = client2.GetServerId(ctx2)
require.NoError(err)
}
4 changes: 2 additions & 2 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func main() {
}
}()

rpcServer, err := signaling.NewGrpcServer(config)
rpcServer, err := signaling.NewGrpcServer(config, version)
if err != nil {
log.Fatalf("Could not create RPC server: %s", err)
}
Expand All @@ -213,7 +213,7 @@ func main() {
}()
defer rpcServer.Close()

rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor)
rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor, version)
if err != nil {
log.Fatalf("Could not create RPC clients: %s", err)
}
Expand Down

0 comments on commit 7b7f9ac

Please sign in to comment.