Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c: define async version of ArrowArrayStream #811

Open
lidavidm opened this issue Jun 16, 2023 · 20 comments
Open

c: define async version of ArrowArrayStream #811

lidavidm opened this issue Jun 16, 2023 · 20 comments

Comments

@lidavidm
Copy link
Member

We should make sure ArrowAsyncStream also includes the ArrowDeviceStream changes.

The async version of the ADBC API could be the only device-aware one. (The sync version would have to play ABI shenanigans, or duplicate every call...)

@CurtHagenlocher
Copy link
Contributor

CurtHagenlocher commented Apr 16, 2024

Presumably we want more than ArrayStream to be async. In principle, any API that might require a database round trip should support it though in practice perhaps that might be unwieldy.

Of course, every language has different idioms and capabilities when it comes to async and maybe it's not realistic to expose this in the C API and expect to be able to interact with it from all languages. The simplest thing is a callback which says "I'm done" and both C# and Rust can be made to work with that. I have no idea about Go. Java might need to spawn a thread to make that work (but with green threads on the horizon that could be okay).

The "straw person" would be something like -- and this is really rough --

struct ArrowAsyncInfo {
    void* caller_data;
    void* private_data; // belongs to the driver, nullptr if not currently "active"
    void (*complete)(struct ArrowAsyncInfo*, AdbcStatusCode);
};

AdbcStatusCode AdbcCancel(struct ArrowAsyncInfo *asyncInfo);

AdbcStatusCode AdbcConnectionGetTableSchemaAsync(
    struct AdbcConnection* connection,
    const char* catalog,
    const char* db_schema,
    const char* table_name,
    struct ArrowSchema* schema,
    struct AdbcError* error,
    struct ArrowAsyncInfo *asyncInfo);

The caller populates ArrowAsyncInfo with caller_data and completion handler and initializes private_data to nullptr. It then calls ConnectionGetTableSchemaAsync, which can return an error, success indicating synchronous completion or success indicating initiation of an asynchronous operation. In the latter case, it also populates the private_data field.

When the operation succeeds or fails, the driver cleans up private_data before calling the completion function with the status of the operation.

The consumer may call AdbcCancel in order to terminate the operation. This should either fail or return asynchronously, and cancellation isn't complete until the completion function is called with a cancelled status. Note that an operation may still succeed after it has been cancelled due to timing issues.

The driver may have to play games with sentinel values in the private_data field and interlocked exchanges in order to make concurrency work out with cancellation.

@lidavidm
Copy link
Member Author

Yup, this issue was just to sketch out the basic C Data Interface changes. Though, given the hoops we had to jump through to try to propagate ADBC error metadata through that boundary, I wonder if that's worth it and if we shouldn't instead use entirely ADBC interfaces (with ADBC error handling) instead.

@lidavidm
Copy link
Member Author

Thanks for the sketch! I was imagining something like that, but this is actually concrete.

CC @zeroshade

@zeroshade
Copy link
Member

Thanks for the sketch @CurtHagenlocher I plan on starting to tackle this more formally in the next couple weeks.

@CurtHagenlocher
Copy link
Contributor

CurtHagenlocher commented Apr 30, 2024

I've been starting to play around with this, and the management of the data structures is a bit annoying. It also isn't very consistent with other parts of the Arrow C API where the receiver of a call is free to move structures around in memory as long as it preserves the bits. Here's an alternative whose memory management is more like the synchronous API:

struct ArrowAsyncInfo {
    void* caller_data;
    void* private_data;
    void (*release)(struct ArrowAsyncInfo*);
};

AdbcStatusCode AdbcConnectionGetTableSchemaAsync(
    struct AdbcConnection* connection,
    const char* catalog,
    const char* db_schema,
    const char* table_name,
    struct ArrowAsyncInfo* asyncInfo,
    struct AdbcError* syncError,
    void (*complete)(struct ArrowAsyncInfo*, AdbcStatus, struct AdbcSchema*, struct AdbcError*);
};

In this variation, the caller allocates (and is free to move) ArrowAsyncInfo and populates caller_data. The receiver fills the private_data and release fields and makes a copy for itself. On completion, the receiver calls the complete callback with its copy of ArrowAsyncInfo and its own buffers for the result (which can be nullptr on error) and the result status. It's the original caller's responsibility to clean these up at an appropriate time by invoking the corresponding release. The original caller then signals that it will no longer call AdbcCancel by calling release on ArrowAsyncInfo.

The primary drawback of this variation is that it doesn't cleanly support synchronous completion of the call. On the other hand, there was nothing in the initial sketch which would have prevented the completion callback from happening before the original async call returned, and that fact is a bit more obvious with this approach.

Edited to add a synchronous error to the function signature so that e.g. parameter validation could return an error immediately instead of having to go through the callback.

@zeroshade
Copy link
Member

zeroshade commented May 21, 2024

@CurtHagenlocher @lidavidm What do you two think about the following idea:

struct AsyncArrowStream {
    int (*on_schema)(struct AsyncArrowStream* self, struct ArrowSchema* out,
            AdbcStatusCode status, struct AdbcError* error);
    int (*on_next)(struct AsyncArrowStream* self, struct ArrowDeviceArray* out,
            AdbcStatusCode status, struct AdbcError* error);

    void (*release)(struct AsyncArrowSTream* self);
    void* private_data;
};

Which would be used like:

AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
                struct AsyncArrowStream* stream_handler,
                int64_t* rows_affected, struct AdbcError* sync_error);

The caller would populate the AsyncArrowStream's callbacks. on_schema would be called as soon as the schema is available, with calls to on_next as each record batch is available. Semantically:

  • private_data should be populated by the caller with any contextual information that is needed by the async callbacks.
  • sync_error is populated if a synchronous error happens before any asynchronous operations have begun.
  • If an error is encountered asynchronously trying to get the schema, then the status code and error are populated to call on_schema with a nullptr for the ArrowSchema. on_next will not be called in this scenario.
  • rows_affected should be populated if available before the call to on_schema.
  • If an error is encountered retrieving data, on_next is called with the error and status code and nullptr for the ArrowDeviceArray.
  • To signal the end of the stream, on_next is called with ADBC_STATUS_OK and a nullptr for the ArrowDeviceArray.
  • the async callbacks return int rather than void so that the callbacks can indicate whether an error was encountered on their end and that the producer should cancel/stop calling callback methods.

The following rules should be observed by drivers:

  • The async callback for one call should complete before the next async callback is called, avoiding potential race conditions on a single result stream.
  • Once the last callback in a stream completes, (on_schema or on_next) the producer should then call release.

This would work for any and all of the cases that work with ArrowArrayStreams. For the other scenarios, (ExecuteUpdate, GetTableSchema, etc.) something closer to what @CurtHagenlocher was suggesting with an ArrowAsyncInfo type object might be more useful maybe?

@lidavidm
Copy link
Member Author

sync_error is populated if a synchronous error happens before any asynchronous operations have begun.

IMO this gets annoying, as you have to handle every error in two locations

the async callbacks return int rather than void so that the callbacks can indicate whether an error was encountered on their end and that the producer should cancel/stop calling callback methods.

int being basically bool here?

Other questions

  • For cancellation, we'll continue to use the existing method?
  • How will this integrate with the rest of the Arrow ecosystem? It sounds like we'll have to convert everything back to synchronous interfaces in the end...

@CurtHagenlocher
Copy link
Contributor

I gather that ArrowDeviceArray has different allocation semantics than ArrowArray? I guess I need to take a closer look at it.

Any contract around releasing the ArrowDeviceArrays and ArrowSchema? What if the consumer hasn't released them yet when the producer releases the AsyncArrowStream?

rows_affected should be populated if available before the call to on_schema

I think this would be a bit of a pain to manage. Why not add rows_affected to the on_schema call?

IMO this gets annoying, as you have to handle every error in two locations

What if the receiver e.g. can't allocate memory for an async task? I don't see how you get out of having to check the error in two locations in general, and it feels nicer for things like parameter validation failures to fail immediately.

How will this integrate with the rest of the Arrow ecosystem?

Realistically, I think we'd need to push this API into Arrow itself :/.

@paleolimbot
Copy link
Member

paleolimbot commented May 22, 2024

A minor modification of Matt's proposal for the sake of argument:

struct ArrowArrayStreamHandler {
    // For parity with the device array stream?
    int32_t device_type;

    // return codes would be errno, but no need to pass a message since it would just be
    // passed to the handler anyway via on_error. If an error code is returned here the
    // producer must pass it to on_error()
    int (*on_schema)(struct AsyncArrowStream* self, struct ArrowSchema* out);

    // Always called at least once with a released array to indicate the end except if on_error
    // is called, in which case it would be the last call.
    int (*on_next)(struct AsyncArrowStream* self, struct ArrowDeviceArray* out);

    // For R I'd need this so that I can call some promise$reject() method, and I think it's needed 
    // Metadata here would be identical to ArrowSchema::metadata but the handler method
    // would need to copy it (and message) if it needs to live after the handler function returns.
    void (*on_error(struct AsyncArrowStream* self, int code, const char* message, const char* metadata);
    void (*release)(struct AsyncArrowStream* self);
    void* private_data;
};

...which has a few features:

  • Could theoretically be part of the Arrow C Stream for ABI stability (e.g., I could make an interface for an IPC writer from this and wire it up directly to an ADBC call from another Python package)
  • One place to handle errors (either from producer or consumer)

...and drawbacks:

  • No ADBC-specific things like errors or rows_affected
  • Probably many things I haven't considered 🙂

@lidavidm
Copy link
Member Author

I think this would be a bit of a pain to manage. Why not add rows_affected to the on_schema call?

+1

What if the receiver e.g. can't allocate memory for an async task? I don't see how you get out of having to check the error in two locations in general, and it feels nicer for things like parameter validation failures to fail immediately.

Hmm. For instance, in Rust, is it typical to have Result<Future<T>>, or just Future<T>?

re:

Could theoretically be part of the Arrow C Stream for ABI stability (e.g., I could make an interface for an IPC writer from this and wire it up directly to an ADBC call from another Python package)

Maybe we could add a void* argument for extensions? e.g. on_schema could have an argument that can be cast to an ADBC type containing the row count. Or we could do what we're already doing, and have ADBC expose top-level getters that take in this case an AsyncArrowStream and return ADBC-specific information (error details, row count, etc)

@CurtHagenlocher
Copy link
Contributor

For instance, in Rust, is it typical to have Result<Future<T>>, or just Future<T>?

I'm at best an enthusiastic dabbler in Rust, but I think you'd generally have Future<Result<T>>. The thing is, you still have some thing you can look at or wait on or poll -- whereas with this low-level C API, you don't have anything if something has gone unexpectedly wrong.

@zeroshade
Copy link
Member

int being basically bool here?

Like @paleolimbot said, it would be errno like the C Data Interface callbacks

For cancellation, we'll continue to use the existing method?

Yup. The callback can then be called using ADBC_STATUS_CANCELLED

I gather that ArrowDeviceArray has different allocation semantics than ArrowArray? I guess I need to take a closer look at it.

ArrowDeviceArray is a struct containing an ArrowArray along with a device_type, device_id, and a pointer that can be non-null if the data is on a device that requires synchronization (such as a GPU).

Any contract around releasing the ArrowDeviceArrays and ArrowSchema? What if the consumer hasn't released them yet when the producer releases the AsyncArrowStream?

ArrowDeviceArray and ArrowSchema have their own release callbacks. A producer calling the release callback on the AsyncArrowStream shouldn't have any effect on the ability to release the ArrowDeviceArray or ArrowSchema. Their lifetimes should be tied to their own release callbacks.

A minor modification of Matt's proposal for the sake of argument: ...

Is the inclusion of a device type there for the caller to tell the producer what device it wants the results to be allocated on? If the producer can't comply with that, does it then call on_error?

In your suggested interface, are the semantics for on_error that the producer calls it in either of the following scenarios:

  • Producer encountered an error asynchronously and thus calls on_error instead of calling on_schema or on_next
  • Consumer returned a non-zero errno from on_schema or on_next, so producer calls on_error with the returned error code

Are there other scenarios I'm missing?

I think this would be a bit of a pain to manage. Why not add rows_affected to the on_schema call?
+1

That works. I'm fine with that. Though we'd have to go the route that @lidavidm suggested if we're pushing this upstream into the C Data Interface ABI directly of the void* for extensions. I personally am not a big fan of the idea of making a top-level getter that we would pass the ArrowArrayStream to in order to retrieve the row count, etc.

// Metadata here would be identical to ArrowSchema::metadata but the handler method
// would need to copy it (and message) if it needs to live after the handler function returns

The ADBC Error struct also contains other information such as an optional opaque binary blob for metadata values and such, which would not be able to be passed through this interface. So we'd probably need to expand this on_error to also have a void* or pass a char* + length, or some other combination to allow passing binary pieces of error details.

I agree that it might be easier in general if we push this upstream to the Arrow C ABI, but there are also benefits to being able to use the ADBC specific objects in the async interface too. If we do push upstream with this, would we want Async versions of both ArrowArrayStream AND ArrowDeviceArrayStream?

@lidavidm
Copy link
Member Author

The thing is, you still have some thing you can look at or wait on or poll -- whereas with this low-level C API, you don't have anything if something has gone unexpectedly wrong.

But the struct is caller-allocated, right? So the driver can immediately invoke on_error. I'm also thinking about other async APIs; gRPC's callback interface for instance also only has a single error path. UCX does let you check for errors 'immediately' but but because it's poll-driven rather than callback-driven that's effectively also a single error path.

On the other hand I agree it would be a better developer experience, and probably the caller already has to handle errors for other things, so I suppose I'm not entirely against it, so long as it's stipulated that it should really only be used for immediate validation errors (e.g. an IO error should basically never be returned through the immediate error).

@lidavidm
Copy link
Member Author

If we do push upstream with this, would we want Async versions of both ArrowArrayStream AND ArrowDeviceArrayStream?

I vote no.

@CurtHagenlocher
Copy link
Contributor

CurtHagenlocher commented May 22, 2024

But the struct is caller-allocated, right? So the driver can immediately invoke on_error.

I suspect I'm subconsciously shying away from that possibility because I expect it to increase the risk of accidental deadlocks. But writing concurrent code is hard, and if the contract allows it then the implementations had better act accordingly.

If the async stream is not part of the Arrow C ABI, then there's no reason to conform to it at all. We could just unroll it into multiple callbacks on the primary function. Starting from my previous strawperson:

struct ArrowAsyncInfo {
    void* caller_data;
    void* private_data;
    void (*release)(struct ArrowAsyncInfo*);
};

AdbcStatusCode AdbcStatementExecuteQueryAsync(
    struct AdbcStatement* statement,
    struct ArrowAsyncInfo* asyncInfo,
    void (*on_schema)(struct ArrowAsyncInfo*, AdbcStatus, long*, struct AdbcSchema*, struct AdbcError*),
    void (*on_next)(struct ArrowAsyncInfo*, AdbcStatus, struct ArrowArray*, struct AdbcError*)
);

@lidavidm
Copy link
Member Author

int being basically bool here?

Like @paleolimbot said, it would be errno like the C Data Interface callbacks

Hmm, what's the purpose though? If you return an error to the driver...it just turns around and returns it back to you?

@lidavidm
Copy link
Member Author

(I mean purpose vs just a plain bool, or gRPC's approach where you explicitly call cancel)

@CurtHagenlocher
Copy link
Contributor

We could just unroll it into multiple callbacks on the primary function.

This admittedly gets quite awkward when trying to bind an array stream as a parameter, so ... probably not.

@paleolimbot
Copy link
Member

If you return an error to the driver...it just turns around and returns it back to you?

One could also just state that if the consumer returns something non-zero that no other callbacks will be called (I don't have strong feelings about any of this). Having it be errno (or adbc status) just minimizes the number of types of things that get returned (makes it easier to reuse the return not OK macros).

The ADBC Error struct also contains other information such as an optional opaque binary blob for metadata values

I believe that the schema->metadata encoding schema is capable of encoding the same blob key/value pairing as the ADBC errors. It would be a bit of a pain to pack/unpack it, of course, but any Arrow implementation supporting the C data interface probably can already do it.

@zeroshade
Copy link
Member

One could also just state that if the consumer returns something non-zero that no other callbacks will be called (I don't have strong feelings about any of this).

This was my original intention. Returning non-zero indicates that no other callbacks should be called. Only Release, I don't think we need to have the producer turn around and call the on_error with the code you returned. I do agree that using errno or AdbcStatusCode minimizes the number of types of things. Though we obviously can't use AdbcStatusCode if we're going to upstream this to the Arrow C Data Interface.

(I mean purpose vs just a plain bool, or gRPC's approach where you explicitly call cancel)

Personally, I like putting more of the burden on the producer than the consumer. I'd prefer that the producer react to a non-zero return value by cancelling and then calling release than requiring the consumer to explicitly call cancel. That's just my personal opinion though, @CurtHagenlocher might have a better opinion on this given that he's likely to be an actual user of this API.

I believe that the schema->metadata encoding schema is capable of encoding the same blob key/value pairing as the ADBC errors. It would be a bit of a pain to pack/unpack it, of course, but any Arrow implementation supporting the C data interface probably can already do it.

That's a good point. That could work pretty well then.

This admittedly gets quite awkward when trying to bind an array stream as a parameter, so ... probably not.

Are we intending that the binding should be asynchronous too? I was thinking that the binding API would still be synchronous and binding a normal ArrowArrayStream since it would still be the driver itself calling the callbacks.

It seems like we're approaching a small consensus on this, so if everyone is okay with it, i'll take the suggestions we've all discussed so far and try to come up with a potential proposal to upstream to the Arrow repo. Anyone object / think there's more we need to discuss before I try?

zeroshade added a commit to apache/arrow that referenced this issue Nov 6, 2024
…3632)

### Rationale for this change
See apache/arrow-adbc#811 and #43631

### What changes are included in this PR?
Definition of `ArrowAsyncDeviceStreamHandler` and addition of it to the docs.

I've sent an [email to the mailing list](https://lists.apache.org/thread/yfokmfkrmmp7tqvq0m3rshcvloq278cq) to start a discussion on this topic, so this may change over time due to those discussions.

* GitHub Issue: #43631

Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Raúl Cumplido <[email protected]>
Co-authored-by: Dane Pitkin <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: David Li <[email protected]>
Co-authored-by: Ian Cook <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants