Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller][flyteadmin] Streaming Decks V2 #6053

Merged
merged 34 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54aa165
add tests from Yi-Cheng
Future-Outlier Nov 27, 2024
9ed6b6e
helped by Kevin and Yi-Cheng
Future-Outlier Nov 27, 2024
4b4f6bd
lint
Future-Outlier Nov 27, 2024
dd774cb
nit
Future-Outlier Nov 28, 2024
0bb8e91
add comments
Future-Outlier Dec 13, 2024
25fea29
add comments and better solution for backward compativle
Future-Outlier Dec 17, 2024
4e24e91
better comments
Future-Outlier Dec 17, 2024
8d1d0e4
DeckStatus
Future-Outlier Dec 18, 2024
31853bb
rename GetDeckStatus
Future-Outlier Dec 18, 2024
4068043
comments
Future-Outlier Dec 18, 2024
65b6efe
lint
Future-Outlier Jan 2, 2025
137579f
fix
Future-Outlier Jan 9, 2025
04f7fbc
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 9, 2025
aa56d64
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 13, 2025
a16851f
use BoolValue as IDL, suggested by Eduardo
Future-Outlier Jan 13, 2025
7314455
change commennts
Future-Outlier Jan 13, 2025
19498f5
update
Future-Outlier Jan 13, 2025
74f595f
fix
Future-Outlier Jan 13, 2025
3bd3336
fix
Future-Outlier Jan 14, 2025
f6d8493
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 14, 2025
4b56e52
fix
Future-Outlier Jan 14, 2025
db4b19e
remove unused ogic
Future-Outlier Jan 14, 2025
2737251
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
564dc5f
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
69ba94e
Merge remote-tracking branch 'origin' into streaming-deck-v2
eapolinario Jan 16, 2025
c992eae
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 17, 2025
0b91b5c
Update by Kevin's advice
Future-Outlier Jan 17, 2025
1d18265
update
Future-Outlier Jan 17, 2025
96500c1
update
Future-Outlier Jan 22, 2025
dd9dbaa
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 23, 2025
f51ff8c
RemoveDeckURIIfDeckNotExists
Future-Outlier Jan 23, 2025
bd5e682
update
Future-Outlier Jan 23, 2025
561a43c
nit suggestion by Eduardo
Future-Outlier Jan 24, 2025
a33ba09
update
Future-Outlier Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.GetEvent().GetDeckUri()
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"
eapolinario marked this conversation as resolved.
Show resolved Hide resolved

var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt()))
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down
80 changes: 61 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,43 @@
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

p.execInfo.OutputInfo.DeckURI = deckURI
}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

Check warning on line 92 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L90-L92

Added lines #L90 - L92 were not covered by tests

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 98 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L96-L98

Added lines #L96 - L98 were not covered by tests

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {

Check warning on line 107 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L107

Added line #L107 was not covered by tests
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})

Check warning on line 110 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L110

Added line #L110 was not covered by tests
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +177,13 @@
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}

Check warning on line 184 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L182-L184

Added lines #L182 - L184 were not covered by tests
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,7 +207,7 @@
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -464,8 +500,19 @@
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(ctx, tCtx)

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do a head call on the deck URI for every task that succeeds? Two thoughts here:
(1) does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
(2) this is incurring a 20-30ms performance degredation to every task execution

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will take a look tmr, thank you!!!

Copy link
Member Author

@Future-Outlier Future-Outlier Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do a head call on the deck URI for every task that succeeds?

yes it will do a head call by RemoteFileOutputReader

func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) {
md, err := r.store.Head(ctx, r.outPath.GetDeckPath())
if err != nil {
return false, err
}
return md.Exists(), nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you know the performance degradation?
did you use grafana or other performance tools?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?

flyteadmin will set the deckURI in the execution metadata to nil if the propeller removes it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to just to close the loop on this, we're still making this HEAD call only when the node reaches the terminal phase.

if err != nil {
return pluginTrns, err
}

Check warning on line 515 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L514-L515

Added lines #L514 - L515 were not covered by tests
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand Down Expand Up @@ -501,25 +548,20 @@
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final
// phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}

Check warning on line 564 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L559-L564

Added lines #L559 - L564 were not covered by tests
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
Expand Down
Loading