Skip to content

Commit

Permalink
Missed ExecutePartitions on importer
Browse files Browse the repository at this point in the history
  • Loading branch information
CurtHagenlocher committed Apr 25, 2024
1 parent 4d2167b commit f4b119c
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,21 @@ public override unsafe UpdateResult ExecuteUpdate()
return new UpdateResult(rows);
}
}

public override unsafe PartitionedResult ExecutePartitioned()
{
using (CallHelper caller = new CallHelper())
{
if (SqlQuery != null)
{
// TODO: Consider moving this to the setter
caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery);
}

caller.Call(_nativeDriver.StatementExecutePartitions, ref _nativeStatement, out PartitionedResult result);
return result;
}
}
}

/// <summary>
Expand Down Expand Up @@ -719,6 +734,64 @@ public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, CArrowArr
}
#endif

#if NET5_0_OR_GREATER
public unsafe void Call(delegate* unmanaged<CAdbcStatement*, CArrowSchema*, CAdbcPartitions*, long*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcStatement nativeStatement, out PartitionedResult result)
#else
public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, out PartitionedResult result)
#endif
{
fixed (CAdbcStatement* stmt = &nativeStatement)
fixed (CAdbcError* e = &_error)
{
CAdbcPartitions* nativePartitions = null;
CArrowSchema* nativeSchema = null;
long rowsAffected = 0;
try
{
nativePartitions = (CAdbcPartitions*)Marshal.AllocHGlobal(sizeof(CAdbcPartitions));
nativeSchema = CArrowSchema.Create();

#if NET5_0_OR_GREATER
TranslateCode(fn(stmt, nativeSchema, nativePartitions, &rowsAffected, e));
#else
TranslateCode(Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.StatementExecutePartitions>(fn)(stmt, nativeSchema, nativePartitions, &rowsAffected, e));
#endif

PartitionDescriptor[] partitions = new PartitionDescriptor[nativePartitions->num_partitions];
for (int i = 0; i < partitions.Length; i++)
{
partitions[i] = new PartitionDescriptor(MarshalExtensions.MarshalBuffer(nativePartitions->partitions[i], checked((int)nativePartitions->partition_lengths[i])));
}

result = new PartitionedResult(
CArrowSchemaImporter.ImportSchema(nativeSchema),
rowsAffected,
partitions);
}
finally
{
if (nativePartitions->release != null)
{
#if NET5_0_OR_GREATER
nativePartitions->release(nativePartitions);
#else
Marshal.GetDelegateForFunctionPointer<CAdbcDriverExporter.PartitionsRelease>(nativePartitions->release)(nativePartitions);
#endif
}

if (nativePartitions != null)
{
Marshal.FreeHGlobal((IntPtr)nativePartitions);
}

if (nativeSchema != null)
{
CArrowSchema.Free(nativeSchema);
}
}
}
}

#if NET5_0_OR_GREATER
public unsafe void Call(delegate* unmanaged<CAdbcConnection*, byte*, byte*, byte*, CArrowSchema*, CAdbcError*, AdbcStatusCode> fn, ref CAdbcConnection nativeconnection, string catalog, string dbSchema, string tableName, CArrowSchema* nativeSchema)
{
Expand Down

0 comments on commit f4b119c

Please sign in to comment.