-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(csharp/src/Drivers): introduce drivers for Apache systems built on Thrift #1710
Changes from 91 commits
b479cd0
0116e21
e14559f
f4899bf
0cc6ee6
2757e5c
e5af9f6
ac9187f
fb7035a
45fd0c7
9084a13
666d012
6652330
df4fdec
56ac604
964d279
96448f7
d2fb852
3a3a224
b927c31
20e81e5
8f0803f
d0e8158
4313cc2
5701c04
ea46162
b34ff8a
f7ca693
1949057
84b39b6
590b920
6d566b3
9579fdb
4051089
6b46336
08743eb
af278ce
d5bdb76
609238a
9ae24cf
5038c9c
0551db7
6212c7e
58f0246
3aa6320
752638d
0b1bbdf
df422aa
9373f97
36eaba2
c419b85
d74a392
4de2e1b
d34c5ba
ad70304
9e2f5a0
295694c
2e4dbd1
515c4c2
7fda295
20f6059
dce233a
865cdb8
6dca25c
995e1d0
c0101ba
d5fb190
9101a16
10ca62f
23e824f
c511876
843896b
8dd0749
6009b4c
436b189
698665a
302fc98
79d5cd9
9d11fdc
60de1c0
cc985a3
82e5dab
19e8c8f
cbc524d
f2cdd8e
15b7dae
1618429
8eda11f
c6c5407
f3d97f6
f39a597
2eb1142
a118de1
7fa0a16
0e7aa50
58d2d23
8373c96
226343a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,7 @@ python/doc/ | |
# Egg metadata | ||
*.egg-info | ||
|
||
.vs/ | ||
.vscode | ||
.idea/ | ||
.pytest_cache/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<TargetFrameworks>net472;net6.0</TargetFrameworks> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="ApacheThrift" Version="0.19.0" /> | ||
</ItemGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\Apache.Arrow.Adbc\Apache.Arrow.Adbc.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Apache.Arrow.Ipc; | ||
using Apache.Hive.Service.Rpc.Thrift; | ||
using Thrift.Protocol; | ||
using Thrift.Transport; | ||
|
||
namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2 | ||
{ | ||
public abstract class HiveServer2Connection : AdbcConnection | ||
{ | ||
const string userAgent = "AdbcExperimental/0.0"; | ||
|
||
protected TOperationHandle operationHandle; | ||
protected IReadOnlyDictionary<string, string> properties; | ||
internal TTransport transport; | ||
internal TCLIService.Client client; | ||
internal TSessionHandle sessionHandle; | ||
|
||
internal HiveServer2Connection() : this(null) | ||
{ | ||
|
||
} | ||
|
||
internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties) | ||
{ | ||
this.properties = properties; | ||
} | ||
|
||
public void Open() | ||
{ | ||
TProtocol protocol = CreateProtocol(); | ||
this.transport = protocol.Transport; | ||
this.client = new TCLIService.Client(protocol); | ||
|
||
var s0 = this.client.OpenSession(CreateSessionRequest()).Result; | ||
this.sessionHandle = s0.SessionHandle; | ||
} | ||
|
||
protected abstract TProtocol CreateProtocol(); | ||
protected abstract TOpenSessionReq CreateSessionRequest(); | ||
|
||
public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern) | ||
{ | ||
Dictionary<string, Dictionary<string, Dictionary<string, List<string>>>> catalogMap = new Dictionary<string, Dictionary<string, Dictionary<string, List<string>>>>(); | ||
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs) | ||
{ | ||
TGetCatalogsReq getCatalogsReq = new TGetCatalogsReq(this.sessionHandle); | ||
} | ||
|
||
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.DbSchemas) | ||
{ | ||
TGetSchemasReq getSchemasReq = new TGetSchemasReq(this.sessionHandle); | ||
} | ||
|
||
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Tables) | ||
{ | ||
TGetTablesReq getTablesReq = new TGetTablesReq(this.sessionHandle); | ||
} | ||
|
||
if (depth == GetObjectsDepth.All) | ||
{ | ||
TGetColumnsReq columnsReq = new TGetColumnsReq(this.sessionHandle); | ||
columnsReq.CatalogName = catalogPattern; | ||
columnsReq.SchemaName = dbSchemaPattern; | ||
columnsReq.TableName = tableNamePattern; | ||
|
||
if (!string.IsNullOrEmpty(columnNamePattern)) | ||
columnsReq.ColumnName = columnNamePattern; | ||
|
||
var columnsResponse = this.client.GetColumns(columnsReq).Result; | ||
if (columnsResponse.Status.StatusCode == TStatusCode.ERROR_STATUS) | ||
{ | ||
throw new Exception(columnsResponse.Status.ErrorMessage); | ||
} | ||
|
||
this.operationHandle = columnsResponse.OperationHandle; | ||
} | ||
|
||
PollForResponse(); | ||
|
||
Schema schema = GetSchema(); | ||
|
||
return new GetObjectsReader(this,schema); | ||
} | ||
|
||
public override IArrowArrayStream GetInfo(List<int> codes) | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
|
||
public override IArrowArrayStream GetTableTypes() | ||
{ | ||
throw new NotImplementedException(); | ||
} | ||
|
||
protected void PollForResponse() | ||
{ | ||
TGetOperationStatusResp statusResponse = null; | ||
do | ||
{ | ||
if (statusResponse != null) { Thread.Sleep(500); } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good reminder for me that we really need a more async-friendly API, ideally a cross-process one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean at the ADBC API definition level? CC @zeroshade who has been thinking about this. It's something I would like to tackle, but there are questions about compatibility and what happens to the sync API afterwards and if we also want to try to 'fix' other things at the same time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I put some thoughts into #811 which is the only existing issue we have that sort of tracks async. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @CurtHagenlocher - I have some private WIP to use |
||
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle); | ||
statusResponse = this.client.GetOperationStatus(request).Result; | ||
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE); | ||
} | ||
|
||
|
||
public override void Dispose() | ||
{ | ||
if (this.client != null) | ||
{ | ||
TCloseSessionReq r6 = new TCloseSessionReq(this.sessionHandle); | ||
this.client.CloseSession(r6).Wait(); | ||
|
||
this.transport.Close(); | ||
this.client.Dispose(); | ||
this.transport = null; | ||
this.client = null; | ||
} | ||
} | ||
|
||
protected Schema GetSchema() | ||
{ | ||
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle); | ||
TGetResultSetMetadataResp response = this.client.GetResultSetMetadata(request).Result; | ||
return SchemaParser.GetArrowSchema(response.Schema); | ||
} | ||
|
||
sealed class GetObjectsReader : IArrowArrayStream | ||
{ | ||
HiveServer2Connection connection; | ||
Schema schema; | ||
List<TSparkArrowBatch> batches; | ||
int index; | ||
IArrowReader reader; | ||
|
||
public GetObjectsReader(HiveServer2Connection connection, Schema schema) | ||
{ | ||
this.connection = connection; | ||
this.schema = schema; | ||
} | ||
|
||
public Schema Schema { get { return schema; } } | ||
|
||
public async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) | ||
{ | ||
while (true) | ||
{ | ||
if (this.reader != null) | ||
{ | ||
RecordBatch next = await this.reader.ReadNextRecordBatchAsync(cancellationToken); | ||
if (next != null) | ||
{ | ||
return next; | ||
} | ||
this.reader = null; | ||
} | ||
|
||
if (this.batches != null && this.index < this.batches.Count) | ||
{ | ||
this.reader = new ArrowStreamReader(new ChunkStream(this.schema, this.batches[this.index++].Batch)); | ||
continue; | ||
} | ||
|
||
this.batches = null; | ||
this.index = 0; | ||
|
||
if (this.connection == null) | ||
{ | ||
return null; | ||
} | ||
|
||
TFetchResultsReq request = new TFetchResultsReq(this.connection.operationHandle, TFetchOrientation.FETCH_NEXT, 50000); | ||
TFetchResultsResp response = await this.connection.client.FetchResults(request, cancellationToken); | ||
this.batches = response.Results.ArrowBatches; | ||
|
||
if (!response.HasMoreRows) | ||
{ | ||
this.connection = null; | ||
} | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
using System; | ||
|
||
namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2 | ||
{ | ||
public class HiveServer2Exception : AdbcException | ||
{ | ||
private string _sqlState; | ||
private int _nativeError; | ||
|
||
public HiveServer2Exception() | ||
{ | ||
} | ||
|
||
public HiveServer2Exception(string message) : base(message) | ||
{ | ||
} | ||
|
||
public HiveServer2Exception(string message, AdbcStatusCode statusCode) : base(message, statusCode) | ||
{ | ||
} | ||
|
||
public HiveServer2Exception(string message, Exception innerException) : base(message, innerException) | ||
{ | ||
} | ||
|
||
public HiveServer2Exception(string message, AdbcStatusCode statusCode, Exception innerException) : base(message, statusCode, innerException) | ||
{ | ||
} | ||
|
||
public override string SqlState | ||
{ | ||
get { return _sqlState; } | ||
} | ||
|
||
public override int NativeError | ||
{ | ||
get { return _nativeError; } | ||
} | ||
|
||
internal HiveServer2Exception SetSqlState(string sqlState) | ||
{ | ||
_sqlState = sqlState; | ||
return this; | ||
} | ||
|
||
internal HiveServer2Exception SetNativeError(int nativeError) | ||
{ | ||
_nativeError = nativeError; | ||
return this; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be quite incomplete. Consider removing it from this PR and submitting separately once it's complete and tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't remove it. HiveServer2 is the base class that Spark and Impala build on, but I will add details to the readme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the body, or at least the parts of the body that aren't implemented. It could always throw a NotImplementedException for when e.g. depth != GetObjectsDepth.All.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, removed the implementation here. It was not working and not useful as base functionality.