diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h index db051fff5ff05..9dc142bd080df 100644 --- a/cpp/src/arrow/c/abi.h +++ b/cpp/src/arrow/c/abi.h @@ -228,6 +228,212 @@ struct ArrowDeviceArrayStream { #endif // ARROW_C_DEVICE_STREAM_INTERFACE +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +# define ARROW_C_ASYNC_STREAM_INTERFACE + +// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed +// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler. +// +// The reason for this Task approach instead of the Async interface returning +// the Array directly is to allow for more complex thread handling and reducing +// context switching and data transfers between CPU cores (e.g. from one L1/L2 +// cache to another) if desired. +// +// For example, the `on_next_task` callback can be called when data is ready, while +// the producer puts potential "decoding" logic in the `ArrowAsyncTask` object. This +// allows for the producer to manage the I/O on one thread which calls `on_next_task` +// and the consumer can determine when the decoding (producer logic in the `extract_data` +// callback of the task) occurs and on which thread, to avoid a CPU core transfer +// (data staying in the L2 cache). +struct ArrowAsyncTask { + // This callback should populate the ArrowDeviceArray associated with this task. + // The order of ArrowAsyncTasks provided by the producer enables a consumer to + // ensure the order of data to process. + // + // This function is expected to be synchronous, but should not perform any blocking + // I/O. Ideally it should be as cheap as possible so as to not tie up the consumer + // thread unnecessarily. + // + // Returns: 0 if successful, errno-compatible error otherwise. + // + // If a non-0 value is returned then it should be followed by a call to `on_error` + // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's highly + // likely that whatever is calling this function may be entirely disconnected from + // the current control flow. Indicating an error here with a non-zero return allows + // the current flow to be aware of the error occurring, while still allowing any + // logging or error handling to still be centralized in the `on_error` callback of + // the original Async handler. + // + // Rather than a release callback, any required cleanup should be performed as part + // of the invocation of `extract_data`. Ownership of the Array is passed to the consumer + // calling this, and so it must be released separately. + // + // It is only valid to call this method exactly once. + int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out); + + // opaque task-specific data + void* private_data; +}; + +// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async +// producer and consumer. This object allows the consumer to perform backpressure and flow +// control on the asynchronous stream processing. This object must be owned by the +// producer who creates it, and thus is responsible for cleaning it up. +struct ArrowAsyncProducer { + // A consumer must call this function to start receiving on_next_task calls. + // + // It *must* be valid to call this synchronously from within `on_next_task` or + // `on_schema`, but this function *must not* immediately call `on_next_task` so as + // to avoid recursion and reentrant callbacks. + // + // After cancel has been called, additional calls to this function must be NOPs, + // but allowed. While not cancelled, calling this function must register the + // given number of additional arrays/batches to be produced with the producer. + // The producer should only call `on_next_task` at most the registered number + // of arrays before propagating backpressure. + // + // Any error encountered by calling request must be propagated by calling the `on_error` + // callback of the ArrowAsyncDeviceStreamHandler. + // + // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or + // `release` should be scheduled by the producer to be called later. + // + // It is invalid for a consumer to call this with a value of n <= 0, producers should + // error if given such a value. + void (*request)(struct ArrowAsyncProducer* self, int64_t n); + + // This cancel callback signals a producer that it must eventually stop making calls + // to on_next_task. It must be idempotent and thread-safe. After calling cancel once, + // subsequent calls must be NOPs. This must not call any consumer-side handlers other + // than `on_error`. + // + // It is not required that calling cancel affect the producer immediately, only that it + // must eventually stop calling on_next_task and subsequently call release on the + // async handler. As such, a consumer must be prepared to receive one or more calls to + // `on_next_task` even after calling cancel if there are still requested arrays pending. + // + // Successful cancellation should *not* result in the producer calling `on_error`, it + // should finish out any remaining tasks and eventually call `release`. + // + // Any error encountered during handling a call to cancel must be reported via the + // on_error callback on the async stream handler. + void (*cancel)(struct ArrowAsyncProducer* self); + + // Any additional metadata tied to a specific stream of data. This must either be NULL + // or a valid pointer to metadata which is encoded in the same way schema metadata + // would be. Non-null metadata must be valid for the lifetime of this object. As an + // example a producer could use this to provide the total number of rows and/or batches + // in the stream if known. + const char* additional_metadata; + + // producer-specific opaque data. + void* private_data; +}; + +// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema must be + // released or moved by the handler (producer is giving ownership of the schema to + // the handler, but not ownership of the top level object itself). + // + // With the exception of an error occurring (on_error), this must be the first + // callback function which is called by a producer and must only be called exactly + // once. As such, the producer should provide a valid ArrowAsyncProducer instance + // so the consumer can control the flow. See the documentation on ArrowAsyncProducer + // for how it works. The ArrowAsyncProducer is owned by the producer who calls this + // function and thus the producer is responsible for cleaning it up when calling + // the release callback of this handler. + // + // If there is any additional metadata tied to this stream, it will be provided as + // a non-null value for the `additional_metadata` field of the ArrowAsyncProducer + // which will be valid at least until the release callback is called. + // + // Return value: 0 if successful, `errno`-compatible error otherwise + // + // A producer that receives a non-zero return here should stop producing and eventually + // call release instead. + int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowSchema* stream_schema); + + // Handler for receiving data. This is called when data is available providing an + // ArrowAsyncTask struct to signify it. The producer indicates the end of the stream + // by passing NULL as the value for the task rather than a valid pointer to a task. + // The task object is only valid for the lifetime of this function call, if a consumer + // wants to utilize it after this function returns, it must copy or move the contents + // of it to a new ArrowAsyncTask object. + // + // The `request` callback of a provided ArrowAsyncProducer must be called in order + // to start receiving calls to this handler. + // + // The metadata argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer (such as total number + // of rows, context info, or otherwise). The data should be passed using the same + // encoding as the metadata within the ArrowSchema struct itself (defined in + // the spec at + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata) + // + // If metadata is non-null then it only needs to exist for the lifetime of this call, + // a consumer who wants it to live after that must copy it to ensure lifetime. + // + // A producer *must not* call this concurrently from multiple different threads. + // + // A consumer must be prepared to receive one or more calls to this callback even + // after calling cancel on the corresponding ArrowAsyncProducer, as cancel does not + // guarantee it happens immediately. + // + // Return value: 0 if successful, `errno`-compatible error otherwise. + // + // If the consumer returns a non-zero return from this method, that indicates to the + // producer that it should stop propagating data as an error occurred. After receiving + // such a return, the only interaction with this object is for the producer to call + // the `release` callback. + int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowAsyncTask* task, const char* metadata); + + // Handler for encountering an error. The producer should call release after + // this returns to clean up any resources. The `code` passed in can be any error + // code that a producer wants, but should be errno-compatible for consistency. + // + // If the message or metadata are non-null, they will only last as long as this + // function call. The consumer would need to perform a copy of the data if it is + // necessary for them to live past the lifetime of this call. + // + // Error metadata should be encoded as with metadata in ArrowSchema, defined in + // the spec at + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata + // + // It is valid for this to be called by a producer with or without a preceding call + // to ArrowAsyncProducer.request. + // + // This callback must not call any methods of an ArrowAsyncProducer object. + void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code, + const char* message, const char* metadata); + + // Release callback to release any resources for the handler. Should always be + // called by a producer when it is done utilizing a handler. No callbacks should + // be called after this is called. + // + // It is valid for the release callback to be called by a producer with or without + // a preceding call to ArrowAsyncProducer.request. + // + // The release callback must not call any methods of an ArrowAsyncProducer object. + void (*release)(struct ArrowAsyncDeviceStreamHandler* self); + + // MUST be populated by the producer BEFORE calling any callbacks other than release. + // This provides the connection between a handler and its producer, and must exist until + // the release callback is called. + struct ArrowAsyncProducer* producer; + + // Opaque handler-specific data + void* private_data; +}; + +#endif // ARROW_C_ASYNC_STREAM_INTERFACE + #ifdef __cplusplus } #endif diff --git a/docs/source/format/CDeviceDataInterface.rst b/docs/source/format/CDeviceDataInterface.rst index 59433bae47e27..fbb2012c3059b 100644 --- a/docs/source/format/CDeviceDataInterface.rst +++ b/docs/source/format/CDeviceDataInterface.rst @@ -506,6 +506,8 @@ could be used for any device: arr->array.release(&arr->array); } +.. _c-device-stream-interface: + Device Stream Interface ======================= @@ -650,6 +652,367 @@ The stream source is not assumed to be thread-safe. Consumers wanting to call ``get_next`` from several threads should ensure those calls are serialized. +Async Device Stream Interface +============================= + +.. warning:: + + Experimental: The Async C Device Stream interface is experimental in its current + form. Based on feedback and usage the protocol definition may change until + it is fully standardized. + +The :ref:`C stream interface ` provides a synchronous +API centered around the consumer calling the producer functions to retrieve +the next record batch. For concurrent communication between producer and consumer, +the ``ArrowAsyncDeviceStreamHandler`` can be used. This interface is non-opinionated +and may fit into different concurrent communication models. + +Semantics +--------- + +Rather than the producer providing a structure of callbacks for a consumer to +call and retrieve records, the Async interface is a structure allocated and populated by the consumer. +The consumer allocated struct provides handler callbacks for the producer to call +when the schema and chunks of data are available. + +In addition to the ``ArrowAsyncDeviceStreamHandler``, there are also two additional +structs used for the full data flow: ``ArrowAsyncTask`` and ``ArrowAsyncProducer``. + +Structure Definition +-------------------- + +The C device async stream interface consists of three ``struct`` definitions: + +.. code-block:: c + + #ifndef ARROW_C_ASYNC_STREAM_INTERFACE + #define ARROW_C_ASYNC_STREAM_INTERFACE + + struct ArrowAsyncTask { + int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out); + + void* private_data; + }; + + struct ArrowAsyncProducer { + void (*request)(struct ArrowAsyncProducer* self, int64_t n); + void (*cancel)(struct ArrowAsyncProducer* self); + + void (*release)(struct ArrowAsyncProducer* self); + const char* additional_metadata; + void* private_data; + }; + + struct ArrowAsyncDeviceStreamHandler { + // consumer-specific handlers + int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowSchema* stream_schema); + int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowAsyncTask* task, const char* metadata); + void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, + int code, const char* message, const char* metadata); + + // release callback + void (*release)(struct ArrowAsyncDeviceStreamHandler* self); + + // must be populated before calling any callbacks + struct ArrowAsyncProducer* producer; + + // opaque handler-specific data + void* private_data; + }; + + #endif // ARROW_C_ASYNC_STREAM_INTERFACE + +.. note:: + The canonical guard ``ARROW_C_ASYNC_STREAM_INTERFACE`` is meant to avoid + duplicate definitions if two projects copy the C async stream interface + definitions into their own headers, and a third-party project includes + from these two projects. It is therefore important that this guard is kept + exactly as-is when these definitions are copied. + +The ArrowAsyncDeviceStreamHandler structure +''''''''''''''''''''''''''''''''''''''''''' + +The structure has the following fields: + +.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*) + + *Mandatory.* Handler for receiving the schema of the stream. All incoming records should + match the provided schema. If successful, the function should return 0, otherwise + it should return an ``errno``-compatible error code. + + If there is any extra contextual information that the producer wants to provide, it can set + :c:member:`ArrowAsyncProducer.additional_metadata` to a non-NULL value. This is encoded in the + same format as :c:member:`ArrowSchema.metadata`. The lifetime of this metadata, if not ``NULL``, + should be tied to the lifetime of the ``ArrowAsyncProducer`` object. + + Unless the ``on_error`` handler is called, this will always get called exactly once and will be + the first method called on this object. As such the producer *MUST* populate the ``ArrowAsyncProducer`` + member before calling this function to allow the consumer to apply back-pressure and control the flow of data. + The producer maintains ownership of the ``ArrowAsyncProducer`` and must clean it up *after* + calling the release callback on the ``ArrowAsyncDeviceStreamHandler``. + + A producer that receives a non-zero result here must not subsequently call anything other than + the release callback on this object. + +.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*) + + *Mandatory.* Handler to be called when a new record is available for processing. The + schema for each record should be the same as the schema that ``on_schema`` was called with. + If successfully handled, the function should return 0, otherwise it should return an + ``errno``-compatible error code. + + Rather than passing the record itself it receives an ``ArrowAsyncTask`` instead to facilitate + better consumer-focused thread control as far as receiving the data. A call to this function + simply indicates that data is available via the provided task. + + The producer signals the end of the stream by passing ``NULL`` for the ``ArrowAsyncTask`` + pointer instead of a valid address. This task object is only valid during the lifetime of + this function call. If the consumer wants to use the task beyond the scope of this method, it + must copy or move its contents to a new ArrowAsyncTask object. + + The ``const char*`` parameter exists for producers to provide any extra contextual information + they want. This is encoded in the same format as :c:member:`ArrowSchema.metadata`. If not ``NULL``, + the lifetime is only the scope of the call to this function. A consumer who wants to maintain + the additional metadata beyond the lifetime of this call *MUST* copy the value themselves. + + A producer *MUST NOT* call this concurrently from multiple threads. + + The :c:member:`ArrowAsyncProducer.request` callback must be called to start receiving calls to this + handler. + +.. c:member:: void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*) + + *Mandatory.* Handler to be called when an error is encountered by the producer. After calling + this, the ``release`` callback will be called as the last call on this struct. The parameters + are an ``errno``-compatible error code and an optional error message and metadata. + + If the message and metadata are not ``NULL``, their lifetime is only valid during the scope + of this call. A consumer who wants to maintain these values past the return of this function + *MUST* copy the values themselves. + + If the metadata parameter is not ``NULL``, to provide key-value error metadata, then it should + be encoded identically to the way that metadata is encoded in :c:member:`ArrowSchema.metadata`. + + It is valid for this to be called by a producer with or without a preceding call to + :c:member:`ArrowAsyncProducer.request`. This callback *MUST NOT* call any methods of an + ``ArrowAsyncProducer`` object. + +.. c:member:: void (*ArrowAsyncDeviceStreamHandler.release)(struct ArrowAsyncDeviceStreamHandler*) + + *Mandatory.* A pointer to a consumer-provided release callback for the handler. + + It is valid for this to be called by a producer with or without a preceding call to + :c:member:`ArrowAsyncProducer.request`. This must not call any methods of an ``ArrowAsyncProducer`` + object. + +.. c:member:: struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer + + *Mandatory.* The producer object that the consumer will use to request additional data or cancel. + + This object *MUST* be populated by the producer before calling the :c:member:`ArrowAsyncDeviceStreamHandler.on_schema` + callback. The producer maintains ownership of this object and must clean it up *after* calling + the release callback on the ``ArrowAsyncDeviceStreamHandler``. + + The consumer *CANNOT* assume that this is valid until the ``on_schema`` callback is called. + +.. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data + + *Optional.* An opaque pointer to consumer-provided private data. + + Producers *MUST NOT* process this member. Lifetime of this member is handled by + the consumer, and especially by the release callback. + +The ArrowAsyncTask structure +'''''''''''''''''''''''''''' + +The purpose of using a Task object rather than passing the array directly to the ``on_next`` +callback is to allow for more complex and efficient thread handling. Utilizing a Task +object allows for a producer to separate the "decoding" logic from the I/O, enabling a +consumer to avoid transferring data between CPU cores (e.g. from one L1/L2 cache to another). + +This producer-provided structure has the following fields: + +.. c:member:: int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, struct ArrowDeviceArray*) + + *Mandatory.* A callback to populate the provided ``ArrowDeviceArray`` with the available data. + The order of ``ArrowAsyncTasks`` provided by the producer enables a consumer to know the order of + the data to process. If the consumer does not care about the data that is owned by this task, + it must still call ``extract_data`` so that the producer can perform any required cleanup. ``NULL`` + should be passed as the device array pointer to indicate that the consumer doesn't want the + actual data, letting the task perform necessary cleanup. + + If a non-zero value is returned from this, it should be followed only by the producer calling + the ``on_error`` callback of the ``ArrowAsyncDeviceStreamHandler``. Because calling this method + is likely to be separate from the current control flow, returning a non-zero value to signal + an error occuring allows the current thread to decide handle the case accordingly, while still + allowing all error logging and handling to be centralized in the + :c:member:`ArrowAsyncDeviceStreamHandler.on_error` callback. + + Rather than having a separate release callback, any required cleanup should be performed as part + of the invocation of this callback. Ownership of the Array is given to the pointer passed in as + a parameter, and this array must be released separately. + + It is only valid to call this method exactly once. + +.. c:member:: void* ArrowArrayTask.private_data + + *Optional.* An opaque pointer to producer-provided private data. + + Consumers *MUST NOT* process this member. Lifetime of this member is handled by + the producer who created this object, and should be cleaned up if necessary during + the call to :c:member:`ArrowArrayTask.extract_data`. + +The ArrowAsyncProducer structure +'''''''''''''''''''''''''''''''' + +This producer-provided and managed object has the following fields: + +.. c:member:: void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, uint64_t) + + *Mandatory.* This function must be called by a consumer to start receiving calls to + :c:member:`ArrowAsyncDeviceStreamHandler.on_next_task`. It *MUST* be valid to call + this synchronously from within :c:member:`ArrowAsyncDeviceStreamHandler.on_next_task` + or :c:member:`ArrowAsyncDeviceStreamHandler.on_schema`. As a result, this function + *MUST NOT* synchronously call ``on_next_task`` or ``on_error`` to avoid recursive + and reentrant callbacks. + + After ``cancel`` is called, additional calls to this function must be a NOP, but allowed. + + While not cancelled, calling this function registers the given number of additional + arrays/batches to be produced by the producer. A producer should only call + the appropriate ``on_next_task`` callback up to a maximum of the total sum of calls to + this method before propagating back-pressure / waiting. + + Any error encountered by calling request must be propagated by calling the ``on_error`` + callback of the ``ArrowAsyncDeviceStreamHandler``. + + It is invalid to call this function with a value of ``n`` that is ``<= 0``. Producers should + error (e.g. call ``on_error``) if receiving such a value for ``n``. + +.. c:member:: void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*) + + *Mandatory.* This function signals to the producer that it must *eventually* stop calling + ``on_next_task``. Calls to ``cancel`` must be idempotent and thread-safe. After calling + it once, subsequent calls *MUST* be a NOP. This *MUST NOT* call any consumer-side handlers + other than ``on_error``. + + It is not required that calling ``cancel`` affect the producer *immediately*, only that it + must eventually stop calling ``on_next_task`` and then subsequently call ``release`` + on the async handler object. As such, a consumer *MUST* be prepared to receive one or more + calls to ``on_next_task`` or ``on_error`` even after calling ``cancel`` if there are still + requested arrays pending. + + Successful cancelling *MUST NOT* result in a producer calling + :c:member:`ArrowAsyncDeviceStreamHandler.on_error`, instead it should finish out any remaining + tasks (calling ``on_next_task`` accordingly) and eventually just call ``release``. + + Any error encountered during handling a call to cancel must be reported via the ``on_error`` + callback on the async stream handler. + +.. c:member:: const char* ArrowAsyncProducer.additional_metadata + + *Optional.* An additional metadata string to provide any extra context to the consumer. This *MUST* + either be ``NULL`` or a valid string that is encoded in the same way as :c:member:`ArrowSchema.metadata`. + As an example, a producer could utilize this metadata to provide the total number of rows and/or batches + in the stream if known. + + If not ``NULL`` it *MUST* be valid for at least the lifetime of this object. + +.. c:member:: void* ArrowAsyncProducer.private_data + + *Optional.* An opaque pointer to producer-provided specific data. + + Consumers *MUST NOT* process this member, the lifetime is owned by the producer + that constructed this object. + +Error Handling +'''''''''''''' + +Unlike the regular C Stream interface, the Async interface allows for errors to flow in +both directions. As a result, error handling can be slightly more complex. Thus this spec +designates the following rules: + +* If the producer encounters an error during processing, it should call the ``on_error`` + callback, and then call ``release`` after it returns. + +* If ``on_schema`` or ``on_next_task`` returns a non-zero integer value, the producer *should not* + call the ``on_error`` callback, but instead should eventually call ``release`` at some point + before or after any logging or processing of the error code. + +Result lifetimes +'''''''''''''''' + +The ``ArrowSchema`` passed to the ``on_schema`` callback must be released independently, +with the object itself needing to be moved to a consumer owned ``ArrowSchema`` object. The +``ArrowSchema*`` passed as a parameter to the callback *MUST NOT* be stored and kept. + +The ``ArrowAsyncTask`` object provided to ``on_next_task`` is owned by the producer and +will be cleaned up during the invocation of calling ``extract_data`` on it. If the consumer +doesn't care about the data, it should pass ``NULL`` instead of a valid ``ArrowDeviceArray*``. + +The ``const char*`` error ``message`` and ``metadata`` which are passed to ``on_error`` +are only valid within the scope of the ``on_error`` function itself. They must be copied +if it is necessary for them to exist after it returns. + +Stream Handler Lifetime +''''''''''''''''''''''' + +Lifetime of the async stream handler is managed using a release callback with similar +usage as in :ref:`C data interface `. + +ArrowAsyncProducer Lifetime +''''''''''''''''''''''''''' + +The lifetime of the ``ArrowAsyncProducer`` is owned by the producer itself and should +be managed by it. It *MUST* be populated before calling any methods other than ``release`` +and *MUST* remain valid at least until just before calling ``release`` on the stream handler object. + +Thread safety +''''''''''''' + +All handler functions on the ``ArrowAsyncDeviceStreamHandler`` should only be called in a +serialized manner, but are not guaranteed to be called from the same thread every time. A +producer should wait for handler callbacks to return before calling the next handler callback, +and before calling the ``release`` callback. + +Back-pressure is managed by the consumer making calls to :c:member:`ArrowAsyncProducer.request` +to indicate how many arrays it is ready to receive. + +The ``ArrowAsyncDeviceStreamHandler`` object should be able to handle callbacks as soon as +it is passed to the producer, any initialization should be performed before it is provided. + +Possible Sequence Diagram +------------------------- + +.. mermaid:: + + sequenceDiagram + Consumer->>+Producer: ArrowAsyncDeviceStreamHandler* + Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*) + Consumer->>Producer: ArrowAsyncProducer->request(n) + + par + loop up to n times + Producer-->>Consumer: on_next_task(ArrowAsyncTask*) + end + and for each task + Consumer-->>Producer: ArrowAsyncTask.extract_data(...) + Consumer-->>Producer: ArrowAsyncProducer->request(1) + end + + break Optionally + Consumer->>-Producer: ArrowAsyncProducer->cancel() + end + + loop possible remaining + Producer-->>Consumer: on_next_task(ArrowAsyncTask*) + end + + Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release() + + Interoperability with other interchange formats ===============================================