Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai): add AI orchestrator metrics #3097

Merged
merged 13 commits into from
Jul 18, 2024
Merged
157 changes: 157 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type (
kOrchestratorURI tag.Key
kOrchestratorAddress tag.Key
kFVErrorType tag.Key
kPipeline tag.Key
kModelName tag.Key
mSegmentSourceAppeared *stats.Int64Measure
mSegmentEmerged *stats.Int64Measure
mSegmentEmergedUnprocessed *stats.Int64Measure
Expand Down Expand Up @@ -190,6 +192,12 @@ type (
mSegmentClassProb *stats.Float64Measure
mSceneClassification *stats.Int64Measure

// Metrics for AI jobs
mAIModelsRequested *stats.Int64Measure
mAIRequestLatencyScore *stats.Float64Measure
mAIRequestPrice *stats.Float64Measure
mAIRequestError *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
success map[uint64]*segmentsAverager
Expand Down Expand Up @@ -217,6 +225,11 @@ type (
removedAt time.Time
tries map[uint64]tryData // seqNo:try
}

AIJobInfo struct {
LatencyScore float64
PricePerUnit float64
}
)

// Exporter Prometheus exporter that handles `/metrics` endpoint
Expand Down Expand Up @@ -254,6 +267,8 @@ func InitCensus(nodeType NodeType, version string) {
census.kOrchestratorAddress = tag.MustNewKey("orchestrator_address")
census.kFVErrorType = tag.MustNewKey("fverror_type")
census.kSegClassName = tag.MustNewKey("seg_class_name")
census.kModelName = tag.MustNewKey("model_name")
census.kPipeline = tag.MustNewKey("pipeline")
census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID))
if err != nil {
glog.Exit("Error creating context", err)
Expand Down Expand Up @@ -339,6 +354,12 @@ func InitCensus(nodeType NodeType, version string) {
census.mSegmentClassProb = stats.Float64("segment_class_prob", "SegmentClassProb", "tot")
census.mSceneClassification = stats.Int64("scene_classification_done", "SceneClassificationDone", "tot")

// Metrics for AI jobs
census.mAIModelsRequested = stats.Int64("ai_models_requested", "Number of AI models requested over time", "tot")
census.mAIRequestLatencyScore = stats.Float64("ai_request_latency_score", "AI request latency score, based on smallest pipeline unit", "")
census.mAIRequestPrice = stats.Float64("ai_request_price", "AI request price per unit, based on smallest pipeline unit", "")
census.mAIRequestError = stats.Int64("ai_request_errors", "Errors during AI request processing", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
glog.Infof("Node type %s node ID %s", nodeType, NodeID)
Expand All @@ -359,6 +380,7 @@ func InitCensus(nodeType NodeType, version string) {
baseTagsWithEthAddr := baseTags
baseTagsWithManifestIDAndEthAddr := baseTags
baseTagsWithOrchInfo := baseTags
baseTagsWithGatewayInfo := baseTags
if PerStreamMetrics {
baseTagsWithManifestID = []tag.Key{census.kNodeID, census.kNodeType, census.kManifestID}
baseTagsWithEthAddr = []tag.Key{census.kNodeID, census.kNodeType, census.kSender}
Expand All @@ -370,8 +392,17 @@ func InitCensus(nodeType NodeType, version string) {
}
baseTagsWithManifestIDAndOrchInfo := baseTagsWithManifestID
baseTagsWithOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTags...)
baseTagsWithGatewayInfo = append([]tag.Key{census.kSender}, baseTags...)
baseTagsWithManifestIDAndOrchInfo = append([]tag.Key{census.kOrchestratorURI, census.kOrchestratorAddress}, baseTagsWithManifestID...)

// Add node type specific tags.
baseTagsWithNodeInfo := baseTags
if nodeType == Orchestrator {
baseTagsWithNodeInfo = baseTagsWithOrchInfo
} else {
baseTagsWithNodeInfo = baseTagsWithGatewayInfo
}

views := []*view.View{
{
Name: "versions",
Expand Down Expand Up @@ -855,6 +886,36 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTags,
Aggregation: view.Count(),
},

// Metrics for AI jobs
{
Name: "ai_models_requested",
Measure: census.mAIModelsRequested,
Description: "Number of AI models requested over time",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTags...),
Aggregation: view.Count(),
},
{
Name: "ai_request_latency_score",
Measure: census.mAIRequestLatencyScore,
Description: "AI request latency score",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eliteprox, @ad-astra-video do you think listing this per gateway label makes sense?

Aggregation: view.LastValue(),
},
{
Name: "ai_request_price",
Measure: census.mAIRequestPrice,
Description: "AI request price per unit",
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.LastValue(),
},
{
Name: "ai_request_errors",
Measure: census.mAIRequestError,
Description: "Errors when processing AI requests",
TagKeys: append([]tag.Key{census.kErrorCode, census.kPipeline, census.kModelName}, baseTagsWithNodeInfo...),
Aggregation: view.Sum(),
},
}

// Register the views
Expand Down Expand Up @@ -1709,6 +1770,102 @@ func RewardCallError(sender string) {
}
}

// recordModelRequested increments request count for a specific AI model and pipeline.
func (cen *censusMetricsCounter) recordModelRequested(pipeline, modelName string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, pipeline), tag.Insert(cen.kModelName, modelName)}, cen.mAIModelsRequested.M(1)); err != nil {
glog.Errorf("Failed to record metrics with tags: %v", err)
}
}

// AIRequestFinished records gateway AI job request metrics.
func AIRequestFinished(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, orchInfo *lpnet.OrchestratorInfo) {
census.recordModelRequested(pipeline, model)
census.recordAIRequestLatencyScore(pipeline, model, jobInfo.LatencyScore, orchInfo)
census.recordAIRequestPricePerUnit(pipeline, model, jobInfo.PricePerUnit, orchInfo)
}

// recordAIRequestLatencyScore records the latency score for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestLatencyScore(Pipeline string, Model string, latencyScore float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIRequestPricePerUnit records the price per unit for a AI job request.
func (cen *censusMetricsCounter) recordAIRequestPricePerUnit(Pipeline string, Model string, pricePerUnit float64, orchInfo *lpnet.OrchestratorInfo) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(cen.kOrchestratorAddress, common.BytesToAddress(orchInfo.GetAddress()).String())},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIRequestError logs an error in a gateway AI job request.
func AIRequestError(code string, Pipeline string, Model string, orchInfo *lpnet.OrchestratorInfo) {
orchAddr := ""
if addr := orchInfo.GetAddress(); addr != nil {
orchAddr = common.BytesToAddress(addr).String()
}

if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kOrchestratorURI, orchInfo.GetTranscoder()), tag.Insert(census.kOrchestratorAddress, orchAddr)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIJobProcessed records orchestrator AI job processing metrics.
func AIJobProcessed(ctx context.Context, pipeline string, model string, jobInfo AIJobInfo, sender string) {
census.recordModelRequested(pipeline, model)
census.recordAIJobLatencyScore(pipeline, model, jobInfo.LatencyScore, sender)
census.recordAIJobPricePerUnit(pipeline, model, jobInfo.PricePerUnit, sender)
}

// recordAIJobLatencyScore records the latency score for a processed AI job.
func (cen *censusMetricsCounter) recordAIJobLatencyScore(Pipeline string, Model string, latencyScore float64, sender string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)},
cen.mAIRequestLatencyScore.M(latencyScore)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// recordAIJobPricePerUnit logs the cost per unit of a processed AI job.
func (cen *censusMetricsCounter) recordAIJobPricePerUnit(Pipeline string, Model string, pricePerUnit float64, sender string) {
cen.lock.Lock()
defer cen.lock.Unlock()

if err := stats.RecordWithTags(cen.ctx,
[]tag.Mutator{tag.Insert(cen.kPipeline, Pipeline), tag.Insert(cen.kModelName, Model), tag.Insert(cen.kSender, sender)},
cen.mAIRequestPrice.M(pricePerUnit)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// AIProcessingError logs errors in orchestrator AI job processing.
func AIProcessingError(code string, Pipeline string, Model string, sender string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kErrorCode, code), tag.Insert(census.kPipeline, Pipeline), tag.Insert(census.kModelName, Model), tag.Insert(census.kSender, sender)},
census.mAIRequestError.M(1)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
25 changes: 25 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/monitor"
middleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/oapi-codegen/runtime"
)
Expand Down Expand Up @@ -271,6 +272,9 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
start := time.Now()
resp, err := submitFn(ctx)
if err != nil {
if monitor.Enabled {
monitor.AIProcessingError(err.Error(), pipeline, modelID, sender.Hex())
}
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -283,6 +287,27 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
// If additional parameters that influence compute cost become configurable, then the formula should be reconsidered
orch.DebitFees(sender, manifestID, payment.GetExpectedPrice(), outPixels)

if monitor.Enabled {
var latencyScore float64
switch v := req.(type) {
case worker.TextToImageJSONRequestBody:
latencyScore = CalculateTextToImageLatencyScore(took, v, outPixels)
case worker.ImageToImageMultipartRequestBody:
latencyScore = CalculateImageToImageLatencyScore(took, v, outPixels)
case worker.ImageToVideoMultipartRequestBody:
latencyScore = CalculateImageToVideoLatencyScore(took, outPixels)
case worker.UpscaleMultipartRequestBody:
latencyScore = CalculateUpscaleLatencyScore(took, outPixels)
}

var pricePerAIUnit float64
if priceInfo := payment.GetExpectedPrice(); priceInfo != nil && priceInfo.GetPixelsPerUnit() != 0 {
pricePerAIUnit = float64(priceInfo.GetPricePerUnit()) / float64(priceInfo.GetPixelsPerUnit())
}

monitor.AIJobProcessed(ctx, pipeline, modelID, monitor.AIJobInfo{LatencyScore: latencyScore, PricePerUnit: pricePerAIUnit}, sender.Hex())
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp)
Expand Down
Loading
Loading