diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByIndexOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByIndexOperator.java index cdec5b8aa..c601dd384 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByIndexOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnGetByIndexOperator.java @@ -33,14 +33,13 @@ import io.dingodb.exec.operator.params.TxnGetByIndexParam; import io.dingodb.exec.transaction.base.TransactionType; import io.dingodb.exec.utils.ByteUtils; +import io.dingodb.exec.utils.TxnMergedIterator; import io.dingodb.meta.MetaService; import io.dingodb.meta.entity.Table; import io.dingodb.partition.DingoPartitionServiceProvider; import io.dingodb.partition.PartitionService; import io.dingodb.store.api.StoreInstance; -import io.dingodb.store.api.transaction.DingoTransformedIterator; import io.dingodb.store.api.transaction.data.Op; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -226,99 +225,7 @@ protected Iterator createScanLocalIterator( Iterator kvKVIterator, KeyValueCodec decoder ) { - KeyValue kv1 = getNextValue(localKVIterator); - if (kv1 == null) { - return DingoTransformedIterator.transform(kvKVIterator, wrap(decoder::decode)::apply); - } - KeyValue kv2 = getNextValue(kvKVIterator); - - final int pos = 9; - List mergedList = new ArrayList<>(); - List deletedList = new ArrayList<>(); - - while (kv1 != null && kv2 != null) { - byte[] key1 = kv1.getKey(); - byte[] key2 = kv2.getKey(); - int code = key1[key1.length - 2]; - if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) { - if ( - (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) - && !deletedList.contains(new KeyBytes(key2)) - ) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - continue; - } else if (code == Op.DELETE.getCode()) { - deletedList.add(new KeyBytes(key1)); - kv1 = getNextValue(localKVIterator); - continue; - } - } - if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) { - if ( - (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) - && !deletedList.contains(new KeyBytes(key2)) - ) { - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - if (code == Op.DELETE.getCode()) { - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - } - if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) { - if (code == Op.DELETE.getCode()) { - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - continue; - } - if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - } - } - } - while (kv1 != null) { - if (!mergedList.contains(kv1) && (kv1.getKey()[kv1.getKey().length - 2] != Op.DELETE.getCode())) { - mergedList.add(kv1); - } - kv1 = getNextValue(localKVIterator); - } - while (kv2 != null) { - byte[] key = kv2.getKey(); - if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) { - mergedList.add(kv2); - } - kv2 = getNextValue(kvKVIterator); - } - - return Iterators.transform(mergedList.iterator(), wrap(decoder::decode)::apply); - } - - @AllArgsConstructor - private static class KeyBytes { - private final byte[] key; - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KeyBytes keyBytes = (KeyBytes) o; - return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0; - } - - @Override - public int hashCode() { - return Arrays.hashCode(key); - } + return new TxnMergedIterator(localKVIterator, kvKVIterator, decoder); } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java index 69020ec31..64dcd1208 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnPartRangeScanOperator.java @@ -16,32 +16,26 @@ package io.dingodb.exec.operator; -import com.google.common.base.Objects; import com.google.common.collect.Iterators; import io.dingodb.codec.CodecService; import io.dingodb.common.CommonId; import io.dingodb.common.Coprocessor; import io.dingodb.common.partition.RangeDistribution; -import io.dingodb.common.profile.OperatorProfile; import io.dingodb.common.profile.SourceProfile; import io.dingodb.common.store.KeyValue; -import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.exec.Services; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.operator.data.Context; import io.dingodb.exec.operator.params.TxnPartRangeScanParam; import io.dingodb.exec.utils.ByteUtils; +import io.dingodb.exec.utils.TxnMergedIterator; import io.dingodb.store.api.StoreInstance; import io.dingodb.store.api.transaction.ProfileScanIterator; import io.dingodb.store.api.transaction.data.Op; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; -import java.util.List; import static io.dingodb.common.util.NoBreakFunctions.wrap; @@ -97,99 +91,9 @@ private TxnPartRangeScanOperator() { profile.getChildren().add(profileScanIterator.getInitRpcProfile()); } profile.setRegionId(partId.seq); - - KeyValue kv1 = getNextValue(localKVIterator); - KeyValue kv2 = getNextValue(kvKVIterator); - - final int pos = 9; - List mergedList = new ArrayList<>(); - List deletedList = new ArrayList<>(); - - while (kv1 != null && kv2 != null) { - byte[] key1 = kv1.getKey(); - byte[] key2 = kv2.getKey(); - int code = key1[key1.length - 2]; - if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) { - if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) && !deletedList.contains(new KeyBytes(key2))) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - continue; - } else if (code == Op.DELETE.getCode()) { - deletedList.add(new KeyBytes(key1)); - kv1 = getNextValue(localKVIterator); - continue; - } - } - if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) { - if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) && !deletedList.contains(new KeyBytes(key2))) { - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - if (code == Op.DELETE.getCode()) { -// deletedList.add(new KeyBytes(key1)); -// kv1 = getNextValue(localKVIterator); - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - } - if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) { - if (code == Op.DELETE.getCode()) { - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - continue; - } - if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - } - } - } - while (kv1 != null) { - if (!mergedList.contains(kv1) && (kv1.getKey()[kv1.getKey().length - 2] != Op.DELETE.getCode())) { - mergedList.add(kv1); - } - kv1 = getNextValue(localKVIterator); - } - while (kv2 != null) { - byte[] key = kv2.getKey(); - if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) { - mergedList.add(kv2); - } - kv2 = getNextValue(kvKVIterator); - } + TxnMergedIterator txnMergedIterator = new TxnMergedIterator(localKVIterator, kvKVIterator, param.getCodec()); profile.end(); - return Iterators.transform(mergedList.iterator(), wrap(param.getCodec()::decode)::apply); - } - - private KeyValue getNextValue(Iterator iterator) { - if (iterator.hasNext()) { - return iterator.next(); - } - return null; + return txnMergedIterator; } - @AllArgsConstructor - public static class KeyBytes { - private final byte[] key; - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KeyBytes keyBytes = (KeyBytes) o; - return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(key); - } - } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanOperatorBase.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanOperatorBase.java index 4c9366003..5a20be325 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanOperatorBase.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TxnScanOperatorBase.java @@ -22,21 +22,16 @@ import io.dingodb.common.CommonId; import io.dingodb.common.partition.RangeDistribution; import io.dingodb.common.store.KeyValue; -import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.exec.Services; import io.dingodb.exec.utils.ByteUtils; +import io.dingodb.exec.utils.TxnMergedIterator; import io.dingodb.store.api.StoreInstance; -import io.dingodb.store.api.transaction.DingoTransformedIterator; import io.dingodb.store.api.transaction.data.Op; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; -import java.util.List; import static io.dingodb.common.util.NoBreakFunctions.wrap; @@ -100,103 +95,7 @@ public abstract class TxnScanOperatorBase extends ScanOperatorBase { Iterator kvKVIterator, KeyValueCodec decoder ) { - KeyValue kv1 = getNextValue(localKVIterator); - if (kv1 == null) { - return DingoTransformedIterator.transform(kvKVIterator, wrap(decoder::decode)::apply); - } - KeyValue kv2 = getNextValue(kvKVIterator); - - final int pos = 9; - List mergedList = new ArrayList<>(); - List deletedList = new ArrayList<>(); - - while (kv1 != null && kv2 != null) { - byte[] key1 = kv1.getKey(); - byte[] key2 = kv2.getKey(); - int code = key1[key1.length - 2]; - if (code == Op.NONE.getCode()) { - kv1 = getNextValue(localKVIterator); - continue; - } - if (ByteArrayUtils.lessThan(key1, key2, pos, key1.length - 2)) { - if ( - (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) - && !deletedList.contains(new KeyBytes(key2)) - ) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - continue; - } else if (code == Op.DELETE.getCode()) { - deletedList.add(new KeyBytes(key1)); - kv1 = getNextValue(localKVIterator); - continue; - } - } - if (ByteArrayUtils.greatThan(key1, key2, pos, key1.length - 2)) { - if ( - (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) - && !deletedList.contains(new KeyBytes(key2)) - ) { - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - if (code == Op.DELETE.getCode()) { - mergedList.add(kv2); - kv2 = getNextValue(kvKVIterator); - continue; - } - } - if (ByteArrayUtils.compare(key1, key2, pos, key1.length - 2) == 0) { - if (code == Op.DELETE.getCode()) { - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - continue; - } - if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) { - mergedList.add(kv1); - kv1 = getNextValue(localKVIterator); - kv2 = getNextValue(kvKVIterator); - } - } - } - while (kv1 != null) { - if (!mergedList.contains(kv1) && (!Op.isDelete(kv1.getKey()[kv1.getKey().length - 2])) - && (!Op.isNone(kv1.getKey()[kv1.getKey().length - 2]))) { - mergedList.add(kv1); - } - kv1 = getNextValue(localKVIterator); - } - while (kv2 != null) { - byte[] key = kv2.getKey(); - if (!mergedList.contains(kv2) && !deletedList.contains(new KeyBytes(key))) { - mergedList.add(kv2); - } - kv2 = getNextValue(kvKVIterator); - } - - return DingoTransformedIterator.transform(mergedList.iterator(), wrap(decoder::decode)::apply); + return new TxnMergedIterator(localKVIterator, kvKVIterator, decoder); } - @AllArgsConstructor - private static class KeyBytes { - private final byte[] key; - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KeyBytes keyBytes = (KeyBytes) o; - return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0; - } - - @Override - public int hashCode() { - return Arrays.hashCode(key); - } - } } diff --git a/dingo-exec/src/main/java/io/dingodb/exec/utils/TxnMergedIterator.java b/dingo-exec/src/main/java/io/dingodb/exec/utils/TxnMergedIterator.java new file mode 100644 index 000000000..fa859adc4 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/utils/TxnMergedIterator.java @@ -0,0 +1,166 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.utils; + +import com.google.common.base.Objects; +import io.dingodb.codec.KeyValueCodec; +import io.dingodb.common.store.KeyValue; +import io.dingodb.common.util.ByteArrayUtils; +import io.dingodb.store.api.transaction.data.Op; +import lombok.AllArgsConstructor; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +public class TxnMergedIterator implements Iterator { + + private final Iterator localKVIterator; + private final Iterator kvKVIterator; + private final KeyValueCodec decoder; + private KeyValue nextLocal; + private KeyValue nextKv; + private KeyValue result; + private Set deletedSet = new HashSet<>(); + + public TxnMergedIterator(Iterator localKVIterator, Iterator kvKVIterator, KeyValueCodec decoder) { + this.localKVIterator = localKVIterator; + this.kvKVIterator = kvKVIterator; + this.decoder = decoder; + nextLocal = getNextValue(localKVIterator); + nextKv = getNextValue(kvKVIterator); + } + + @Override + public boolean hasNext() { + // Compute the next element after returning the current one + computeNext(); + return result != null; + } + + @Override + public Object[] next() { + // Decode the value using the provided codec + Object[] decoded = decoder.decode(result); + return decoded; + } + + private void computeNext() { + result = null; + while (nextLocal != null && nextKv != null) { + byte[] key1 = nextLocal.getKey(); + byte[] key2 = nextKv.getKey(); + int code = key1[key1.length - 2]; + if (code == Op.NONE.getCode()) { + nextLocal = getNextValue(localKVIterator); + continue; + } + if (ByteArrayUtils.lessThan(key1, key2, 9, key1.length - 2)) { + if ( + (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) + && !deletedSet.contains(new KeyBytes(key2)) + ) { + result = nextLocal; + nextLocal = getNextValue(localKVIterator); + break; + } else if (code == Op.DELETE.getCode()) { + deletedSet.add(new KeyBytes(key1)); + nextLocal = getNextValue(localKVIterator); + continue; + } + } + if (ByteArrayUtils.greatThan(key1, key2, 9, key1.length - 2)) { + if ( + (code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode()) + && !deletedSet.contains(new KeyBytes(key2)) + ) { + result = nextKv; + nextKv = getNextValue(kvKVIterator); + break; + } + if (code == Op.DELETE.getCode()) { + result = nextKv; + nextKv = getNextValue(kvKVIterator); + break; + } + } + if (ByteArrayUtils.compare(key1, key2, 9, key1.length - 2) == 0) { + if (code == Op.DELETE.getCode()) { + nextLocal = getNextValue(localKVIterator); + nextKv = getNextValue(kvKVIterator); + continue; + } + if ((code == Op.PUT.getCode() || code == Op.PUTIFABSENT.getCode())) { + result = nextLocal; + nextLocal = getNextValue(localKVIterator); + nextKv = getNextValue(kvKVIterator); + break; + } + } + } + if (result == null) { + if (nextLocal == null && nextKv != null) { + while (nextKv != null) { + if (nextKv.getKey()[nextKv.getKey().length - 2] != Op.DELETE.getCode()) { + result = nextKv; + nextKv = getNextValue(kvKVIterator); + break; + } else { + nextKv = getNextValue(kvKVIterator); + } + } + } else if (nextKv == null && nextLocal != null) { + while (nextLocal != null) { + if (nextLocal.getKey()[nextLocal.getKey().length - 2] != Op.DELETE.getCode()) { + result = nextLocal; + nextLocal = getNextValue(localKVIterator); + break; + } else { + nextLocal = getNextValue(localKVIterator); + } + } + } + } + + } + + private static KeyValue getNextValue(Iterator iterator) { + return iterator.hasNext() ? iterator.next() : null; + } + + @AllArgsConstructor + public static class KeyBytes { + private final byte[] key; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeyBytes keyBytes = (KeyBytes) o; + return ByteArrayUtils.compare(this.key, keyBytes.key, 9, this.key.length - 2) == 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(key); + } + } +}