diff --git a/grpc_client.go b/grpc_client.go index 24d2b0cd..1d33e62c 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -179,14 +179,14 @@ func (c *GrpcClient) SetSelf(self bool) { c.isSelf.Store(self) } -func (c *GrpcClient) GetServerId(ctx context.Context) (string, error) { +func (c *GrpcClient) GetServerId(ctx context.Context) (string, string, error) { statsGrpcClientCalls.WithLabelValues("GetServerId").Inc() response, err := c.impl.GetServerId(ctx, &GetServerIdRequest{}, grpc.WaitForReady(true)) if err != nil { - return "", err + return "", "", err } - return response.GetServerId(), nil + return response.GetServerId(), response.GetVersion(), nil } func (c *GrpcClient) LookupResumeId(ctx context.Context, resumeId string) (*LookupResumeIdReply, error) { @@ -398,7 +398,8 @@ type grpcClientsList struct { } type GrpcClients struct { - mu sync.RWMutex + mu sync.RWMutex + version string clientsMap map[string]*grpcClientsList clients []*GrpcClient @@ -421,10 +422,11 @@ type GrpcClients struct { closeFunc context.CancelFunc } -func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor) (*GrpcClients, error) { +func NewGrpcClients(config *goconf.ConfigFile, etcdClient *EtcdClient, dnsMonitor *DnsMonitor, version string) (*GrpcClients, error) { initializedCtx, initializedFunc := context.WithCancel(context.Background()) closeCtx, closeFunc := context.WithCancel(context.Background()) result := &GrpcClients{ + version: version, dnsMonitor: dnsMonitor, etcdClient: etcdClient, initializedCtx: initializedCtx, @@ -499,12 +501,12 @@ func (c *GrpcClients) isClientAvailable(target string, client *GrpcClient) bool return false } -func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, error) { +func (c *GrpcClients) getServerIdWithTimeout(ctx context.Context, client *GrpcClient) (string, string, error) { ctx2, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - id, err := client.GetServerId(ctx2) - return id, err + id, version, err := client.GetServerId(ctx2) + return id, version, err } func (c *GrpcClients) checkIsSelf(ctx context.Context, target string, client *GrpcClient) { @@ -522,7 +524,7 @@ loop: return } - id, err := c.getServerIdWithTimeout(ctx, client) + id, version, err := c.getServerIdWithTimeout(ctx, client) if err != nil { if errors.Is(err, context.Canceled) { return @@ -539,8 +541,10 @@ loop: log.Printf("GRPC target %s is this server, removing", client.Target()) c.closeClient(client) client.SetSelf(true) + } else if version != c.version { + log.Printf("WARNING: Node %s is runing different version %s than local node (%s)", client.Target(), version, c.version) } else { - log.Printf("Checked GRPC server id of %s", client.Target()) + log.Printf("Checked GRPC server id of %s running version %s", client.Target(), version) } break loop } diff --git a/grpc_client_test.go b/grpc_client_test.go index 3606aec2..b536a341 100644 --- a/grpc_client_test.go +++ b/grpc_client_test.go @@ -53,7 +53,7 @@ func (c *GrpcClients) getWakeupChannelForTesting() <-chan struct{} { func NewGrpcClientsForTestWithConfig(t *testing.T, config *goconf.ConfigFile, etcdClient *EtcdClient) (*GrpcClients, *DnsMonitor) { dnsMonitor := newDnsMonitorForTest(t, time.Hour) // will be updated manually - client, err := NewGrpcClients(config, etcdClient, dnsMonitor) + client, err := NewGrpcClients(config, etcdClient, dnsMonitor, "0.0.0") require.NoError(t, err) t.Cleanup(func() { client.Close() @@ -336,7 +336,7 @@ func Test_GrpcClients_Encryption(t *testing.T) { require.NoError(clients.WaitForInitialized(ctx)) for _, client := range clients.GetClients() { - _, err := client.GetServerId(ctx) + _, _, err := client.GetServerId(ctx) require.NoError(err) } }) diff --git a/grpc_internal.pb.go b/grpc_internal.pb.go index 7e4dc524..eee099c3 100644 --- a/grpc_internal.pb.go +++ b/grpc_internal.pb.go @@ -77,6 +77,7 @@ func (*GetServerIdRequest) Descriptor() ([]byte, []int) { type GetServerIdReply struct { state protoimpl.MessageState `protogen:"open.v1"` ServerId string `protobuf:"bytes,1,opt,name=serverId,proto3" json:"serverId,omitempty"` + Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -118,26 +119,35 @@ func (x *GetServerIdReply) GetServerId() string { return "" } +func (x *GetServerIdReply) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + var File_grpc_internal_proto protoreflect.FileDescriptor var file_grpc_internal_proto_rawDesc = []byte{ 0x0a, 0x13, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x22, 0x14, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x2e, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x48, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x32, 0x5a, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x49, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, 0x4b, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, - 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, - 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x22, 0x00, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x73, 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, - 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x32, 0x5a, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x12, + 0x4b, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1d, + 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, + 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x3c, 0x5a, 0x3a, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x6b, + 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, + 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, + 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/grpc_internal.proto b/grpc_internal.proto index 6a6978ae..6093f786 100644 --- a/grpc_internal.proto +++ b/grpc_internal.proto @@ -34,4 +34,5 @@ message GetServerIdRequest { message GetServerIdReply { string serverId = 1; + string version = 2; } diff --git a/grpc_server.go b/grpc_server.go index 44674954..77c6aec3 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -70,6 +70,7 @@ type GrpcServer struct { UnimplementedRpcMcuServer UnimplementedRpcSessionsServer + version string creds credentials.TransportCredentials conn *grpc.Server listener net.Listener @@ -78,7 +79,7 @@ type GrpcServer struct { hub GrpcServerHub } -func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { +func NewGrpcServer(config *goconf.ConfigFile, version string) (*GrpcServer, error) { var listener net.Listener if addr, _ := GetStringOptionWithEnv(config, "grpc", "listen"); addr != "" { var err error @@ -95,6 +96,7 @@ func NewGrpcServer(config *goconf.ConfigFile) (*GrpcServer, error) { conn := grpc.NewServer(grpc.Creds(creds)) result := &GrpcServer{ + version: version, creds: creds, conn: conn, listener: listener, @@ -265,6 +267,7 @@ func (s *GrpcServer) GetServerId(ctx context.Context, request *GetServerIdReques statsGrpcServerCalls.WithLabelValues("GetServerId").Inc() return &GetServerIdReply{ ServerId: s.serverId, + Version: s.version, }, nil } diff --git a/grpc_server_test.go b/grpc_server_test.go index ffa6ceb9..e985fd26 100644 --- a/grpc_server_test.go +++ b/grpc_server_test.go @@ -66,7 +66,7 @@ func NewGrpcServerForTestWithConfig(t *testing.T, config *goconf.ConfigFile) (se addr = net.JoinHostPort("127.0.0.1", strconv.Itoa(port)) config.AddOption("grpc", "listen", addr) var err error - server, err = NewGrpcServer(config) + server, err = NewGrpcServer(config, "0.0.0") if isErrorAddressAlreadyInUse(err) { continue } @@ -218,7 +218,7 @@ func Test_GrpcServer_ReloadCA(t *testing.T) { ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() - _, err = client1.GetServerId(ctx1) + _, _, err = client1.GetServerId(ctx1) require.NoError(err) org2 := "Updated client" @@ -245,6 +245,6 @@ func Test_GrpcServer_ReloadCA(t *testing.T) { defer cancel2() // This will fail if the CA certificate has not been reloaded by the server. - _, err = client2.GetServerId(ctx2) + _, _, err = client2.GetServerId(ctx2) require.NoError(err) } diff --git a/server/main.go b/server/main.go index b8ace972..e372bd9c 100644 --- a/server/main.go +++ b/server/main.go @@ -202,7 +202,7 @@ func main() { } }() - rpcServer, err := signaling.NewGrpcServer(config) + rpcServer, err := signaling.NewGrpcServer(config, version) if err != nil { log.Fatalf("Could not create RPC server: %s", err) } @@ -213,7 +213,7 @@ func main() { }() defer rpcServer.Close() - rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor) + rpcClients, err := signaling.NewGrpcClients(config, etcdClient, dnsMonitor, version) if err != nil { log.Fatalf("Could not create RPC clients: %s", err) }