From 931e8822b3468b0671d56459ee6f8b4e768c5544 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Thu, 18 Jul 2024 15:40:47 +0200 Subject: [PATCH] feat(ai): add AI orchestrator metrics (#3097) * Add gateway metric for roundtrip ai times by model and pipeline * Rename metrics and add unique manifest * Fix name mismatch * modelsRequested not working correctly * feat: add initial POC AI gateway metrics This commit adds the initial AI gateway metrics so that they can reviewed by others. The code still need to be cleaned up and the buckets adjusted. * feat: improve AI metrics This commit improves the AI metrics so that they are easier to work with. * feat(ai): log no capacity error to metrics This commit ensures that an error is logged when the Gateway could not find orchestrators for a given model and capability. * feat(ai): add TicketValueSent and TicketsSent metrics This commit ensure that the `ticket_value_sent` abd `tickets_sent` metrics are also created for a AI Gateway. * fix(ai): ensure that AI metrics have orch address label This commit ensures that the AI gateway metrics contain the orch address label. * feat(ai): add orchestrator AI census metrics This commit introduces a suite of AI orchestrator metrics to the census module, mirroring those received by the Gateway. The newly added metrics include `ai_models_requested`, `ai_request_latency_score`, `ai_request_price`, and `ai_request_errors`, facilitating comprehensive tracking and analysis of AI request handling performance on the orchestrator side. * refactor: improve orchestrator metrics tags This commit ensures that the right tags are attached to the Orchestrator AI metrics. * refactor(ai): improve latency score calculations This commit ensures that no devide by zero errors can occur in the latency score calculations. --------- Co-authored-by: Elite Encoder --- monitor/census.go | 74 ++++++++++++++++--- server/ai_http.go | 30 ++++++++ server/ai_process.go | 164 ++++++++++++++++++++++++++++--------------- 3 files changed, 199 insertions(+), 69 deletions(-) diff --git a/monitor/census.go b/monitor/census.go index 373220122b..2f883d5e48 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) { baseTagsWithEthAddr := baseTags baseTagsWithManifestIDAndEthAddr := baseTags baseTagsWithOrchInfo := baseTags + baseTagsWithGatewayInfo := baseTags if PerStreamMetrics { baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID} baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender} @@ -391,8 +392,19 @@ func InitCensus(nodeType NodeType, version string) { } baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...) + baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...) baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...) + // Add node type specific tags. + baseTagsWithNodeInfo := baseTags + aiRequestLatencyScoreTags := baseTags + if nodeType == Orchestrator { + baseTagsWithNodeInfo = baseTagsWithGatewayInfo + } else { + baseTagsWithNodeInfo = baseTagsWithOrchInfo + aiRequestLatencyScoreTags = baseTagsWithOrchInfo + } + views := []*view.View{ { Name: "versions", @@ -889,7 +901,7 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_latency_score", Measure: census.mAIRequestLatencyScore, Description: "AI request latency score", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, aiRequestLatencyScoreTags...), Aggregation: view.LastValue(), }, { @@ -903,7 +915,7 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_errors", Measure: census.mAIRequestError, Description: "Errors when processing AI requests", - TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), Aggregation: view.Sum(), }, } @@ -1760,13 +1772,6 @@ func RewardCallError(sender string) { } } -// AIRequestFinished records gateway AI job request metrics. -func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { - census.recordModelRequested(pipeline, model) - census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) -} - // recordModelRequested increments request count for a specific AI model and pipeline. func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { cen.lock.Lock() @@ -1778,6 +1783,13 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string } } +// AIRequestFinished records gateway AI job request metrics. +func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) + census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit) +} + // recordAIRequestLatencyScore records the latency score for a AI job request. func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() @@ -1791,12 +1803,12 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Mo } // recordAIRequestPricePerUnit records the price per unit for a AI job request. -func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { +func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { cen.lock.Lock() defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, cen.mAIRequestPrice.M(pricePerUnit)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } @@ -1816,6 +1828,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet. } } +// AIJobProcessed records orchestrator AI job processing metrics. +func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore) + census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit) +} + +// recordAIJobLatencyScore records the latency score for a processed AI job. +func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// recordAIJobPricePerUnit logs the cost per unit of a processed AI job. +func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + cen.mAIRequestPrice.M(pricePerUnit)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// AIProcessingError logs errors in orchestrator AI job processing. +func AIProcessingError(code string, Pipeline string, Model string, sender string) { + if err := stats.RecordWithTags(census.ctx, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)}, + census.mAIRequestError.M(1)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_http.go b/server/ai_http.go index 4ef6eac545..eba099df35 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -14,6 +14,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" middleware "github.com/oapi-codegen/nethttp-middleware" "github.com/oapi-codegen/runtime" ) @@ -309,6 +310,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request start := time.Now() resp, err := submitFn(ctx) if err != nil { + if monitor.Enabled { + monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex()) + } respondWithError(w, err.Error(), http.StatusInternalServerError) return } @@ -321,6 +325,32 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request // If additional parameters that influence compute cost become configurable, then the formula should be reconsidered orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels) + if monitor.Enabled { + var latencyScore float64 + switch v := req.(type) { + case worker.TextToImageJSONRequestBody: + latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels) + case worker.ImageToImageMultipartRequestBody: + latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels) + case worker.ImageToVideoMultipartRequestBody: + latencyScore = CalculateImageToVideoLatencyScore(took, v, outPixels) + case worker.UpscaleMultipartRequestBody: + latencyScore = CalculateUpscaleLatencyScore(took, v, outPixels) + case worker.AudioToTextMultipartRequestBody: + durationSeconds, err := common.CalculateAudioDuration(v.Audio) + if err == nil { + latencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds) + } + } + + var pricePerAIUnit float64 + if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 { + pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit()) + } + + monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit}) + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) diff --git a/server/ai_process.go b/server/ai_process.go index 613c294242..d286582215 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -9,6 +9,7 @@ import ( "fmt" "image" "io" + "math" "math/big" "net/http" "path/filepath" @@ -53,6 +54,30 @@ type aiRequestParams struct { sessManager *AISessionManager } +// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request. +func CalculateTextToImageLatencyScore(took time.Duration, req worker.TextToImageJSONRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of images and inference steps are currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + numInferenceSteps := float64(50) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + // Handle special case for SDXL-Lightning model. + if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) + } + + return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) +} + func processTextToImage(ctx context.Context, params aiRequestParams, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -134,22 +159,7 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess } // TODO: Refine this rough estimate in future iterations. - // TODO: Default values for the number of images and inference steps are currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - numInferenceSteps := float64(50) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - // Handle special case for SDXL-Lightning model. - if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) - } - - sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + sess.LatencyScore = CalculateTextToImageLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -163,6 +173,30 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess return resp.JSON200, nil } +// CalculateImageToImageLatencyScore computes the time taken per pixel for an image-to-image request. +func CalculateImageToImageLatencyScore(took time.Duration, req worker.ImageToImageMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of images and inference steps are currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + numInferenceSteps := float64(100) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + // Handle special case for SDXL-Lightning model. + if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) + } + + return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) +} + func processImageToImage(ctx context.Context, params aiRequestParams, req worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -258,22 +292,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes } // TODO: Refine this rough estimate in future iterations. - // TODO: Default values for the number of images and inference steps are currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - numInferenceSteps := float64(100) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - // Handle special case for SDXL-Lightning model. - if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) - } - - sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + sess.LatencyScore = CalculateImageToImageLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -287,6 +306,22 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes return resp.JSON200, nil } +// CalculateImageToVideoLatencyScore computes the time taken per pixel for an image-to-video request. +func CalculateImageToVideoLatencyScore(took time.Duration, req worker.ImageToVideoMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of inference steps is currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numInferenceSteps := float64(25) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + + return took.Seconds() / float64(outPixels) / numInferenceSteps +} + func processImageToVideo(ctx context.Context, params aiRequestParams, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -398,13 +433,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes } // TODO: Refine this rough estimate in future iterations - // TODO: Default values for the number of inference steps is currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numInferenceSteps := float64(25) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + sess.LatencyScore = CalculateImageToVideoLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -418,6 +447,22 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes return &res, nil } +// CalculateUpscaleLatencyScore computes the time taken per pixel for an upscale request. +func CalculateUpscaleLatencyScore(took time.Duration, req worker.UpscaleMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of inference steps is currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numInferenceSteps := float64(75) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + + return took.Seconds() / float64(outPixels) / numInferenceSteps +} + func processUpscale(ctx context.Context, params aiRequestParams, req worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -513,13 +558,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, } // TODO: Refine this rough estimate in future iterations - // TODO: Default values for the number of inference steps is currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numInferenceSteps := float64(75) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + sess.LatencyScore = CalculateUpscaleLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -533,6 +572,26 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, return resp.JSON200, nil } +// CalculateAudioToTextLatencyScore computes the time taken per second of audio for an audio-to-text request. +func CalculateAudioToTextLatencyScore(took time.Duration, durationSeconds int64) float64 { + if durationSeconds <= 0 { + return 0 + } + + return took.Seconds() / float64(durationSeconds) +} + +func processAudioToText(ctx context.Context, params aiRequestParams, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { + resp, err := processAIRequest(ctx, params, req) + if err != nil { + return nil, err + } + + txtResp := resp.(*worker.TextResponse) + + return txtResp, nil +} + func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISession, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { var buf bytes.Buffer mw, err := worker.NewAudioToTextMultipartWriter(&buf, req) @@ -606,7 +665,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess } // TODO: Refine this rough estimate in future iterations - sess.LatencyScore = took.Seconds() / float64(durationSeconds) + sess.LatencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds) if monitor.Enabled { var pricePerAIUnit float64 @@ -620,17 +679,6 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess return &res, nil } -func processAudioToText(ctx context.Context, params aiRequestParams, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { - resp, err := processAIRequest(ctx, params, req) - if err != nil { - return nil, err - } - - txtResp := resp.(*worker.TextResponse) - - return txtResp, nil -} - func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) { var cap core.Capability var modelID string