Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check version of cluster nodes and log warning if different. #898

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ func (c *GrpcClient) SetSelf(self bool) {
c.isSelf.Store(self)
}

func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) {
func (c *GrpcClient) GetServerId(ctx context.Context) (string, string, error) {
statsGrpcClientCalls.WithLabelValues("GetServerId").Inc()
response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true))
if err != nil {
return "", err
return "", "", err
}

return response.GetServerId(), nil
return response.GetServerId(), response.GetVersion(), nil
}

func (c *GrpcClient) LookupResumeId(ctx context.Context, resumeId string) (*LookupResumeIdReply, error) {
Expand Down Expand Up @@ -398,7 +398,8 @@ type grpcClientsList struct {
}

type GrpcClients struct {
mu sync.RWMutex
mu sync.RWMutex
version string

clientsMap map[string]*grpcClientsList
clients []*GrpcClient
Expand All @@ -421,10 +422,11 @@ type GrpcClients struct {
closeFunc context.CancelFunc
}

func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) {
func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) {
initializedCtx, initializedFunc := context.WithCancel(context.Background())
closeCtx, closeFunc := context.WithCancel(context.Background())
result := &GrpcClients{
version: version,
dnsMonitor: dnsMonitor,
etcdClient: etcdClient,
initializedCtx: initializedCtx,
Expand Down Expand Up @@ -499,12 +501,12 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool
return false
}

func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, error) {
func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, string, error) {
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

id, err := client.GetServerId(ctx2)
return id, err
id, version, err := client.GetServerId(ctx2)
return id, version, err
}

func (c *GrpcClients) checkIsSelf(ctx context.Context, target string, client *GrpcClient) {
Expand All @@ -522,7 +524,7 @@ loop:
return
}

id, err := c.getServerIdWithTimeout(ctx, client)
id, version, err := c.getServerIdWithTimeout(ctx, client)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand All @@ -539,8 +541,10 @@ loop:
log.Printf("GRPC target %s is this server, removing", client.Target())
c.closeClient(client)
client.SetSelf(true)
} else if version != c.version {
log.Printf("WARNING: Node %s is runing different version %s than local node (%s)", client.Target(), version, c.version)
} else {
log.Printf("Checked GRPC server id of %s", client.Target())
log.Printf("Checked GRPC server id of %s running version %s", client.Target(), version)
}
break loop
}
Expand Down
4 changes: 2 additions & 2 deletions grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} {

func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) {
dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually
client, err := NewGrpcClients(config, etcdClient, dnsMonitor)
client, err := NewGrpcClients(config, etcdClient, dnsMonitor, "0.0.0")
require.NoError(t, err)
t.Cleanup(func() {
client.Close()
Expand Down Expand Up @@ -336,7 +336,7 @@ func Test_GrpcClients_Encryption(t *testing.T) {
require.NoError(clients.WaitForInitialized(ctx))

for _, client := range clients.GetClients() {
_, err := client.GetServerId(ctx)
_, _, err := client.GetServerId(ctx)
require.NoError(err)
}
})
Expand Down
34 changes: 22 additions & 12 deletions grpc_internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message GetServerIdRequest {

message GetServerIdReply {
string serverId = 1;
string version = 2;
}
5 changes: 4 additions & 1 deletion grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type GrpcServer struct {
UnimplementedRpcMcuServer
UnimplementedRpcSessionsServer

version string
creds credentials.TransportCredentials
conn *grpc.Server
listener net.Listener
Expand All @@ -78,7 +79,7 @@ type GrpcServer struct {
hub GrpcServerHub
}

func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {
func NewGrpcServer(config *goconf.ConfigFile, version string) (*GrpcServer, error) {
var listener net.Listener
if addr, _ := GetStringOptionWithEnv(config, "grpc", "listen"); addr != "" {
var err error
Expand All @@ -95,6 +96,7 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) {

conn := grpc.NewServer(grpc.Creds(creds))
result := &GrpcServer{
version: version,
creds: creds,
conn: conn,
listener: listener,
Expand Down Expand Up @@ -265,6 +267,7 @@ func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdReques
statsGrpcServerCalls.WithLabelValues("GetServerId").Inc()
return &GetServerIdReply{
ServerId: s.serverId,
Version: s.version,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewGrpcServerForTestWithConfig(t *testing.T, config *goconf.ConfigFile) (se
addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
config.AddOption("grpc", "listen", addr)
var err error
server, err = NewGrpcServer(config)
server, err = NewGrpcServer(config, "0.0.0")
if isErrorAddressAlreadyInUse(err) {
continue
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
defer cancel1()

_, err = client1.GetServerId(ctx1)
_, _, err = client1.GetServerId(ctx1)
require.NoError(err)

org2 := "Updated client"
Expand All @@ -245,6 +245,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) {
defer cancel2()

// This will fail if the CA certificate has not been reloaded by the server.
_, err = client2.GetServerId(ctx2)
_, _, err = client2.GetServerId(ctx2)
require.NoError(err)
}
4 changes: 2 additions & 2 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func main() {
}
}()

rpcServer, err := signaling.NewGrpcServer(config)
rpcServer, err := signaling.NewGrpcServer(config, version)
if err != nil {
log.Fatalf("Could not create RPC server: %s", err)
}
Expand All @@ -213,7 +213,7 @@ func main() {
}()
defer rpcServer.Close()

rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor)
rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor, version)
if err != nil {
log.Fatalf("Could not create RPC clients: %s", err)
}
Expand Down
Loading