-
Notifications
You must be signed in to change notification settings - Fork 11.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[indexer-alt] Fix first_checkpoint option #20079
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
3 Skipped Deployments
|
3af3e24
to
3dc9d24
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lxfind, I had had a slightly different plan in mind for addressing this issue, which should touch the pipeline logic less, let me know what you think:
Concurent Pipelines
For concurrent pipelines, when we add the pipeline, we should check for a gap between an optional first_checkpoint
and the loaded watermark at that point.
- This allows us to quote the specific pipeline that pulled the watermark lower than
first_checkpoint
. - We should also honour the
--skip-watermark
flag in these cases. I.e. it shouldn't matter to a concurrent pipeline that there is a potential gap, if--skip-watermark
has been provided, and we should mention that in the error message as a potential way to make progress.
Sequential Pipelines
For sequential pipelines, it's more tricky. Today sequential pipelines:
- Guarantee that each update only runs once on the table, because the updates themselves are not idempotent.
- Guarantee that the watermark is only updated if all the updates have been applied up to and including that watermark, since genesis.
- Require that there are no gaps in ingestion/processing -- this requirement is implicit, the pipeline will stall and eventually it will start producing warnings, but there are ways we could have detected this issue statically.
To me, the key thing is that this pipeline deals with non-idempotent updates. For pipelines related to objects (which are all the sequential pipelines we have today) it's also important that we track all updates since genesis, but I'm going to argue that this is secondary (you could reasonably have sequential pipelines that don't care about starting from genesis -- this is even true for the object pipelines if we start from a formal snapshot).
IIUC, the current solution enforces the "no gaps" property, but it weakens the other two properties:
- Updates could be run multiple times, because the pipeline was made to restart at some earlier checkpoint. For certain updates (let's say the pipeline's update is applying a delta of some kind) this will just corrupt the data straight away, for other updates it is not so bad (I think all the updates we want to do today are in this second category), but it still...
- ...means you could read a watermark but the data in the table is not an accurate reflection of the data at that watermark, because the pipeline is replaying some updates.
Instead, we should keep the property that each update will run exactly once, and that the watermark is moved atomically, consistently and durably with the associated updates and weaken the property that we must start from genesis.
We can do this by allowing next_checkpoint
to take the value of a non-zero first_checkpoint
in case there is no watermark, but complaining early (in a similar fashion to the solution for concurrent pipelines, which we can check when we add the pipeline) if we detect that there would be a gap introduced.
It would also be useful to track where a pipeline started from (so that we could know if the objects pipeline was not started from genesis). I think we can address that by using the reader_lo
field in the watermarks
table to track that -- if we are starting a pipeline, and we have been given a first checkpoint and there is no initial watermark, we can set reader_lo
to first_checkpoint
before we start.
crates/sui-indexer-alt/src/lib.rs
Outdated
if first_checkpoint > self.first_checkpoint_from_watermark { | ||
return Err(anyhow::anyhow!( | ||
"First checkpoint {} is larger than the expected first checkpoint from watermark {}.\ | ||
This will create gaps in the data.", | ||
first_checkpoint, | ||
self.first_checkpoint_from_watermark | ||
)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, this is an ensure!
call in disguise:
if first_checkpoint > self.first_checkpoint_from_watermark { | |
return Err(anyhow::anyhow!( | |
"First checkpoint {} is larger than the expected first checkpoint from watermark {}.\ | |
This will create gaps in the data.", | |
first_checkpoint, | |
self.first_checkpoint_from_watermark | |
)); | |
} | |
ensure!( | |
first_checkpoint > self.first_checkpoint_from_watermark, | |
"First checkpoint {first_checkpoint} is larger than the expected first checkpoint from watermark {}. \ | |
This will create gaps in the data", | |
self.first_checkpoint_from_watermark | |
); |
If we ever do start from a formal snapshot, we should also update the watermark when we do so, such that the watermark is at the right place. So I don't quite see a valid reasons that we would ever want a gap here? Also for the concurrent pipeline, in what scenario would you want to allow the pipeline to run with a gap but skipping the watermark update? |
You don't ever want a gap in the sense of "you processed rows for checkpoints up to
|
In that case, we should always put an initial watermark when we initialize the index, right?
Well in that case there should be no gap at all because we would be updating older data entries.
I don't see how to pull this off. So you start the new indexer at some future checkpoint with --skip-watermark, then? |
Yes, that ends up happening as part of my suggested change, in an off-by-one fashion. Inside the sequential pipeline,
I don't think I understood your question fully earlier -- if the question is why you would want to leave a gap in checkpoints processed with a concurrent pipeline, I agree with you, there isn't a good reason to do that, except at the front (between genesis and some starting checkpoint). Nevertheless, the committing logic should not prevent us from doing that because we may know we are not introducing a gap even if the pipeline does not, and the examples I gave were primarily related to that: Situations where we needed to run the pipeline without it being able to refer to or update the watermark, but we wanted to run it anyway on that range of checkpoints.
The scenario I had in mind is one where we introduce a change to an existing table (e.g. new column). To backfill the new column, we would need to run the new indexer from some past initial checkpoint to some recent checkpoint without updating the watermark, while the old indexer is mainly running the show. Once we see that the backfill instance of the indexer has caught up, we could hand over to the new indexer in production as well -- i.e. it could control the watermark. |
3dc9d24
to
ad32ded
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great, thanks @lxfind. As it is, this change doesn't include the part where the pipelines initialise their next_checkpoint
to first_checkpoint
(instead of 0
) when the watermark does not exist, is that to come in a follow-up? (That would be fine, I think this PR is good as it is).
crates/sui-indexer-alt/src/lib.rs
Outdated
if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) { | ||
ensure!( | ||
first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1, | ||
"For pipeline {}, first checkpoint override {} is too far ahead than watermark {}. This could create gaps in the data.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"For pipeline {}, first checkpoint override {} is too far ahead than watermark {}. This could create gaps in the data.", | |
"For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. This could create gaps in the data.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added TODO. Will follow up in a separate PR.
217f678
to
ff0a8f9
Compare
Description
The "first_checkpoint" option is a bit broken.
If first_checkpoint is larger than watermark + 1, for sequential pipeline this will simply lead to loss of liveness because it always expects watermark + 1 as the next checkpoint; for concurrent pipeline it will never be able to update watermark for a similar reason.
This PR adds a check when registering the pipeline to simply not allow that to happen.
If first_checkpoint is smaller than watermark + 1, the intention must be that we want to be able to backfill.
However the sequential pipeline will ignore any data that is below the watermark.
This PR fixes that by still allowing us to commit data even when they are below watermark.
Test plan
CI.
Probably need to add tests too.
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.