Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connectors-v2-Paimon] Support Date&Time Data Type To Be Flink Int #6076

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -279,7 +281,10 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
objects[i] = rowData.getBinary(i);
break;
case DATE:
objects[i] = rowData.getTimestamp(i, 3).toLocalDateTime().toLocalDate();
objects[i] = DateTimeUtils.toLocalDate(rowData.getInt(i));
break;
case TIME:
objects[i] = DateTimeUtils.toLocalTime(rowData.getInt(i));
break;
case TIMESTAMP:
// Now SeaTunnel not supported assigned the timezone for timestamp,
Expand Down Expand Up @@ -381,9 +386,13 @@ public static InternalRow convert(
break;
case DATE:
LocalDate date = (LocalDate) seaTunnelRow.getField(i);
LocalTime time = LocalTime.of(0, 0, 0);
binaryWriter.writeTimestamp(
i, Timestamp.fromLocalDateTime(date.atTime(time)), 3);
String dateStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be more convenient to directly obtain the number of days from LocalDate?

binaryWriter.writeInt(i, DateTimeUtils.parseDate(dateStr));
break;
case TIME:
LocalTime time = (LocalTime) seaTunnelRow.getField(i);
String timeStr = time.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support milliseconds?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to DATA

binaryWriter.writeInt(i, DateTimeUtils.parseTime(timeStr));
break;
case TIMESTAMP:
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
Expand Down Expand Up @@ -139,6 +140,8 @@ public DataType visit(SeaTunnelDataType<?> dataType) {
return DataTypes.BOOLEAN();
case DATE:
return DataTypes.DATE();
case TIME:
return DataTypes.TIME();
case TIMESTAMP:
return DataTypes.TIMESTAMP(6);
case MAP:
Expand Down Expand Up @@ -241,10 +244,14 @@ public SeaTunnelDataType<?> visit(DoubleType doubleType) {

@Override
public SeaTunnelDataType<?> visit(DateType dateType) {
// TODO the data type in flink is int, so it should be converted to LocalDate
return LocalTimeType.LOCAL_DATE_TYPE;
}

@Override
public SeaTunnelDataType<?> visit(TimeType timeType) {
return LocalTimeType.LOCAL_TIME_TYPE;
}

@Override
public SeaTunnelDataType<?> visit(TimestampType timestampType) {
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.data.serializer.InternalArraySerializer;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -48,6 +49,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +78,7 @@ public void before() {
"c_bytes",
"c_boolean",
"c_date",
"c_time",
"c_timestamp",
"c_map",
"c_array"
Expand All @@ -92,6 +95,7 @@ public void before() {
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
Expand All @@ -107,11 +111,12 @@ public void before() {
byte[] bytes = new byte[] {1, 2, 3, 4};
boolean booleanValue = false;
LocalDate date = LocalDate.of(1996, 3, 16);
LocalTime time = LocalTime.of(4, 16, 20);
LocalDateTime timestamp = LocalDateTime.of(1996, 3, 16, 4, 16, 20);
Map<String, String> map = new HashMap<>();
map.put("name", "paimon");
String[] strings = new String[] {"paimon", "seatunnel"};
Object[] objects = new Object[14];
Object[] objects = new Object[15];
objects[0] = tinyint;
objects[1] = smallint;
objects[2] = intNum;
Expand All @@ -123,11 +128,12 @@ public void before() {
objects[8] = bytes;
objects[9] = booleanValue;
objects[10] = date;
objects[11] = timestamp;
objects[12] = map;
objects[13] = strings;
objects[11] = time;
objects[12] = timestamp;
objects[13] = map;
objects[14] = strings;
seaTunnelRow = new SeaTunnelRow(objects);
BinaryRow binaryRow = new BinaryRow(14);
BinaryRow binaryRow = new BinaryRow(15);
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
binaryRowWriter.writeByte(0, tinyint);
binaryRowWriter.writeShort(1, smallint);
Expand All @@ -139,9 +145,12 @@ public void before() {
binaryRowWriter.writeString(7, BinaryString.fromString(string));
binaryRowWriter.writeBinary(8, bytes);
binaryRowWriter.writeBoolean(9, booleanValue);
binaryRowWriter.writeTimestamp(
10, Timestamp.fromLocalDateTime(LocalDateTime.of(date, LocalTime.of(0, 0, 0))), 3);
binaryRowWriter.writeTimestamp(11, Timestamp.fromLocalDateTime(timestamp), 6);
binaryRowWriter.writeInt(
10,
DateTimeUtils.parseDate(date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))));
binaryRowWriter.writeInt(
11, DateTimeUtils.parseTime(time.format(DateTimeFormatter.ofPattern("HH:mm:ss"))));
binaryRowWriter.writeTimestamp(12, Timestamp.fromLocalDateTime(timestamp), 6);
BinaryArray binaryArray = new BinaryArray();
BinaryArrayWriter binaryArrayWriter =
new BinaryArrayWriter(
Expand All @@ -158,7 +167,7 @@ public void before() {
binaryArrayWriter1.complete();
BinaryMap binaryMap = BinaryMap.valueOf(binaryArray, binaryArray1);
binaryRowWriter.writeMap(
12, binaryMap, new InternalMapSerializer(DataTypes.STRING(), DataTypes.STRING()));
13, binaryMap, new InternalMapSerializer(DataTypes.STRING(), DataTypes.STRING()));
BinaryArray binaryArray2 = new BinaryArray();
BinaryArrayWriter binaryArrayWriter2 =
new BinaryArrayWriter(
Expand All @@ -169,7 +178,7 @@ public void before() {
binaryArrayWriter2.writeString(1, BinaryString.fromString("seatunnel"));
binaryArrayWriter2.complete();
binaryRowWriter.writeArray(
13, binaryArray2, new InternalArraySerializer(DataTypes.STRING()));
14, binaryArray2, new InternalArraySerializer(DataTypes.STRING()));
internalRow = binaryRow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void before() {
"c_bytes",
"c_boolean",
"c_date",
"c_time",
"c_timestamp",
"c_map",
"c_array"
Expand All @@ -72,6 +73,7 @@ public void before() {
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
Expand All @@ -89,10 +91,11 @@ public void before() {
new DataField(8, "c_bytes", DataTypes.BYTES()),
new DataField(9, "c_boolean", DataTypes.BOOLEAN()),
new DataField(10, "c_date", DataTypes.DATE()),
new DataField(11, "c_timestamp", DataTypes.TIMESTAMP(6)),
new DataField(11, "c_time", DataTypes.TIME()),
new DataField(12, "c_timestamp", DataTypes.TIMESTAMP(6)),
new DataField(
12, "c_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new DataField(13, "c_array", DataTypes.ARRAY(DataTypes.STRING())));
13, "c_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
new DataField(14, "c_array", DataTypes.ARRAY(DataTypes.STRING())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env {
source {
FakeSource {
row.num = 100000
SimonChou12138 marked this conversation as resolved.
Show resolved Hide resolved
split.num = 3
schema = {
fields {
c_map = "map<string, string>"
Expand All @@ -45,6 +46,7 @@ source {
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_time = time
c_timestamp = timestamp
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@
}, {
"id" : 12,
"name" : "c_date",
"type" : "TIMESTAMP(3)"
"type" : "DATE"
}, {
"id" : 13,
"name" : "c_time",
"type" : "TIME"
}, {
"id" : 14,
"name" : "c_timestamp",
"type" : "TIMESTAMP(6)"
} ],
"highestFieldId" : 13,
"highestFieldId" : 14,
"partitionKeys" : [ ],
"primaryKeys" : [ ],
"options" : { }
Expand Down