Skip to content

Commit

Permalink
[feat][dingo-executor] Table dingo_trx supports showing transaction i…
Browse files Browse the repository at this point in the history
…nfos at remote nodes.
  • Loading branch information
nokiaMS authored and githubgxll committed Oct 17, 2024
1 parent 6a10fd9 commit 6610c26
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Iterator<Object[]> getTxnInfo() {
long jobId = dc.getMdlLockJobMap().keySet().stream().findFirst().orElse(0L);
ITransaction transaction = dc.getTransaction();
List<String> sqlList = dc.getTransaction().getSqlList();
Object[] res = new Object[14];
Object[] res = new Object[17];

//Get transaction id as string.
res[0] = transaction.getTxnId().toString();
Expand Down Expand Up @@ -170,7 +170,7 @@ public Iterator<Object[]> getTxnInfo() {
if (transaction.getPrimaryKeyLock() == null) {
res[12] = "";
} else {
StringBuilder hexString = new StringBuilder();
StringBuffer hexString = new StringBuffer();
hexString.append("0X");
for (byte b : transaction.getPrimaryKeyLock()) {
hexString.append(String.format("%02X", b));
Expand All @@ -187,6 +187,26 @@ public Iterator<Object[]> getTxnInfo() {
} catch (UnsupportedOperationException e) {
res[13] = "";
}

//Get txn cancel status.
res[14] = String.valueOf(transaction.getCancelStatus());

//Get cross channel status.
res[15] = String.valueOf(transaction.getIsCrossNode());

//Get cross channel infos.
if(transaction.getIsCrossNode()) {
StringBuffer stringBuf = new StringBuffer();
transaction.getChannelMap().forEach(
(k,v ) -> {
stringBuf.append(k.toString()).append('-').append(v.channelId()).append(';');
}
);
res[16] = stringBuf.toString();
} else {
res[16] = "";
}

return res;
})
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.dingodb.common.profile.StmtSummaryMap;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.Location;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.operator.params.InfoSchemaScanParam;
import io.dingodb.meta.DdlService;
Expand All @@ -30,6 +31,10 @@
import io.dingodb.transaction.api.TransactionService;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import io.dingodb.net.api.ApiRegistry;
import io.dingodb.cluster.ClusterService;
import io.dingodb.common.annotation.ApiDeclaration;
import io.dingodb.common.config.DingoConfiguration;

import java.sql.Timestamp;
import java.util.ArrayList;
Expand Down Expand Up @@ -377,8 +382,46 @@ private static Iterator<Object[]> getMdlView() {
return TransactionService.getDefault().getMdlInfo();
}

private static Iterator<Object[]> getTxnInfo() {
return TransactionService.getDefault().getTxnInfo();
/**
* Api to get remote txn informations.
*/
public interface Api {
@ApiDeclaration
default List<Object[]> txnInfos() {
return new ArrayList<>();
}

@ApiDeclaration
default List<Object[]> getTxnInfos() {
List<Object[]> results = new ArrayList<>();
Iterator<Object[]> iterator = TransactionService.getDefault().getTxnInfo();
while(iterator.hasNext()) {
results.add(iterator.next());
}
return results;
}
}

/**
* The function is triggered by selecting dingo_trx table to fetch cluster transaction infos.
* @return The transaction informations in cluster.
*/
private static Iterator<Object[]> getTxnInfo() {
List<Object[]> result = new ArrayList<>();

//get remote txn infos.
ClusterService.getDefault().getComputingLocations().stream()
.filter($ -> !$.equals(DingoConfiguration.location()))
.map($ -> ApiRegistry.getDefault().proxy(InfoSchemaScanOperator.Api.class, $))
.map(InfoSchemaScanOperator.Api::getTxnInfos)
.forEach(result::addAll);

//get local txn infos.
Iterator<Object[]> iterator = TransactionService.getDefault().getTxnInfo();
while (iterator.hasNext()) {
result.add(iterator.next());
}

return result.stream().iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ public void cancel() {
LogUtils.debug(log, "{} The current {} cancel is set to true", txnId, transactionOf());
}

@Override
public boolean getCancelStatus() {
return cancel.get();
}

@Override
public boolean getIsCrossNode() {
return isCrossNode;
}

@Override
public synchronized void close(JobManager jobManager) {
MdcUtils.setTxnId(txnId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public interface ITransaction {

void cancel();

boolean getCancelStatus();

boolean getIsCrossNode();

void addSql(String sql);

List<String> getSqlList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public long getJobSeqId() {
throw new UnsupportedOperationException();
}

@Override
public boolean getCancelStatus() { throw new UnsupportedOperationException(); }
@Override
public void setIs(InfoSchema is) {

Expand All @@ -122,5 +124,5 @@ public synchronized void rollback(JobManager jobManager) {
}

@Override
public boolean onePcStage() {throw new UnsupportedOperationException();}
public boolean onePcStage() { throw new UnsupportedOperationException(); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.dingodb.common.util.Utils;
import io.dingodb.driver.mysql.SessionVariableChangeWatcher;
import io.dingodb.exec.Services;
import io.dingodb.exec.operator.InfoSchemaScanOperator;
import io.dingodb.meta.InfoSchemaService;
import io.dingodb.net.MysqlNetService;
import io.dingodb.net.MysqlNetServiceProvider;
Expand Down Expand Up @@ -122,6 +123,7 @@ public void exec(JCommander commander) throws Exception {
AutoIncrementService.INSTANCE.resetAutoIncrement();

ApiRegistry.getDefault().register(ShowLocksExecutor.Api.class, new ShowLocksExecutor.Api() { });
ApiRegistry.getDefault().register(InfoSchemaScanOperator.Api.class, new InfoSchemaScanOperator.Api() { });

SafePointUpdateTask.run();

Expand Down
24 changes: 24 additions & 0 deletions dingo-executor/src/main/resources/information-dingoTrx.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,29 @@
"scale": -2147483648,
"primary": -1,
"nullable": true
},
{
"name": "is_canceled",
"type": "varchar",
"precision": 32,
"scale": -2147483648,
"primary": -1,
"nullable": true
},
{
"name": "is_cross_channel",
"type": "varchar",
"precision": 32,
"scale": -2147483648,
"primary": -1,
"nullable": true
},
{
"name": "channel_infos",
"type": "varchar",
"precision": 256,
"scale": -2147483648,
"primary": -1,
"nullable": true
}
]

0 comments on commit 6610c26

Please sign in to comment.