Skip to content

Commit

Permalink
implement driver
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Oct 15, 2024
1 parent 5048f06 commit 0eb7a56
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
3 changes: 2 additions & 1 deletion docs/source/cpp/recipe_driver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ target_link_libraries(adbc_driver_framework PRIVATE nanoarrow::nanoarrow)
# is the init function?
add_library(driver_example SHARED driver_example.cc)
target_include_directories(driver_example PRIVATE ../../../../c ../../../../c/include)
target_link_libraries(driver_example PRIVATE adbc_driver_framework nanoarrow::nanoarrow)
target_link_libraries(driver_example PRIVATE adbc_driver_framework
nanoarrow::nanoarrow_ipc)

# TODO: Do we want to have this as part of the example? We could make the validation library
# available but I am not sure it is ready to promise that kind of stability (e.g., C++
Expand Down
58 changes: 46 additions & 12 deletions docs/source/cpp/recipe_driver/driver_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

#include "driver_example.h"

#include <cstdio>
#include <string>

#include "driver/framework/connection.h"
Expand Down Expand Up @@ -219,33 +220,66 @@ class DriverExampleStatement : public adbc::driver::Statement<DriverExampleState
public:
[[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";

// Get information from the connection and/or store a reference if needed.
Status InitImpl(void* parent) {
auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
uri_ = connection.uri();
return Base::InitImpl(parent);
}

Status SetSqlQueryImpl(std::string_view query) {
return adbc::driver::status::NotImplemented("SetSqlQuery");
}
// Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
// using the target table as the filename.
Result<int64_t> ExecuteIngestImpl(IngestState& state) {
std::string directory = uri_.substr(strlen("file://"));
std::string filename = directory + "/" + *state.target_table;

Status BindStreamImpl(ArrowArrayStream* stream) {
return adbc::driver::status::NotImplemented("BindStream");
}
nanoarrow::ipc::UniqueOutputStream output_stream;
FILE* c_file = std::fopen(filename.c_str(), "wb");
UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
/*close_on_release*/ true));

nanoarrow::ipc::UniqueWriter writer;
UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));

Status GetParameterSchemaImpl(struct ArrowSchema* schema) {
return adbc::driver::status::NotImplemented("GetParameterSchema");
ArrowError nanoarrow_error;
ArrowErrorInit(&nanoarrow_error);
UNWRAP_NANOARROW(nanoarrow_error, Internal,
ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
&nanoarrow_error));

return -1;
}

Status PrepareImpl() { return adbc::driver::status::NotImplemented("Prepare"); }
// Our implementation of query execution is to accept a simple query in the form
// SELECT * FROM (the filename).
Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
std::string prefix("SELECT * FROM ");
if (state.query.find(prefix) != 0) {
return adbc::driver::status::InvalidArgument(
"[example] Query must be in the form 'SELECT * FROM filename'");
}

std::string directory = uri_.substr(strlen("file://"));
std::string filename = directory + "/" + state.query.substr(prefix.size());

nanoarrow::ipc::UniqueInputStream input_stream;
FILE* c_file = std::fopen(filename.c_str(), "rb");
UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
/*close_on_release*/ true));

UNWRAP_ERRNO(Internal,
ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
return -1;
}

Result<int64_t> ExecuteQueryImpl(ArrowArrayStream* stream) {
return adbc::driver::status::NotImplemented("ExecuteQuery");
// This path is taken when the user calls Prepare() first.
Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
QueryState query_state{state.query};
return ExecuteQueryImpl(query_state, stream);
}

private:
std::string uri_;
nanoarrow::UniqueArrayStream bind_;
};

} // namespace
Expand Down

0 comments on commit 0eb7a56

Please sign in to comment.