From a5467aba1998f949b4d3c6e98afec513c5bdca44 Mon Sep 17 00:00:00 2001 From: tzvonimir Date: Fri, 11 Oct 2024 11:53:26 +0200 Subject: [PATCH] Update lda process metrics --- client/client.go | 77 ++++++++++++++++++++++++++++-------- cmd/collect.go | 1 + collector/collector.go | 1 + config/config.example.toml | 4 ++ config/config.go | 2 + proto/api/v1/collector.proto | 1 + user/user.go | 30 ++++++++++++-- 7 files changed, 96 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 2e64407..5181e4c 100644 --- a/client/client.go +++ b/client/client.go @@ -9,8 +9,10 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) // Config holds configuration for the client connection. @@ -26,19 +28,37 @@ type Client struct { conn *grpc.ClientConn logger *zerolog.Logger timeout time.Duration + config Config } -// NewClient creates a new client and returns a pointer to it and an error +// NewClient creates a new client with connection management and returns a pointer to it and an error func NewClient(config Config) (*Client, error) { + client := &Client{ + logger: &logging.Log, + timeout: time.Duration(config.Timeout) * time.Second, + config: config, + } + + // Establish the initial connection + err := client.connect() + if err != nil { + return nil, err + } + + return client, nil +} + +// connect handles connection establishment and configuration +func (c *Client) connect() error { var opts []grpc.DialOption // Setup connection security based on config creds := grpc.WithTransportCredentials(insecure.NewCredentials()) - if config.SecureConnection { - if config.CertFile != "" { - tlsFromFile, err := credentials.NewClientTLSFromFile(config.CertFile, "") + if c.config.SecureConnection { + if c.config.CertFile != "" { + tlsFromFile, err := credentials.NewClientTLSFromFile(c.config.CertFile, "") if err != nil { - return nil, fmt.Errorf("failed to create TLS credentials: %w", err) + return fmt.Errorf("failed to create TLS credentials: %w", err) } creds = grpc.WithTransportCredentials(tlsFromFile) } else { @@ -47,28 +67,49 @@ func NewClient(config Config) (*Client, error) { } opts = append(opts, creds) - conn, err := grpc.Dial(config.Address, opts...) + // Adding keepalive parameters to manage connection health + keepAliveParams := grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, // Ping the server every 10 seconds to keep the connection alive + Timeout: 5 * time.Second, // Wait 5 seconds for a pong before closing the connection + PermitWithoutStream: true, // Send pings even without active RPCs + }) + opts = append(opts, keepAliveParams) + + // Dial the server + conn, err := grpc.Dial(c.config.Address, opts...) if err != nil { - return nil, fmt.Errorf("failed to connect to server: %w", err) + return fmt.Errorf("failed to connect to server: %w", err) } - // Set a default timeout of 60 seconds if not provided - if config.Timeout == 0 { - config.Timeout = 60 - } + // Set the connection on the client + c.conn = conn + return nil +} - client := &Client{ - conn: conn, - logger: &logging.Log, - timeout: time.Duration(config.Timeout) * time.Second, +// Reconnect attempts to reconnect if the connection is down +func (c *Client) Reconnect() error { + if c.conn != nil { + c.Close() } + return c.connect() +} - return client, nil +// CheckAndReconnect checks connection health and reconnects if necessary +func (c *Client) CheckAndReconnect() error { + if c.conn.GetState() == connectivity.TransientFailure || c.conn.GetState() == connectivity.Shutdown { + c.logger.Warn().Msg("Connection lost. Attempting to reconnect...") + return c.Reconnect() + } + return nil } // SendCommands sends a list of commands to the server func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error { + if err := c.CheckAndReconnect(); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) + } + client := gen.NewCollectorServiceClient(c.conn) req := &gen.SendCommandsRequest{ @@ -90,6 +131,10 @@ func (c *Client) SendCommands(commands []*gen.Command, auth *gen.Auth) error { // SendProcesses sends a list of processes to the server func (c *Client) SendProcesses(processes []*gen.Process, auth *gen.Auth) error { + if err := c.CheckAndReconnect(); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) + } + client := gen.NewCollectorServiceClient(c.conn) req := &gen.SendProcessesRequest{ diff --git a/cmd/collect.go b/cmd/collect.go index 6f63d04..8cccab0 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -86,6 +86,7 @@ func collect(cmd *cobra.Command, _ []string) error { UserID: config.AppConfig.UserID, TeamID: config.AppConfig.TeamID, WorkspaceID: config.AppConfig.WorkspaceID, + UserEmail: config.AppConfig.UserEmail, } if autoCredentials { diff --git a/collector/collector.go b/collector/collector.go index 2957a2f..16b97d5 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -46,6 +46,7 @@ type AuthConfig struct { TeamID string UserID string WorkspaceID string + UserEmail string } // collectionConfig contains the configuration for the collection process diff --git a/config/config.example.toml b/config/config.example.toml index 2c0e199..4ca7fc4 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -81,6 +81,10 @@ secure_connection = true # Default: (empty) # user_id = "" +# Specifies the user identifier that will be used to make the collection of data for that user +# Default: (empty) +# user_email = "" + # Specifies the user identifier that will be used to make the collection of data for that workspace # Default: (empty) # workspace_id = "" diff --git a/config/config.go b/config/config.go index 4a16c41..9381bc4 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,8 @@ type Config struct { TeamID string `mapstructure:"team_id"` // UserID is the user identifier for the workspace UserID string `mapstructure:"user_id"` + // UserEmail is the user identifier for the workspace + UserEmail string `mapstructure:"user_email"` // WorkspaceID is the workspace identifier WorkspaceID string `mapstructure:"workspace_id"` } diff --git a/proto/api/v1/collector.proto b/proto/api/v1/collector.proto index 6412199..42c6976 100644 --- a/proto/api/v1/collector.proto +++ b/proto/api/v1/collector.proto @@ -13,6 +13,7 @@ message Auth { string user_id = 1; // Unique identifer for user that is processing the data string team_id = 2; // Unique identifier for users team optional string workspace_id = 3; // Unique identifier of the Workspace that is running the request + string user_email = 4; // Unique identifier of user that is processing the data } // Define a message representing a command, including its metadata and timing information. diff --git a/user/user.go b/user/user.go index 746079d..ec8741a 100644 --- a/user/user.go +++ b/user/user.go @@ -275,9 +275,11 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) { devzeroTeamFile = "DEVZERO_TEAM_ID" devzeroUserFile = "DEVZERO_USER_ID" devzeroWorkspaceFile = "DEVZERO_WORKSPACE_ID" + devzeroEmailFile = "DEVZERO_WORKSPACE_OWNER_EMAIL" ) userId := "" + userEmail := "" teamId := "" workspaceId := "" @@ -297,6 +299,14 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) { } } + emailPath := filepath.Join(devzeroConfigPath, devzeroEmailFile) + if util.FileExists(emailPath) { + data, err := os.ReadFile(emailPath) + if err == nil && len(data) > 0 { + userEmail = string(data) + } + } + workspacePath := filepath.Join(devzeroConfigPath, devzeroWorkspaceFile) if util.FileExists(workspacePath) { data, err := os.ReadFile(workspacePath) @@ -309,18 +319,21 @@ func ReadDZWorkspaceConfig() (collector.AuthConfig, error) { UserID: userId, TeamID: teamId, WorkspaceID: workspaceId, + UserEmail: userEmail, }, nil } // ReadDZCliConfig reads the DevZero workspace configuration func ReadDZCliConfig(path string) (collector.AuthConfig, error) { const ( - localUserFile = "user_id.txt" - localTeamFile = "team_id.txt" + localUserFile = "user_id.txt" + localTeamFile = "team_id.txt" + localEmailFile = "user_email.txt" ) userId := "" teamId := "" + userEmail := "" localUserPath := filepath.Join(path, localUserFile) if util.FileExists(localUserPath) { @@ -338,8 +351,17 @@ func ReadDZCliConfig(path string) (collector.AuthConfig, error) { } } + localEmailPath := filepath.Join(path, localEmailFile) + if util.FileExists(localEmailPath) { + data, err := os.ReadFile(localEmailPath) + if err == nil && len(data) > 0 { + userEmail = string(data) + } + } + return collector.AuthConfig{ - UserID: userId, - TeamID: teamId, + UserID: userId, + TeamID: teamId, + UserEmail: userEmail, }, nil }