Skip to content

Commit

Permalink
added checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
anurag4DSB committed Dec 20, 2024
1 parent 9b5f19f commit 087d5e6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
11 changes: 8 additions & 3 deletions pkg/grpcfactory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ func (s *COSIProvisionerServer) Run(ctx context.Context, registry prometheus.Reg
listenConfig := net.ListenConfig{}
listener, err := listenConfig.Listen(ctx, "unix", addr.Path)
if err != nil {
klog.ErrorS(err, "Failed to start server")
return fmt.Errorf("failed to start server: %w", err)
klog.ErrorS(err, "Failed to start listener")
return fmt.Errorf("failed to start listener: %w", err)
}
defer listener.Close()
defer func() {
klog.Info("Closing listener...")
listener.Close()
}()

s.listenOpts = append(s.listenOpts,
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor()),
Expand All @@ -74,9 +77,11 @@ func (s *COSIProvisionerServer) Run(ctx context.Context, registry prometheus.Reg
}()
select {
case <-ctx.Done():
klog.Info("Context canceled, stopping gRPC server...")
server.GracefulStop()
return ctx.Err()
case err := <-errChan:
klog.ErrorS(err, "gRPC server exited with error")
return err
}
}
47 changes: 31 additions & 16 deletions pkg/grpcfactory/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grpcfactory_test

import (
"context"
"errors"
"fmt"
"net"
"os"
Expand All @@ -11,6 +10,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
"github.com/scality/cosi-driver/pkg/grpcfactory"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)
Expand All @@ -34,6 +34,7 @@ var _ = Describe("gRPC Factory Server", Ordered, func() {
identityServer cosi.IdentityServer
provisionerServer cosi.ProvisionerServer
server *grpcfactory.COSIProvisionerServer
registry *prometheus.Registry
)

BeforeEach(func() {
Expand All @@ -42,9 +43,13 @@ var _ = Describe("gRPC Factory Server", Ordered, func() {

identityServer = &mockIdentityServer{}
provisionerServer = &mockProvisionerServer{}

// Create a custom Prometheus registry for this test
registry = prometheus.NewRegistry()
})

AfterEach(func() {
// Clean up the Unix socket file
socketPath := strings.TrimPrefix(address, "unix://")
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
fmt.Printf("Warning: failed to remove socket file %s: %v\n", socketPath, err)
Expand All @@ -58,43 +63,53 @@ var _ = Describe("gRPC Factory Server", Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(server).NotTo(BeNil())

runErrChan := make(chan error)
go func() {
err := server.Run(ctx)
if errors.Is(err, context.Canceled) {
return // Expected when the context is canceled
}
Expect(err).NotTo(HaveOccurred())
defer GinkgoRecover()
runErrChan <- server.Run(ctx, registry) // Pass registry here for metrics registration
}()

// Allow time for the server to start
time.Sleep(100 * time.Millisecond)
}, SpecTimeout(1*time.Second))

ctxCancel, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel()
<-ctxCancel.Done()

Expect(<-runErrChan).To(SatisfyAny(BeNil(), Equal(context.Canceled)))
}, SpecTimeout(2*time.Second))

It("should return an error when reusing the same address", func(ctx SpecContext) {
// Manually create a listener to occupy the socket
socketPath := strings.TrimPrefix(address, "unix://")
listener, err := net.Listen("unix", socketPath)
Expect(err).NotTo(HaveOccurred())
defer listener.Close()

server2, err := grpcfactory.NewCOSIProvisionerServer(address, identityServer, provisionerServer, nil)
// Pass nil instead of registry for the ServerOptions parameter
server, err = grpcfactory.NewCOSIProvisionerServer(address, identityServer, provisionerServer, nil)
Expect(err).NotTo(HaveOccurred())
Expect(server2).NotTo(BeNil())
Expect(server).NotTo(BeNil())

// Run the second server and expect it to fail
err = server2.Run(ctx)
// Run the server with the registry
err = server.Run(ctx, registry)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("address already in use"))
}, SpecTimeout(1*time.Second))

It("should return an error for unsupported address schemes", func(ctx SpecContext) {
invalidAddress := "http://invalid-scheme-address" // Address with an unsupported scheme
var (
server *grpcfactory.COSIProvisionerServer
err error
)

invalidAddress := "http://invalid-scheme-address"

server, err := grpcfactory.NewCOSIProvisionerServer(invalidAddress, identityServer, provisionerServer, nil)
server, err = grpcfactory.NewCOSIProvisionerServer(invalidAddress, identityServer, provisionerServer, nil)
Expect(err).NotTo(HaveOccurred()) // Ensure server creation succeeds
Expect(server).NotTo(BeNil())

// Wait for server.Run to return an error
err = server.Run(ctx)
// Attempt to run the server with the registry
err = server.Run(ctx, prometheus.NewRegistry()) // Pass a custom registry here
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("unsupported scheme: expected 'unix'"))
}, SpecTimeout(1*time.Second))
Expand Down

0 comments on commit 087d5e6

Please sign in to comment.