You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This issue describes a design for implementing row-level streaming in Bigslice.
Bigslice streams all of its data during processing in micro-batches. For example, a bigslice.Map operator is given a vector of values over which to apply the operation, and fills a downstream vector of processed values.
Operators like bigslice.Flatmap, which can emit multiple rows for each input row, implement internal buffering if the user-returned data exceeds the requested chunk sizes. Likewise, bigslice.Cogroup, which implements data joining, treats joined data as a single row—i.e., each row contains within it all rows that match the join criteria.
This is limiting: namely, we cannot perform joins where the number of rows for a key exceeds what can fit in memory. (This is exacerbated by the fact that Bigslice itself operates on batches of rows.) We also cannot perform flat-map operations where the result of a computation for a single row does not fit in memory.
In other words, streaming in Bigslice is only “up to” rows.
We propose to repair this by:
providing a generalized data processing function that permits streaming;
changing the underlying data chunking to allow for streaming.
Generalized streaming operator
We propose a generalized streaming operator: Each. Each is invoked for each row of a slice. Its inputs are the slice columns. The last (optional) argument of each is an “emit” function that produces rows downstream. For example, the following acts as a combined map and filter: it filters out odd values, and emits the column “even” to each row.
Functions passed to Each can also accept streams. These are functions that act as scanners to underlying streams of values.
For example, if we co-grouped the above slice, we could access the stream of values as follows, emitting their sum, producing a slice of ints.
bigslice.Each(slice, func(key string, values func(*int) bool, emit func(int)) {
var sum, datum int
for values(&datum) {
sum += datum
}
emit(sum)
})
Note that this is a streaming equivalent of
bigslice.Map(slice, func(key string, values []int) int {
var sum int
for _, datum := range values {
sum += datum
}
return sum
})
For complex co-groups, each co-grouped slice corresponds to a single streaming reader. For example:
The above API can be implemented using the current reader APIs, but the implementation would have to buffer (and perhaps spill to disk). This can be addressed by introducing nested readers: a sliceio.Reader would be amended to support sub-streams:
type Reader {
// ...
Child(index int) Reader
}
the new Child method would return a reader to an indexed sub-stream. The physical layout of output files would remain as chunks of rows, but would be prefixed by the stream index. Any stream reference must occur after the full sub-stream has been materialized. This allows a reader to, for example, spill a sub-stream to disk, and re-read it on demand.
The text was updated successfully, but these errors were encountered:
This issue describes a design for implementing row-level streaming in Bigslice.
Bigslice streams all of its data during processing in micro-batches. For example, a
bigslice.Map
operator is given a vector of values over which to apply the operation, and fills a downstream vector of processed values.Operators like
bigslice.Flatmap
, which can emit multiple rows for each input row, implement internal buffering if the user-returned data exceeds the requested chunk sizes. Likewise, bigslice.Cogroup, which implements data joining, treats joined data as a single row—i.e., each row contains within it all rows that match the join criteria.This is limiting: namely, we cannot perform joins where the number of rows for a key exceeds what can fit in memory. (This is exacerbated by the fact that Bigslice itself operates on batches of rows.) We also cannot perform flat-map operations where the result of a computation for a single row does not fit in memory.
In other words, streaming in Bigslice is only “up to” rows.
We propose to repair this by:
Generalized streaming operator
We propose a generalized streaming operator:
Each
. Each is invoked for each row of a slice. Its inputs are the slice columns. The last (optional) argument of each is an “emit” function that produces rows downstream. For example, the following acts as a combined map and filter: it filters out odd values, and emits the column “even” to each row.Functions passed to
Each
can also accept streams. These are functions that act as scanners to underlying streams of values.For example, if we co-grouped the above slice, we could access the stream of values as follows, emitting their sum, producing a slice of ints.
Note that this is a streaming equivalent of
For complex co-groups, each co-grouped slice corresponds to a single streaming reader. For example:
Streaming readers, data layout
The above API can be implemented using the current reader APIs, but the implementation would have to buffer (and perhaps spill to disk). This can be addressed by introducing nested readers: a sliceio.Reader would be amended to support sub-streams:
the new Child method would return a reader to an indexed sub-stream. The physical layout of output files would remain as chunks of rows, but would be prefixed by the stream index. Any stream reference must occur after the full sub-stream has been materialized. This allows a reader to, for example, spill a sub-stream to disk, and re-read it on demand.
The text was updated successfully, but these errors were encountered: