From fd302f2524f43ad4c08a52e605dfd338ceabe029 Mon Sep 17 00:00:00 2001 From: Max Kureikin Date: Mon, 27 May 2024 21:32:06 +0400 Subject: [PATCH] Issue-43 Add Node Health Prom Metrics per Chain (#44) --- .../node_selector_service/models/qos_node.go | 4 +- .../cached_session_registry_service.go | 64 ++++++++++++++++++- 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/internal/node_selector_service/models/qos_node.go b/internal/node_selector_service/models/qos_node.go index 29f2786..9b01d68 100644 --- a/internal/node_selector_service/models/qos_node.go +++ b/internal/node_selector_service/models/qos_node.go @@ -69,7 +69,7 @@ func NewQosNode(morseNode *models.Node, pocketSession *models.Session, appSigner } func (n *QosNode) IsHealthy() bool { - return !n.isInTimeout() && n.IsSynced() + return !n.IsInTimeout() && n.IsSynced() } func (n *QosNode) IsSynced() bool { @@ -80,7 +80,7 @@ func (n *QosNode) SetSynced(synced bool) { n.synced = synced } -func (n *QosNode) isInTimeout() bool { +func (n *QosNode) IsInTimeout() bool { return !n.timeoutUntil.IsZero() && time.Now().Before(n.timeoutUntil) } diff --git a/internal/session_registry/cached_session_registry_service.go b/internal/session_registry/cached_session_registry_service.go index 81519f4..82cef02 100644 --- a/internal/session_registry/cached_session_registry_service.go +++ b/internal/session_registry/cached_session_registry_service.go @@ -20,6 +20,9 @@ import ( var ( counterSessionRequest *prometheus.CounterVec histogramSessionRequestLatency *prometheus.HistogramVec + healthyNodesPerChainGauge *prometheus.GaugeVec + syncedNodesPerChainGauge *prometheus.GaugeVec + timeoutNodesPerChainGauge *prometheus.GaugeVec ErrRecentlyFailed = errors.New("dispatch recently failed, returning early") ) @@ -27,6 +30,7 @@ const ( blocksPerSession = 4 sessionPrimerInterval = time.Second * 5 ttlCacheCleanerInterval = time.Second * 15 + nodeMetricsExporterInterval = time.Second * 20 reasonSessionSuccessCached = "session_cached" reasonSessionSuccessColdHit = "session_cold_hit" reasonSessionFailedBackoff = "session_failed_backoff" @@ -53,7 +57,28 @@ func init() { []string{"cached"}, ) - prometheus.MustRegister(counterSessionRequest, histogramSessionRequestLatency) + healthyNodesPerChainGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cached_client_session_healthy_nodes", + Help: "Number of healthy (synced + not in timeout) nodes per chain", + }, + []string{"chain_id"}, + ) + syncedNodesPerChainGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cached_client_session_synced_nodes", + Help: "Number of synced nodes per chain", + }, + []string{"chain_id"}, + ) + timeoutNodesPerChainGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cached_client_session_timeout_nodes", + Help: "Number of nodes in timeout per chain", + }, + []string{"chain_id"}, + ) + prometheus.MustRegister(counterSessionRequest, histogramSessionRequestLatency, healthyNodesPerChainGauge, syncedNodesPerChainGauge, timeoutNodesPerChainGauge) } type CachedSessionRegistryService struct { @@ -77,6 +102,7 @@ func NewCachedSessionRegistryService(poktClient pokt_v0.PocketService, appRegist go nodeCache.Start() cachedRegistry.startTTLCacheCleaner() cachedRegistry.startSessionUpdater() + cachedRegistry.startNodeMetricsExporter() return cachedRegistry } @@ -282,3 +308,39 @@ func (c *CachedSessionRegistryService) shouldBackoffDispatchFailure() bool { func getSessionCacheKey(req *models.GetSessionRequest) string { return fmt.Sprintf("%s-%s-%d", req.AppPubKey, req.Chain, req.SessionHeight) } + +func (c *CachedSessionRegistryService) exportNodeMetrics() { + nodesMap := c.GetNodesMap() + for sessionKey, sessionItem := range nodesMap { + chainId := sessionKey.Chain + var healthyNodesCount, syncedNodesCount, timeoutNodesCount int + + for _, node := range sessionItem.Value() { + if node.IsHealthy() { + healthyNodesCount++ + } + if node.IsSynced() { + syncedNodesCount++ + } + if node.IsInTimeout() { + timeoutNodesCount++ + } + } + + healthyNodesPerChainGauge.WithLabelValues(chainId).Set(float64(healthyNodesCount)) + syncedNodesPerChainGauge.WithLabelValues(chainId).Set(float64(syncedNodesCount)) + timeoutNodesPerChainGauge.WithLabelValues(chainId).Set(float64(timeoutNodesCount)) + } +} + +func (c *CachedSessionRegistryService) startNodeMetricsExporter() { + ticker := time.Tick(nodeMetricsExporterInterval) + go func() { + for { + select { + case <-ticker: + c.exportNodeMetrics() + } + } + }() +}