Skip to content

Commit

Permalink
Merge pull request #1192 from zebrunner/develop
Browse files Browse the repository at this point in the history
[draft] 3.1.2
  • Loading branch information
dmtgrinevich authored Oct 11, 2024
2 parents 7988cfb + f8f2cd5 commit bfe2532
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 16 deletions.
2 changes: 1 addition & 1 deletion build/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
E3S_VERSION=3.1.1
E3S_VERSION=3.1.2
5 changes: 5 additions & 0 deletions cmd/scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func stopLostTasks(ctx context.Context, svc *ecs.ECS, wg *sync.WaitGroup) {
if time.Since(*task.StartedAt) <= config.Conf.ServiceStartupTimeout {
continue
}

if task.Group != nil && *task.Group == "service:linux-exporter" {
continue
}

taskId := strings.Split(*task.TaskArn, "/")[2]
l := log.WithField(config.TaskIdKey, taskId)
l.Warn("Unrecognized task detected! Aborting")
Expand Down
49 changes: 38 additions & 11 deletions environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,32 @@ func (env *ExecutionEnvironment) ContainerOverrides() []*ecs.ContainerOverride {
cpu := container.Cpu()
memory := container.Memory()
override := ecs.ContainerOverride{
Name: &container.Name,
Cpu: &cpu,
Memory: &memory,
Command: aws.StringSlice(container.Command),
Name: &container.Name,
Cpu: &cpu,
Memory: &memory,
}

if strings.ToLower(env.Capabilities.PlatformName.ToPrimitive()) != envtype.WINDOWS.String() {
override.MemoryReservation = &memory
}

env := []*ecs.KeyValuePair{}
for k, v := range container.Env {
// need to declare local variables to provide as pointer later
key := k
value := v
env = append(env, &ecs.KeyValuePair{Name: &key, Value: &value})
// Env vars and command var are passed on task definition register phase for generic env due to:
// Task definition ovveride max symbols num constraint:
// InvalidParameterException: Container Overrides length must be at most 8192.
// When task definition register max symbols constraint is much higher:
// ClientException: Actual length: '117374'. Max allowed length is '65536' bytes.
// It is also possible because we always register new generic task definition before it starts
if env.Type != envtype.GENERIC {
override.Command = aws.StringSlice(container.Command)
env := []*ecs.KeyValuePair{}
for k, v := range container.Env {
// need to declare local variables to provide as pointer later
key := k
value := v
env = append(env, &ecs.KeyValuePair{Name: &key, Value: &value})
}
override.Environment = env
}
override.Environment = env

overrides = append(overrides, &override)
}
Expand Down Expand Up @@ -182,6 +190,25 @@ func (env *ExecutionEnvironment) ContainerDefinitions() []*ecs.ContainerDefiniti
}
containerDefinition.PortMappings = portMappings

// Env vars and command var are passed on task definition register phase for generic env due to:
// Task definition ovveride max symbols num constraint:
// InvalidParameterException: Container Overrides length must be at most 8192.
// When task definition register max symbols constraint is much higher:
// ClientException: Actual length: '117374'. Max allowed length is '65536' bytes.
// It is also possible because we always register new generic task definition before it starts
if env.Type == envtype.GENERIC {
env := []*ecs.KeyValuePair{}
for k, v := range c.Env {
// need to declare local variables to provide as pointer later
key := k
value := v
env = append(env, &ecs.KeyValuePair{Name: &key, Value: &value})
}

containerDefinition.Environment = env
containerDefinition.Command = aws.StringSlice(c.Command)
}

definitions = append(definitions, &containerDefinition)
}

Expand Down
1 change: 1 addition & 0 deletions starter/taskRegistrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func registerTask(ctx context.Context, env *environment.ExecutionEnvironment, ro
},
CapacityProviderStrategy: []*ecs.CapacityProviderStrategyItem{{CapacityProvider: &env.CapacityProvider}},
}

l.WithField("runTaskInput", runTaskInput).Trace("Res runTaskInput")

// TODO: explicitly minimize errors range to wait only by well-known reasons aka RESOURCE:CPU etc
Expand Down
22 changes: 22 additions & 0 deletions utils/retries.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"net/http"
"reflect"
"runtime"
"strings"
Expand Down Expand Up @@ -41,3 +42,24 @@ func RetryThrottling[T, R interface{}](executeFunc func(T) (R, error)) func(T) (
return result, err
}
}

type Sender interface {
Do(*http.Request) (*http.Response, error)
}

func RetryOnSendFailure(sendFn Sender, retryCount int, retryDelay time.Duration) func(*http.Request) (*http.Response, error) {
return func(req *http.Request) (*http.Response, error) {
var err error
var res *http.Response
for i := 0; i < retryCount; i++ {
res, err = sendFn.Do(req)
if err == nil {
break
}

time.Sleep(retryDelay)
}

return res, err
}
}
10 changes: 6 additions & 4 deletions zebrunner/zebrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

"github.com/zebrunner/esg/cachemaps/mapper"
"github.com/zebrunner/esg/config"
"github.com/zebrunner/esg/utils"
)

const (
USAGE_API_PATH = "/api/engine-utilization/v1/engine-usages"
ABORT_API_PATH = "/api/reporting/v1/launches/uuid"
USAGE_API_PATH = "/api/credits/v1/engine-usages"
ABORT_API_PATH = "/api/reporting/v1/launches/uuid"
// TODO: delete as only all on-prem tenants will be upgraded
OLD_ABORT_API_PATH = "/api/reporting/api/project-test-runs/abort"
RETRY_COUNT = 3
)

func TrackResourcesUsage(cachedTask *mapper.Mapper, task *ecs.Task) {
Expand Down Expand Up @@ -139,7 +141,7 @@ func TrackResourcesUsage(cachedTask *mapper.Mapper, task *ecs.Task) {
req.Header.Add("Content-Type", "application/json")
l.Trace("req: ", req)

resp, err := http.DefaultClient.Do(req)
resp, err := utils.RetryOnSendFailure(http.DefaultClient, RETRY_COUNT, time.Millisecond*250)(req)
if err != nil {
l.WithError(err).Error("Failed to send request")
return
Expand Down Expand Up @@ -208,7 +210,7 @@ func AbortLaunch(routerUUID, workspace, launchUUID, reason string) {

l.Trace("req: ", req)

resp, err := http.DefaultClient.Do(req)
resp, err := utils.RetryOnSendFailure(http.DefaultClient, RETRY_COUNT, time.Millisecond*250)(req)
if err != nil {
l.WithError(err).Error("Failed to send request")
return
Expand Down

0 comments on commit bfe2532

Please sign in to comment.