From 4d2167becd2fed63bef5ad51480d8b2ee5bba983 Mon Sep 17 00:00:00 2001 From: Curt Hagenlocher Date: Thu, 25 Apr 2024 09:59:29 -0700 Subject: [PATCH] Implement remaining functions in 1.0 spec. Closes #1221 --- csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs | 18 +- .../C/CAdbcDriverExporter.cs | 179 +++++++++++++++--- .../C/CAdbcDriverImporter.cs | 149 +++++++++++++-- ...ns.netstandard.cs => MarshalExtensions.cs} | 29 ++- .../Apache.Arrow.Adbc/PartitionDescriptor.cs | 2 + 5 files changed, 332 insertions(+), 45 deletions(-) rename csharp/src/Apache.Arrow.Adbc/Extensions/{MarshalExtensions.netstandard.cs => MarshalExtensions.cs} (79%) diff --git a/csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs b/csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs index 91486be475..66994a1322 100644 --- a/csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs +++ b/csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs @@ -44,15 +44,29 @@ public AdbcStatement() /// public virtual byte[] SubstraitPlan { - get { throw new NotImplementedException(); } - set { throw new NotImplementedException(); } + get { throw AdbcException.NotImplemented("Statement does not support SubstraitPlan"); } + set { throw AdbcException.NotImplemented("Statement does not support SubstraitPlan"); } } + /// + /// Binds this statement to a to provide parameter values or bulk data ingestion. + /// + /// the RecordBatch to bind + /// the schema of the RecordBatch public virtual void Bind(RecordBatch batch, Schema schema) { throw AdbcException.NotImplemented("Statement does not support Bind"); } + /// + /// Binds this statement to an to provide parameter values or bulk data ingestion. + /// + /// + public virtual void BindStream(IArrowArrayStream stream) + { + throw AdbcException.NotImplemented("Statement does not support BindStream"); + } + /// /// Executes the statement and returns a tuple containing the number /// of records and the . diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs index 04fea40a05..9241e97019 100644 --- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs +++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs @@ -19,13 +19,10 @@ using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; +using Apache.Arrow.Adbc.Extensions; using Apache.Arrow.C; using Apache.Arrow.Ipc; -#if NETSTANDARD -using Apache.Arrow.Adbc.Extensions; -#endif - namespace Apache.Arrow.Adbc.C { public class CAdbcDriverExporter @@ -38,6 +35,7 @@ public class CAdbcDriverExporter #if NET5_0_OR_GREATER private static unsafe delegate* unmanaged ReleaseErrorPtr => (delegate* unmanaged)s_releaseError.Pointer; private static unsafe delegate* unmanaged ReleaseDriverPtr => &ReleaseDriver; + private static unsafe delegate* unmanaged ReleasePartitionsPtr => &ReleasePartitions; private static unsafe delegate* unmanaged DatabaseInitPtr => &InitDatabase; private static unsafe delegate* unmanaged DatabaseReleasePtr => &ReleaseDatabase; @@ -55,16 +53,23 @@ public class CAdbcDriverExporter private static unsafe delegate* unmanaged ConnectionSetOptionPtr => &SetConnectionOption; private static unsafe delegate* unmanaged StatementBindPtr => &BindStatement; + private static unsafe delegate* unmanaged StatementBindStreamPtr => &BindStreamStatement; private static unsafe delegate* unmanaged StatementExecuteQueryPtr => &ExecuteStatementQuery; + private static unsafe delegate* unmanaged StatementExecutePartitionsPtr => &ExecuteStatementPartitions; private static unsafe delegate* unmanaged StatementNewPtr => &NewStatement; private static unsafe delegate* unmanaged StatementReleasePtr => &ReleaseStatement; private static unsafe delegate* unmanaged StatementPreparePtr => &PrepareStatement; private static unsafe delegate* unmanaged StatementSetSqlQueryPtr => &SetStatementSqlQuery; + private static unsafe delegate* unmanaged StatementSetSubstraitPlanPtr => &SetStatementSubstraitPlan; + private static unsafe delegate* unmanaged StatementGetParameterSchemaPtr => &GetStatementParameterSchema; #else private static IntPtr ReleaseErrorPtr => s_releaseError.Pointer; internal unsafe delegate AdbcStatusCode DriverRelease(CAdbcDriver* driver, CAdbcError* error); private static unsafe readonly NativeDelegate s_releaseDriver = new NativeDelegate(ReleaseDriver); private static IntPtr ReleaseDriverPtr => s_releaseDriver.Pointer; + internal unsafe delegate void PartitionsRelease(CAdbcPartitions* partitions); + private static unsafe readonly NativeDelegate s_releasePartitions = new NativeDelegate(ReleasePartitions); + private static IntPtr ReleasePartitionsPtr => s_releasePartitions.Pointer; private static unsafe readonly NativeDelegate s_databaseInit = new NativeDelegate(InitDatabase); private static IntPtr DatabaseInitPtr => s_databaseInit.Pointer; @@ -102,12 +107,18 @@ public class CAdbcDriverExporter private static unsafe readonly NativeDelegate s_connectionSetOption = new NativeDelegate(SetConnectionOption); private static IntPtr ConnectionSetOptionPtr => s_connectionSetOption.Pointer; - private unsafe delegate AdbcStatusCode StatementBind(CAdbcStatement* statement, CArrowArray* array, CArrowSchema* schema, CAdbcError* error); + internal unsafe delegate AdbcStatusCode StatementBind(CAdbcStatement* statement, CArrowArray* array, CArrowSchema* schema, CAdbcError* error); private static unsafe readonly NativeDelegate s_statementBind = new NativeDelegate(BindStatement); private static IntPtr StatementBindPtr => s_statementBind.Pointer; + internal unsafe delegate AdbcStatusCode StatementBindStream(CAdbcStatement* statement, CArrowArrayStream* stream, CAdbcError* error); + private static unsafe readonly NativeDelegate s_statementBindStream = new NativeDelegate(BindStreamStatement); + private static IntPtr StatementBindStreamPtr => s_statementBindStream.Pointer; internal unsafe delegate AdbcStatusCode StatementExecuteQuery(CAdbcStatement* statement, CArrowArrayStream* stream, long* rows, CAdbcError* error); private static unsafe readonly NativeDelegate s_statementExecuteQuery = new NativeDelegate(ExecuteStatementQuery); private static IntPtr StatementExecuteQueryPtr = s_statementExecuteQuery.Pointer; + internal unsafe delegate AdbcStatusCode StatementExecutePartitions(CAdbcStatement* statement, CArrowSchema* schema, CAdbcPartitions* partitions, long* rows, CAdbcError* error); + private static unsafe readonly NativeDelegate s_statementExecutePartitions = new NativeDelegate(ExecuteStatementPartitions); + private static IntPtr StatementExecutePartitionsPtr = s_statementExecutePartitions.Pointer; internal unsafe delegate AdbcStatusCode StatementNew(CAdbcConnection* connection, CAdbcStatement* statement, CAdbcError* error); private static unsafe readonly NativeDelegate s_statementNew = new NativeDelegate(NewStatement); private static IntPtr StatementNewPtr => s_statementNew.Pointer; @@ -119,17 +130,14 @@ public class CAdbcDriverExporter internal unsafe delegate AdbcStatusCode StatementSetSqlQuery(CAdbcStatement* statement, byte* text, CAdbcError* error); private static unsafe readonly NativeDelegate s_statementSetSqlQuery = new NativeDelegate(SetStatementSqlQuery); private static IntPtr StatementSetSqlQueryPtr = s_statementSetSqlQuery.Pointer; + internal unsafe delegate AdbcStatusCode StatementSetSubstraitPlan(CAdbcStatement* statement, byte* plan, int length, CAdbcError* error); + private static unsafe readonly NativeDelegate s_statementSetSubstraitPlan = new NativeDelegate(SetStatementSubstraitPlan); + private static IntPtr StatementSetSubstraitPlanPtr = s_statementSetSubstraitPlan.Pointer; + internal unsafe delegate AdbcStatusCode StatementGetParameterSchema(CAdbcStatement* statement, CArrowSchema* schema, CAdbcError* error); + private static unsafe readonly NativeDelegate s_statementGetParameterSchema = new NativeDelegate(GetStatementParameterSchema); + private static IntPtr StatementGetParameterSchemaPtr = s_statementGetParameterSchema.Pointer; #endif - /* - * Not yet implemented - - unsafe delegate AdbcStatusCode StatementBindStream(CAdbcStatement* statement, CArrowArrayStream* stream, CAdbcError* error); - unsafe delegate AdbcStatusCode StatementExecutePartitions(CAdbcStatement* statement, CArrowSchema* schema, CAdbcPartitions* partitions, long* rows_affected, CAdbcError* error); - unsafe delegate AdbcStatusCode StatementGetParameterSchema(CAdbcStatement* statement, CArrowSchema* schema, CAdbcError* error); - unsafe delegate AdbcStatusCode StatementSetSubstraitPlan(CAdbcStatement statement, byte* plan, int length, CAdbcError error); - */ - public unsafe static AdbcStatusCode AdbcDriverInit(int version, CAdbcDriver* nativeDriver, CAdbcError* error, AdbcDriver driver) { DriverStub stub = new DriverStub(driver); @@ -142,7 +150,6 @@ public unsafe static AdbcStatusCode AdbcDriverInit(int version, CAdbcDriver* nat nativeDriver->DatabaseSetOption = DatabaseSetOptionPtr; nativeDriver->DatabaseRelease = DatabaseReleasePtr; - // TODO: This should probably only set the pointers for the functionality actually supported by this particular driver nativeDriver->ConnectionCommit = ConnectionCommitPtr; nativeDriver->ConnectionGetInfo = ConnectionGetInfoPtr; nativeDriver->ConnectionGetObjects = ConnectionGetObjectsPtr; @@ -156,15 +163,15 @@ public unsafe static AdbcStatusCode AdbcDriverInit(int version, CAdbcDriver* nat nativeDriver->ConnectionRollback = ConnectionRollbackPtr; nativeDriver->StatementBind = StatementBindPtr; - // nativeDriver->StatementBindStream = StatementBindStreamPtr; + nativeDriver->StatementBindStream = StatementBindStreamPtr; nativeDriver->StatementExecuteQuery = StatementExecuteQueryPtr; - // nativeDriver->StatementExecutePartitions = StatementExecutePartitionsPtr; - // nativeDriver->StatementGetParameterSchema = StatementGetParameterSchemaPtr; + nativeDriver->StatementExecutePartitions = StatementExecutePartitionsPtr; + nativeDriver->StatementGetParameterSchema = StatementGetParameterSchemaPtr; nativeDriver->StatementNew = StatementNewPtr; nativeDriver->StatementPrepare = StatementPreparePtr; nativeDriver->StatementRelease = StatementReleasePtr; nativeDriver->StatementSetSqlQuery = StatementSetSqlQueryPtr; - // nativeDriver->StatementSetSubstraitPlan = StatementSetSubstraitPlanPtr; + nativeDriver->StatementSetSubstraitPlan = StatementSetSubstraitPlanPtr; return 0; } @@ -181,12 +188,7 @@ private unsafe static AdbcStatusCode SetError(CAdbcError* error, Exception excep { ReleaseError(error); -#if NETSTANDARD error->message = (byte*)MarshalExtensions.StringToCoTaskMemUTF8(exception.Message); -#else - error->message = (byte*)Marshal.StringToCoTaskMemUTF8(exception.Message); -#endif - error->sqlstate0 = (byte)0; error->sqlstate1 = (byte)0; error->sqlstate2 = (byte)0; @@ -249,6 +251,37 @@ private unsafe static AdbcStatusCode ReleaseDriver(CAdbcDriver* nativeDriver, CA } } +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static void ReleasePartitions(CAdbcPartitions* partitions) + { + if (partitions != null) + { + if (partitions->partitions != null) + { + for (int i = 0; i < partitions->num_partitions; i++) + { + byte* partition = partitions->partitions[i]; + if (partition != null) + { + Marshal.FreeHGlobal((IntPtr)partition); + partitions->partitions[i] = null; + } + } + Marshal.FreeHGlobal((IntPtr)partitions->partitions); + partitions->partitions = null; + } + if (partitions->partition_lengths != null) + { + Marshal.FreeHGlobal((IntPtr)partitions->partition_lengths); + partitions->partition_lengths = null; + } + + partitions->release = default; + } + } + #if NET5_0_OR_GREATER [UnmanagedCallersOnly] #endif @@ -512,6 +545,46 @@ private unsafe static AdbcStatusCode SetStatementSqlQuery(CAdbcStatement* native } } +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static AdbcStatusCode SetStatementSubstraitPlan(CAdbcStatement* nativeStatement, byte* plan, int length, CAdbcError* error) + { + try + { + GCHandle gch = GCHandle.FromIntPtr((IntPtr)nativeStatement->private_data); + AdbcStatement stub = (AdbcStatement)gch.Target; + + stub.SubstraitPlan = MarshalExtensions.MarshalBuffer(plan, length); + + return AdbcStatusCode.Success; + } + catch (Exception e) + { + return SetError(error, e); + } + } + +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static AdbcStatusCode GetStatementParameterSchema(CAdbcStatement* nativeStatement, CArrowSchema* schema, CAdbcError* error) + { + try + { + GCHandle gch = GCHandle.FromIntPtr((IntPtr)nativeStatement->private_data); + AdbcStatement stub = (AdbcStatement)gch.Target; + + CArrowSchemaExporter.ExportSchema(stub.GetParameterSchema(), schema); + + return AdbcStatusCode.Success; + } + catch (Exception e) + { + return SetError(error, e); + } + } + #if NET5_0_OR_GREATER [UnmanagedCallersOnly] #endif @@ -533,6 +606,26 @@ private unsafe static AdbcStatusCode BindStatement(CAdbcStatement* nativeStateme } } +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static AdbcStatusCode BindStreamStatement(CAdbcStatement* nativeStatement, CArrowArrayStream* stream, CAdbcError* error) + { + try + { + GCHandle gch = GCHandle.FromIntPtr((IntPtr)nativeStatement->private_data); + AdbcStatement stub = (AdbcStatement)gch.Target; + + IArrowArrayStream arrayStream = CArrowArrayStreamImporter.ImportArrayStream(stream); + stub.BindStream(arrayStream); + return AdbcStatusCode.Success; + } + catch (Exception e) + { + return SetError(error, e); + } + } + #if NET5_0_OR_GREATER [UnmanagedCallersOnly] #endif @@ -557,6 +650,44 @@ private unsafe static AdbcStatusCode ExecuteStatementQuery(CAdbcStatement* nativ } } +#if NET5_0_OR_GREATER + [UnmanagedCallersOnly] +#endif + private unsafe static AdbcStatusCode ExecuteStatementPartitions(CAdbcStatement* nativeStatement, CArrowSchema* schema, CAdbcPartitions* partitions, long* rows, CAdbcError* error) + { + try + { + GCHandle gch = GCHandle.FromIntPtr((IntPtr)nativeStatement->private_data); + AdbcStatement stub = (AdbcStatement)gch.Target; + var result = stub.ExecutePartitioned(); + if (rows != null) + { + *rows = result.AffectedRows; + } + + partitions->release = ReleasePartitionsPtr; + partitions->num_partitions = result.PartitionDescriptors.Count; + partitions->partitions = (byte**)Marshal.AllocHGlobal(IntPtr.Size * result.PartitionDescriptors.Count); + partitions->partition_lengths = (nuint*)Marshal.AllocHGlobal(IntPtr.Size * result.PartitionDescriptors.Count); + for (int i = 0; i < partitions->num_partitions; i++) + { + ReadOnlySpan partition = result.PartitionDescriptors[i].Descriptor; + partitions->partition_lengths[i] = (nuint)partition.Length; + partitions->partitions[i] = (byte*)Marshal.AllocHGlobal(partition.Length); + fixed (void* descriptor = partition) + { + Buffer.MemoryCopy(descriptor, partitions->partitions[i], partition.Length, partition.Length); + } + } + + return AdbcStatusCode.Success; + } + catch (Exception e) + { + return SetError(error, e); + } + } + #if NET5_0_OR_GREATER [UnmanagedCallersOnly] #endif diff --git a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs index 0624acdcb5..4a453dbe4d 100644 --- a/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs +++ b/csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs @@ -17,14 +17,13 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; -using System.Linq; using System.Runtime.InteropServices; using Apache.Arrow.Adbc.Extensions; using Apache.Arrow.C; using Apache.Arrow.Ipc; - namespace Apache.Arrow.Adbc.C { internal delegate AdbcStatusCode AdbcDriverInit(int version, ref CAdbcDriver driver, ref CAdbcError error); @@ -284,6 +283,7 @@ sealed class AdbcStatementNative : AdbcStatement { private CAdbcDriver _nativeDriver; private CAdbcStatement _nativeStatement; + private byte[] _substraitPlan; public AdbcStatementNative(CAdbcDriver nativeDriver, CAdbcStatement nativeStatement) { @@ -291,13 +291,46 @@ public AdbcStatementNative(CAdbcDriver nativeDriver, CAdbcStatement nativeStatem _nativeStatement = nativeStatement; } + public unsafe override byte[] SubstraitPlan + { + get => _substraitPlan; + set + { + using (CallHelper caller = new CallHelper()) + { + caller.Call(_nativeDriver.StatementSetSubstraitPlan, ref _nativeStatement, value); + _substraitPlan = value; + } + } + } + + public unsafe override void Bind(RecordBatch batch, Schema schema) + { + using (CallHelper caller = new CallHelper()) + { + caller.Call(_nativeDriver.StatementBind, ref _nativeStatement, batch, schema); + } + } + + public unsafe override void BindStream(IArrowArrayStream stream) + { + using (CallHelper caller = new CallHelper()) + { + caller.Call(_nativeDriver.StatementBindStream, ref _nativeStatement, stream); + } + } + public unsafe override QueryResult ExecuteQuery() { CArrowArrayStream* nativeArrayStream = CArrowArrayStream.Create(); using (CallHelper caller = new CallHelper()) { - caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery); + if (SqlQuery != null) + { + // TODO: Consider moving this to the setter + caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery); + } long rows = 0; @@ -311,7 +344,11 @@ public override unsafe UpdateResult ExecuteUpdate() { using (CallHelper caller = new CallHelper()) { - caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery); + if (SqlQuery != null) + { + // TODO: Consider moving this to the setter + caller.Call(_nativeDriver.StatementSetSqlQuery, ref _nativeStatement, SqlQuery); + } long rows = 0; @@ -331,11 +368,7 @@ private struct Utf8Helper : IDisposable public Utf8Helper(string s) { -#if NETSTANDARD _s = MarshalExtensions.StringToCoTaskMemUTF8(s); -#else - _s = Marshal.StringToCoTaskMemUTF8(s); -#endif } public static implicit operator IntPtr(Utf8Helper s) { return s._s; } @@ -345,9 +378,12 @@ public Utf8Helper(string s) /// /// Assists with delegate calls and handling error codes /// - private struct CallHelper : IDisposable + private unsafe struct CallHelper : IDisposable { private CAdbcError _error; + private CArrowArray* _array; + private CArrowSchema* _schema; + private CArrowArrayStream* _arrayStream; public unsafe void Call(AdbcDriverInit init, int version, ref CAdbcDriver driver) { @@ -538,6 +574,57 @@ public unsafe void Call(IntPtr fn, ref CAdbcConnection nativeConnection, ref CAd } #endif +#if NET5_0_OR_GREATER + public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement, RecordBatch batch, Schema schema) +#else + public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, RecordBatch batch, Schema schema) +#endif + { + fixed (CAdbcStatement* stmt = &nativeStatement) + fixed (CAdbcError* e = &_error) + { + Debug.Assert(_array == null); + _array = CArrowArray.Create(); + CArrowArrayExporter.ExportRecordBatch(batch, _array); + + Debug.Assert(_schema == null); + _schema = CArrowSchema.Create(); + CArrowSchemaExporter.ExportSchema(schema, _schema); + +#if NET5_0_OR_GREATER + TranslateCode(fn(stmt, _array, _schema, e)); +#else + TranslateCode(Marshal.GetDelegateForFunctionPointer(fn)(stmt, _array, _schema, e)); +#endif + + _array = null; + _schema = null; + } + } + +#if NET5_0_OR_GREATER + public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement, IArrowArrayStream stream) +#else + public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, IArrowArrayStream stream) +#endif + { + fixed (CAdbcStatement* stmt = &nativeStatement) + fixed (CAdbcError* e = &_error) + { + Debug.Assert(_arrayStream == null); + _arrayStream = CArrowArrayStream.Create(); + CArrowArrayStreamExporter.ExportArrayStream(stream, _arrayStream); + +#if NET5_0_OR_GREATER + TranslateCode(fn(stmt, _arrayStream, e)); +#else + TranslateCode(Marshal.GetDelegateForFunctionPointer(fn)(stmt, _arrayStream, e)); +#endif + + _arrayStream = null; + } + } + #if NET5_0_OR_GREATER public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement) { @@ -588,6 +675,28 @@ public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, string sq } #endif +#if NET5_0_OR_GREATER + public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement, byte[] substraitPlan) + { + fixed (CAdbcStatement* stmt = &nativeStatement) + fixed (byte* plan = substraitPlan) + fixed (CAdbcError* e = &_error) + { + TranslateCode(fn(stmt, plan, substraitPlan.Length, e)); + } + } +#else + public unsafe void Call(IntPtr fn, ref CAdbcStatement nativeStatement, byte[] substraitPlan) + { + fixed (CAdbcStatement* stmt = &nativeStatement) + fixed (byte* plan = substraitPlan) + fixed (CAdbcError* e = &_error) + { + TranslateCode(Marshal.GetDelegateForFunctionPointer(fn)(stmt, plan, substraitPlan.Length, e)); + } + } +#endif + #if NET5_0_OR_GREATER public unsafe void Call(delegate* unmanaged fn, ref CAdbcStatement nativeStatement, CArrowArrayStream* arrowStream, ref long nRows) { @@ -686,6 +795,22 @@ public unsafe void Dispose() _error.release = default; } } + + if (_array != null) + { + CArrowArray.Free(_array); + _array = null; + } + if (_schema != null) + { + CArrowSchema.Free(_schema); + _schema = null; + } + if (_arrayStream != null) + { + CArrowArrayStream.Free(_arrayStream); + _arrayStream = null; + } } #if NET5_0_OR_GREATER @@ -707,7 +832,7 @@ public unsafe void Call(IntPtr ptr, ref CAdbcConnection connection, IReadOnlyLis fixed (CAdbcConnection* cn = &connection) fixed (CAdbcError* e = &_error) { - Span span = infoCodes.ToArray().AsSpan(); + Span span = infoCodes.AsSpan(); fixed (AdbcInfoCode* spanPtr = span) { TranslateCode(Marshal.GetDelegateForFunctionPointer(ptr)(cn, (int*)spanPtr, infoCodes.Count, stream, e)); @@ -736,11 +861,7 @@ public unsafe void Call(IntPtr fn, ref CAdbcConnection connection, int depth, st for (int i = 0; i < table_types.Count; i++) { string tableType = table_types[i]; -#if NETSTANDARD bTable_type[i] = (byte*)MarshalExtensions.StringToCoTaskMemUTF8(tableType); -#else - bTable_type[i] = (byte*)Marshal.StringToCoTaskMemUTF8(tableType); -#endif } try diff --git a/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.netstandard.cs b/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs similarity index 79% rename from csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.netstandard.cs rename to csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs index 56039785fe..ce0b50b25d 100644 --- a/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.netstandard.cs +++ b/csharp/src/Apache.Arrow.Adbc/Extensions/MarshalExtensions.cs @@ -15,8 +15,6 @@ * limitations under the License. */ -#if NETSTANDARD - using System; using System.Runtime.InteropServices; using System.Text; @@ -25,6 +23,7 @@ namespace Apache.Arrow.Adbc.Extensions { public static class MarshalExtensions { +#if NETSTANDARD public static unsafe string PtrToStringUTF8(IntPtr intPtr) { if (intPtr == IntPtr.Zero) @@ -60,7 +59,7 @@ public static unsafe IntPtr StringToCoTaskMemUTF8(string s) int nb = Encoding.UTF8.GetMaxByteCount(s.Length); - IntPtr pMem = Marshal.AllocCoTaskMem(nb + 1); + IntPtr pMem = Marshal.AllocHGlobal(nb + 1); int nbWritten; byte* pbMem = (byte*)pMem; @@ -74,7 +73,27 @@ public static unsafe IntPtr StringToCoTaskMemUTF8(string s) return pMem; } +#else + public static IntPtr StringToCoTaskMemUTF8(string s) + { + return Marshal.StringToCoTaskMemUTF8(s); + } +#endif + + public static unsafe byte[] MarshalBuffer(void* ptr, int size) + { + if (ptr == null) + { + return null; + } + + byte[] bytes = new byte[size]; + fixed (byte* firstByte = bytes) + { + Buffer.MemoryCopy(ptr, firstByte, size, size); + } + + return bytes; + } } } - -#endif diff --git a/csharp/src/Apache.Arrow.Adbc/PartitionDescriptor.cs b/csharp/src/Apache.Arrow.Adbc/PartitionDescriptor.cs index 16c10755fd..5f62da35cb 100644 --- a/csharp/src/Apache.Arrow.Adbc/PartitionDescriptor.cs +++ b/csharp/src/Apache.Arrow.Adbc/PartitionDescriptor.cs @@ -32,6 +32,8 @@ public PartitionDescriptor(byte[] descriptor) _descriptor = descriptor; } + public ReadOnlySpan Descriptor => _descriptor; + public override bool Equals(object obj) { PartitionDescriptor? other = obj as PartitionDescriptor?;