Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43631: [C][Format] Add ArrowAsyncDeviceStreamHandler interface #43632

Merged
merged 39 commits into from
Nov 6, 2024

Conversation

zeroshade
Copy link
Member

@zeroshade zeroshade commented Aug 9, 2024

Rationale for this change

See apache/arrow-adbc#811 and #43631

What changes are included in this PR?

Definition of ArrowAsyncDeviceStreamHandler and addition of it to the docs.

I've sent an email to the mailing list to start a discussion on this topic, so this may change over time due to those discussions.

Copy link

github-actions bot commented Aug 9, 2024

⚠️ GitHub issue #43631 has been automatically assigned in GitHub to PR creator.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to handle cancellation here? That is a common concern for async APIs and leaving it to be implementation-defined may limit the usefulness here

// released or moved by the handler (producer is giving ownership of it to
// the handler).
//
// The `extension_param` argument can be null or can be used by a producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should it be freed/what is the lifetime + ownership? (Especially in the case where the consumer may not understand the format of the extension and won't know how to free it?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am lukewarm on a vague extension parameter here...maybe metadata or a JSON string if we really need additional key/value information outside the scope of the schema's metadata?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this as "the contents of stream_schema are only valid during the call. Copy off any information that you need. The ArrowSchema will be deleted by the producer at some point after the call completes." Or, to put it another way "the consumer shall not call release on the ArrowSchema"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I should reword it a bit then, the intent was that the ownership of the ArrowSchema is being given to the consumer from the Producer, thus the consumer MUST call release on the ArrowSchema unless it needs to keep it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should it be freed/what is the lifetime + ownership? (Especially in the case where the consumer may not understand the format of the extension and won't know how to free it?)

The intent was that the extension_param is entirely owned by the producer, the consumer shouldn't care or need to manage the lifetime of the extension at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could be const void* then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that it's void* implies that there's out-of-band communication between the producer and consumer. For example:

  • the consumer is a GUI and the producer is a dataset library
  • the consumer initiates an interaction by passing
    • a query string
    • an ArrowAsyncDeviceStreamHandler
    • ExtensionParamKind::TOTAL_NUMBER_OF_ROWS

Since the producer received that last argument, it will ensure that extension_param points to an int64_t (or fail, or leave it null to indicate that capability is not available, or ...)

Since this scenario already assumes the producer and consumer can communicate what the extension parameter should be without recourse to the arrow C ABI, it seems odd to give them another more restrictive channel to communicate non-arrow objects.

I think this parameter should either be removed as out of scope or replaced with something accessible to consumers without out-of-band typeinfo (as @paleolimbot suggested, metadata would work)

// to pass arbitrary extra information to the consumer (such as total number
// of rows, context info, or otherwise).
//
// Return value: 0 if successful, `errno`-compatible error otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the producer supposed to do with an error? (FWIW, gRPC just uses void for its callbacks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the consumer should be able to return something to tell the producer to stop (I agree this should be explicit, though, and errno might not be the most precise tool but it might be slightly more informative than a bool or invented enum).

For argument's sake: If a non-zero value is returned, the producer must not call any other callbacks except the release callback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it might make more sense to just have a unified cancellation mechanism. And again, what is the producer really supposed to do with the error code? It's just going to drop it on the floor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks returning an error like this is pretty common in Rust. Normally what the producer does is:

  • Abort the reader
  • Return an error "External error: the producer received error code from a call to on_schema: "

I think it's nice to be able to return an error code here. For example, maybe the consumer can't handle one of the data types reported by the schema.

Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent on_next from being called once or twice while the producer is cleaning up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent on_next from being called once or twice while the producer is cleaning up)

It won't prevent it, but the docs explicitly state that if an error is returned from the callback, the producer should only call release after that and not call further callbacks. though there isn't anything to functionally prevent it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I would suggest that ADBC's async APIs should be compatible with Reactive Streams (RS) semantics in all places except for the semantics of request(n) signal, which for Arrow shouldn't be tied to the number of calls to on_next(), but rather indicate the number of rows. This is to avoid the burden of reinventing the wheel in async model aspects which reactive streams working group already thought and debated long and hard about.

Also, users of ADBC's async APIs in JVM and C# will be able to leverage some very high quality code for these platforms that implement RS semantics, such as j.u.c.SubmissionPublisher, kotlinx.coroutines.reactive, dotnet/reactive, hopefully with minimal modifications, or perhaps no modifications at all (the different semantics of request(n) could perhaps piggy-back existing libs because per RS spec, producers are allowed to send a complete signal after calling on_next fewer times than was requested).

Per RS 1.7, it's required that producer stops on error from consumer callback:

Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.

Re:

the docs explicitly state that if an error is returned from the callback, the producer should only call release after that and not call further callbacks. though there isn't anything to functionally prevent it.

Indeed, per RS 2.5, consumer MUST go out of its way even to prevent buggy concurrent use by the producer, let alone not to call any of its methods itself concurrently with the producer, or ofter the producer has returned error or completion signal.

Copy link
Member

@leventov leventov Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it might make more sense to just have a unified cancellation mechanism. And again, what is the producer really supposed to do with the error code? It's just going to drop it on the floor.

I think it can be useful for monitoring on the middleware/transport level, if the API wraps communication with a remote server. See APPLICATION_ERROR in RSocket protocol.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Aug 10, 2024
Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for putting this together! Just a few initial comments!

Do we want to handle cancellation here?

I think that with the given proposal, if the producer sees/wants a cancellation it can call on_error() with ECANCEL; if the consumer sees or wants a cancellation, it can signal it by returning ECANCEL?

It is also worth evaluating all of the shoulds here to see if any of them can be promoted to musts to give better guarantees.

// released or moved by the handler (producer is giving ownership of it to
// the handler).
//
// The `extension_param` argument can be null or can be used by a producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am lukewarm on a vague extension parameter here...maybe metadata or a JSON string if we really need additional key/value information outside the scope of the schema's metadata?

// to pass arbitrary extra information to the consumer (such as total number
// of rows, context info, or otherwise).
//
// Return value: 0 if successful, `errno`-compatible error otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the consumer should be able to return something to tell the producer to stop (I agree this should be explicit, though, and errno might not be the most precise tool but it might be slightly more informative than a bool or invented enum).

For argument's sake: If a non-zero value is returned, the producer must not call any other callbacks except the release callback.

@lidavidm
Copy link
Member

lidavidm commented Aug 10, 2024

Do we want to handle cancellation here?

I think that with the given proposal, if the producer sees/wants a cancellation it can call on_error() with ECANCEL; if the consumer sees or wants a cancellation, it can signal it by returning ECANCEL?

You may also want to cancel outside of the consumer's callback, though.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this!

Do we want to handle cancellation here? That is a common concern for async APIs and leaving it to be implementation-defined may limit the usefulness here

I'm not sure that cancellation would be a part of ArrowAsyncDeviceStreamHandler. I do agree it is important. Additional, any kind of push-based API like this should also have a method for signalling backpressure. For example, I could imagine the API is something like this:

struct ScanHandle {
  // Cancels an on-going scan, the producer is free to ignore this if
  // cancellation is not possible
  void (*cancel)(struct ScanHandle* self);
  // Release the scan handle.  If this is called before the scan is complete then
  // the behavior is undefined
  void (*release)(struct ScanHandle* self);
  // Request that the producer pauses.  There may still be calls to `on_next` and
  // `on_error` after this method is called (indeed, the producer may be unable to
  // pause).
  void (*pause)(struct ScanHandle* self);
  // Request that the producer resumes production after a pause.
  void (*resume)(struct ScanHandle* self);
  // Opaque handler-specific data
  void* private_data;
}

// Start a scan, data (including any errors) will be provided to `callback`.  An
// implementation-defined handler will be returned that can be used to cancel
// or inspect the scan.
ScanHandle* start_scan(ArrowAsyncDeviceStreamHandler* callback, ScanOptions* scan_options);

// released or moved by the handler (producer is giving ownership of it to
// the handler).
//
// The `extension_param` argument can be null or can be used by a producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this as "the contents of stream_schema are only valid during the call. Copy off any information that you need. The ArrowSchema will be deleted by the producer at some point after the call completes." Or, to put it another way "the consumer shall not call release on the ArrowSchema"

// to pass arbitrary extra information to the consumer (such as total number
// of rows, context info, or otherwise).
//
// Return value: 0 if successful, `errno`-compatible error otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks returning an error like this is pretty common in Rust. Normally what the producer does is:

  • Abort the reader
  • Return an error "External error: the producer received error code from a call to on_schema: "

I think it's nice to be able to return an error code here. For example, maybe the consumer can't handle one of the data types reported by the schema.

Either way though, if an error is returned or if the consumer explicitly cancels in some way, I assume the producer needs to keep accepting calls to the callbacks right? (e.g. returning an error here won't prevent on_next from being called once or twice while the producer is cleaning up)

Comment on lines 252 to 262
// Handler for receiving an array/record batch. Always called at least once
// unless an error is encountered (which would result in calling on_error).
// An empty/released array is passed to indicate the end of the stream if no
// errors have been encountered.
//
// The `extension_param` argument can be null or can be used by a producer
// to pass arbitrary extra information to the consumer.
//
// Return value: 0 if successful, `errno`-compatible error otherwise.
int (*on_next)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowDeviceArray* next, void* extension_param);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're explicit about ownership elsewhere so it might be nice to just add a statement "after this call the consumer is responsible for releasing the array provided by next"

Comment on lines 279 to 424
// Release callback to release any resources for the handler. Should always be
// called by a producer when it is done utilizing a handler. No callbacks should
// be called after this is called.
void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should the consumer do if there are calls to on_error or on_next that are still in progress when this is called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec document currently states that the producer must wait until a callback returns before calling the next callback. The intent is that a consumer shouldn't have to handle concurrent calls like that, and backpressure is managed by the callbacks themselves (producer won't call the next callback until the current one returns).

@westonpace
Copy link
Member

westonpace commented Aug 10, 2024

I've been coding Rust too long now and as a result I must at least offer, for your consideration, a pull based asynchronous stream (which is convenient in that it is nearly identical to the synchronous version and it doesn't require special backpressure signals). It also avoids any sort of re-entrancy (the currently proposed approach has re-entrant methods on the callback structure) Cancellation is handled by releasing the ArrowAsyncDeviceArrayStream before it is finished (if canceled while a pending call is in progress then the Waker needs to stay alive until wake is called and the producer still needs to call wake).

If parallelism is desired by the consumer then the consumer should launch a new thread task to call get_next as soon as the previous call completes (while the old thread processes the data returned by the previous call to get_next).

struct Waker {
  // Signal to the producer that more data is available, the consumer shall release any resources
  // associated with the waker after this method is called.  The producer shall not call any other
  // methods after calling this method.
  //
  // The producer must always call this method even if an error is encountered (in which case the
  // error will be reported on the following call to `get_next`).
  void wake(Waker* waker); 
  void* private_data;
};

struct ArrowAsyncDeviceArrayStream {
  // Callback to get the next array
  // (if no error and the array is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // If EWOULDBLOCK is returned then the producer is not ready to generate more data.  If the
  // producer returns this value then the producer takes ownership of `waker` and is responsible
  // for calling `wake` when more data is available.  If any other value is returned then the producer
  // shall ignore `waker` and never call any methods on it.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, Waker* waker, struct ArrowDeviceArray* out);

  // The rest is identical to `ArrowDeviceArrayStream`
  ArrowDeviceType device_type;
  const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
  void (*release)(struct ArrowAsyncDeviceArrayStream* self);
  void* private_data;
};

Or, just for fun, an epoll-styled approach, which has the same pull-based advantages and doesn't require a waker but is less friendly towards coroutine-style asynchronicity.

struct ArrowAsyncDeviceArrayStream {
  // Callback to get the next array
  // (if no error and the array is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // If EWOULDBLOCK is returned then the producer is not ready to generate more data.  The
  // consumer should call `wait`.  `out` will not be valid.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, struct ArrowDeviceArray* out);

  // Blocks until data is available
  void (*wait)(struct ArrowAsyncDeviceArrayStream* self, int timeout);

  // The rest is identical to `ArrowDeviceArrayStream`
  ArrowDeviceType device_type;
  const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
  void (*release)(struct ArrowAsyncDeviceArrayStream* self);
  void* private_data;
};

cpp/src/arrow/c/abi.h Outdated Show resolved Hide resolved
@westonpace
Copy link
Member

Ah, one more potential approach. This one is actually my favorite for a file(s) scanner. It has all the advantages of a pull-based approach plus it puts the consumer in complete control of deciding how much decode parallelism there should be (by calling next_task before the previous call to next_task has completed). On the other hand, it can make it more difficult if the producer does not have any control over the parallelism or know the size of the stream ahead-of-time (e.g. maybe it is receiving data from a TCP stream):

struct Waker {
  // Signal to the producer that more data is available, the consumer shall release any resources
  // associated with the waker after this method is called.  The producer shall not call any other
  // methods after calling this method.
  //
  // The producer must always call this method even if an error is encountered (in which case the
  // error will be reported on the following call to `get_next`).
  void wake(Waker* waker); 
  void* private_data;
};

struct ArrowArrayTask {
  // Callback to get the array
  // (if no error and the array is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // If EWOULDBLOCK is returned then the array is not ready.  If the
  // producer returns this value then the producer takes ownership of `waker` and is responsible
  // for calling `wake` when more data is available.  If any other value is returned then the producer
  // shall ignore `waker` and never call any methods on it.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next)(struct ArrowArrayTask* self, Waker* waker, struct ArrowDeviceArray* out);
  void (*release)(struct ArrowArrayTask* self);
  void* ArrowArrayTask;
};

struct ArrowAsyncDeviceArrayStream {
  // Callback to get the next array task
  // (if no error and the task is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // The consumer is allowed to call this method again even if the tasks returned by a previous
  // call have not completed.  However, the consumer shall not call this re-entrantly.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next_task)(struct ArrowAsyncDeviceArrayStream* self, struct ArrowArrayTask* task);

  // The rest is identical to `ArrowDeviceArrayStream`
  ArrowDeviceType device_type;
  const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
  void (*release)(struct ArrowAsyncDeviceArrayStream* self);
  void* private_data;
};

@zeroshade
Copy link
Member Author

@westonpace your last example confuses me a bit, particularly because the entire purpose of this is to create a push-based approach for async handling, rather than a pull-based approach.

Below are my questions:

struct Waker {
  // Signal to the producer that more data is available, the consumer shall release any resources
  // associated with the waker after this method is called.  The producer shall not call any other
  // methods after calling this method.
  //
  // The producer must always call this method even if an error is encountered (in which case the
  // error will be reported on the following call to `get_next`).
  void wake(Waker* waker); 
  void* private_data;
};

I don't understand signalling to the producer that data is available, the producer knows when data is available right? it's producing the data in the first place. Isn't the point that the producer needs to signal the consumer that more data is available? Waker would need to be consumer created, with the producer calling wake when data is available. Was your comment just a typo, or am I missing something?

struct ArrowAsyncDeviceArrayStream {
  // Callback to get the next array task
  // (if no error and the task is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // The consumer is allowed to call this method again even if the tasks returned by a previous
  // call have not completed.  However, the consumer shall not call this re-entrantly.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next_task)(struct ArrowAsyncDeviceArrayStream* self, struct ArrowArrayTask* task);

  // The rest is identical to `ArrowDeviceArrayStream`
  ArrowDeviceType device_type;
  const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
  void (*release)(struct ArrowAsyncDeviceArrayStream* self);
  void* private_data;
};

Why the abstraction of a task in this case? Couldn't we remove the task abstraction and just put the Waker here? It feels like the get_next_task abstraction here is just a way to re-invert the direction of the calls. Producer calls get_next_task which turns around and calls task->get_next? At that point, it feels like we could bypass the idea of needing ArrowAsyncDeviceArrayStream to be constructed by the consumer and instead build the entire API around the waker?

Also in the case of this being able to handle concurrency, the get_last_error function becomes mostly meaningless, right? Depending on when it is called, you end up with a race condition as far as what the error might be.

Finally, If a consumer can call get_next_task before the previous one has completed, what should happen in the following scenario:

  • There is only one record in the stream, it is delayed
  • Consumer calls get_next_task 5 times in a loop to get the next 5 tasks

Do all 5 calls from the consumer wait until the record comes in? Does each one provide its own task object which then turns around and waits until the record comes in to call wake? The producer then needs to manage the order the calls came in, along with the order that the records come in for the calls to get_next. This seems like a lot of added complexity without much benefit.

All in all, i'm not completely sold on the idea of this re-inversion of the paradigm. It honestly feels sorta hacky, but that might just be me.

@paleolimbot @lidavidm thoughts on @westonpace's suggestion here?

@zeroshade
Copy link
Member Author

@westonpace I completely forgot to address the other suggestion where you did put the waker directly into the interface lol

struct Waker {
  // Signal to the producer that more data is available, the consumer shall release any resources
  // associated with the waker after this method is called.  The producer shall not call any other
  // methods after calling this method.
  //
  // The producer must always call this method even if an error is encountered (in which case the
  // error will be reported on the following call to `get_next`).
  void wake(Waker* waker); 
  void* private_data;
};

struct ArrowAsyncDeviceArrayStream {
  // Callback to get the next array
  // (if no error and the array is released, the stream has ended)
  //
  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
  //
  // If EWOULDBLOCK is returned then the producer is not ready to generate more data.  If the
  // producer returns this value then the producer takes ownership of `waker` and is responsible
  // for calling `wake` when more data is available.  If any other value is returned then the producer
  // shall ignore `waker` and never call any methods on it.
  //
  // If successful, the ArrowDeviceArray must be released independently from the stream.
  int (*get_next)(struct ArrowAsyncDeviceArrayStream* self, Waker* waker, struct ArrowDeviceArray* out);

  // The rest is identical to `ArrowDeviceArrayStream`
  ArrowDeviceType device_type;
  const char* (*get_last_error)(struct ArrowAsyncDeviceArrayStream* self);
  void (*release)(struct ArrowAsyncDeviceArrayStream* self);
  void* private_data;
};

I guess I still have the same questions as before though. Isn't the waker provided by the consumer for the producer to signal to the consumer that data is available? not the other way around?

The original idea and semantics here were that the ArrowAsyncDeviceArrayStream is constructed by the consumer and given to the producer, given the comments you've put here it seems like this is the other way around now? The producer still provides the struct, and the consumer calls get_next providing the waker. Do I have that right? What happens if get_next is called again before wake is called on the waker? Are we defining that get_next must not be called until wake is called? The semantics are a bit confusing to me here as far as what you're proposing.

@westonpace
Copy link
Member

westonpace commented Aug 12, 2024

@westonpace your last example confuses me a bit, particularly because the entire purpose of this is to create a push-based approach for async handling, rather than a pull-based approach.

I read the purpose as "create an async interface". There are both push-based and pull-based asynchronous interfaces. Is there a reason you specifically want a pull-based interface?

I don't understand signalling to the producer that data is available, the producer knows when data is available right? it's producing the data in the first place. Isn't the point that the producer needs to signal the consumer that more data is available? Waker would need to be consumer created, with the producer calling wake when data is available. Was your comment just a typo, or am I missing something?

Typo. The waker is for the producer to signal the consumer that data is available.

Also in the case of this being able to handle concurrency, the get_last_error function becomes mostly meaningless, right? Depending on when it is called, you end up with a race condition as far as what the error might be.

Good point. The error handling would have to move to the task.

All in all, i'm not completely sold on the idea of this re-inversion of the paradigm. It honestly feels sorta hacky, but that might just be me.

I'm basically modeling this on the current lance file reader. The file reader knows (from the metadata) how many tasks there will be. get_next_task is a synchronous method that creates a task which is an asynchronous task. Therefore:

There is only one record in the stream, it is delayed
Consumer calls get_next_task 5 times in a loop to get the next 5 tasks

This would never happen. The first call to get_next_task returns the first task. The second call returns null because it knows all tasks are done. However, if you didn't know ahead of time how many tasks there are then tasks 2-5 would just all return NULL.

The original idea and semantics here were that the ArrowAsyncDeviceArrayStream is constructed by the consumer and given to the producer, given the comments you've put here it seems like this is the other way around now? The producer still provides the struct, and the consumer calls get_next providing the waker. Do I have that right? What happens if get_next is called again before wake is called on the waker? Are we defining that get_next must not be called until wake is called? The semantics are a bit confusing to me here as far as what you're proposing.

Yeah, sorry for the typo, but you got it. The producer creates the ArrowAsyncDeviceArrayStream. The consumer creates the Waker and passes it into the call to get_next.

What happens if get_next is called again before wake is called on the waker?

There are ways this could work but I don't see any advantage in allowing it so it'd be easier to say that must not happen.

Are we defining that get_next must not be called until wake is called?

Sure, if this simplifies things there is no harm in defining that.

The semantics are a bit confusing to me here as far as what you're proposing.

This is based on Rust's asynchronous model which I've found to work well. The consumer in this case is running some kind of event-loop based coroutine asynchronous logic.

while True:
  for task in ready_tasks:
    task.run()

When calling an asynchronous method the logic looks something like this (taking many liberties here)

cur_task = get_current_task()
waker = lambda: tasks.make_task_active(cur_task.id)
res = call_async_method(..., waker=waker)
if res == EWOULDBLOCK:
  return yield_current_task() # makes the task inactive

Resetting the conversation

My goals here are currently based on the approach I'm taking with the lance file reader where I go pretty far out of my way to avoid data being transferred from one core to another (e.g. from one L1/L2 cache to another). It may be that the complexity of achieving these goals is not worth the benefit. Currently the approach I'm using is the task based approach described above:

  • Consumer calls get_next_task (synchronous, single-threaded)
    • Producer figures out what I/O is needed to fulfill the task
    • Producer schedules the I/O
    • Producer creates a task to decode the data and returns it
  • Consumer checks if task is ready (gives producer a Waker), if not, it adds it to the inactive task list.
  • I/O finishes on producer. Producer marks task ready (by signaling on the Waker).
  • Once consumer has a free thread it assigns it the ready task. Consumer runs task, decodes data (the decode logic is producer logic), and then processes the decoded data (consumer logic) all without a CPU core transfer (data stays in L2 cache).

So my main goal here is to have some way so that the handoff from producer to consumer can happen without forcing a CPU core transfer.

As described, the intent was that a consumer doesn't need to handle concurrency by default because a producer should only call the on_next when the previous one returns. But in the scenario you describe, where the processing inside of on_next might be lengthy, I think it's fine for the consumer to have to make the decision as to whether or not it should create a new thread task (if it wants to handle things in parallel) or do it inside of on_next (and potentially tie up the producer, but that's basically just backpressure essentially).

This doesn't meet my goal because, by this point, the producer has already done the decoding. If the consumer decides to make a new thread task it is incurring the cost of a CPU core transfer.

@lidavidm
Copy link
Member

Before I dive into the details of the proposal (thanks Weston for all the feedback!) I'll note that gRPC has both models:

  • a lower-level pull based model where the consumer creates a reactor and pushes operations/polls for completion (so more like the epoll model, less like the Rust Waker model), which for a long time was the only async API
  • a callback-based model that wraps the pull-based model, which was added later for ease of use.

It might be worthwhile to consider similar wrapping in our case.

Also, it might be worth considering what exactly is easy/efficient to implement for different use cases. I think ADBC drivers written in Go, for instance, would probably not adapt to the pull based model well, since Go doesn't really want or need to give you that much control, but ones written in Rust or C++ would adapt much better.

@paleolimbot
Copy link
Member

Echoing all the thanks to Weston for the detailed response!

I wonder if it is worth clarifying the goals and non-goals of this proposal. In my mind, this is about rectifying two very different ways engines/APIs operate (push vs. pull). I don't have much experience on the performance side, but in the development time/lines-of-code side, trying to make a producer that expects to push its output interact with a consumer that wants to pull is expensive (the reverse is also true). This gets more and more complicated the more times this mismatch is encountered in a pipeline.

I worry that in the quest for the best possible performance that we loose any development time/lines-of-code advantage that a simpler approach might have enabled! I also worry that an ABI that becomes too opinionated about how a scanner should be implemented will still not be able to express other ("non optimal"?) scanners that, for historical reasons (or because we were wrong about what an optimal scanner looks like), don't work that way. I still think that something like the original proposal (with clear, if imperfect, expectations about what can or should happen in the callbacks) is a missing piece (if not the missing piece).

@lidavidm
Copy link
Member

I'm basically modeling this on the current lance file reader. The file reader knows (from the metadata) how many tasks there will be. get_next_task is a synchronous method that creates a task which is an asynchronous task.

I think outside of file readers, we're never going to know how many tasks there will be up front so I'm not sure if we want to choose an interface that bakes in this assumption.

This gets more and more complicated the more times this mismatch is encountered in a pipeline.

Yes, I think we're going to have to end up with a lowest common denominator style approach. And overall I feel like the callback based approach is most likely to be the most natural between languages.

For the pull based stream here: #43632 (comment)

  • Doesn't this still mean the consumer can block the producer's thread on accident (by doing processing inside wake)?

For the task based approach here: #43632 (comment)

  • Is the reason for a separate task to help optimize cache usage? (Basically because the control flow is on the consumer's side, so it is the consumer thread that eventually calls get_next and can then immediately do processing without having to transfer threads to avoid blocking the producer)
  • Would it be sufficient for that use case if we had a callback approach that produced a task instead of directly producing an array?

@ianmcook
Copy link
Member

@github-actions crossbow submit preview-docs

Copy link

Revision: 0e3574b

Submitted crossbow builds: ursacomputing/crossbow @ actions-e929f1681c

Task Status
preview-docs GitHub Actions

@ianmcook
Copy link
Member

@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Nov 5, 2024
@zeroshade zeroshade requested a review from bkietz November 5, 2024 15:10
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels Nov 5, 2024
cpp/src/arrow/c/abi.h Outdated Show resolved Hide resolved
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting merge Awaiting merge labels Nov 6, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Nov 6, 2024
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Nov 6, 2024
@zeroshade zeroshade merged commit 40b2fca into apache:main Nov 6, 2024
39 of 40 checks passed
@zeroshade zeroshade removed the awaiting merge Awaiting merge label Nov 6, 2024
Copy link

After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit 40b2fca.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 30 possible false positives for unstable benchmarks that are known to sometimes produce them.

@bkietz
Copy link
Member

bkietz commented Nov 6, 2024

🎉

zeroshade added a commit that referenced this pull request Nov 11, 2024
)

### Rationale for this change
Building on #43632 which created the Async C Data Structures, this adds functions to `bridge.h`/`bridge.cc` to implement helpers for managing the Async C Data interfaces 

### What changes are included in this PR?
Two functions added to bridge.h:

1. `CreateAsyncDeviceStreamHandler` populates a `ArrowAsyncDeviceStreamHandler` and an `Executor` to provide a future that resolves to an `AsyncRecordBatchGenerator` to produce record batches as they are pushed asynchronously. The `ArrowAsyncDeviceStreamHandler` can then be passed to any asynchronous producer.
2. `ExportAsyncRecordBatchReader` takes a record batch generator and a schema, along with an `ArrowAsyncDeviceStreamHandler` to use for calling the callbacks to push data as it is available from the generator. 

### Are these changes tested?
Unit tests are added (currently only one test, more tests to be added)

### Are there any user-facing changes?
No

* GitHub Issue: #43631

Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: David Li <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
zeroshade added a commit to apache/arrow-go that referenced this pull request Nov 12, 2024
This adds a basic implementation of helpers for managing an
ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device
interface. The corresponding C++ helper implementation can be found at
apache/arrow#44495 with the discusson on the
actual C structures located at
apache/arrow#43632.

---------

Co-authored-by: Sutou Kouhei <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.