Binding Behavior Updates #1219
Replies: 8 comments 8 replies
-
Thinking through some of the implications of this:
|
Beta Was this translation helpful? Give feedback.
-
I like the idea of adding an explicit |
Beta Was this translation helpful? Give feedback.
-
Meta-comment: while we discuss the proposal in the comments, would you please edit the proposal with updates and outcomes so that it stays ever-green?
A few observations:
|
Beta Was this translation helpful? Give feedback.
-
Given the addition of |
Beta Was this translation helpful? Give feedback.
-
I have updated the original write-up per our discussion here and other VC discussions:
There's a couple of areas IMO that need further definition and discussion:
|
Beta Was this translation helpful? Give feedback.
-
A couple of additional updates, based on further discussion:
|
Beta Was this translation helpful? Give feedback.
-
A few more updates per separate discussions:
|
Beta Was this translation helpful? Give feedback.
-
High-level issue for tracking the work discussed here: #1276 |
Beta Was this translation helpful? Give feedback.
-
Tracking issue for implementation: #1276
At a high level, these changes support situations where a way to easily "reset" a binding for a capture or materialization is needed. An example for captures is if a captured table has a table alteration that changes the types & values of a column, and that table needs to be re-backfilled in order to capture the altered values. For materializations, an example is if a field type changes in an incompatible way due to widening from schema inference, ex.
{ type: integer }
->{ type: [integer, object] }
, requiring a new column type & re-materialization of the current table for the materialization of that collection.The new capabilities the changes described here will enable are:
A materialization can currently re-materialize a collection from the beginning into a new destination table, and that capability will remain unchanged.
Note: The word "table" is commonly used when describing the source of data for a capture binding and the destination for data of a materialization binding, but these changes are not strictly limited to table-based systems. For example, this would also apply to an ElasticSearch materialization that uses an "index" instead of a table, or a Kafka capture that reads from "topics".
There will be additional scope and design needed to expose these capabilities to the user for ex. via the UI that is not discussed in detail here.
Current Behavior
It is technically possible today to reset a capture binding for ~most captures by removing the binding from the spec, publishing, re-adding the binding, and publishing again. Each connector must implement logic to identify bindings that have been removed by comparing the spec sent to the connector with a previously persisted checkpoint, and then remove the state for the removed binding. Not all captures implement this feature and there is not strict consistency in precisely how it is implemented. It is not implemented by any materialization connectors, and there isn't a straightforward way for it to be implemented for materializations.
Some relevant details of how connectors manage state:
<table> -> <state>
, where<table>
is some unique identifier for the source within the captured system that corresponds to theResourcePath
, for examplenamespace.tablename
, or maybe justtablename
. Captures have full control over their state persisted in these checkpoints.Proposed Changes
Capture Checkpoints
These changes are largely motivated to make resetting a binding easier (click a single button to reset), and to enforce consistency in how it works across all captures. Since it is technically possible for a capture connector to reset a binding by examining its previous driver checkpoint and the provided spec, these aren't absolutely necessary. But it is difficult to require every capture connector to always implement this in a consistent way.
Change 1: Re-starting a capture binding should be possible by a single publication of an updated spec. There must be a way to convey the desire for the connector to forget everything it knew about the binding previously and start from scratch on a go-forward basis. A distinct
backfillVersion
property of the binding's specification that is incorporated into the key for the binding in the capture's state could accomplish this: To re-capture the binding, change the binding'sbackfillVersion
and publish the spec. ThebackfillVersion
will be a monotonically increasing integer value.Change 2: The Flow runtime should prune state checkpoints so that each capture connector doesn't have to. This will ensure consistency across all capture connectors including those implementing the native Flow protocol written in Go or some other language, as well as connectors built to other protocols through an appropriate translation layer (ATF, etc... maybe). For this to work, there needs to be a common structure to a capture checkpoint that can be used by the runtime and capture connector.
A binding key is the proposed mechanism for this: It will be formed by joining the resource path of the binding with the
backfillVersion
of the binding to form a unique identifier for the binding, so changing thebackfillVersion
will force a fresh start of the binding. As a sketch, the checkpoint might a serialized JSON object that includes a key ofbindingStateV1
, which is an object itself having keys like<resourcePath>/<backfillVersion>
. As part of sending theOpen
message to the connector, the runtime could then inspect the last committed checkpoint and remove any keys inbindingStateV1
that aren't part of the current specification. The checkpoint could contain other keys/values as the connector requires, but thebindingStateV1
property is specifically for automatically pruned binding states.The binding key will be added to the connector protocol and computed by the runtime. It will be a string to allow for flexibility in how it is provided.
A snippet from an example specification with the new
backfillVersion
field is shown below:The driver checkpoint for a capture that is fully opted in to runtime-managed pruning could look like this:
For the capture to access the state of
mySchema.myTable
, it would use thebindingKey
from the runtime (mySchema.myTable/1
, composed of the resource path andbackfillVersion
) and get the state frombindingStateV1
. Note the other top-level property"otherStuff"
which could be anything - even including binding state, if the capture has not opted in to runtime-managed state pruning.Materialization Checkpoints
backfillVersion
field to allow for re-starting a binding from the same collection to the same table. Removing a binding from a materialization should cause the state to be pruned from the checkpoint. This should also happen in the runtime, especially since materializations are oblivious to the contents of their checkpoints. ThebackfillVersion
andbindingKey
fields should be added to the materialization part of the connector protocol as well, so that a materialization connector knows if it needs to re-initialize a table based on a change in thebackfillVersion
of the binding (see below).Management of Materialized Tables
To properly re-start a materialization of a binding, the table in the endpoint needs to be truncated, and its columns probably need to be changed to allow for different types. Effectively this will entail a re-creation of the table, by dropping & re-creating, or a
CREATE OR REPLACE TABLE ...
type of operation. This is tricky for at least a couple of reasons: 1) What happens with existing shard(s) of the materialization when their target table is dropped out from the under them while they are running? For example, how can a race where a prior materialization shard commits data to a newly created data, registering that progress on a to-be-reset checkpoint update, be prevented? and 2) How do we avoid inadvertently dropping a table pre-created by a user that they don't want dropped?Change 4: When applying materialization changes, first disable materialization shards and wait for their primary loop to exit prior to applying the changes, and then re-enable the shards after the changes have been applied. This is to ensure (as best we can) that there are no concurrently running shards of the materialization that will sneak in a write to any re-created tables that result from the publication. It also works to ensure that there won't be any locks held on tables to be altered by the materialization transactions that would block the actions needed to be taken by apply. This can actually already be an issue, such as the case of a running materialization locking a table for minutes+ and blocking a table alteration to add a column to the table.
Where possible, materializations using transactional endpoints should increment their fence while applying these changes. The disabling of shards will happen in the
activate
command.Change 5: Re-create materialized tables (destructive action) only under the following conditions:
Apply
request includes bindings that produceUnsatisfiable
constraints. By extension, this means that the control plane will need some work done to handle the case of aValidate
response that hasUnsatisfiable
constraints, but still wanting to go through with theApply
anyway.Apply
request includes bindings with a differentbackfillVersion
than prior bindings materializing to the same table.Re-creating tables only under these circumstances means that just removing a binding from a materialization will not drop any tables, which sort-of addresses the concern about inadvertently dropping a user table that they didn't want dropped.
Derivations
Derivations do already have a
name
fortransforms
which could be changed to re-derived from a source collection in a pinch, but for consistency with captures & materializations thebackfillVersion
property will be added to the specification for derivations as well. Admittedly I am less familiar with how derivations use checkpoints and state, so this is a little vague at the moment:backfillVersion
property of their specs (to be added) and the resultingbindingKey
in a similar way as captures & materializations: A change in thebackfillVersion
will indicate that the derivation should should start over from the beginning of its source collection, and forget any state that it had previously persisted for that binding.Migration
Captures will be able to opt in to the new form of checkpoints by emitting driver checkpoints with state in the
bindingStateV1
object, keyed with thebindingKey
. There will be a transition period where captures must understand states in both forms. They should endeavor to convert to the new form by emitting an updated, converted checkpoint on startup, if needed. There is an edge case where if a capture had previously been disabled for a long time and has thebackfillVersion
changed as part of publishing it to re-enabled for the first time, it won't actually restart the capture for that binding from scratch. This could easily be remedied by changing thebackfillVersion
again to re-capture the binding.Materializations currently have checkpoints structured like this:
"some/journal/pivot=00;materialize/some/materialization/resource%2Fpath":{"read_through": <...>}
The
backfillVersion
field will change thematerialize/{materialization}/{encoded-resource-path}
part of this, into something likematerialize/some/materialization/resource%2Fpath%2F1
(the trailing1
is thebackfillVersion
). To enable a non-breaking transition, an omitted or 0 value forbackfillVersion
will not be appended to the resource path in general, for both materialization checkpoints and the binding keys provided to both captures and materializations.Beta Was this translation helpful? Give feedback.
All reactions