Skip to content

Commit

Permalink
fix: add AIModels currency config field and fix pixelsPerUnit handling
Browse files Browse the repository at this point in the history
This commit adds a new `currency` field to the `AIModelConfig` to specify the
currency. Additionally, it improves the AI startup code in `starter.go` to
correctly handle parsing of this currency while ensuring compatibility with
'offchain' mode. Further improvements to the AI startup code are deferred to
avoid conflicts with existing pull requests.
  • Loading branch information
rickstaa committed Aug 2, 2024
1 parent 8036856 commit 4d54872
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 49 deletions.
96 changes: 64 additions & 32 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,16 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

// Get base pixels per unit.
pixelsPerUnitBase, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnitBase.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))
}
if !ok || pixelsPerUnitBase.Sign() <= 0 {
// Can't divide by 0
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))
}

if *cfg.AIModels != "" {
configs, err := core.ParseAIModelConfigs(*cfg.AIModels)
if err != nil {
Expand All @@ -1105,30 +1115,28 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
for _, config := range configs {
modelConstraint := &core.ModelConstraint{Warm: config.Warm}

// Set price per unit base info.
pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))
}
if pixelsPerUnit.Sign() <= 0 {
// Can't divide by 0
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))
}
pricePerUnit, currency, err := parsePricePerUnit(config.PricePerUnit.String())
if err != nil {
panic(fmt.Errorf("'pricePerUnit' value specified for model '%v' in pipeline '%v' must be a valid integer with an optional currency, provided %v", config.ModelID, config.Pipeline, config.PricePerUnit))
} else if pricePerUnit.Sign() < 0 {
panic(fmt.Errorf("'pricePerUnit' value specified for model '%v' in pipeline '%v' must be >= 0, provided %v", config.ModelID, config.Pipeline, config.PricePerUnit))
}
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)
var autoPrice *core.AutoConvertedPrice
if *cfg.Network == "offchain" {
autoPrice = core.NewFixedPrice(pricePerPixel)
} else {
autoPrice, err = core.NewAutoConvertedPrice(currency, pricePerPixel, nil)
}
if err != nil {
panic(fmt.Errorf("error converting price: %v", err))
if *cfg.Network != "offchain" {
pixelsPerUnit := config.PixelsPerUnit.Rat
if config.PixelsPerUnit.Rat == nil {
pixelsPerUnit = pixelsPerUnitBase
} else {
if !pixelsPerUnit.IsInt() || pixelsPerUnit.Sign() <= 0 {
panic(fmt.Errorf("'pixelsPerUnit' value specified for model '%v' in pipeline '%v' must be a valid positive integer, provided %v", config.ModelID, config.Pipeline, config.PixelsPerUnit))
}
}
pricePerUnit := config.PricePerUnit.Rat
if err != nil {
panic(fmt.Errorf("'pricePerUnit' value specified for model '%v' in pipeline '%v' must be a valid integer with an optional currency, provided %v", config.ModelID, config.Pipeline, config.PricePerUnit))
} else if pricePerUnit.Sign() < 0 {
panic(fmt.Errorf("'pricePerUnit' value specified for model '%v' in pipeline '%v' must be >= 0, provided %v", config.ModelID, config.Pipeline, config.PricePerUnit))
}
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)

autoPrice, err = core.NewAutoConvertedPrice(config.Currency, pricePerPixel, nil)
if err != nil {
panic(fmt.Errorf("error converting price: %v", err))
}
}

// If the config contains a URL we call Warm() anyway because AIWorker will just register
Expand Down Expand Up @@ -1158,7 +1166,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

capabilityConstraints[core.Capability_TextToImage].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_TextToImage, config.ModelID, autoPrice)
if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_TextToImage, config.ModelID, autoPrice)
}
case "image-to-image":
_, ok := capabilityConstraints[core.Capability_ImageToImage]
if !ok {
Expand All @@ -1170,7 +1180,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

capabilityConstraints[core.Capability_ImageToImage].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_ImageToImage, config.ModelID, autoPrice)
if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_ImageToImage, config.ModelID, autoPrice)
}
case "image-to-video":
_, ok := capabilityConstraints[core.Capability_ImageToVideo]
if !ok {
Expand All @@ -1182,7 +1194,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

capabilityConstraints[core.Capability_ImageToVideo].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_ImageToVideo, config.ModelID, autoPrice)
if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_ImageToVideo, config.ModelID, autoPrice)
}
case "upscale":
_, ok := capabilityConstraints[core.Capability_Upscale]
if !ok {
Expand All @@ -1194,7 +1208,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

capabilityConstraints[core.Capability_Upscale].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_Upscale, config.ModelID, autoPrice)
if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_Upscale, config.ModelID, autoPrice)
}
case "audio-to-text":
_, ok := capabilityConstraints[core.Capability_AudioToText]
if !ok {
Expand All @@ -1206,14 +1222,20 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

capabilityConstraints[core.Capability_AudioToText].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_AudioToText, config.ModelID, autoPrice)
if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_AudioToText, config.ModelID, autoPrice)
}
}

if len(aiCaps) > 0 {
capability := aiCaps[len(aiCaps)-1]
price := n.GetBasePriceForCap("default", capability, config.ModelID)
pricePerUnit := price.Num().Int64() / price.Denom().Int64()
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d wei per compute unit", config.Pipeline, capability, config.ModelID, pricePerUnit)
if *cfg.Network != "offchain" {
pricePerUnit := price.Num().Int64() / price.Denom().Int64()
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s at price %d wei per compute unit", config.Pipeline, capability, config.ModelID, pricePerUnit)
} else {
glog.V(6).Infof("Capability %s (ID: %v) advertised with model constraint %s", config.Pipeline, capability, config.ModelID)
}
}
}
} else {
Expand Down Expand Up @@ -1735,11 +1757,21 @@ func getGatewayPrices(gatewayPrices string) []GatewayPrice {

prices := make([]GatewayPrice, len(allGateways))
for i, p := range allGateways {
pixelsPerUnit, ok := new(big.Rat).SetString(string(p.PixelsPerUnit))

Check failure on line 1760 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-arm64

cannot convert p.PixelsPerUnit (variable of type "github.com/livepeer/go-livepeer/core".JSONRat) to type string
if !ok {
glog.Errorf("Pixels per unit could not be parsed for gateway %v. must be a valid number, provided %s", p.EthAddress, p.PixelsPerUnit)
continue
}
pricePerUnit, ok := new(big.Rat).SetString(string(p.PricePerUnit))

Check failure on line 1765 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-arm64

cannot convert p.PricePerUnit (variable of type "github.com/livepeer/go-livepeer/core".JSONRat) to type string
if !ok {
glog.Errorf("Price per unit could not be parsed for gateway %v. must be a valid number, provided %s", p.EthAddress, p.PricePerUnit)
continue
}
prices[i] = GatewayPrice{
EthAddress: p.EthAddress,
Currency: p.Currency,
PricePerUnit: p.PricePerUnit.Rat,
PixelsPerUnit: p.PixelsPerUnit.Rat,
PricePerUnit: pricePerUnit,
PixelsPerUnit: pixelsPerUnit,
}
}

Expand Down
18 changes: 1 addition & 17 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,10 @@ type AIModelConfig struct {
Warm bool `json:"warm,omitempty"`
PricePerUnit JSONRat `json:"price_per_unit,omitempty"`
PixelsPerUnit JSONRat `json:"pixels_per_unit,omitempty"`
Currency string `json:"currency,omitempty"`
OptimizationFlags worker.OptimizationFlags `json:"optimization_flags,omitempty"`
}

func (config *AIModelConfig) UnmarshalJSON(data []byte) error {
// Custom type to avoid recursive calls to UnmarshalJSON
type AIModelConfigAlias AIModelConfig
// Set default values for fields
defaultConfig := &AIModelConfigAlias{
PixelsPerUnit: JSONRat{new(big.Rat).SetInt64(1)},
}

if err := json.Unmarshal(data, defaultConfig); err != nil {
return err
}

*config = AIModelConfig(*defaultConfig)

return nil
}

func ParseAIModelConfigs(config string) ([]AIModelConfig, error) {
var configs []AIModelConfig

Expand Down

0 comments on commit 4d54872

Please sign in to comment.