Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1846] Fix the StreamHandler usage in fetching chunk when task attempt is odd #3079

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

onebox-li
Copy link
Contributor

What changes were proposed in this pull request?

The streams opened in the streamCreatorPool thread pool are all based on the primary locations. When the task attempt is odd, the task will start to fetch the chunk from the replica location first. This will cause using the wrong streamHandler to fetch data. To keep the logic simple, we always fetch from the primary location, and when change to peer, closing stream and use a null streamHandler when fetching peers.

Why are the changes needed?

Avoid tasks that are slowed down by NPE and potential data problems.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manual test.

@RexXiong
Copy link
Contributor

Nice catch...

if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && location.hasPeer()) {
location = location.getPeer();
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
Exception lastException = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, It would be better keep this, switch peers based on the attemptNumber, may avoid failure PartitionLocation previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a problem with the primary location, then it is likely that it has already been changed to the peer in the last task attempt but still failed. In this case, it is not so relevant which one to use to start fetching in the new task attempt.
If there is no problem with the primary location and the task is retried due to other problems, this situation is even less relevant.
So I think we could always fetch chunk by starting from the primary location. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a problem with the primary location, then it is likely that it has already been changed to the peer in the last task attempt but still failed. In this case, it is not so relevant which one to use to start fetching in the new task attempt. If there is no problem with the primary location and the task is retried due to other problems, this situation is even less relevant. So I think we could always fetch chunk by starting from the primary location. WDYT?

Sound reasonable. And if we change location to peer here would cause pbStreamHandler and location inconsistent when createReader,there may be issues in some shuffle scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

This would be a bit wasteful because most tasks need not change to replica location if the cluster is stable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

This would be a bit wasteful because most tasks need not change to replica location if the cluster is stable.

Sounds reasonable.

clientFactory.createClient(location.getHost(), location.getFetchPort());
TransportMessage bufferStreamEnd =
new TransportMessage(
MessageType.BUFFER_STREAM_END,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need send BUFFER STREAM END to replicate location?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what will happen if we send buffer stream end message to a celeborn worker which did not open the stream. Because we may reach here when location is excluded without open stream first.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants