-
Notifications
You must be signed in to change notification settings - Fork 683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flytepropeller][flyteadmin] Streaming Decks V2 #6053
Conversation
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Yi Cheng <[email protected]> Co-authored-by: pingsutw <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6053 +/- ##
=======================================
Coverage 37.06% 37.07%
=======================================
Files 1318 1318
Lines 132644 132707 +63
=======================================
+ Hits 49167 49202 +35
- Misses 79228 79250 +22
- Partials 4249 4255 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future-Outlier <[email protected]>
switch pluginTrns.pInfo.Phase() { | ||
case pluginCore.PhaseSuccess: | ||
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). | ||
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head
call on the deck URI for every task that succeeds? Two thoughts here:
(1) does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
(2) this is incurring a 20-30ms performance degredation to every task execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will take a look tmr, thank you!!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a head call on the deck URI for every task that succeeds?
yes it will do a head
call by RemoteFileOutputReader
flyte/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go
Lines 306 to 313 in b3330ba
func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { | |
md, err := r.store.Head(ctx, r.outPath.GetDeckPath()) | |
if err != nil { | |
return false, err | |
} | |
return md.Exists(), nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know the performance degradation?
did you use grafana or other performance tools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the flyteadmin merge algorithm then remove the deckURI from the execution metadata?
flyteadmin will set the deckURI
in the execution metadata
to nil
if the propeller removes it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hamersaw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to just to close the loop on this, we're still making this HEAD call only when the node reaches the terminal phase.
How to test it?
flytectl demo start --image futureoutlier/sandbox:deck-1205-1138 --force cd flytekit
gh pr checkout 2779
from flytekit import ImageSpec, task, workflow
from flytekit.deck import Deck
flytekit_hash = "473ae1119af6f86c26c0790dee0affa3eb29be64"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
# Define custom image for the task
custom_image = ImageSpec(packages=[flytekit],
apt_packages=["git"],
registry="localhost:30000",
env={"FLYTE_SDK_LOGGING_LEVEL": 10},
)
@task(enable_deck=True, container_image=custom_image)
def t_deck():
import time
"""
1st deck only show timeline deck
2nd will show
"""
for i in range(5):
Deck.publish()
# # raise Exception("This is an exception")
time.sleep(3)
@workflow
def wf():
t_deck()
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
import os
runner = CliRunner()
path = os.path.realpath(__file__)
# result = runner.invoke(pyflyte.main,
# ["run", path, "wf"])
# print("Local Execution: ", result.output)
result = runner.invoke(pyflyte.main,
["run", "--remote", path,"wf"])
# "--remote"
print("Remote Execution: ", result.output) |
Mind adding screenshots for the rendered deck and refresh to the PR description? |
Yes no problem |
its provided! |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #f3ef5eActionable Suggestions - 1
Additional Suggestions - 1
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
// - We relied on a HEAD request to check if the deck file exists, then added the URI to the event. | ||
// | ||
// After (new behavior): | ||
// - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is no longer correct right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes super nice catch
@@ -380,6 +430,27 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics | |||
return t.taskMetricsMap[metricNameKey], nil | |||
} | |||
|
|||
func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { | |||
// FLYTE_ENABLE_DECK is used when flytekit > 1.14.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this comment
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run Status
|
// GetDeckStatus determines whether a task generates a deck based on its execution context. | ||
// | ||
// This function ensures backward compatibility with older Flytekit versions using the following logic: | ||
// 1. For Flytekit > 1.14.3, the task template's metadata includes the `generates_deck` flag: | ||
// - If `generates_deck` is set to true, it indicates that the task generates a deck, and DeckEnabled is returned. | ||
// 2. If `generates_deck` is set to false or is not set (likely from older Flytekit versions): | ||
// - DeckUnknown is returned as a placeholder status. | ||
// - In terminal states, a HEAD request can be made to check if the deck file exists. | ||
// | ||
// In future implementations, a `DeckDisabled` status could be introduced for better performance optimization: | ||
// - This would eliminate the need for a HEAD request in the final phase. | ||
// - However, the tradeoff is that a new field would need to be added to FlyteIDL to support this behavior. | ||
|
||
template, err := tCtx.tr.Read(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better comments!
cc @wild-endeavor
Streaming Decks #!/usr/bin/env bash
set -ex
ARCH="$(uname -m)"
case ${ARCH} in
x86_64|amd64)
IMAGE_ARCH=amd64
;;
aarch64|arm64)
IMAGE_ARCH=arm64
;;
*)
>&2 echo "ERROR: Unsupported architecture: ${ARCH}"
exit 1
;;
esac
FLYTECONSOLE_IMAGE="localhost:30000/flyteconsole:1216-2134"
IMAGE_DIGEST="$(docker manifest inspect --verbose localhost:30000/flyteconsole:1216-2134 | \
jq --arg IMAGE_ARCH "${IMAGE_ARCH}" --raw-output \
'.[] | select(.Descriptor.platform.architecture == $IMAGE_ARCH) | .Descriptor.digest')"
# Short circuit if we already have the correct distribution
[ -f cmd/single/dist/.digest ] && grep -Fxq ${IMAGE_DIGEST} cmd/single/dist/.digest && exit 0
# Create container from desired image
CONTAINER_ID=$(docker create localhost:30000/flyteconsole:1216-2134)
trap 'docker rm -f ${CONTAINER_ID}' EXIT
# Copy distribution
rm -rf cmd/single/dist
docker cp ${CONTAINER_ID}:/app cmd/single/dist
printf '%q' ${IMAGE_DIGEST} > cmd/single/dist/.digest
|
04f7fbc
to
1204c0e
Compare
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #daa4f1Actionable Suggestions - 0Review Details
|
Hi, @eapolinario @pingsutw @wild-endeavor Now all of these cases will be considered.
|
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #dc455eActionable Suggestions - 5
Review Details
|
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | ||
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | ||
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider checking for errors from RemoveDeckURIIfDeckNotExists
before proceeding with the task completion. The current implementation only logs the error but continues execution which could lead to inconsistent state.
Code suggestion
Check the AI-generated fix before applying
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | |
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | |
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | |
} | |
} | |
}() | |
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | |
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | |
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | |
// Return error to allow proper handling at higher levels | |
return pluginTrns, err | |
} | |
} | |
} | |
}() |
Code Review Run #dc455e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
defer func() { | ||
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | ||
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | ||
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the deck URI cleanup logic to a separate function for better code organization and reusability. The deferred function could be simplified by extracting the logic into a named function.
Code suggestion
Check the AI-generated fix before applying
defer func() { | |
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | |
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | |
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | |
} | |
} | |
}() | |
defer cleanupDeckURI(ctx, tCtx, deckStatus, pluginTrns) | |
func cleanupDeckURI(ctx context.Context, tCtx *taskExecutionContext, deckStatus DeckStatus, pluginTrns *pluginRequestedTransition) { | |
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() { | |
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil { | |
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err) | |
} | |
} | |
} |
Code Review Run #dc455e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
exists, err := reader.DeckExists(ctx) | ||
if err != nil { | ||
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) | ||
p.execInfo.OutputInfo.DeckURI = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider initializing OutputInfo
if it's nil
before attempting to set DeckURI
to avoid potential nil pointer dereference in RemoveDeckURIIfDeckNotExists
Code suggestion
Check the AI-generated fix before applying
@@ -102,2 +102,5 @@
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
+ if p.execInfo.OutputInfo == nil {
+ p.execInfo.OutputInfo = &handler.OutputInfo{}
+ }
p.execInfo.OutputInfo.DeckURI = nil
Code Review Run #dc455e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Future-Outlier , does this suggestion make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: Future-Outlier <[email protected]>
4c6e488
to
96500c1
Compare
Code Review Agent Run #b10317Actionable Suggestions - 0Review Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, just left one minor question, but won't block the change on it.
exists, err := reader.DeckExists(ctx) | ||
if err != nil { | ||
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) | ||
p.execInfo.OutputInfo.DeckURI = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Future-Outlier , does this suggestion make sense?
switch pluginTrns.pInfo.Phase() { | ||
case pluginCore.PhaseSuccess: | ||
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). | ||
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to just to close the loop on this, we're still making this HEAD call only when the node reaches the terminal phase.
yes I should remove this line, thank you! |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run #c2d4d4Actionable Suggestions - 0Review Details
|
if p.execInfo.OutputInfo != nil { | ||
p.execInfo.OutputInfo.DeckURI = nil | ||
} | ||
p.execInfo.OutputInfo.DeckURI = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is superfluous since in the case of OutputInfo
being nil
by definition DeckUri
is going to be nil.
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Code Review Agent Run Status
|
Tracking issue
#5574
Why are the changes needed?
To enhance user visibility into Flyte Decks at different stages of workflow execution (running, failing, and succeeding), enabling better debugging and analysis.
Summary
What changes were proposed in this pull request?
2025/01/14 update
BoolValue
, it can return 3 values, which can represent 3 statusnil -> DeckUnknown
true -> DeckEnabled
false -> DeckDisabled
Concept:
NodeExecutionEvent
, and send it to admin.flyte/flytepropeller/pkg/controller/nodes/executor.go
Lines 1251 to 1261 in b3330ba
Life Cycle:
use new flytekit > 1.14.0
summary:
HEAD
request to be called. (save resource)details:
DeckURI
when the task is running ifFLYTE_ENABLE_DECK=true
in the task template.DeckURI
to node info, and turn it toNodeExecutionEvent
to flyte admin.DeckURI
toClosure
flyte/flyteadmin/dataproxy/service.go
Lines 175 to 189 in b3330ba
node Closure
, it will not show theFlyte Deck
button.old flytekit <= 1.14.0
summary:
details:
HEAD
request to know if the Deck URI exists or not.if exist, then put it to the node info.
How was this patch tested?
python code:
Setup process
single binary.
flyte: this branch
flytekit: flyteorg/flytekit#2779
flyteconsole: flyteorg/flyteconsole#890
Screenshots
flytekit branch:
flyteorg/flytekit#2779
NEW FLYTEKIT, NO DECK, RUNNING With Deck, SUCCEED, and FAILED
OSS-STREAMING-DECK-small.mov
OLD FLYTEKIT, NO DECK, RUNNING With Deck, SUCCEED, and FAILED
OSS-STREAMING-DECK-OLD-FLYTEKIT-small.mov
Check all the applicable boxes
Related PRs
follow up questions
Abort
phase for the streaming deck?should we support
EPhaseAbort
in this file?https://github.com/flyteorg/flyte/blob/b3330ba4430538f91ae9fc7d868a29a2e96db8bd/flytepropeller/pkg/controller/nodes/handler/transition_info.go
Summary by Bito
This PR enhances Flyte's functionality through multiple improvements: (1) Decks V2 streaming with tri-state support and refined protocol buffer definitions, including improved deck URI handling and terminal state management, (2) Service discovery updates with k8s:// scheme implementation, (3) Security configurations with shared secrets, and (4) Enhanced type system handling including dataclass metadata improvements and OutputInfo safety measures. Changes span TypeScript, Go, JavaScript, Python, and Rust codebases.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5