From 0767d66d12b98e0977879794204b1a561b6579e6 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Mon, 27 May 2024 10:53:53 -0400 Subject: [PATCH 01/12] Add gateway metric for roundtrip ai times by model and pipeline --- monitor/census.go | 30 ++++++++++++++++++++++++++++++ server/ai_mediaserver.go | 6 ++++++ 2 files changed, 36 insertions(+) diff --git a/monitor/census.go b/monitor/census.go index f89d1a41ae..27e100dc91 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -113,6 +113,9 @@ type ( kOrchestratorURI tag.Key kOrchestratorAddress tag.Key kFVErrorType tag.Key + kPipeline tag.Key + kModelName tag.Key + mInferenceTime *stats.Float64Measure mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -254,6 +257,8 @@ func InitCensus(nodeType NodeType, version string) { census.kOrchestratorAddress = tag.MustNewKey("orchestrator_address") census.kFVErrorType = tag.MustNewKey("fverror_type") census.kSegClassName = tag.MustNewKey("seg_class_name") + census.kModelName = tag.MustNewKey("model_name") + census.kPipeline = tag.MustNewKey("pipeline") census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID)) if err != nil { glog.Exit("Error creating context", err) @@ -289,6 +294,7 @@ func InitCensus(nodeType NodeType, version string) { census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per") census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per") census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") + census.mInferenceTime = stats.Float64("inference_time_seconds", "Inference time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec") @@ -548,6 +554,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kProfiles, census.kTrusted, census.kVerified}, baseTags...), Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, + { + Name: "inference_time_seconds", + Measure: census.mInferenceTime, + Description: "InferenceTime, seconds", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), + }, { Name: "transcode_overall_latency_seconds", Measure: census.mTranscodeOverallLatency, @@ -1355,6 +1368,23 @@ func SegmentTranscoded(ctx context.Context, nonce, seqNo uint64, sourceDur time. census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles, trusted, verified) } +func AiJobProcessed(ctx context.Context, Pipeline string, Model string, responseDuration time.Duration) { + + census.aiJobProcessed(Pipeline, Model, responseDuration) +} + +func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, responseDuration time.Duration) { + cen.lock.Lock() + defer cen.lock.Unlock() + + ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)) + if err != nil { + glog.Error("Error creating context", err) + return + } + + stats.Record(ctx, census.mInferenceTime.M(responseDuration.Seconds())) +} func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string, trusted, verified bool) { diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 62e5b7ad39..806b10919b 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -13,6 +13,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-tools/drivers" middleware "github.com/oapi-codegen/nethttp-middleware" "github.com/oapi-codegen/runtime" @@ -108,6 +109,11 @@ func (ls *LivepeerServer) TextToImage() http.Handler { took := time.Since(start) clog.Infof(ctx, "Processed TextToImage request prompt=%v model_id=%v took=%v", req.Prompt, *req.ModelId, took) + //Log round trip time for text-to-image job + if monitor.Enabled { + monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, took) + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) From e03879161d1392253895ee22aae9e74e7cc62f41 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 28 May 2024 08:29:02 -0400 Subject: [PATCH 02/12] Rename metrics and add unique manifest --- monitor/census.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 27e100dc91..3078cd6100 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -115,7 +115,7 @@ type ( kFVErrorType tag.Key kPipeline tag.Key kModelName tag.Key - mInferenceTime *stats.Float64Measure + mAIRoundtripTime *stats.Float64Measure mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -294,7 +294,7 @@ func InitCensus(nodeType NodeType, version string) { census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per") census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per") census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") - census.mInferenceTime = stats.Float64("inference_time_seconds", "Inference time", "sec") + census.mAIRoundtripTime = stats.Float64("inference_time_seconds", "Inference time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec") @@ -555,10 +555,10 @@ func InitCensus(nodeType NodeType, version string) { Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, { - Name: "inference_time_seconds", - Measure: census.mInferenceTime, - Description: "InferenceTime, seconds", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Name: "ai_roundtrip_time_seconds", + Measure: census.mAIRoundtripTime, + Description: "AIRoundtripTime, seconds", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, { @@ -1383,7 +1383,7 @@ func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, r return } - stats.Record(ctx, census.mInferenceTime.M(responseDuration.Seconds())) + stats.Record(ctx, census.mAIRoundtripTime.M(responseDuration.Seconds())) } func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, From f959f07aa64f7e3c5457783f301e305938f8da46 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Tue, 28 May 2024 11:18:50 -0400 Subject: [PATCH 03/12] Fix name mismatch --- monitor/census.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 3078cd6100..df795a7c70 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -294,7 +294,7 @@ func InitCensus(nodeType NodeType, version string) { census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per") census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per") census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") - census.mAIRoundtripTime = stats.Float64("inference_time_seconds", "Inference time", "sec") + census.mAIRoundtripTime = stats.Float64("ai_roundtrip_time_seconds", "AI Roundtrip time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec") @@ -557,7 +557,7 @@ func InitCensus(nodeType NodeType, version string) { { Name: "ai_roundtrip_time_seconds", Measure: census.mAIRoundtripTime, - Description: "AIRoundtripTime, seconds", + Description: "AI Roundtrip time, seconds", TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, From 9221e733fb3f64302b89ef9a53869c79b273edf9 Mon Sep 17 00:00:00 2001 From: Elite Encoder Date: Fri, 31 May 2024 15:29:36 -0400 Subject: [PATCH 04/12] modelsRequested not working correctly --- monitor/census.go | 23 +++++++++++++++++++++++ server/ai_mediaserver.go | 11 +++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index df795a7c70..201551d75b 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -116,6 +116,7 @@ type ( kPipeline tag.Key kModelName tag.Key mAIRoundtripTime *stats.Float64Measure + mModelsRequested *stats.Int64Measure mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -294,6 +295,7 @@ func InitCensus(nodeType NodeType, version string) { census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per") census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per") census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") + census.mModelsRequested = stats.Int64("ai_models_requested", "Number of models requested over time", "tot") census.mAIRoundtripTime = stats.Float64("ai_roundtrip_time_seconds", "AI Roundtrip time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") @@ -561,6 +563,13 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, + { + Name: "ai_models_requested", + Measure: census.mModelsRequested, + Description: "Count of Models Requested over time", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...), + Aggregation: view.LastValue(), + }, { Name: "transcode_overall_latency_seconds", Measure: census.mTranscodeOverallLatency, @@ -968,6 +977,7 @@ func LogDiscoveryError(ctx context.Context, uri, code string) { []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kOrchestratorURI, uri)}, census.mDiscoveryError.M(1)); err != nil { + //0530 18:08:28.399899 1767400 census.go:965] clientIP=192.168.10.155 request_id=d5303ff3 Error recording metrics err="invalid value: only ASCII characters accepted; max length must be 255 characters" clog.Errorf(ctx, "Error recording metrics err=%q", err) } } @@ -1373,6 +1383,10 @@ func AiJobProcessed(ctx context.Context, Pipeline string, Model string, response census.aiJobProcessed(Pipeline, Model, responseDuration) } +func RecordModelRequested(Pipeline string, Model string) { + census.recordModelRequested(Pipeline, Model) +} + func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, responseDuration time.Duration) { cen.lock.Lock() defer cen.lock.Unlock() @@ -1386,6 +1400,15 @@ func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, r stats.Record(ctx, census.mAIRoundtripTime.M(responseDuration.Seconds())) } +func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { + ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName)) + if err != nil { + glog.Errorf("Failed to create context with tags: %v", err) + return + } + stats.Record(ctx, census.mModelsRequested.M(1)) +} + func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string, trusted, verified bool) { diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index 806b10919b..d28b9d1a35 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -87,6 +87,7 @@ func (ls *LivepeerServer) TextToImage() http.Handler { } clog.V(common.VERBOSE).Infof(r.Context(), "Received TextToImage request prompt=%v model_id=%v", req.Prompt, *req.ModelId) + monitor.RecordModelRequested("text-to-image", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -109,7 +110,6 @@ func (ls *LivepeerServer) TextToImage() http.Handler { took := time.Since(start) clog.Infof(ctx, "Processed TextToImage request prompt=%v model_id=%v took=%v", req.Prompt, *req.ModelId, took) - //Log round trip time for text-to-image job if monitor.Enabled { monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, took) } @@ -140,6 +140,7 @@ func (ls *LivepeerServer) ImageToImage() http.Handler { } clog.V(common.VERBOSE).Infof(ctx, "Received ImageToImage request imageSize=%v prompt=%v model_id=%v", req.Image.FileSize(), req.Prompt, *req.ModelId) + monitor.RecordModelRequested("image-to-image", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -161,6 +162,9 @@ func (ls *LivepeerServer) ImageToImage() http.Handler { took := time.Since(start) clog.V(common.VERBOSE).Infof(ctx, "Processed ImageToImage request imageSize=%v prompt=%v model_id=%v took=%v", req.Image.FileSize(), req.Prompt, *req.ModelId, took) + if monitor.Enabled { + monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, took) + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -194,6 +198,7 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler { } clog.V(common.VERBOSE).Infof(ctx, "Received ImageToVideo request imageSize=%v model_id=%v async=%v", req.Image.FileSize(), *req.ModelId, async) + monitor.RecordModelRequested("image-to-video", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -218,7 +223,9 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler { took := time.Since(start) clog.Infof(ctx, "Processed ImageToVideo request imageSize=%v model_id=%v took=%v", req.Image.FileSize(), *req.ModelId, took) - + if monitor.Enabled { + monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, took) + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) From a456c045b405bb885720a78d6ed2df94dd83ccdd Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 25 Jun 2024 19:41:14 +0100 Subject: [PATCH 05/12] feat: add initial POC AI gateway metrics This commit adds the initial AI gateway metrics so that they can reviewed by others. The code still need to be cleaned up and the buckets adjusted. --- monitor/census.go | 148 ++++++++++++++++++++++++++------------- server/ai_mediaserver.go | 15 +--- server/ai_process.go | 93 ++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 62 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 201551d75b..759216fdff 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -115,8 +115,6 @@ type ( kFVErrorType tag.Key kPipeline tag.Key kModelName tag.Key - mAIRoundtripTime *stats.Float64Measure - mModelsRequested *stats.Int64Measure mSegmentSourceAppeared *stats.Int64Measure mSegmentEmerged *stats.Int64Measure mSegmentEmergedUnprocessed *stats.Int64Measure @@ -194,6 +192,12 @@ type ( mSegmentClassProb *stats.Float64Measure mSceneClassification *stats.Int64Measure + // Metrics for AI jobs + mAIModelsRequested *stats.Int64Measure + mAILatencyScore *stats.Float64Measure + mAIPricePerUnit *stats.Float64Measure + mAIRequestError *stats.Int64Measure + lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo success map[uint64]*segmentsAverager @@ -221,6 +225,11 @@ type ( removedAt time.Time tries map[uint64]tryData // seqNo:try } + + AIJobInfo struct { + LatencyScore float64 + PricePerUnit float64 + } ) // Exporter Prometheus exporter that handles `/metrics` endpoint @@ -295,8 +304,6 @@ func InitCensus(nodeType NodeType, version string) { census.mSuccessRate = stats.Float64("success_rate", "Success rate", "per") census.mSuccessRatePerStream = stats.Float64("success_rate_per_stream", "Success rate, per stream", "per") census.mTranscodeTime = stats.Float64("transcode_time_seconds", "Transcoding time", "sec") - census.mModelsRequested = stats.Int64("ai_models_requested", "Number of models requested over time", "tot") - census.mAIRoundtripTime = stats.Float64("ai_roundtrip_time_seconds", "AI Roundtrip time", "sec") census.mTranscodeOverallLatency = stats.Float64("transcode_overall_latency_seconds", "Transcoding latency, from source segment emerged from segmenter till all transcoded segment apeeared in manifest", "sec") census.mUploadTime = stats.Float64("upload_time_seconds", "Upload (to Orchestrator) time", "sec") @@ -347,6 +354,12 @@ func InitCensus(nodeType NodeType, version string) { census.mSegmentClassProb = stats.Float64("segment_class_prob", "SegmentClassProb", "tot") census.mSceneClassification = stats.Int64("scene_classification_done", "SceneClassificationDone", "tot") + // Metrics for AI jobs + census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot") + census.mAILatencyScore = stats.Float64("ai_latency_score", "Orchestrator AI request latency score, based on smallest pipeline unit", "") + census.mAIPricePerUnit = stats.Float64("ai_price_per_unit", "Price paid per AI pipeline unit", "") + census.mAIRequestError = stats.Int64("ai_request_errors", "AIRequestErrors", "tot") + glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) glog.Infof("Node type %s node ID %s", nodeType, NodeID) @@ -556,20 +569,6 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: append([]tag.Key{census.kProfiles, census.kTrusted, census.kVerified}, baseTags...), Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), }, - { - Name: "ai_roundtrip_time_seconds", - Measure: census.mAIRoundtripTime, - Description: "AI Roundtrip time, seconds", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), - Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), - }, - { - Name: "ai_models_requested", - Measure: census.mModelsRequested, - Description: "Count of Models Requested over time", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...), - Aggregation: view.LastValue(), - }, { Name: "transcode_overall_latency_seconds", Measure: census.mTranscodeOverallLatency, @@ -877,6 +876,36 @@ func InitCensus(nodeType NodeType, version string) { TagKeys: baseTags, Aggregation: view.Count(), }, + + // Metrics for AI jobs + { + Name: "ai_models_requested", + Measure: census.mAIModelsRequested, + Description: "Count of AI model requests over time", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...), + Aggregation: view.LastValue(), + }, + { + Name: "ai_latency_score", + Measure: census.mAILatencyScore, + Description: "Orchestrator AI request latency score", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), + Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), + }, + { + Name: "ai_price_per_unit", + Measure: census.mAIPricePerUnit, + Description: "Price paid per AI pipeline unit", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), + Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), + }, + { + Name: "ai_request_errors", + Measure: census.mAIRequestError, + Description: "Errors processing AI requests", + TagKeys: baseTags, + Aggregation: view.Sum(), + }, } // Register the views @@ -1378,36 +1407,6 @@ func SegmentTranscoded(ctx context.Context, nonce, seqNo uint64, sourceDur time. census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles, trusted, verified) } -func AiJobProcessed(ctx context.Context, Pipeline string, Model string, responseDuration time.Duration) { - - census.aiJobProcessed(Pipeline, Model, responseDuration) -} - -func RecordModelRequested(Pipeline string, Model string) { - census.recordModelRequested(Pipeline, Model) -} - -func (cen *censusMetricsCounter) aiJobProcessed(Pipeline string, Model string, responseDuration time.Duration) { - cen.lock.Lock() - defer cen.lock.Unlock() - - ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)) - if err != nil { - glog.Error("Error creating context", err) - return - } - - stats.Record(ctx, census.mAIRoundtripTime.M(responseDuration.Seconds())) -} - -func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { - ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName)) - if err != nil { - glog.Errorf("Failed to create context with tags: %v", err) - return - } - stats.Record(ctx, census.mModelsRequested.M(1)) -} func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string, trusted, verified bool) { @@ -1762,6 +1761,59 @@ func RewardCallError(sender string) { } } +// AIJobProccessed records metrics from AI jobs +func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { + census.modelRequested(pipeline, model, orchInfo) + census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) + census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) +} + +func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string, orchInfo *lpnet.OrchestratorInfo) { + ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) + if err != nil { + glog.Errorf("Failed to create context with tags: %v", err) + return + } + + stats.Record(ctx, census.mAIModelsRequested.M(1)) +} + +func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { + cen.lock.Lock() + defer cen.lock.Unlock() + + ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) + if err != nil { + glog.Error("Error creating context", err) + return + } + + stats.Record(ctx, census.mAILatencyScore.M(latencyScore)) +} + +func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { + cen.lock.Lock() + defer cen.lock.Unlock() + + ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) + if err != nil { + glog.Error("Error creating context", err) + return + } + + stats.Record(ctx, census.mAIPricePerUnit.M(pricePerUnit)) +} + +// RewardCallError records an error during the AI job request +func AIRequestError(sender string) { + if err := stats.RecordWithTags(census.ctx, + []tag.Mutator{tag.Insert(census.kSender, sender)}, + census.mRewardCallError.M(1)); err != nil { + + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go index d28b9d1a35..62e5b7ad39 100644 --- a/server/ai_mediaserver.go +++ b/server/ai_mediaserver.go @@ -13,7 +13,6 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" - "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-tools/drivers" middleware "github.com/oapi-codegen/nethttp-middleware" "github.com/oapi-codegen/runtime" @@ -87,7 +86,6 @@ func (ls *LivepeerServer) TextToImage() http.Handler { } clog.V(common.VERBOSE).Infof(r.Context(), "Received TextToImage request prompt=%v model_id=%v", req.Prompt, *req.ModelId) - monitor.RecordModelRequested("text-to-image", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -110,10 +108,6 @@ func (ls *LivepeerServer) TextToImage() http.Handler { took := time.Since(start) clog.Infof(ctx, "Processed TextToImage request prompt=%v model_id=%v took=%v", req.Prompt, *req.ModelId, took) - if monitor.Enabled { - monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, took) - } - w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) @@ -140,7 +134,6 @@ func (ls *LivepeerServer) ImageToImage() http.Handler { } clog.V(common.VERBOSE).Infof(ctx, "Received ImageToImage request imageSize=%v prompt=%v model_id=%v", req.Image.FileSize(), req.Prompt, *req.ModelId) - monitor.RecordModelRequested("image-to-image", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -162,9 +155,6 @@ func (ls *LivepeerServer) ImageToImage() http.Handler { took := time.Since(start) clog.V(common.VERBOSE).Infof(ctx, "Processed ImageToImage request imageSize=%v prompt=%v model_id=%v took=%v", req.Image.FileSize(), req.Prompt, *req.ModelId, took) - if monitor.Enabled { - monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, took) - } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -198,7 +188,6 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler { } clog.V(common.VERBOSE).Infof(ctx, "Received ImageToVideo request imageSize=%v model_id=%v async=%v", req.Image.FileSize(), *req.ModelId, async) - monitor.RecordModelRequested("image-to-video", *req.ModelId) params := aiRequestParams{ node: ls.LivepeerNode, @@ -223,9 +212,7 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler { took := time.Since(start) clog.Infof(ctx, "Processed ImageToVideo request imageSize=%v model_id=%v took=%v", req.Image.FileSize(), *req.ModelId, took) - if monitor.Enabled { - monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, took) - } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) diff --git a/server/ai_process.go b/server/ai_process.go index ed58e3106c..7865b8255d 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -19,6 +19,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" "github.com/livepeer/go-tools/drivers" "github.com/livepeer/lpms/stream" ) @@ -75,6 +76,9 @@ func processTextToImage(ctx context.Context, params aiRequestParams, req worker. func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISession, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -90,6 +94,9 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess outPixels := int64(*req.Height) * int64(*req.Width) setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -98,6 +105,9 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess resp, err := client.TextToImageWithResponse(ctx, req, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -124,6 +134,14 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess } sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + if monitor.Enabled { + pricePerUnit := 0.0 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { + pricePerUnit = float64(priceInfo.PricePerUnit) + } + monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } @@ -160,26 +178,41 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes var buf bytes.Buffer mw, err := worker.NewImageToImageMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } imageRdr, err := req.Image.Reader() if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } outPixels := int64(config.Height) * int64(config.Width) setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -188,6 +221,9 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes resp, err := client.ImageToImageWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -210,6 +246,14 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes } sess.LatencyScore = took.Seconds() / float64(outPixels) / numImages + if monitor.Enabled { + pricePerUnit := 0.0 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { + pricePerUnit = float64(priceInfo.PricePerUnit) + } + monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } @@ -250,11 +294,17 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes var buf bytes.Buffer mw, err := worker.NewImageToVideoMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -271,6 +321,9 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes outPixels := int64(*req.Height) * int64(*req.Width) * frames setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } defer completeBalanceUpdate(sess.BroadcastSession, balUpdate) @@ -279,12 +332,18 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes resp, err := client.ImageToVideoWithBody(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -299,12 +358,23 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes var res worker.ImageResponse if err := json.Unmarshal(data, &res); err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } // TODO: Refine this rough estimate in future iterations sess.LatencyScore = took.Seconds() / float64(outPixels) + if monitor.Enabled { + pricePerUnit := 0.0 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { + pricePerUnit = float64(priceInfo.PricePerUnit) + } + monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + } + return &res, nil } @@ -341,20 +411,32 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, var buf bytes.Buffer mw, err := worker.NewUpscaleMultipartWriter(&buf, req) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } imageRdr, err := req.Image.Reader() if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } outPixels := int64(config.Height) * int64(config.Width) @@ -369,6 +451,9 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, resp, err := client.UpscaleWithBodyWithResponse(ctx, mw.FormDataContentType(), &buf, setHeaders) took := time.Since(start) if err != nil { + if monitor.Enabled { + monitor.AIRequestError(err.Error()) + } return nil, err } @@ -385,6 +470,14 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, // TODO: Refine this rough estimate in future iterations sess.LatencyScore = took.Seconds() / float64(outPixels) + if monitor.Enabled { + pricePerUnit := 0.0 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { + pricePerUnit = float64(priceInfo.PricePerUnit) + } + monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + } + return resp.JSON200, nil } From 47c088fcdb2882c063cdcbc58aa36220c803cb33 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Mon, 8 Jul 2024 20:20:30 +0200 Subject: [PATCH 06/12] feat: improve AI metrics This commit improves the AI metrics so that they are easier to work with. --- monitor/census.go | 97 +++++++++++++++++++++----------------------- server/ai_process.go | 46 ++++++++++----------- 2 files changed, 70 insertions(+), 73 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 759216fdff..79cca90278 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -193,10 +193,10 @@ type ( mSceneClassification *stats.Int64Measure // Metrics for AI jobs - mAIModelsRequested *stats.Int64Measure - mAILatencyScore *stats.Float64Measure - mAIPricePerUnit *stats.Float64Measure - mAIRequestError *stats.Int64Measure + mAIModelsRequested *stats.Int64Measure + mAIRequestLatencyScore *stats.Float64Measure + mAIRequestPrice *stats.Float64Measure + mAIRequestError *stats.Int64Measure lock sync.Mutex emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo @@ -356,9 +356,9 @@ func InitCensus(nodeType NodeType, version string) { // Metrics for AI jobs census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot") - census.mAILatencyScore = stats.Float64("ai_latency_score", "Orchestrator AI request latency score, based on smallest pipeline unit", "") - census.mAIPricePerUnit = stats.Float64("ai_price_per_unit", "Price paid per AI pipeline unit", "") - census.mAIRequestError = stats.Int64("ai_request_errors", "AIRequestErrors", "tot") + census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI orchestrator request latency score, based on smallest pipeline unit", "") + census.mAIRequestPrice = stats.Float64("ai_request_price", "Price paid per AI request unit", "") + census.mAIRequestError = stats.Int64("ai_request_errors", "Errors when processing AI requests", "tot") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -881,29 +881,29 @@ func InitCensus(nodeType NodeType, version string) { { Name: "ai_models_requested", Measure: census.mAIModelsRequested, - Description: "Count of AI model requests over time", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestID...), - Aggregation: view.LastValue(), + Description: "Number of AI models requested over time", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Aggregation: view.Count(), }, { - Name: "ai_latency_score", - Measure: census.mAILatencyScore, - Description: "Orchestrator AI request latency score", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), - Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), + Name: "ai_request_latency_score", + Measure: census.mAIRequestLatencyScore, + Description: "AI orchestrator request latency score", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + Aggregation: view.LastValue(), }, { - Name: "ai_price_per_unit", - Measure: census.mAIPricePerUnit, - Description: "Price paid per AI pipeline unit", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithManifestIDAndIP...), - Aggregation: view.Distribution(0, .250, .500, .750, 1.000, 1.250, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000), + Name: "ai_request_price", + Measure: census.mAIRequestPrice, + Description: "AI request price per unit", + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + Aggregation: view.LastValue(), }, { Name: "ai_request_errors", Measure: census.mAIRequestError, - Description: "Errors processing AI requests", - TagKeys: baseTags, + Description: "Errors when processing AI requests", + TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), Aggregation: view.Sum(), }, } @@ -1006,7 +1006,6 @@ func LogDiscoveryError(ctx context.Context, uri, code string) { []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kOrchestratorURI, uri)}, census.mDiscoveryError.M(1)); err != nil { - //0530 18:08:28.399899 1767400 census.go:965] clientIP=192.168.10.155 request_id=d5303ff3 Error recording metrics err="invalid value: only ASCII characters accepted; max length must be 255 characters" clog.Errorf(ctx, "Error recording metrics err=%q", err) } } @@ -1763,53 +1762,51 @@ func RewardCallError(sender string) { // AIJobProccessed records metrics from AI jobs func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { - census.modelRequested(pipeline, model, orchInfo) + census.modelRequested(pipeline, model) census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) + census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit) } -func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string, orchInfo *lpnet.OrchestratorInfo) { - ctx, err := tag.New(cen.ctx, tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelName), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) - if err != nil { - glog.Errorf("Failed to create context with tags: %v", err) - return - } +// modelRequested records the number of requests per pipeline and model +func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string) { + cen.lock.Lock() + defer cen.lock.Unlock() - stats.Record(ctx, census.mAIModelsRequested.M(1)) + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, modelName)}, cen.mAIModelsRequested.M(1)); err != nil { + glog.Errorf("Failed to record metrics with tags: %v", err) + } } +// recordAILatencyScore records the latency score for an AI job func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() defer cen.lock.Unlock() - ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) - if err != nil { - glog.Error("Error creating context", err) - return + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder())}, + cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) } - - stats.Record(ctx, census.mAILatencyScore.M(latencyScore)) } -func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { +// recordAIPricePerUnit records the price per unit for an AI job +func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { cen.lock.Lock() defer cen.lock.Unlock() - ctx, err := tag.New(cen.ctx, tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder())) - if err != nil { - glog.Error("Error creating context", err) - return + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + cen.mAIRequestPrice.M(pricePerUnit)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) } - - stats.Record(ctx, census.mAIPricePerUnit.M(pricePerUnit)) } -// RewardCallError records an error during the AI job request -func AIRequestError(sender string) { +// AIRequestError records an error during the AI job request +func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) { if err := stats.RecordWithTags(census.ctx, - []tag.Mutator{tag.Insert(census.kSender, sender)}, - census.mRewardCallError.M(1)); err != nil { - + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + census.mAIRequestError.M(1)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } } diff --git a/server/ai_process.go b/server/ai_process.go index 7865b8255d..efb647e07b 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -77,7 +77,7 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -95,7 +95,7 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -106,7 +106,7 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess took := time.Since(start) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -179,7 +179,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes mw, err := worker.NewImageToImageMultipartWriter(&buf, req) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -187,7 +187,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -195,14 +195,14 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes imageRdr, err := req.Image.Reader() if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -211,7 +211,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -222,7 +222,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes took := time.Since(start) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-image", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -251,7 +251,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { pricePerUnit = float64(priceInfo.PricePerUnit) } - monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil @@ -295,7 +295,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes mw, err := worker.NewImageToVideoMultipartWriter(&buf, req) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -303,7 +303,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -322,7 +322,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, outPixels) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -333,7 +333,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes took := time.Since(start) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -342,7 +342,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes data, err := io.ReadAll(resp.Body) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -359,7 +359,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes var res worker.ImageResponse if err := json.Unmarshal(data, &res); err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "image-to-video", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -372,7 +372,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { pricePerUnit = float64(priceInfo.PricePerUnit) } - monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) } return &res, nil @@ -412,7 +412,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, mw, err := worker.NewUpscaleMultipartWriter(&buf, req) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -420,7 +420,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -428,14 +428,14 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, imageRdr, err := req.Image.Reader() if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) } return nil, err } config, _, err := image.DecodeConfig(imageRdr) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -452,7 +452,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, took := time.Since(start) if err != nil { if monitor.Enabled { - monitor.AIRequestError(err.Error()) + monitor.AIRequestError(err.Error(), "upscale", *req.ModelId, sess.OrchestratorInfo) } return nil, err } @@ -475,7 +475,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { pricePerUnit = float64(priceInfo.PricePerUnit) } - monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + monitor.AiJobProcessed(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil From 59822854d996c3f7d82f8fc351e620f4f49ffc46 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Mon, 8 Jul 2024 22:33:36 +0200 Subject: [PATCH 07/12] feat(ai): log no capacity error to metrics This commit ensures that an error is logged when the Gateway could not find orchestrators for a given model and capability. --- monitor/census.go | 7 ++++++- server/ai_process.go | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 79cca90278..a5dfae2ef7 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -1804,8 +1804,13 @@ func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model str // AIRequestError records an error during the AI job request func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) { + orchAddr := "" + if addr := orchInfo.GetAddress(); addr != nil { + orchAddr = common.BytesToAddress(addr).String() + } + if err := stats.RecordWithTags(census.ctx, - []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)}, census.mAIRequestError.M(1)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } diff --git a/server/ai_process.go b/server/ai_process.go index efb647e07b..40066f8e87 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -75,6 +75,7 @@ func processTextToImage(ctx context.Context, params aiRequestParams, req worker. func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISession, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient)) + if err != nil { if monitor.Enabled { monitor.AIRequestError(err.Error(), "text-to-image", *req.ModelId, sess.OrchestratorInfo) @@ -526,6 +527,7 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface default: return nil, errors.New("unknown AI request type") } + capName, _ := core.CapabilityToName(cap) var resp *worker.ImageResponse @@ -536,7 +538,11 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface for { select { case <-cctx.Done(): - return nil, &ServiceUnavailableError{err: fmt.Errorf("no orchestrators available within %v timeout", processingRetryTimeout)} + err := fmt.Errorf("no orchestrators available within %v timeout", processingRetryTimeout) + if monitor.Enabled { + monitor.AIRequestError(err.Error(), capName, modelID, nil) + } + return nil, &ServiceUnavailableError{err: err} default: } @@ -563,7 +569,11 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface } if resp == nil { - return nil, &ServiceUnavailableError{err: errors.New("no orchestrators available")} + errMsg := "no orchestrators available" + if monitor.Enabled { + monitor.AIRequestError(errMsg, capName, modelID, nil) + } + return nil, &ServiceUnavailableError{err: errors.New(errMsg)} } return resp, nil From bd5b047694a8b25643f8865a436761a4f0a8c7f6 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 9 Jul 2024 12:41:41 +0200 Subject: [PATCH 08/12] feat(ai): add TicketValueSent and TicketsSent metrics This commit ensure that the `ticket_value_sent` abd `tickets_sent` metrics are also created for a AI Gateway. --- monitor/census.go | 12 ++++++------ server/ai_process.go | 10 ++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index a5dfae2ef7..cb0b8c7449 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -356,7 +356,7 @@ func InitCensus(nodeType NodeType, version string) { // Metrics for AI jobs census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot") - census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI orchestrator request latency score, based on smallest pipeline unit", "") + census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "") census.mAIRequestPrice = stats.Float64("ai_request_price", "Price paid per AI request unit", "") census.mAIRequestError = stats.Int64("ai_request_errors", "Errors when processing AI requests", "tot") @@ -888,7 +888,7 @@ func InitCensus(nodeType NodeType, version string) { { Name: "ai_request_latency_score", Measure: census.mAIRequestLatencyScore, - Description: "AI orchestrator request latency score", + Description: "AI request latency score", TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), Aggregation: view.LastValue(), }, @@ -896,7 +896,7 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_price", Measure: census.mAIRequestPrice, Description: "AI request price per unit", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), Aggregation: view.LastValue(), }, { @@ -1764,7 +1764,7 @@ func RewardCallError(sender string) { func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { census.modelRequested(pipeline, model) census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit) + census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) } // modelRequested records the number of requests per pipeline and model @@ -1791,12 +1791,12 @@ func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model str } // recordAIPricePerUnit records the price per unit for an AI job -func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { +func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder())}, cen.mAIRequestPrice.M(pricePerUnit)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } diff --git a/server/ai_process.go b/server/ai_process.go index 40066f8e87..64f167ed54 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -607,11 +607,21 @@ func prepareAIPayment(ctx context.Context, sess *AISession, outPixels int64) (wo payment, err := genPayment(ctx, sess.BroadcastSession, balUpdate.NumTickets) if err != nil { + clog.Errorf(ctx, "Could not create payment err=%q", err) + + if monitor.Enabled { + monitor.PaymentCreateError(ctx) + } + return nil, nil, err } // As soon as the request is sent to the orch consider the balance update's credit as spent balUpdate.Status = CreditSpent + if monitor.Enabled { + monitor.TicketValueSent(ctx, balUpdate.NewCredit) + monitor.TicketsSent(ctx, balUpdate.NumTickets) + } setHeaders := func(_ context.Context, req *http.Request) error { req.Header.Set(segmentHeader, segCreds) From b7181d425c64095da04a5c7e277173885afc0ab5 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 9 Jul 2024 13:09:46 +0200 Subject: [PATCH 09/12] fix(ai): ensure that AI metrics have orch address label This commit ensures that the AI gateway metrics contain the orch address label. --- monitor/census.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index cb0b8c7449..30d2162e6e 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -1784,7 +1784,7 @@ func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model str defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } @@ -1796,7 +1796,7 @@ func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model str defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, cen.mAIRequestPrice.M(pricePerUnit)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } From a3f7d537dbf23dd4ab5eb956350012cd674e74f8 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Sun, 14 Jul 2024 11:30:37 +0200 Subject: [PATCH 10/12] feat(ai): add orchestrator AI census metrics This commit introduces a suite of AI orchestrator metrics to the census module, mirroring those received by the Gateway. The newly added metrics include `ai_models_requested`, `ai_request_latency_score`, `ai_request_price`, and `ai_request_errors`, facilitating comprehensive tracking and analysis of AI request handling performance on the orchestrator side. --- monitor/census.go | 92 ++++++++++++++++++++++++++++--------- server/ai_http.go | 25 ++++++++++ server/ai_process.go | 107 ++++++++++++++++++++++++++++++------------- 3 files changed, 171 insertions(+), 53 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 30d2162e6e..4a432a8753 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -357,8 +357,8 @@ func InitCensus(nodeType NodeType, version string) { // Metrics for AI jobs census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot") census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "") - census.mAIRequestPrice = stats.Float64("ai_request_price", "Price paid per AI request unit", "") - census.mAIRequestError = stats.Int64("ai_request_errors", "Errors when processing AI requests", "tot") + census.mAIRequestPrice = stats.Float64("ai_request_price", "AI request price per unit, based on smallest pipeline unit", "") + census.mAIRequestError = stats.Int64("ai_request_errors", "Errors during AI request processing", "tot") glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version()) glog.Infof("Livepeer version: %s", version) @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) { baseTagsWithEthAddr := baseTags baseTagsWithManifestIDAndEthAddr := baseTags baseTagsWithOrchInfo := baseTags + baseTagsWithGatewayInfo := baseTags if PerStreamMetrics { baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID} baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender} @@ -391,8 +392,17 @@ func InitCensus(nodeType NodeType, version string) { } baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...) + baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...) baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...) + // Add node type specific tags. + baseTagsWithNodeInfo := baseTags + if nodeType == Orchestrator { + baseTagsWithNodeInfo = baseTagsWithOrchInfo + } else { + baseTagsWithNodeInfo = baseTagsWithGatewayInfo + } + views := []*view.View{ { Name: "versions", @@ -889,21 +899,21 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_latency_score", Measure: census.mAIRequestLatencyScore, Description: "AI request latency score", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), Aggregation: view.LastValue(), }, { Name: "ai_request_price", Measure: census.mAIRequestPrice, Description: "AI request price per unit", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), Aggregation: view.LastValue(), }, { Name: "ai_request_errors", Measure: census.mAIRequestError, Description: "Errors when processing AI requests", - TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), Aggregation: view.Sum(), }, } @@ -1760,15 +1770,8 @@ func RewardCallError(sender string) { } } -// AIJobProccessed records metrics from AI jobs -func AiJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { - census.modelRequested(pipeline, model) - census.recordAILatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) -} - -// modelRequested records the number of requests per pipeline and model -func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string) { +// recordModelRequested increments request count for a specific AI model and pipeline. +func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { cen.lock.Lock() defer cen.lock.Unlock() @@ -1778,31 +1781,38 @@ func (cen *censusMetricsCounter) modelRequested(pipeline, modelName string) { } } -// recordAILatencyScore records the latency score for an AI job -func (cen *censusMetricsCounter) recordAILatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { +// AIRequestFinished records gateway AI job request metrics. +func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) + census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) +} + +// recordAIRequestLatencyScore records the latency score for a AI job request. +func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } } -// recordAIPricePerUnit records the price per unit for an AI job -func (cen *censusMetricsCounter) recordAIPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { +// recordAIRequestPricePerUnit records the price per unit for a AI job request. +func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, cen.mAIRequestPrice.M(pricePerUnit)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } } -// AIRequestError records an error during the AI job request +// AIRequestError logs an error in a gateway AI job request. func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) { orchAddr := "" if addr := orchInfo.GetAddress(); addr != nil { @@ -1816,6 +1826,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet. } } +// AIJobProcessed records orchestrator AI job processing metrics. +func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, sender string) { + census.recordModelRequested(pipeline, model) + census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore, sender) + census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit, sender) +} + +// recordAIJobLatencyScore records the latency score for a processed AI job. +func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64, sender string) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)}, + cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// recordAIJobPricePerUnit logs the cost per unit of a processed AI job. +func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64, sender string) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)}, + cen.mAIRequestPrice.M(pricePerUnit)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// AIProcessingError logs errors in orchestrator AI job processing. +func AIProcessingError(code string, Pipeline string, Model string, sender string) { + if err := stats.RecordWithTags(census.ctx, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)}, + census.mAIRequestError.M(1)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_http.go b/server/ai_http.go index fea249ca51..fab8b8eb4a 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -14,6 +14,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" middleware "github.com/oapi-codegen/nethttp-middleware" "github.com/oapi-codegen/runtime" ) @@ -271,6 +272,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request start := time.Now() resp, err := submitFn(ctx) if err != nil { + if monitor.Enabled { + monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex()) + } respondWithError(w, err.Error(), http.StatusInternalServerError) return } @@ -283,6 +287,27 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request // If additional parameters that influence compute cost become configurable, then the formula should be reconsidered orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels) + if monitor.Enabled { + var latencyScore float64 + switch v := req.(type) { + case worker.TextToImageJSONRequestBody: + latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels) + case worker.ImageToImageMultipartRequestBody: + latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels) + case worker.ImageToVideoMultipartRequestBody: + latencyScore = CalculateImageToVideoLatencyScore(took, outPixels) + case worker.UpscaleMultipartRequestBody: + latencyScore = CalculateUpscaleLatencyScore(took, outPixels) + } + + var pricePerAIUnit float64 + if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 { + pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit()) + } + + monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit}, sender.Hex()) + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) diff --git a/server/ai_process.go b/server/ai_process.go index 64f167ed54..169bb5bdd7 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -9,6 +9,7 @@ import ( "fmt" "image" "io" + "math" "math/big" "net/http" "path/filepath" @@ -44,6 +45,24 @@ type aiRequestParams struct { sessManager *AISessionManager } +// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request. +func CalculateTextToImageLatencyScore(took time.Duration, req worker.TextToImageJSONRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + numInferenceSteps := float64(50) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + + return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) +} + func processTextToImage(ctx context.Context, params aiRequestParams, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -125,27 +144,34 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess // TODO: Refine this rough estimate in future iterations. // TODO: Default values for the number of images and inference steps are currently hardcoded. // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - numInferenceSteps := float64(50) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + sess.LatencyScore = CalculateTextToImageLatencyScore(took, req, outPixels) if monitor.Enabled { - pricePerUnit := 0.0 - if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { - pricePerUnit = float64(priceInfo.PricePerUnit) + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) } - monitor.AiJobProcessed(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + + monitor.AIRequestFinished(ctx, "text-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil } +// CalculateImageToImageLatencyScore computes the time taken per pixel for an image-to-image request. +func CalculateImageToImageLatencyScore(took time.Duration, req worker.ImageToImageMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + + return took.Seconds() / float64(outPixels) / numImages +} + func processImageToImage(ctx context.Context, params aiRequestParams, req worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -241,23 +267,29 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes // TODO: Refine this rough estimate in future iterations. // TODO: Default values for the number of images is currently hardcoded. // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / numImages + sess.LatencyScore = CalculateImageToImageLatencyScore(took, req, outPixels) if monitor.Enabled { - pricePerUnit := 0.0 - if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { - pricePerUnit = float64(priceInfo.PricePerUnit) + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) } - monitor.AiJobProcessed(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + + monitor.AIRequestFinished(ctx, "image-to-image", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil } +// CalculateImageToVideoLatencyScore computes the time taken per pixel for an image-to-video request. +func CalculateImageToVideoLatencyScore(took time.Duration, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + return took.Seconds() / float64(outPixels) +} + func processImageToVideo(ctx context.Context, params aiRequestParams, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -366,19 +398,29 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes } // TODO: Refine this rough estimate in future iterations - sess.LatencyScore = took.Seconds() / float64(outPixels) + sess.LatencyScore = CalculateImageToVideoLatencyScore(took, outPixels) if monitor.Enabled { - pricePerUnit := 0.0 - if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { - pricePerUnit = float64(priceInfo.PricePerUnit) + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) } - monitor.AiJobProcessed(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + + monitor.AIRequestFinished(ctx, "image-to-video", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) } return &res, nil } +// CalculateUpscaleLatencyScore computes the time taken per pixel for an upscale request. +func CalculateUpscaleLatencyScore(took time.Duration, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + return took.Seconds() / float64(outPixels) +} + func processUpscale(ctx context.Context, params aiRequestParams, req worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -469,14 +511,15 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, } // TODO: Refine this rough estimate in future iterations - sess.LatencyScore = took.Seconds() / float64(outPixels) + sess.LatencyScore = CalculateUpscaleLatencyScore(took, outPixels) if monitor.Enabled { - pricePerUnit := 0.0 - if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil { - pricePerUnit = float64(priceInfo.PricePerUnit) + var pricePerAIUnit float64 + if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 { + pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit) } - monitor.AiJobProcessed(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerUnit}, sess.OrchestratorInfo) + + monitor.AIRequestFinished(ctx, "upscale", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo) } return resp.JSON200, nil From 7bfdd45333887c2acecf3793c41042826605f011 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Wed, 17 Jul 2024 21:17:50 +0200 Subject: [PATCH 11/12] refactor: improve orchestrator metrics tags This commit ensures that the right tags are attached to the Orchestrator AI metrics. --- monitor/census.go | 10 ++++++---- server/ai_http.go | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 4a432a8753..d6140ac82c 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -397,10 +397,12 @@ func InitCensus(nodeType NodeType, version string) { // Add node type specific tags. baseTagsWithNodeInfo := baseTags + aiRequestLatencyScoreTags := baseTags if nodeType == Orchestrator { - baseTagsWithNodeInfo = baseTagsWithOrchInfo - } else { baseTagsWithNodeInfo = baseTagsWithGatewayInfo + } else { + baseTagsWithNodeInfo = baseTagsWithOrchInfo + aiRequestLatencyScoreTags = baseTagsWithOrchInfo } views := []*view.View{ @@ -899,14 +901,14 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_latency_score", Measure: census.mAIRequestLatencyScore, Description: "AI request latency score", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, aiRequestLatencyScoreTags...), Aggregation: view.LastValue(), }, { Name: "ai_request_price", Measure: census.mAIRequestPrice, Description: "AI request price per unit", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...), Aggregation: view.LastValue(), }, { diff --git a/server/ai_http.go b/server/ai_http.go index fab8b8eb4a..ee60603c28 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -3,6 +3,7 @@ package server import ( "context" "encoding/json" + "errors" "fmt" "image" "net/http" @@ -271,6 +272,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request start := time.Now() resp, err := submitFn(ctx) + err = errors.New("fake error for testing") if err != nil { if monitor.Enabled { monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex()) From e57a7c0b78bc3d634f2a1be44a40db10930a8d86 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Thu, 18 Jul 2024 15:39:03 +0200 Subject: [PATCH 12/12] refactor(ai): improve latency score calculations This commit ensures that no devide by zero errors can occur in the latency score calculations. --- server/ai_process.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/ai_process.go b/server/ai_process.go index e42f9ec0e9..d286582215 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -72,7 +72,7 @@ func CalculateTextToImageLatencyScore(took time.Duration, req worker.TextToImage } // Handle special case for SDXL-Lightning model. if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) } return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) @@ -187,11 +187,11 @@ func CalculateImageToImageLatencyScore(took time.Duration, req worker.ImageToIma } numInferenceSteps := float64(100) if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) } // Handle special case for SDXL-Lightning model. if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) } return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) @@ -316,7 +316,7 @@ func CalculateImageToVideoLatencyScore(took time.Duration, req worker.ImageToVid // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. numInferenceSteps := float64(25) if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) } return took.Seconds() / float64(outPixels) / numInferenceSteps @@ -457,7 +457,7 @@ func CalculateUpscaleLatencyScore(took time.Duration, req worker.UpscaleMultipar // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. numInferenceSteps := float64(75) if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) } return took.Seconds() / float64(outPixels) / numInferenceSteps