Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized spill file format #14078

Open
3 tasks
Tracked by #14077
alamb opened this issue Jan 10, 2025 · 9 comments
Open
3 tasks
Tracked by #14077

Optimized spill file format #14078

alamb opened this issue Jan 10, 2025 · 9 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jan 10, 2025

Is your feature request related to a problem or challenge?

DataFusion spills data to local disk for processing datasets that do not fit in available memory, as illustrated in this comment:

/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
/// a total order. Depending on the input size and memory manager
/// configuration, writes intermediate results to disk ("spills")
/// using Arrow IPC format.
///
/// # Algorithm
///
/// 1. get a non-empty new batch from input
///
/// 2. check with the memory manager there is sufficient space to
/// buffer the batch in memory 2.1 if memory sufficient, buffer
/// batch in memory, go to 1.
///
/// 2.2 if no more memory is available, sort all buffered batches and
/// spill to file. buffer the next batch in memory, go to 1.
///
/// 3. when input is exhausted, merge all in memory batches and spills
/// to get a total order.
///
/// # When data fits in available memory
///
/// If there is sufficient memory, data is sorted in memory to produce the output
///
/// ```text
/// ┌─────┐
/// │ 2 │
/// │ 3 │
/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
/// │ 4 │
/// │ 2 │ │
/// └─────┘ ▼
/// ┌─────┐
/// │ 1 │ In memory
/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
/// │ 1 │
/// └─────┘ ▲
/// ... │
///
/// ┌─────┐ │
/// │ 4 │
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
///
/// ```
///
/// # When data does not fit in available memory
///
/// When memory is exhausted, data is first sorted and written to one
/// or more spill files on disk:
///
/// ```text
/// ┌─────┐ .─────────────────.
/// │ 2 │ ( )
/// │ 3 │ │`─────────────────'│
/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
/// │ 4 │ │ │ │ 1 │░ │
/// │ 2 │ │ │... │░ │
/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
/// ┌─────┐ │ └────┘░ 1 │░ │
/// │ 1 │ In memory │ ░░░░░░ │ ░░ │
/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
/// │ 1 │ and write to file │ │ ░░ │
/// └─────┘ │ 4 │░ │
/// ... ▲ │ └░─░─░░ │
/// │ │ ░░░░░░ │
/// ┌─────┐ │.─────────────────.│
/// │ 4 │ │ ( )
/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
/// └─────┘
///
/// in_mem_batches spills
/// (file on disk in Arrow
/// IPC format)
/// ```
///
/// Once the input is completely read, the spill files are read and
/// merged with any in memory batches to produce a single total sorted
/// output:
///
/// ```text
/// .─────────────────.
/// ( )
/// │`─────────────────'│
/// │ ┌────┐ │
/// │ │ 1 │░ │
/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
/// │ │ 4 │░ ┌────┐ │ │
/// │ └────┘░ │ 1 │░ │ ▼
/// │ ░░░░░░ │ │░ │
/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
/// │ │ │░ │
/// │ │ 4 │░ │ ▲
/// │ └────┘░ │ │
/// │ ░░░░░░ │
/// │.─────────────────.│ │
/// ( )
/// `─────────────────' │
/// spills
/// │
///
/// │
///
/// ┌─────┐ │
/// │ 1 │
/// │ 4 │─ ─ ─ ─ │
/// └─────┘ │
/// ... In memory
/// └ ─ ─ ─▶ sort/merge
/// ┌─────┐
/// │ 4 │ ▲
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
/// ```
struct ExternalSorter {

Here is the code that handles spilling in sort and hash aggregates.

The current version of DataFusion spills data to disk using the Arrow IPC format, which is correct and was easy to get working, but comes with non trivial overhead, as @andygrove found in Comet:

Some potential sources of overhead are due to the validation applied to arrow IPC files (which may in general have come from untrusted sources) which is unecessary if the data was valid when written by DataFusion

Describe the solution you'd like

As part of improving DataFuison's performance of larger than memory datasets, I would like to consider adding a new optimized serialization format for use in spill files. This is similar to how we have added optimized (non Arrow) in memory storage formats for intermediate results in hash aggregation and others

At a high level this would look like:

  • Add a benchmark for spilling files (maybe write/read data to/from a spill file of various sizes)
  • Add an optimized Reader/Writer
  • Update the SortExec and GroupByHashExec to use this new Reader/Writer

Describe alternatives you've considered

Add a customized Reader/Writer

@andygrove has a PR to add a customized BatchReader to comet that seems to offer significant performance improvements for shuffle reading/writing:

We could potentially upstream this code into DataFusion

Optimize the Arrow IPC reader

Another option would be to continue using the Arrow IPC format, but disable validation:

@totoroyyb actually has a recently PR to arrow-rs proposing this approach:

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented Jan 10, 2025

As a data point, @totoroyyb reports a 100x faster reading of Arrow IPC data without validation on apache/arrow-rs#6933

@2010YOUY01
Copy link
Contributor

Although we're currently spilling column-wise record batches, I think this will change to row-wise batches in the future. It would be better to benchmark and optimize spilling the Arrow Row format in this issue as well.

The reason is that the spilling operation involves sorting, spilling sorted runs, and reading back those runs for merging. Both sorting and merging benefit from the row format. The current implementation performs several unnecessary conversions between row and column formats, which could become inefficient.
The preferred way should be:

  1. Convert to row format and do sorting
  2. Maintaining the row format until the final output (I believe Sort will benefit more from it, because it will do 2 phase or merging, Aggregate will only do 1 phase)
    This is tracked by Use Row Format in SortExec #7053

@tustvold
Copy link
Contributor

tustvold commented Jan 11, 2025

Spilling the row format makes some sense to me, although I suspect IPC will outperform it, presuming a fast enough disk.

I feel I ought to point out though that in order for it to be sound to read a file without validation, DF needs to be sure nobody else could have written/modified it. This may be possible on Unix OSes using some shenanigans with unlinked file descriptors, but I suspect isn't generally possible.

I feel I also ought to point out the mmap use-case is slightly different, as it is effectively in memory already, the performance benefit of skipping validation may be lessened when there are other overheads, e.g. reading the data from disk.

@alamb
Copy link
Contributor Author

alamb commented Jan 11, 2025

I feel I ought to point out though that in order for it to be sound to read a file without validation, DF needs to be sure nobody else could have written/modified it.

In my opinion, DataFusion itself should not go to any significant trouble / effort to protect against the threat model of someone having enough control over the local file system to make arbitrary changes to spill files. If an adversary already has access to the local file system, the amount of extra safety gained by validating spill files is pretty small in my opinion.

@alamb
Copy link
Contributor Author

alamb commented Jan 11, 2025

@2010YOUY01 the idea of avoiding Row->Column->Row conversions is a (very) good one

@tustvold
Copy link
Contributor

tustvold commented Jan 11, 2025

DataFusion itself should not go to any significant trouble / effort to protect against the threat model of someone having enough control over the local file system to make arbitrary changes to spill files

Agreed, the more reasonable scenario would be a bug in DF causes it to trample its files, maybe some sort of concurrency bug where it opens the spill file before it finishes writing it. TBC I am not making a judgement of whether or not DF should care about this, someone would need to collect concrete benchmarks of what portion of the cost of spilling actually is validation as a starting point, I am merely stating that what is being proposed is unsound.

@Omega359
Copy link
Contributor

It would be nice to have the option (likely not enabled by default) for the spill files to be compressed. It's almost trivial I think with the current implementation.

@andygrove
Copy link
Member

It would be nice to have the option (likely not enabled by default) for the spill files to be compressed. It's almost trivial I think with the current implementation.

We support this in Comet with the Arrow IPC Writer and it is trivial. We support LZ4, Snappy, and ZSTD.

@alamb
Copy link
Contributor Author

alamb commented Jan 12, 2025

FYI I am working with @@totoroyyb on the arrow IPC work, in case anyone is interested or has time to help:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants