From 6610c26ce2013e959ab7f6019da4518b545a148a Mon Sep 17 00:00:00 2001 From: guoxu <13910754971@163.com> Date: Thu, 17 Oct 2024 14:21:50 +0800 Subject: [PATCH] [feat][dingo-executor] Table dingo_trx supports showing transaction infos at remote nodes. --- .../io/dingodb/driver/TransactionService.java | 24 +++++++++- .../exec/operator/InfoSchemaScanOperator.java | 47 ++++++++++++++++++- .../transaction/base/BaseTransaction.java | 10 ++++ .../exec/transaction/base/ITransaction.java | 4 ++ .../transaction/impl/NoneTransaction.java | 4 +- .../io/dingodb/server/executor/Starter.java | 2 + .../main/resources/information-dingoTrx.json | 24 ++++++++++ 7 files changed, 110 insertions(+), 5 deletions(-) diff --git a/dingo-driver/host/src/main/java/io/dingodb/driver/TransactionService.java b/dingo-driver/host/src/main/java/io/dingodb/driver/TransactionService.java index 56b3ca0e43..f9c7c68dcd 100644 --- a/dingo-driver/host/src/main/java/io/dingodb/driver/TransactionService.java +++ b/dingo-driver/host/src/main/java/io/dingodb/driver/TransactionService.java @@ -115,7 +115,7 @@ public Iterator getTxnInfo() { long jobId = dc.getMdlLockJobMap().keySet().stream().findFirst().orElse(0L); ITransaction transaction = dc.getTransaction(); List sqlList = dc.getTransaction().getSqlList(); - Object[] res = new Object[14]; + Object[] res = new Object[17]; //Get transaction id as string. res[0] = transaction.getTxnId().toString(); @@ -170,7 +170,7 @@ public Iterator getTxnInfo() { if (transaction.getPrimaryKeyLock() == null) { res[12] = ""; } else { - StringBuilder hexString = new StringBuilder(); + StringBuffer hexString = new StringBuffer(); hexString.append("0X"); for (byte b : transaction.getPrimaryKeyLock()) { hexString.append(String.format("%02X", b)); @@ -187,6 +187,26 @@ public Iterator getTxnInfo() { } catch (UnsupportedOperationException e) { res[13] = ""; } + + //Get txn cancel status. + res[14] = String.valueOf(transaction.getCancelStatus()); + + //Get cross channel status. + res[15] = String.valueOf(transaction.getIsCrossNode()); + + //Get cross channel infos. + if(transaction.getIsCrossNode()) { + StringBuffer stringBuf = new StringBuffer(); + transaction.getChannelMap().forEach( + (k,v ) -> { + stringBuf.append(k.toString()).append('-').append(v.channelId()).append(';'); + } + ); + res[16] = stringBuf.toString(); + } else { + res[16] = ""; + } + return res; }) .iterator(); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/InfoSchemaScanOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/InfoSchemaScanOperator.java index 5cd531eb8c..7c80c74371 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/InfoSchemaScanOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/InfoSchemaScanOperator.java @@ -18,6 +18,7 @@ import io.dingodb.common.profile.StmtSummaryMap; import io.dingodb.common.log.LogUtils; +import io.dingodb.common.Location; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.operator.params.InfoSchemaScanParam; import io.dingodb.meta.DdlService; @@ -30,6 +31,10 @@ import io.dingodb.transaction.api.TransactionService; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; +import io.dingodb.net.api.ApiRegistry; +import io.dingodb.cluster.ClusterService; +import io.dingodb.common.annotation.ApiDeclaration; +import io.dingodb.common.config.DingoConfiguration; import java.sql.Timestamp; import java.util.ArrayList; @@ -377,8 +382,46 @@ private static Iterator getMdlView() { return TransactionService.getDefault().getMdlInfo(); } - private static Iterator getTxnInfo() { - return TransactionService.getDefault().getTxnInfo(); + /** + * Api to get remote txn informations. + */ + public interface Api { + @ApiDeclaration + default List txnInfos() { + return new ArrayList<>(); + } + + @ApiDeclaration + default List getTxnInfos() { + List results = new ArrayList<>(); + Iterator iterator = TransactionService.getDefault().getTxnInfo(); + while(iterator.hasNext()) { + results.add(iterator.next()); + } + return results; + } } + /** + * The function is triggered by selecting dingo_trx table to fetch cluster transaction infos. + * @return The transaction informations in cluster. + */ + private static Iterator getTxnInfo() { + List result = new ArrayList<>(); + + //get remote txn infos. + ClusterService.getDefault().getComputingLocations().stream() + .filter($ -> !$.equals(DingoConfiguration.location())) + .map($ -> ApiRegistry.getDefault().proxy(InfoSchemaScanOperator.Api.class, $)) + .map(InfoSchemaScanOperator.Api::getTxnInfos) + .forEach(result::addAll); + + //get local txn infos. + Iterator iterator = TransactionService.getDefault().getTxnInfo(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + + return result.stream().iterator(); + } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/BaseTransaction.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/BaseTransaction.java index ded0aeaac3..8b54602b56 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/BaseTransaction.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/BaseTransaction.java @@ -229,6 +229,16 @@ public void cancel() { LogUtils.debug(log, "{} The current {} cancel is set to true", txnId, transactionOf()); } + @Override + public boolean getCancelStatus() { + return cancel.get(); + } + + @Override + public boolean getIsCrossNode() { + return isCrossNode; + } + @Override public synchronized void close(JobManager jobManager) { MdcUtils.setTxnId(txnId.toString()); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/ITransaction.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/ITransaction.java index 7d416c755b..c1fd96094e 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/ITransaction.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/base/ITransaction.java @@ -72,6 +72,10 @@ public interface ITransaction { void cancel(); + boolean getCancelStatus(); + + boolean getIsCrossNode(); + void addSql(String sql); List getSqlList(); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/transaction/impl/NoneTransaction.java b/dingo-exec/src/main/java/io/dingodb/exec/transaction/impl/NoneTransaction.java index 76ee987d6c..f0dfcdbb20 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/transaction/impl/NoneTransaction.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/transaction/impl/NoneTransaction.java @@ -101,6 +101,8 @@ public long getJobSeqId() { throw new UnsupportedOperationException(); } + @Override + public boolean getCancelStatus() { throw new UnsupportedOperationException(); } @Override public void setIs(InfoSchema is) { @@ -122,5 +124,5 @@ public synchronized void rollback(JobManager jobManager) { } @Override - public boolean onePcStage() {throw new UnsupportedOperationException();} + public boolean onePcStage() { throw new UnsupportedOperationException(); } } diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/Starter.java b/dingo-executor/src/main/java/io/dingodb/server/executor/Starter.java index f92371e312..68c35cc138 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/Starter.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/Starter.java @@ -30,6 +30,7 @@ import io.dingodb.common.util.Utils; import io.dingodb.driver.mysql.SessionVariableChangeWatcher; import io.dingodb.exec.Services; +import io.dingodb.exec.operator.InfoSchemaScanOperator; import io.dingodb.meta.InfoSchemaService; import io.dingodb.net.MysqlNetService; import io.dingodb.net.MysqlNetServiceProvider; @@ -122,6 +123,7 @@ public void exec(JCommander commander) throws Exception { AutoIncrementService.INSTANCE.resetAutoIncrement(); ApiRegistry.getDefault().register(ShowLocksExecutor.Api.class, new ShowLocksExecutor.Api() { }); + ApiRegistry.getDefault().register(InfoSchemaScanOperator.Api.class, new InfoSchemaScanOperator.Api() { }); SafePointUpdateTask.run(); diff --git a/dingo-executor/src/main/resources/information-dingoTrx.json b/dingo-executor/src/main/resources/information-dingoTrx.json index 792ae4f096..4e0ee8ecc8 100644 --- a/dingo-executor/src/main/resources/information-dingoTrx.json +++ b/dingo-executor/src/main/resources/information-dingoTrx.json @@ -109,5 +109,29 @@ "scale": -2147483648, "primary": -1, "nullable": true + }, + { + "name": "is_canceled", + "type": "varchar", + "precision": 32, + "scale": -2147483648, + "primary": -1, + "nullable": true + }, + { + "name": "is_cross_channel", + "type": "varchar", + "precision": 32, + "scale": -2147483648, + "primary": -1, + "nullable": true + }, + { + "name": "channel_infos", + "type": "varchar", + "precision": 256, + "scale": -2147483648, + "primary": -1, + "nullable": true } ]