Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface #43632

Merged
merged 39 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b68f0bb
GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface
zeroshade Aug 9, 2024
fa26073
linting
zeroshade Aug 9, 2024
331033f
trim trailing whitespace
zeroshade Aug 9, 2024
31117c9
clarifications from feedback
zeroshade Aug 15, 2024
7593f57
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Aug 15, 2024
e612aed
some more clarifications
zeroshade Aug 15, 2024
946c46f
trim trailing whitespace
zeroshade Aug 15, 2024
8fb66f2
Updates based on feedback, restructured and redesigned.
zeroshade Sep 5, 2024
4cb7991
add sequence diagram of async interface
zeroshade Sep 5, 2024
57a79fe
pre-commit linting
zeroshade Sep 9, 2024
5b5b4d9
documentation updates from feedback.
zeroshade Sep 12, 2024
a3d4f88
ran pre-commit for linting
zeroshade Sep 12, 2024
58c2686
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 13, 2024
1044f1a
updates from PR feedback
zeroshade Sep 13, 2024
8ed0607
updating docs from feedback
zeroshade Sep 18, 2024
a359550
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
9bed168
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
95c40f7
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
b9bddaf
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
608b790
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
dbea393
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
6e9980d
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
18efd79
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
eb6bccb
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Sep 19, 2024
b2c4486
updates from freedback
zeroshade Sep 22, 2024
278af8b
Update cpp/src/arrow/c/abi.h
zeroshade Sep 23, 2024
922b103
Update cpp/src/arrow/c/abi.h
zeroshade Sep 23, 2024
bcc8a66
Update cpp/src/arrow/c/abi.h
zeroshade Sep 23, 2024
f6d3b67
Update cpp/src/arrow/c/abi.h
zeroshade Sep 23, 2024
38b21b2
linting
zeroshade Oct 15, 2024
2ff218b
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 17, 2024
d3a3327
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 23, 2024
e65ef06
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 25, 2024
92dcb78
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 25, 2024
c6bd66b
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 25, 2024
099e38e
Updates from feedback
zeroshade Oct 25, 2024
0e3574b
Update docs/source/format/CDeviceDataInterface.rst
zeroshade Oct 25, 2024
5aa29a8
move additional_metadata member.
zeroshade Nov 5, 2024
ca9dfbd
Add context for what additional_metadata might be used for.
zeroshade Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,210 @@ struct ArrowDeviceArrayStream {

#endif // ARROW_C_DEVICE_STREAM_INTERFACE

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
# define ARROW_C_ASYNC_STREAM_INTERFACE

// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed
// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
//
// The reason for this Task approach instead of the Async interface returning
// the Array directly is to allow for more complex thread handling and reducing
// context switching and data transfers between CPU cores (e.g. from one L1/L2
// cache to another) if desired.
Comment on lines +239 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a consequence of consumer being able to maintain an upper bound on memory consumption in terms of the number record batches in memory at any point in time.

Readers might be unable to make this causal inference. I don't really have a suggestion on how to reword this, just saying that the connection might be unclear to many readers.

//
// For example, the `on_next_task` callback can be called when data is ready, while
// the producer puts potential "decoding" logic in the `ArrowAsyncTask` object. This
// allows for the producer to manage the I/O on one thread which calls `on_next_task`
// and the consumer can determine when the decoding (producer logic in the `extract_data`
// callback of the task) occurs and on which thread, to avoid a CPU core transfer
// (data staying in the L2 cache).
struct ArrowAsyncTask {
// This callback should populate the ArrowDeviceArray associated with this task.
bkietz marked this conversation as resolved.
Show resolved Hide resolved
// The order of ArrowAsyncTasks provided by the producer enables a consumer to
// ensure the order of data to process.
//
// This function is expected to be synchronous, but should not perform any blocking
// I/O. Ideally it should be as cheap as possible so as to not tie up the consumer
// thread unnecessarily.
//
// Returns: 0 if successful, errno-compatible error otherwise.
//
// If a non-0 value is returned then it should be followed by a call to `on_error`
// on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's highly
// likely that whatever is calling this function may be entirely disconnected from
// the current control flow. Indicating an error here with a non-zero return allows
// the current flow to be aware of the error occurring, while still allowing any
// logging or error handling to still be centralized in the `on_error` callback of
// the original Async handler.
//
// Rather than a release callback, any required cleanup should be performed as part
// of the invocation of `extract_data`. Ownership of the Array is passed to the consumer
// calling this, and so it must be released separately.
//
// It is only valid to call this method exactly once.
int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);

// opaque task-specific data
void* private_data;
};

// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async
// producer and consumer. This object allows the consumer to perform backpressure and flow
// control on the asynchronous stream processing. This object must be owned by the
// producer who creates it, and thus is responsible for cleaning it up.
bkietz marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowAsyncProducer {
// A consumer must call this function to start receiving on_next_task calls.
//
// It *must* be valid to call this synchronously from within `on_next_task` or
// `on_schema`, but this function *must not* immediately call `on_next_task` so as
// to avoid recursion and reentrant callbacks.
//
// After cancel has been called, additional calls to this function must be NOPs,
// but allowed. While not cancelled, calling this function must register the
// given number of additional arrays/batches to be produced with the producer.
// The producer should only call `on_next_task` at most the registered number
// of arrays before propagating backpressure.
Comment on lines +289 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the idea that I will call this method many times? In other words, if I want to allow up to 32 concurrent tasks I would call request(32) and then, after I complete each task I call request(1)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's definitely one way you could do it which would be valid.

Another possibility would be that you could call request(32) and you track available memory usage and only call request(N) for more when the available memory increases after calls to get_data on the tasks and completing whatever processing you wanted on the resulting batches, or some other strategy.

The idea is that backpressure is managed by the consumer signalling the producer "hey, I can handle up to N more batches of data right now".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your mermaid diagram only shows request being called once (IIRC) and it might be more helpful if there was someway to demonstrate it was part of a loop too. That being said, the diagram has a lot of info already so maybe it's ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be pretty easy to toss a call to request inside the branch that calls get_data, i'll add it there.

//
// Any error encountered by calling request must be propagated by calling the `on_error`
// callback of the ArrowAsyncDeviceStreamHandler.
//
// While not cancelled, any subsequent calls to `on_next_task`, `on_error` or
// `release` should be scheduled by the producer to be called later.
//
// It is invalid for a consumer to call this with a value of n <= 0, producers should
// error if given such a value.
void (*request)(struct ArrowAsyncProducer* self, int64_t n);

// This cancel callback signals a producer that it must eventually stop making calls
// to on_next_task. It must be idempotent and thread-safe. After calling cancel once,
// subsequent calls must be NOPs. This must not call any consumer-side handlers other
// than `on_error`.
//
// It is not required that calling cancel affect the producer immediately, only that it
// must eventually stop calling on_next_task and subsequently call release on the
// async handler. As such, a consumer must be prepared to receive one or more calls to
// `on_next_task` even after calling cancel if there are still requested arrays pending.
//
// Successful cancellation should *not* result in the producer calling `on_error`, it
// should finish out any remaining tasks and eventually call `release`.
//
// Any error encountered during handling a call to cancel must be reported via the
// on_error callback on the async stream handler.
void (*cancel)(struct ArrowAsyncProducer* self);

// Any additional metadata tied to a specific stream of data. This must either be NULL
// or a valid pointer to metadata which is encoded in the same way schema metadata
// would be. Non-null metadata must be valid for the lifetime of this object.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
const char* additional_metadata;

// producer-specific opaque data.
void* private_data;
};

// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous
// style of interaction. While ArrowDeviceArrayStream provides producer
// defined callbacks, this is intended to be created by the consumer instead.
// The consumer passes this handler to the producer, which in turn uses the
// callbacks to inform the consumer of events in the stream.
pitrou marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowAsyncDeviceStreamHandler {
// Handler for receiving a schema. The passed in stream_schema must be
// released or moved by the handler (producer is giving ownership of the schema to
// the handler, but not ownership of the top level object itself).
//
// With the exception of an error occurring (on_error), this must be the first
// callback function which is called by a producer and must only be called exactly
// once. As such, the producer should provide a valid ArrowAsyncProducer instance
// so the consumer can control the flow. See the documentation on ArrowAsyncProducer
// for how it works. The ArrowAsyncProducer is owned by the producer who calls this
// function and thus the producer is responsible for cleaning it up when calling
// the release callback of this handler.
//
// If there is any additional metadata tied to this stream, it will be provided as
// a non-null value for the `additional_metadata` field of the ArrowAsyncProducer
// which will be valid at least until the release callback is called.
//
// Return value: 0 if successful, `errno`-compatible error otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the producer supposed to do with an error? (FWIW, gRPC just uses void for its callbacks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the consumer should be able to return something to tell the producer to stop (I agree this should be explicit, though, and errno might not be the most precise tool but it might be slightly more informative than a bool or invented enum).

For argument's sake: If a non-zero value is returned, the producer must not call any other callbacks except the release callback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it might make more sense to just have a unified cancellation mechanism. And again, what is the producer really supposed to do with the error code? It's just going to drop it on the floor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks returning an error like this is pretty common in Rust. Normally what the producer does is:

  • Abort the reader
  • Return an error "External error: the producer received error code from a call to on_schema: "

I think it's nice to be able to return an error code here. For example, maybe the consumer can't handle one of the data types reported by the schema.

Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent on_next from being called once or twice while the producer is cleaning up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent on_next from being called once or twice while the producer is cleaning up)

It won't prevent it, but the docs explicitly state that if an error is returned from the callback, the producer should only call release after that and not call further callbacks. though there isn't anything to functionally prevent it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I would suggest that ADBC's async APIs should be compatible with Reactive Streams (RS) semantics in all places except for the semantics of request(n) signal, which for Arrow shouldn't be tied to the number of calls to on_next(), but rather indicate the number of rows. This is to avoid the burden of reinventing the wheel in async model aspects which reactive streams working group already thought and debated long and hard about.

Also, users of ADBC's async APIs in JVM and C# will be able to leverage some very high quality code for these platforms that implement RS semantics, such as j.u.c.SubmissionPublisher, kotlinx.coroutines.reactive, dotnet/reactive, hopefully with minimal modifications, or perhaps no modifications at all (the different semantics of request(n) could perhaps piggy-back existing libs because per RS spec, producers are allowed to send a complete signal after calling on_next fewer times than was requested).

Per RS 1.7, it's required that producer stops on error from consumer callback:

Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.

Re:

the docs explicitly state that if an error is returned from the callback, the producer should only call release after that and not call further callbacks. though there isn't anything to functionally prevent it.

Indeed, per RS 2.5, consumer MUST go out of its way even to prevent buggy concurrent use by the producer, let alone not to call any of its methods itself concurrently with the producer, or ofter the producer has returned error or completion signal.

Copy link
Member

@leventov leventov Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it might make more sense to just have a unified cancellation mechanism. And again, what is the producer really supposed to do with the error code? It's just going to drop it on the floor.

I think it can be useful for monitoring on the middleware/transport level, if the API wraps communication with a remote server. See APPLICATION_ERROR in RSocket protocol.

//
// A producer that receives a non-zero return here should stop producing and eventually
// call release instead.
int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowSchema* stream_schema);

// Handler for receiving data. This is called when data is available providing an
// ArrowAsyncTask struct to signify it. The producer indicates the end of the stream
// by passing NULL as the value for the task rather than a valid pointer to a task.
// The task object is only valid for the lifetime of this function call, if a consumer
// wants to utilize it after this function returns, it must copy or move the contents
// of it to a new ArrowAsyncTask object.
//
// The `request` callback of a provided ArrowAsyncProducer must be called in order
// to start receiving calls to this handler.
//
// The metadata argument can be null or can be used by a producer
// to pass arbitrary extra information to the consumer (such as total number
// of rows, context info, or otherwise). The data should be passed using the same
// encoding as the metadata within the ArrowSchema struct itself (defined in
// the spec at
// https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata)
//
// If metadata is non-null then it only needs to exist for the lifetime of this call,
// a consumer who wants it to live after that must copy it to ensure lifetime.
//
// A producer *must not* call this concurrently from multiple different threads.
//
// A consumer must be prepared to receive one or more calls to this callback even
// after calling cancel on the corresponding ArrowAsyncProducer, as cancel does not
// guarantee it happens immediately.
//
// Return value: 0 if successful, `errno`-compatible error otherwise.
//
// If the consumer returns a non-zero return from this method, that indicates to the
// producer that it should stop propagating data as an error occurred. After receiving
// such a return, the only interaction with this object is for the producer to call
// the `release` callback.
int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowAsyncTask* task, const char* metadata);
zeroshade marked this conversation as resolved.
Show resolved Hide resolved

// Handler for encountering an error. The producer should call release after
// this returns to clean up any resources. The `code` passed in can be any error
// code that a producer wants, but should be errno-compatible for consistency.
//
// If the message or metadata are non-null, they will only last as long as this
// function call. The consumer would need to perform a copy of the data if it is
// necessary for them to live past the lifetime of this call.
//
// Error metadata should be encoded as with metadata in ArrowSchema, defined in
// the spec at
// https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata
//
// It is valid for this to be called by a producer with or without a preceding call
// to ArrowAsyncProducer.request.
//
// This callback must not call any methods of an ArrowAsyncProducer object.
void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code,
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
const char* message, const char* metadata);
zeroshade marked this conversation as resolved.
Show resolved Hide resolved

// Release callback to release any resources for the handler. Should always be
// called by a producer when it is done utilizing a handler. No callbacks should
// be called after this is called.
//
// It is valid for the release callback to be called by a producer with or without
// a preceding call to ArrowAsyncProducer.request.
//
// The release callback must not call any methods of an ArrowAsyncProducer object.
void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

// MUST be populated by the producer BEFORE calling any callbacks other than release.
// This provides the connection between a handler and its producer, and must exist until
// the release callback is called.
struct ArrowAsyncProducer* producer;

// Opaque handler-specific data
void* private_data;
};

#endif // ARROW_C_ASYNC_STREAM_INTERFACE

#ifdef __cplusplus
}
#endif
Loading
Loading