Skip to content

Commit

Permalink
add support for splitting queries, including a custom function for do…
Browse files Browse the repository at this point in the history
…ing so
  • Loading branch information
David Coe committed Jan 11, 2024
1 parent cfe63dc commit e29804b
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 37 deletions.
88 changes: 58 additions & 30 deletions csharp/src/Client/AdbcCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,22 @@
*/

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Client
{
public enum AdbcCommandType
{
Create,
Read,
Update,
Delete
}

/// <summary>
/// Creates an ADO.NET command over an Adbc statement.
/// </summary>
public sealed class AdbcCommand : DbCommand
{
private AdbcStatement adbcStatement;
private AdbcStatement _adbcStatement;
private int _timeout = 30;
private AdbcCommandType _adbcCommandType = AdbcCommandType.Read;
public QueryConfiguration _queryConfiguration;

/// <summary>
/// Overloaded. Initializes <see cref="AdbcCommand"/>.
Expand All @@ -57,9 +51,10 @@ public AdbcCommand(AdbcStatement adbcStatement, AdbcConnection adbcConnection) :
if(adbcConnection == null)
throw new ArgumentNullException(nameof(adbcConnection));

this.adbcStatement = adbcStatement;
this._adbcStatement = adbcStatement;
this.DbConnection = adbcConnection;
this.DecimalBehavior = adbcConnection.DecimalBehavior;
this._queryConfiguration = new QueryConfiguration();
}

/// <summary>
Expand All @@ -70,12 +65,12 @@ public AdbcCommand(AdbcStatement adbcStatement, AdbcConnection adbcConnection) :
public AdbcCommand(string query, AdbcConnection adbcConnection) : base()
{
if (string.IsNullOrEmpty(query))
throw new ArgumentNullException(nameof(adbcStatement));
throw new ArgumentNullException(nameof(_adbcStatement));

if (adbcConnection == null)
throw new ArgumentNullException(nameof(adbcConnection));

this.adbcStatement = adbcConnection.AdbcStatement;
this._adbcStatement = adbcConnection.AdbcStatement;
this.CommandText = query;

this.DbConnection = adbcConnection;
Expand All @@ -86,20 +81,20 @@ public AdbcCommand(string query, AdbcConnection adbcConnection) : base()
/// Gets the <see cref="AdbcStatement"/> associated with
/// this <see cref="AdbcCommand"/>.
/// </summary>
public AdbcStatement AdbcStatement => this.adbcStatement;
public AdbcStatement AdbcStatement => this._adbcStatement;

public DecimalBehavior DecimalBehavior { get; set; }

public override string CommandText
{
get => this.adbcStatement.SqlQuery;
set => this.adbcStatement.SqlQuery = value;
get => this._adbcStatement.SqlQuery;
set => this._adbcStatement.SqlQuery = value;
}

public AdbcCommandType AdbcCommandType
public QueryConfiguration QueryConfiguration
{
get => this._adbcCommandType;
set => this._adbcCommandType = value;
get => this._queryConfiguration;
set => this._queryConfiguration = value;
}

public override CommandType CommandType
Expand All @@ -108,7 +103,6 @@ public override CommandType CommandType
{
return CommandType.Text;
}

set
{
if (value != CommandType.Text)
Expand All @@ -129,8 +123,8 @@ public override int CommandTimeout
/// </summary>
public byte[] SubstraitPlan
{
get => this.adbcStatement.SubstraitPlan;
set => this.adbcStatement.SubstraitPlan = value;
get => this._adbcStatement.SubstraitPlan;
set => this._adbcStatement.SubstraitPlan = value;
}

protected override DbConnection DbConnection { get; set; }
Expand All @@ -145,7 +139,7 @@ public override int ExecuteNonQuery()
/// </summary>
public UpdateResult ExecuteUpdate()
{
return this.adbcStatement.ExecuteUpdate();
return this._adbcStatement.ExecuteUpdate();
}

/// <summary>
Expand All @@ -154,7 +148,7 @@ public UpdateResult ExecuteUpdate()
/// <returns><see cref="Result"></returns>
public QueryResult ExecuteQuery()
{
QueryResult executed = this.adbcStatement.ExecuteQuery();
QueryResult executed = this._adbcStatement.ExecuteQuery();

return executed;
}
Expand Down Expand Up @@ -186,15 +180,49 @@ protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
{
case CommandBehavior.SchemaOnly: // The schema is not known until a read happens
case CommandBehavior.Default:
if (this.AdbcCommandType == AdbcCommandType.Read)

// ADBC doesn't have very good support for multi-statements
// see https://github.com/apache/arrow-adbc/issues/1358
// so this attempts to work around that by making multiple calls
// it will return the first result set and the "RecordsAffected" for any other type of calls

if (this.QueryConfiguration != null)
{
QueryResult result = this.ExecuteQuery();
return new AdbcDataReader(this, result, this.DecimalBehavior);
QueryParser queryParser = new QueryParser(this.QueryConfiguration);
List<Query> queries = queryParser.ParseQuery(this.CommandText);

QueryResult queryResult = null;
int recordsEffected = -1;

foreach(Query q in queries)
{
if (q.Type == QueryType.Read)
{
if(queryResult == null)
{
this._adbcStatement.SqlQuery = q.Text;
queryResult = this.ExecuteQuery();
}
}
else
{
if(recordsEffected == -1)
recordsEffected++;

this._adbcStatement.SqlQuery = q.Text;
recordsEffected += this.ExecuteNonQuery();
}
}

if (queryResult != null)
return new AdbcDataReader(this, queryResult, this.DecimalBehavior, recordsEffected);
else
return new AdbcDataReader(recordsEffected);
}
else
{
UpdateResult result = this.ExecuteUpdate();
return new AdbcDataReader(result);
QueryResult result = this.ExecuteQuery();
return new AdbcDataReader(this, result, this.DecimalBehavior);
}

default:
Expand All @@ -207,7 +235,7 @@ protected override void Dispose(bool disposing)
if(disposing)
{
// TODO: ensure not in the middle of pulling
this.adbcStatement?.Dispose();
this._adbcStatement?.Dispose();
}

base.Dispose(disposing);
Expand Down
10 changes: 4 additions & 6 deletions csharp/src/Client/AdbcDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public sealed class AdbcDataReader : DbDataReader, IDbColumnSchemaGenerator
// this is only set if it's not a SELECT statement
private int recordsEffected = -1;

internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, DecimalBehavior decimalBehavior)
internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, DecimalBehavior decimalBehavior, int recordsEffected=-1)
{
if (adbcCommand == null)
throw new ArgumentNullException(nameof(adbcCommand));
Expand All @@ -62,14 +62,12 @@ internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, De

this.isClosed = false;
this.DecimalBehavior = decimalBehavior;
this.recordsEffected = recordsEffected;
}

internal AdbcDataReader(UpdateResult updateResult)
internal AdbcDataReader(int recordsEffected)
{
if (updateResult == null)
throw new ArgumentNullException(nameof(updateResult));

this.recordsEffected = Convert.ToInt32(updateResult.AffectedRows);
this.recordsEffected = recordsEffected;
}

public override object this[int ordinal] => GetValue(ordinal);
Expand Down
18 changes: 18 additions & 0 deletions csharp/src/Client/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
Loading

0 comments on commit e29804b

Please sign in to comment.