diff --git a/monitor/census.go b/monitor/census.go index 373220122b..2f883d5e48 100644 --- a/monitor/census.go +++ b/monitor/census.go @@ -380,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) { baseTagsWithEthAddr := baseTags baseTagsWithManifestIDAndEthAddr := baseTags baseTagsWithOrchInfo := baseTags + baseTagsWithGatewayInfo := baseTags if PerStreamMetrics { baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID} baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender} @@ -391,8 +392,19 @@ func InitCensus(nodeType NodeType, version string) { } baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...) + baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...) baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...) + // Add node type specific tags. + baseTagsWithNodeInfo := baseTags + aiRequestLatencyScoreTags := baseTags + if nodeType == Orchestrator { + baseTagsWithNodeInfo = baseTagsWithGatewayInfo + } else { + baseTagsWithNodeInfo = baseTagsWithOrchInfo + aiRequestLatencyScoreTags = baseTagsWithOrchInfo + } + views := []*view.View{ { Name: "versions", @@ -889,7 +901,7 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_latency_score", Measure: census.mAIRequestLatencyScore, Description: "AI request latency score", - TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, aiRequestLatencyScoreTags...), Aggregation: view.LastValue(), }, { @@ -903,7 +915,7 @@ func InitCensus(nodeType NodeType, version string) { Name: "ai_request_errors", Measure: census.mAIRequestError, Description: "Errors when processing AI requests", - TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithOrchInfo...), + TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...), Aggregation: view.Sum(), }, } @@ -1760,13 +1772,6 @@ func RewardCallError(sender string) { } } -// AIRequestFinished records gateway AI job request metrics. -func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { - census.recordModelRequested(pipeline, model) - census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) - census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo) -} - // recordModelRequested increments request count for a specific AI model and pipeline. func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) { cen.lock.Lock() @@ -1778,6 +1783,13 @@ func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string } } +// AIRequestFinished records gateway AI job request metrics. +func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo) + census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit) +} + // recordAIRequestLatencyScore records the latency score for a AI job request. func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) { cen.lock.Lock() @@ -1791,12 +1803,12 @@ func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Mo } // recordAIRequestPricePerUnit records the price per unit for a AI job request. -func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) { +func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { cen.lock.Lock() defer cen.lock.Unlock() if err := stats.RecordWithTags(cen.ctx, - []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())}, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, cen.mAIRequestPrice.M(pricePerUnit)); err != nil { glog.Errorf("Error recording metrics err=%q", err) } @@ -1816,6 +1828,46 @@ func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet. } } +// AIJobProcessed records orchestrator AI job processing metrics. +func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo) { + census.recordModelRequested(pipeline, model) + census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore) + census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit) +} + +// recordAIJobLatencyScore records the latency score for a processed AI job. +func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + cen.mAIRequestLatencyScore.M(latencyScore)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// recordAIJobPricePerUnit logs the cost per unit of a processed AI job. +func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64) { + cen.lock.Lock() + defer cen.lock.Unlock() + + if err := stats.RecordWithTags(cen.ctx, + []tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model)}, + cen.mAIRequestPrice.M(pricePerUnit)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + +// AIProcessingError logs errors in orchestrator AI job processing. +func AIProcessingError(code string, Pipeline string, Model string, sender string) { + if err := stats.RecordWithTags(census.ctx, + []tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)}, + census.mAIRequestError.M(1)); err != nil { + glog.Errorf("Error recording metrics err=%q", err) + } +} + // Convert wei to gwei func wei2gwei(wei *big.Int) float64 { gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64() diff --git a/server/ai_http.go b/server/ai_http.go index 4ef6eac545..eba099df35 100644 --- a/server/ai_http.go +++ b/server/ai_http.go @@ -14,6 +14,7 @@ import ( "github.com/livepeer/go-livepeer/clog" "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" + "github.com/livepeer/go-livepeer/monitor" middleware "github.com/oapi-codegen/nethttp-middleware" "github.com/oapi-codegen/runtime" ) @@ -309,6 +310,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request start := time.Now() resp, err := submitFn(ctx) if err != nil { + if monitor.Enabled { + monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex()) + } respondWithError(w, err.Error(), http.StatusInternalServerError) return } @@ -321,6 +325,32 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request // If additional parameters that influence compute cost become configurable, then the formula should be reconsidered orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels) + if monitor.Enabled { + var latencyScore float64 + switch v := req.(type) { + case worker.TextToImageJSONRequestBody: + latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels) + case worker.ImageToImageMultipartRequestBody: + latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels) + case worker.ImageToVideoMultipartRequestBody: + latencyScore = CalculateImageToVideoLatencyScore(took, v, outPixels) + case worker.UpscaleMultipartRequestBody: + latencyScore = CalculateUpscaleLatencyScore(took, v, outPixels) + case worker.AudioToTextMultipartRequestBody: + durationSeconds, err := common.CalculateAudioDuration(v.Audio) + if err == nil { + latencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds) + } + } + + var pricePerAIUnit float64 + if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 { + pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit()) + } + + monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit}) + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) diff --git a/server/ai_process.go b/server/ai_process.go index 613c294242..d286582215 100644 --- a/server/ai_process.go +++ b/server/ai_process.go @@ -9,6 +9,7 @@ import ( "fmt" "image" "io" + "math" "math/big" "net/http" "path/filepath" @@ -53,6 +54,30 @@ type aiRequestParams struct { sessManager *AISessionManager } +// CalculateTextToImageLatencyScore computes the time taken per pixel for an text-to-image request. +func CalculateTextToImageLatencyScore(took time.Duration, req worker.TextToImageJSONRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of images and inference steps are currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + numInferenceSteps := float64(50) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + // Handle special case for SDXL-Lightning model. + if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) + } + + return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) +} + func processTextToImage(ctx context.Context, params aiRequestParams, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -134,22 +159,7 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess } // TODO: Refine this rough estimate in future iterations. - // TODO: Default values for the number of images and inference steps are currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - numInferenceSteps := float64(50) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - // Handle special case for SDXL-Lightning model. - if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) - } - - sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + sess.LatencyScore = CalculateTextToImageLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -163,6 +173,30 @@ func submitTextToImage(ctx context.Context, params aiRequestParams, sess *AISess return resp.JSON200, nil } +// CalculateImageToImageLatencyScore computes the time taken per pixel for an image-to-image request. +func CalculateImageToImageLatencyScore(took time.Duration, req worker.ImageToImageMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of images and inference steps are currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numImages := float64(1) + if req.NumImagesPerPrompt != nil { + numImages = math.Max(1, float64(*req.NumImagesPerPrompt)) + } + numInferenceSteps := float64(100) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + // Handle special case for SDXL-Lightning model. + if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { + numInferenceSteps = math.Max(1, core.ParseStepsFromModelID(req.ModelId, 8)) + } + + return took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) +} + func processImageToImage(ctx context.Context, params aiRequestParams, req worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -258,22 +292,7 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes } // TODO: Refine this rough estimate in future iterations. - // TODO: Default values for the number of images and inference steps are currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numImages := float64(1) - if req.NumImagesPerPrompt != nil { - numImages = float64(*req.NumImagesPerPrompt) - } - numInferenceSteps := float64(100) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - // Handle special case for SDXL-Lightning model. - if strings.HasPrefix(*req.ModelId, "ByteDance/SDXL-Lightning") { - numInferenceSteps = core.ParseStepsFromModelID(req.ModelId, 8) - } - - sess.LatencyScore = took.Seconds() / float64(outPixels) / (numImages * numInferenceSteps) + sess.LatencyScore = CalculateImageToImageLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -287,6 +306,22 @@ func submitImageToImage(ctx context.Context, params aiRequestParams, sess *AISes return resp.JSON200, nil } +// CalculateImageToVideoLatencyScore computes the time taken per pixel for an image-to-video request. +func CalculateImageToVideoLatencyScore(took time.Duration, req worker.ImageToVideoMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of inference steps is currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numInferenceSteps := float64(25) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + + return took.Seconds() / float64(outPixels) / numInferenceSteps +} + func processImageToVideo(ctx context.Context, params aiRequestParams, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -398,13 +433,7 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes } // TODO: Refine this rough estimate in future iterations - // TODO: Default values for the number of inference steps is currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numInferenceSteps := float64(25) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + sess.LatencyScore = CalculateImageToVideoLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -418,6 +447,22 @@ func submitImageToVideo(ctx context.Context, params aiRequestParams, sess *AISes return &res, nil } +// CalculateUpscaleLatencyScore computes the time taken per pixel for an upscale request. +func CalculateUpscaleLatencyScore(took time.Duration, req worker.UpscaleMultipartRequestBody, outPixels int64) float64 { + if outPixels <= 0 { + return 0 + } + + // TODO: Default values for the number of inference steps is currently hardcoded. + // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. + numInferenceSteps := float64(75) + if req.NumInferenceSteps != nil { + numInferenceSteps = math.Max(1, float64(*req.NumInferenceSteps)) + } + + return took.Seconds() / float64(outPixels) / numInferenceSteps +} + func processUpscale(ctx context.Context, params aiRequestParams, req worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error) { resp, err := processAIRequest(ctx, params, req) if err != nil { @@ -513,13 +558,7 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, } // TODO: Refine this rough estimate in future iterations - // TODO: Default values for the number of inference steps is currently hardcoded. - // These should be managed by the nethttpmiddleware. Refer to issue LIV-412 for more details. - numInferenceSteps := float64(75) - if req.NumInferenceSteps != nil { - numInferenceSteps = float64(*req.NumInferenceSteps) - } - sess.LatencyScore = took.Seconds() / float64(outPixels) / numInferenceSteps + sess.LatencyScore = CalculateUpscaleLatencyScore(took, req, outPixels) if monitor.Enabled { var pricePerAIUnit float64 @@ -533,6 +572,26 @@ func submitUpscale(ctx context.Context, params aiRequestParams, sess *AISession, return resp.JSON200, nil } +// CalculateAudioToTextLatencyScore computes the time taken per second of audio for an audio-to-text request. +func CalculateAudioToTextLatencyScore(took time.Duration, durationSeconds int64) float64 { + if durationSeconds <= 0 { + return 0 + } + + return took.Seconds() / float64(durationSeconds) +} + +func processAudioToText(ctx context.Context, params aiRequestParams, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { + resp, err := processAIRequest(ctx, params, req) + if err != nil { + return nil, err + } + + txtResp := resp.(*worker.TextResponse) + + return txtResp, nil +} + func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISession, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { var buf bytes.Buffer mw, err := worker.NewAudioToTextMultipartWriter(&buf, req) @@ -606,7 +665,7 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess } // TODO: Refine this rough estimate in future iterations - sess.LatencyScore = took.Seconds() / float64(durationSeconds) + sess.LatencyScore = CalculateAudioToTextLatencyScore(took, durationSeconds) if monitor.Enabled { var pricePerAIUnit float64 @@ -620,17 +679,6 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess return &res, nil } -func processAudioToText(ctx context.Context, params aiRequestParams, req worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error) { - resp, err := processAIRequest(ctx, params, req) - if err != nil { - return nil, err - } - - txtResp := resp.(*worker.TextResponse) - - return txtResp, nil -} - func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) { var cap core.Capability var modelID string