From 141bddc9bfe9f9ac1389c78256973b18b82881e6 Mon Sep 17 00:00:00 2001 From: dmitrybugakov Date: Sat, 23 Sep 2023 16:08:09 +0200 Subject: [PATCH] [FEATURE]: Add Support ProgressListener in ClickHouseStatement --- .../jdbc/statement/ClickHouseStatement.java | 20 ++++++++--- .../housepower/protocol/ProgressResponse.java | 15 +++++++- .../listener/DefaultProgressListener.java | 36 +++++++++++++++++++ .../protocol/listener/ProgressListener.java | 21 +++++++++++ .../stream/ClickHouseQueryResult.java | 17 +++++++-- 5 files changed, 102 insertions(+), 7 deletions(-) create mode 100644 clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/DefaultProgressListener.java create mode 100644 clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/ProgressListener.java diff --git a/clickhouse-native-jdbc/src/main/java/com/github/housepower/jdbc/statement/ClickHouseStatement.java b/clickhouse-native-jdbc/src/main/java/com/github/housepower/jdbc/statement/ClickHouseStatement.java index a47bef70..2be870d4 100644 --- a/clickhouse-native-jdbc/src/main/java/com/github/housepower/jdbc/statement/ClickHouseStatement.java +++ b/clickhouse-native-jdbc/src/main/java/com/github/housepower/jdbc/statement/ClickHouseStatement.java @@ -14,18 +14,20 @@ package com.github.housepower.jdbc.statement; -import com.github.housepower.jdbc.ClickHouseConnection; -import com.github.housepower.jdbc.ClickHouseResultSet; import com.github.housepower.client.NativeContext; import com.github.housepower.data.Block; +import com.github.housepower.jdbc.ClickHouseConnection; +import com.github.housepower.jdbc.ClickHouseResultSet; +import com.github.housepower.jdbc.wrapper.SQLStatement; import com.github.housepower.log.Logger; import com.github.housepower.log.LoggerFactory; import com.github.housepower.misc.ExceptionUtil; import com.github.housepower.misc.Validate; -import com.github.housepower.stream.QueryResult; +import com.github.housepower.protocol.listener.ProgressListener; import com.github.housepower.settings.ClickHouseConfig; import com.github.housepower.settings.SettingKey; -import com.github.housepower.jdbc.wrapper.SQLStatement; +import com.github.housepower.stream.ClickHouseQueryResult; +import com.github.housepower.stream.QueryResult; import com.github.housepower.stream.ValuesNativeInputFormat; import java.sql.Connection; @@ -48,6 +50,7 @@ public class ClickHouseStatement implements SQLStatement { protected Block block; protected final ClickHouseConnection connection; protected final NativeContext nativeContext; + private ProgressListener progressListener; private ClickHouseConfig cfg; private long maxRows; @@ -91,6 +94,11 @@ public int executeUpdate(String query) throws SQLException { } updateCount = -1; QueryResult result = connection.sendQueryRequest(query, cfg); + + if (result instanceof ClickHouseQueryResult) { + ((ClickHouseQueryResult) result).setProgressListener(this.progressListener); + } + lastResultSet = new ClickHouseResultSet(this, cfg, db, table, result.header(), result.data()); return 0; }); @@ -162,6 +170,10 @@ public void setQueryTimeout(int seconds) { this.cfg = cfg.withQueryTimeout(Duration.ofSeconds(seconds)); } + public void setProgressListener(ProgressListener listener) { + this.progressListener = listener; + } + @Override public void setFetchDirection(int direction) throws SQLException { } diff --git a/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/ProgressResponse.java b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/ProgressResponse.java index 5288a9b6..631fafd7 100644 --- a/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/ProgressResponse.java +++ b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/ProgressResponse.java @@ -21,7 +21,11 @@ public class ProgressResponse implements Response { public static ProgressResponse readFrom(BinaryDeserializer deserializer) throws IOException { - return new ProgressResponse(deserializer.readVarInt(), deserializer.readVarInt(), deserializer.readVarInt()); + return new ProgressResponse( + deserializer.readVarInt(), + deserializer.readVarInt(), + deserializer.readVarInt() + ); } private final long newRows; @@ -50,4 +54,13 @@ public long newBytes() { public long newTotalRows() { return newTotalRows; } + + @Override + public String toString() { + return "ProgressResponse {" + + "newRows=" + newRows + + ", newBytes=" + newBytes + + ", newTotalRows=" + newTotalRows + + '}'; + } } diff --git a/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/DefaultProgressListener.java b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/DefaultProgressListener.java new file mode 100644 index 00000000..f51f412e --- /dev/null +++ b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/DefaultProgressListener.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.housepower.protocol.listener; + +import com.github.housepower.log.Logger; +import com.github.housepower.log.LoggerFactory; +import com.github.housepower.protocol.ProgressResponse; + +public class DefaultProgressListener implements ProgressListener { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultProgressListener.class); + + private DefaultProgressListener() { + } + + public static DefaultProgressListener create() { + return new DefaultProgressListener(); + } + + @Override + public void onProgress(ProgressResponse progressResponse) { + LOG.info("DefaultProgressListener: ".concat(progressResponse.toString())); + } +} diff --git a/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/ProgressListener.java b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/ProgressListener.java new file mode 100644 index 00000000..09823793 --- /dev/null +++ b/clickhouse-native-jdbc/src/main/java/com/github/housepower/protocol/listener/ProgressListener.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.housepower.protocol.listener; + +import com.github.housepower.protocol.ProgressResponse; + +public interface ProgressListener { + void onProgress(ProgressResponse progressResponse); +} diff --git a/clickhouse-native-jdbc/src/main/java/com/github/housepower/stream/ClickHouseQueryResult.java b/clickhouse-native-jdbc/src/main/java/com/github/housepower/stream/ClickHouseQueryResult.java index 44b96c65..82f8f4c3 100644 --- a/clickhouse-native-jdbc/src/main/java/com/github/housepower/stream/ClickHouseQueryResult.java +++ b/clickhouse-native-jdbc/src/main/java/com/github/housepower/stream/ClickHouseQueryResult.java @@ -21,15 +21,16 @@ import com.github.housepower.protocol.EOFStreamResponse; import com.github.housepower.protocol.ProgressResponse; import com.github.housepower.protocol.Response; +import com.github.housepower.protocol.listener.ProgressListener; import java.sql.SQLException; -public class ClickHouseQueryResult implements QueryResult { +public class ClickHouseQueryResult implements QueryResult { private final CheckedSupplier responseSupplier; + private ProgressListener progressListener; private Block header; private boolean atEnd; - // Progress // Totals // Extremes // ProfileInfo @@ -39,6 +40,15 @@ public ClickHouseQueryResult(CheckedSupplier responseSup this.responseSupplier = responseSupplier; } + public ClickHouseQueryResult(CheckedSupplier responseSupplier, ProgressListener progressListener) { + this.progressListener = progressListener; + this.responseSupplier = responseSupplier; + } + + public void setProgressListener(ProgressListener progressListener) { + this.progressListener = progressListener; + } + @Override public Block header() throws SQLException { ensureHeaderConsumed(); @@ -97,6 +107,9 @@ private DataResponse consumeDataResponse() throws SQLException { } else if (response instanceof EOFStreamResponse || response == null) { atEnd = true; } else if (response instanceof ProgressResponse) { + if (progressListener != null) { + progressListener.onProgress((ProgressResponse) response); + } readRows += ((ProgressResponse) response).newRows(); readBytes += ((ProgressResponse) response).newBytes(); }