From f4b119c627e1c7bc99cacd624de959daa832ffeb Mon Sep 17 00:00:00 2001 From: Curt Hagenlocher Date: Thu, 25 Apr 2024 12:26:45 -0700 Subject: [PATCH] Missed ExecutePartitions on importer --- .../C/CAdbcDriverImporter.cs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs index 4a453dbe4d..51b5c0d0a4 100644 --- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs +++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs @@ -357,6 +357,21 @@ public override unsafe UpdateResult ExecuteUpdate() return new UpdateResult(rows); } } + + public override unsafe PartitionedResult ExecutePartitioned() + { + using (CallHelper caller = new CallHelper()) + { + if (SqlQuery != null) + { + // TODO: Consider moving this to the setter + caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery); + } + + caller.Call(_nativeDriver.StatementExecutePartitions, ref _nativeStatement, out PartitionedResult result); + return result; + } + } } /// @@ -719,6 +734,64 @@ public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, CArrowArr } #endif +#if NET5_0_OR_GREATER + public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement, out PartitionedResult result) +#else + public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, out PartitionedResult result) +#endif + { + fixed (CAdbcStatement* stmt = &nativeStatement) + fixed (CAdbcError* e = &_error) + { + CAdbcPartitions* nativePartitions = null; + CArrowSchema* nativeSchema = null; + long rowsAffected = 0; + try + { + nativePartitions = (CAdbcPartitions*)Marshal.AllocHGlobal(sizeof(CAdbcPartitions)); + nativeSchema = CArrowSchema.Create(); + +#if NET5_0_OR_GREATER + TranslateCode(fn(stmt, nativeSchema, nativePartitions, &rowsAffected, e)); +#else + TranslateCode(Marshal.GetDelegateForFunctionPointer(fn)(stmt, nativeSchema, nativePartitions, &rowsAffected, e)); +#endif + + PartitionDescriptor[] partitions = new PartitionDescriptor[nativePartitions->num_partitions]; + for (int i = 0; i < partitions.Length; i++) + { + partitions[i] = new PartitionDescriptor(MarshalExtensions.MarshalBuffer(nativePartitions->partitions[i], checked((int)nativePartitions->partition_lengths[i]))); + } + + result = new PartitionedResult( + CArrowSchemaImporter.ImportSchema(nativeSchema), + rowsAffected, + partitions); + } + finally + { + if (nativePartitions->release != null) + { +#if NET5_0_OR_GREATER + nativePartitions->release(nativePartitions); +#else + Marshal.GetDelegateForFunctionPointer(nativePartitions->release)(nativePartitions); +#endif + } + + if (nativePartitions != null) + { + Marshal.FreeHGlobal((IntPtr)nativePartitions); + } + + if (nativeSchema != null) + { + CArrowSchema.Free(nativeSchema); + } + } + } + } + #if NET5_0_OR_GREATER public unsafe void Call(delegate* unmanaged fn, ref CAdbcConnection nativeconnection, string catalog, string dbSchema, string tableName, CArrowSchema* nativeSchema) {