Replies: 1 comment 1 reply
-
Meta: Great write-up. Good length and content clarity. For multi-collection derivations, the situation changes in an interesting way if you suppose that programs will read documents from stdin which span multiple collections, identified by
For example, a lot of the heavy lifting that Differential Dataflow does is building and maintaining in-memory indices which are aware of processed timestamp horizons, and applying those indices to propagate outcomes through the dataflow graph. Or, I might have an internal SQLite database with various indices into which I chuck portions of some documents I read, and which I query on reading other kinds of documents. Or I might keep a simple Python map, or use a specialized library like boost, and so on. (This said, I do think the idea of registers as a query-able table is interesting and worthy of future exploration).
If two programs are processing documents across two collections and communicating through a shared index, I don't know how we feasibly make any guarantees about the relative ordering of processed documents across those programs. Or how we could ever make that processing repeatable. Conversely Flow can drive a single program with documents spanning multiple collections, using a timestamp-based ordering. I think we can even make this ordering fully deterministic & repeatable, improving upon our "mostly" repeatable status today. Continuing with the bikes / stations example, it would mean that separate derivations of that point-in-time join would produce the same result no matter when they're run. |
Beta Was this translation helpful? Give feedback.
-
I've been thinking more about the "unix philosophy" and what it might look like realized in a future
Flow runtime. My understanding of unix philosophy is roughly "composeable programs that each do one
thing (and do it well)". Others may have somewhat different understandings, but to me the most
important concept there is composeability. Other aspects of it, like "everything is a file", are in
service of composeability.
Say you have a list of words and you want to change it into a sorted set of
words. The unix philosophy would probably guide you towards writing two programs instead of one, and indeed
cat words | sort | uniq
is probably what you'd use. So you can decompose your sorted set programinto smaller programs that each read from stdin and write to stdout, and compose them into a
pipeline that's easier to read, write, and maintain. Ain't that grand.
If we look at an example that's more relevant to Flow, then we very quickly find ourselves in muddy
waters, though. Let's take the example of joining two datasets using a point-in-time left inner
join. How would you do that in unix?. Let's first make the example a little more concrete. You have
two source data sets,
stations
andbikes
. One each document inbikes
there's an optionalcurrentStationId
field, which will be used to join each station document to 0 or more bikedocuments.
To really ground out the example, let's look at how a database would handle this join.
So Sqlite is going to create a secondary index of
bikes
oncurrent_station
. TheAUTOMATIC
means that the query planner will create this index automatically since we don't already have one.
(Neat!). It will scan the
stations
table, and for each row it'll use that secondary index to lookupall the bikes for that station. Of course there's also an implicit primary index on bikes. Sqlite
doesn't repeat the entire bike record within each entry in the secondary index, but rather each
secondary index refers to the key of the record in the primary index. The point of this
isn't to explain RDBMS internals to you (I'm sure you already knew this). The point is to set the standard
for efficiency. Whatever we build should be as efficient as this. It's also to provide us some clues as to
the low level programs that we'll need in order to compose high level operations from them in
Flow. If we followed the RDBMS approach, we'd need to compose both primary and secondary indices in
order to implement our join.
Can we make programs like
primary-index
andsecondary-index
and compose them to make a unixyjoin? Unlike the
sort | uniq
example, the unixy factoring for the join isn't so obvious. Do you usenamed pipes? Bash job control? A basic bash pipeline has only a single input (stdin to the first
program), and a single output (stdout of the last program). But we want to join two collections!
The typical unix answer is to have the programs share state. This might look something like the following
hand wavy example.
This is a very slapdash example, so please don't read too much into the details. In particular, this
example uses files to represent the state, but that's not the only mechanism for sharing state. The
important thing is that indices are updated as a side effect of document processing, and that any
other program in the derivation could somehow query that state. Another important thing is that this
is two separate pipelines, one for each source collection. I think that's important because it frees
each program from the responsibility of filtering or routing documents from different collections.
In the current Flow runtime, this particular join example can actually be expressed very succinctly
as two lambdas. One update lambda updates register states, and another publish lambda reads that
register state and emits the joined representation. This proves the potential of derivations
composed of separate programs that share state. And it's conceptually very similar to unixy example
above, since the update and publish lambdas are separate and only the state is shared. But the
current design also has a number of limitations that make it unwieldy or inefficient for some types
of joins. Fully reactive many-to-many joins, for example, require lots of duplication of documents
under the existing register design.
A major missing piece is that registers have no way to refer to a value. This means, for example,
that there's no way to create secondary indices without duplicating the documents. That's a
solveable problem in a future derivation runtime. An exciting idea that I want to explore is that I
think we can allow for this richer state modeling without sacrificing the benefits of our current
register design. It would enable registers to model much richer states without duplication of data.
This in turn allows us to create a set of reusable programs for things like indexing and lookups.
The goal is to end up with a system that allows derivations to be expressed as compositions of small
programs that each "do one thing and do it well". My main assertion is that these programs need to
be able to express things like different types of indexing and lookups. If that sounds really vague,
it's because I'm trying perhaps a bit too hard not to over-specify the requirements in an effort to
avoid biasing our approach. To be clear, though, I see this as somewhat orthogonal to the question
of how we persist the state itself. VM snapshotting is by no means incompatible with these ideas,
and may actually help out considerably.
I think this is a good time to stop pedaling and make sure everyone is still along for the ride.
I was considerably more aggressive in paring down the word count here, so please also feel free to
give feedback on the writing style and clarity.
Beta Was this translation helpful? Give feedback.
All reactions