Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix insert/load with timestamp of Long.MIN_VALUE and LONG.MAX_VALUE #14491

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import org.apache.iotdb.rpc.StatementExecutionException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.v4.ITsFileWriter;
import org.apache.tsfile.write.v4.TsFileWriterBuilder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -43,6 +47,8 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
Expand Down Expand Up @@ -1464,4 +1470,124 @@ public void insertRelationalRowWithAutoCastTest()
session.executeNonQueryStatement("SET CONFIGURATION \"enable_partial_insert\"=\"true\"");
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertMinMaxTimeTest() throws IoTDBConnectionException, StatementExecutionException {
try {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION timestamp_precision_check_enabled='false'");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("USE db1");
session.executeNonQueryStatement("CREATE TABLE test_insert_min_max (id1 ID, s1 INT32)");

session.executeNonQueryStatement(
String.format(
"INSERT INTO test_insert_min_max(time, id1, s1) VALUES (%d, 'd1', 1)",
Long.MIN_VALUE));
session.executeNonQueryStatement(
String.format(
"INSERT INTO test_insert_min_max(time, id1, s1) VALUES (%d, 'd1', 1)",
Long.MAX_VALUE));

SessionDataSet dataSet = session.executeQueryStatement("SELECT * FROM test_insert_min_max");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getFields().get(0).getLongV());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());

session.executeNonQueryStatement("FLUSH");
dataSet = session.executeQueryStatement("SELECT * FROM test_insert_min_max");
record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getFields().get(0).getLongV());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
}
} finally {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION timestamp_precision_check_enabled='true'");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeAlignedTest()
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
File file = new File("target", "test.tsfile");
TableSchema tableSchema =
new TableSchema(
"load_min_max",
Arrays.asList("id1", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.ID, ColumnCategory.MEASUREMENT));

try (ITsFileWriter writer =
new TsFileWriterBuilder().file(file).tableSchema(tableSchema).build()) {
Tablet tablet =
new Tablet(
Arrays.asList("id1", "s1"), Arrays.asList(TSDataType.STRING, TSDataType.INT32));
tablet.addTimestamp(0, Long.MIN_VALUE);
tablet.addTimestamp(1, Long.MAX_VALUE);
tablet.addValue(0, 0, "d1");
tablet.addValue(1, 0, "d1");
tablet.addValue(0, 1, 1);
tablet.addValue(1, 1, 1);
writer.write(tablet);
}

try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("USE db1");
try {
session.executeNonQueryStatement(
"SET CONFIGURATION timestamp_precision_check_enabled='false'");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \'" + file.getAbsolutePath() + "\'");

SessionDataSet dataSet = session.executeQueryStatement("SELECT * FROM load_min_max");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getFields().get(0).getLongV());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getFields().get(0).getLongV());
assertFalse(dataSet.hasNext());
} finally {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION timestamp_precision_check_enabled='false'");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
Expand All @@ -55,6 +60,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -72,6 +79,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings({"ThrowFromFinallyBlock", "ResultOfMethodCallIgnored"})
@RunWith(IoTDBTestRunner.class)
public class IoTDBSessionSimpleIT {

Expand Down Expand Up @@ -336,7 +344,7 @@ public void insertByObjAndNotInferTypeTest() {
expected.add(TSDataType.TEXT.name());

Set<String> actual = new HashSet<>();
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.**");
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.**");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids conflicts with other tests.

while (dataSet.hasNext()) {
actual.add(dataSet.next().getFields().get(3).getStringValue());
}
Expand Down Expand Up @@ -748,7 +756,7 @@ public void createWrongTimeSeriesTest() {
LOGGER.error("", e);
}

final SessionDataSet dataSet = session.executeQueryStatement("SHOW TIMESERIES");
final SessionDataSet dataSet = session.executeQueryStatement("SHOW TIMESERIES root.sg.**");
assertFalse(dataSet.hasNext());

session.deleteStorageGroup(storageGroup);
Expand Down Expand Up @@ -1859,4 +1867,166 @@ public void convertRecordsToTabletsTest() {
e.printStackTrace();
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertMinMaxTimeTest() throws IoTDBConnectionException, StatementExecutionException {
try {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}

session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MIN_VALUE));
session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MAX_VALUE));

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());

session.executeNonQueryStatement("FLUSH");
dataSet = session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
}
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeNonAlignedTest()
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
File file = new File("target", "test.tsfile");
try (TsFileWriter writer = new TsFileWriter(file)) {
IDeviceID deviceID = Factory.DEFAULT_FACTORY.create("root.testLoadMinMax.d1");
writer.registerTimeseries(deviceID, new MeasurementSchema("s1", TSDataType.INT32));
TSRecord record = new TSRecord(deviceID, Long.MIN_VALUE);
record.addPoint("s1", 1);
writer.writeRecord(record);
record.setTime(Long.MAX_VALUE);
writer.writeRecord(record);
}

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \"" + file.getAbsolutePath() + "\"");

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testLoadMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeAlignedTest()
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
File file = new File("target", "test.tsfile");
try (TsFileWriter writer = new TsFileWriter(file)) {
IDeviceID deviceID = Factory.DEFAULT_FACTORY.create("root.testLoadMinMaxAligned.d1");
writer.registerAlignedTimeseries(
deviceID, Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
TSRecord record = new TSRecord(deviceID, Long.MIN_VALUE);
record.addPoint("s1", 1);
writer.writeRecord(record);
record.setTime(Long.MAX_VALUE);
writer.writeRecord(record);
}

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \"" + file.getAbsolutePath() + "\"");

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testLoadMinMaxAligned.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2886,6 +2886,15 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
BinaryAllocator.getInstance().close(true);
}

commonDescriptor
.getConfig()
.setTimestampPrecisionCheckEnabled(
Boolean.parseBoolean(
properties.getProperty(
"timestamp_precision_check_enabled",
ConfigurationFileUtils.getConfigurationDefaultValue(
"timestamp_precision_check_enabled"))));

conf.setEnablePartialInsert(
Boolean.parseBoolean(
Optional.ofNullable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,11 @@ public static Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> getTimePart
result.add(timePartitionSlot);
// next init
timePartitionSlot = new TTimePartitionSlot(endTime);
endTime = endTime + TimePartitionUtils.getTimePartitionInterval();
// beware of overflow
endTime =
endTime + TimePartitionUtils.getTimePartitionInterval() > endTime
? endTime + TimePartitionUtils.getTimePartitionInterval()
: Long.MAX_VALUE;
} else {
index++;
if (index < size) {
Expand Down
Loading
Loading