Skip to content

Commit

Permalink
[fix][dingo-executor] Fix the problem of merging Local data and KV da…
Browse files Browse the repository at this point in the history
…ta in large transactions
  • Loading branch information
githubgxll authored and ketor committed Oct 21, 2024
1 parent cc35b88 commit 2ea60ce
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
import io.dingodb.exec.operator.params.TxnGetByIndexParam;
import io.dingodb.exec.transaction.base.TransactionType;
import io.dingodb.exec.utils.ByteUtils;
import io.dingodb.exec.utils.TxnMergedIterator;
import io.dingodb.meta.MetaService;
import io.dingodb.meta.entity.Table;
import io.dingodb.partition.DingoPartitionServiceProvider;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.StoreInstance;
import io.dingodb.store.api.transaction.DingoTransformedIterator;
import io.dingodb.store.api.transaction.data.Op;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -226,99 +225,7 @@ protected Iterator<KeyValue> createScanLocalIterator(
Iterator<KeyValue> kvKVIterator,
KeyValueCodec decoder
) {
KeyValue kv1 = getNextValue(localKVIterator);
if (kv1 == null) {
return DingoTransformedIterator.transform(kvKVIterator, wrap(decoder::decode)::apply);
}
KeyValue kv2 = getNextValue(kvKVIterator);

final int pos = 9;
List<KeyValue> mergedList = new ArrayList<>();
List<KeyBytes> deletedList = new ArrayList<>();

while (kv1 != null && kv2 != null) {
byte[] key1 = kv1.getKey();
byte[] key2 = kv2.getKey();
int code = key1[key1.length - 2];
if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) {
if (
(code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())
&& !deletedList.contains(new KeyBytes(key2))
) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
continue;
} else if (code == Op.DELETE.getCode()) {
deletedList.add(new KeyBytes(key1));
kv1 = getNextValue(localKVIterator);
continue;
}
}
if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) {
if (
(code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())
&& !deletedList.contains(new KeyBytes(key2))
) {
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
if (code == Op.DELETE.getCode()) {
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
}
if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) {
if (code == Op.DELETE.getCode()) {
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
continue;
}
if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
}
}
}
while (kv1 != null) {
if (!mergedList.contains(kv1) && (kv1.getKey()[kv1.getKey().length - 2] != Op.DELETE.getCode())) {
mergedList.add(kv1);
}
kv1 = getNextValue(localKVIterator);
}
while (kv2 != null) {
byte[] key = kv2.getKey();
if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) {
mergedList.add(kv2);
}
kv2 = getNextValue(kvKVIterator);
}

return Iterators.transform(mergedList.iterator(), wrap(decoder::decode)::apply);
}

@AllArgsConstructor
private static class KeyBytes {
private final byte[] key;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KeyBytes keyBytes = (KeyBytes) o;
return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0;
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
return new TxnMergedIterator(localKVIterator, kvKVIterator, decoder);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,26 @@

package io.dingodb.exec.operator;

import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import io.dingodb.codec.CodecService;
import io.dingodb.common.CommonId;
import io.dingodb.common.Coprocessor;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.profile.OperatorProfile;
import io.dingodb.common.profile.SourceProfile;
import io.dingodb.common.store.KeyValue;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.exec.Services;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.operator.data.Context;
import io.dingodb.exec.operator.params.TxnPartRangeScanParam;
import io.dingodb.exec.utils.ByteUtils;
import io.dingodb.exec.utils.TxnMergedIterator;
import io.dingodb.store.api.StoreInstance;
import io.dingodb.store.api.transaction.ProfileScanIterator;
import io.dingodb.store.api.transaction.data.Op;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static io.dingodb.common.util.NoBreakFunctions.wrap;

Expand Down Expand Up @@ -97,99 +91,9 @@ private TxnPartRangeScanOperator() {
profile.getChildren().add(profileScanIterator.getInitRpcProfile());
}
profile.setRegionId(partId.seq);

KeyValue kv1 = getNextValue(localKVIterator);
KeyValue kv2 = getNextValue(kvKVIterator);

final int pos = 9;
List<KeyValue> mergedList = new ArrayList<>();
List<KeyBytes> deletedList = new ArrayList<>();

while (kv1 != null && kv2 != null) {
byte[] key1 = kv1.getKey();
byte[] key2 = kv2.getKey();
int code = key1[key1.length - 2];
if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) {
if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) && !deletedList.contains(new KeyBytes(key2))) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
continue;
} else if (code == Op.DELETE.getCode()) {
deletedList.add(new KeyBytes(key1));
kv1 = getNextValue(localKVIterator);
continue;
}
}
if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) {
if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) && !deletedList.contains(new KeyBytes(key2))) {
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
if (code == Op.DELETE.getCode()) {
// deletedList.add(new KeyBytes(key1));
// kv1 = getNextValue(localKVIterator);
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
}
if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) {
if (code == Op.DELETE.getCode()) {
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
continue;
}
if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
}
}
}
while (kv1 != null) {
if (!mergedList.contains(kv1) && (kv1.getKey()[kv1.getKey().length - 2] != Op.DELETE.getCode())) {
mergedList.add(kv1);
}
kv1 = getNextValue(localKVIterator);
}
while (kv2 != null) {
byte[] key = kv2.getKey();
if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) {
mergedList.add(kv2);
}
kv2 = getNextValue(kvKVIterator);
}
TxnMergedIterator txnMergedIterator = new TxnMergedIterator(localKVIterator, kvKVIterator, param.getCodec());
profile.end();
return Iterators.transform(mergedList.iterator(), wrap(param.getCodec()::decode)::apply);
}

private KeyValue getNextValue(Iterator<KeyValue> iterator) {
if (iterator.hasNext()) {
return iterator.next();
}
return null;
return txnMergedIterator;
}

@AllArgsConstructor
public static class KeyBytes {
private final byte[] key;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KeyBytes keyBytes = (KeyBytes) o;
return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0;
}

@Override
public int hashCode() {
return Objects.hashCode(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,16 @@
import io.dingodb.common.CommonId;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.store.KeyValue;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.exec.Services;
import io.dingodb.exec.utils.ByteUtils;
import io.dingodb.exec.utils.TxnMergedIterator;
import io.dingodb.store.api.StoreInstance;
import io.dingodb.store.api.transaction.DingoTransformedIterator;
import io.dingodb.store.api.transaction.data.Op;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static io.dingodb.common.util.NoBreakFunctions.wrap;

Expand Down Expand Up @@ -100,103 +95,7 @@ public abstract class TxnScanOperatorBase extends ScanOperatorBase {
Iterator<KeyValue> kvKVIterator,
KeyValueCodec decoder
) {
KeyValue kv1 = getNextValue(localKVIterator);
if (kv1 == null) {
return DingoTransformedIterator.transform(kvKVIterator, wrap(decoder::decode)::apply);
}
KeyValue kv2 = getNextValue(kvKVIterator);

final int pos = 9;
List<KeyValue> mergedList = new ArrayList<>();
List<KeyBytes> deletedList = new ArrayList<>();

while (kv1 != null && kv2 != null) {
byte[] key1 = kv1.getKey();
byte[] key2 = kv2.getKey();
int code = key1[key1.length - 2];
if (code == Op.NONE.getCode()) {
kv1 = getNextValue(localKVIterator);
continue;
}
if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) {
if (
(code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())
&& !deletedList.contains(new KeyBytes(key2))
) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
continue;
} else if (code == Op.DELETE.getCode()) {
deletedList.add(new KeyBytes(key1));
kv1 = getNextValue(localKVIterator);
continue;
}
}
if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) {
if (
(code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())
&& !deletedList.contains(new KeyBytes(key2))
) {
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
if (code == Op.DELETE.getCode()) {
mergedList.add(kv2);
kv2 = getNextValue(kvKVIterator);
continue;
}
}
if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) {
if (code == Op.DELETE.getCode()) {
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
continue;
}
if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) {
mergedList.add(kv1);
kv1 = getNextValue(localKVIterator);
kv2 = getNextValue(kvKVIterator);
}
}
}
while (kv1 != null) {
if (!mergedList.contains(kv1) && (!Op.isDelete(kv1.getKey()[kv1.getKey().length - 2]))
&& (!Op.isNone(kv1.getKey()[kv1.getKey().length - 2]))) {
mergedList.add(kv1);
}
kv1 = getNextValue(localKVIterator);
}
while (kv2 != null) {
byte[] key = kv2.getKey();
if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) {
mergedList.add(kv2);
}
kv2 = getNextValue(kvKVIterator);
}

return DingoTransformedIterator.transform(mergedList.iterator(), wrap(decoder::decode)::apply);
return new TxnMergedIterator(localKVIterator, kvKVIterator, decoder);
}

@AllArgsConstructor
private static class KeyBytes {
private final byte[] key;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KeyBytes keyBytes = (KeyBytes) o;
return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0;
}

@Override
public int hashCode() {
return Arrays.hashCode(key);
}
}
}
Loading

0 comments on commit 2ea60ce

Please sign in to comment.