Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Transform-V2][SQL] Support case when clause for SQL Transform plugin (#5013) #5014

Closed
wants to merge 15 commits into from

Conversation

javalover123
Copy link
Contributor

Purpose of this pull request

Check list

EricJoy2048
EricJoy2048 previously approved these changes Jul 6, 2023
@EricJoy2048
Copy link
Member

Please add e2e for this update.

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add document and test case for this pr.

@javalover123
Copy link
Contributor Author

OK, added. apache/seatunnel-website#253

@EricJoy2048
Copy link
Member

OK, added. apache/seatunnel-website#253

I am sorry, I didn't express myself clearly. You need update the document https://github.com/apache/seatunnel/blob/dev/docs/en/transform-v2/sql-functions.md

@EricJoy2048
Copy link
Member

Hi, @rewerma , Can you help to review this pr?

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Jul 27, 2023

there's a bug,If it is bigint, the case when cannot be matched
image
image

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Jul 27, 2023

@javalover123 there's a bug

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Jul 27, 2023

besides int,tinyint smallint bigint can't nornal match

@javalover123
Copy link
Contributor Author

@wu-a-ge Updated, but I haven't run e2e test already.
5f5f623

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Jul 30, 2023

@wu-a-ge Updated, but I haven't run e2e test already. 5f5f623

another bug,my job config :



env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 5000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "bigint"
      }
    }
    rows = [
      {fields = ["javalover123",6], kind = INSERT}
    ]
  }
}

transform {
  sql {
    source_table_name="fake"
    result_table_name="result_0"
    query="select name,case age when 6 then 1 else age end as age from fake"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    source_table_name="result_0"
    database="seatunnel"
    table="test"
    generate_sink_sql="true"
  }
}

exception:

2023-07-30 23:38:38,172 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-709778] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@64ec1e6
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:209) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[classes/:?]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) [classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_281]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_281]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:145) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:45) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:31) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:204) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:169) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:88) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:159) ~[classes/:?]
	... 16 more

@javalover123

@javalover123
Copy link
Contributor Author

javalover123 commented Jul 31, 2023

Thanks, can you give some help/advice? @wu-a-ge

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Jul 31, 2023

Thanks, can you give some help/advice? @wu-a-ge

Thanks for your contribution. My current workaround is to directly adjust the toExternal method in the AbstractJdbcRowConverter class. All casts are parsed using the parse method that wraps the type, but I personally believe that it is still necessary to determine the data type to solve the problem of different types of conversions
image

@javalover123
Copy link
Contributor Author

Update, please check, thanks. @wu-a-ge
In another way, Number.*Value() method performance should be better when run with JDK 9 or above.
image

                case TINYINT:
                    statement.setByte(
                            statementIndex, ((Number) row.getField(fieldIndex)).byteValue());
                    break;
                case SMALLINT:
                    statement.setShort(
                            statementIndex, ((Number) row.getField(fieldIndex)).shortValue());
                    break;
                case INT:
                    statement.setInt(
                            statementIndex, ((Number) row.getField(fieldIndex)).intValue());
                    break;
                case BIGINT:
                    statement.setLong(
                            statementIndex, ((Number) row.getField(fieldIndex)).longValue());
                    break;
                case FLOAT:
                    statement.setFloat(
                            statementIndex, ((Number) row.getField(fieldIndex)).floatValue());
                    break;
                case DOUBLE:
                    statement.setDouble(
                            statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Aug 1, 2023

Update, please check, thanks. @wu-a-ge In another way, Number.*Value() method performance should be better when run with JDK 9 or above. image

                case TINYINT:
                    statement.setByte(
                            statementIndex, ((Number) row.getField(fieldIndex)).byteValue());
                    break;
                case SMALLINT:
                    statement.setShort(
                            statementIndex, ((Number) row.getField(fieldIndex)).shortValue());
                    break;
                case INT:
                    statement.setInt(
                            statementIndex, ((Number) row.getField(fieldIndex)).intValue());
                    break;
                case BIGINT:
                    statement.setLong(
                            statementIndex, ((Number) row.getField(fieldIndex)).longValue());
                    break;
                case FLOAT:
                    statement.setFloat(
                            statementIndex, ((Number) row.getField(fieldIndex)).floatValue());
                    break;
                case DOUBLE:
                    statement.setDouble(
                            statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());

Found a bug,
image
caseExpression.getElseExpression() may be null,so there will throw a NullPointException
image
@javalover123

@javalover123
Copy link
Contributor Author

Update, please check, thanks. @wu-a-ge

@wu-a-ge
Copy link
Contributor

wu-a-ge commented Aug 7, 2023

Update, please check, thanks. @wu-a-ge

Hello,One scenario is not supported

case when num3>2 and num='abc' then 7  end as tinyint1

case when is a complex logical expression

@javalover123
Copy link
Contributor Author

Update, please check, thanks. @wu-a-ge

@TyrantLucifer
Copy link
Member

@rewerma PTAL.

@javalover123
Copy link
Contributor Author

Update, please check, thanks. @rewerma

@EricJoy2048
Copy link
Member

EricJoy2048 commented Sep 11, 2023

It seems that different data types may have Bug that cannot be covered by test cases.
Can you add more e2e case to cover them? Hi, @wu-a-ge , do you have any suggestions ?

@zhilinli123
Copy link
Contributor

zhilinli123 commented Sep 12, 2023

I don't know what test cases are missing, personal advice,I think more type tests can be added to make sure it passes,For example, string,int,double,

SELECT 
  column_name,
  CASE 
    WHEN column_name IN (value_1, value_2, ...) THEN result_1
    ...
    ELSE test_in_int,
   CASE 
    WHEN column_name IN ('value_1','value_2', ...) THEN result_1
    ...
    ELSE test_in_string
    CASE 
    WHEN (column_name IN ('value_1','value_2', ...)  or column_name!='value3' or  or column_int!=10)  THEN result_1
    ...
    ELSE test_in_or_string,
    CASE 
    WHEN column_name IN (value_1, value_2 ) THEN value_2 
    ELSE test_in_return
  END AS output_column
FROM table_name;

@javalover123
Copy link
Contributor Author

@EricJoy2048 @zhilinli123 I may add this week

@EricJoy2048
Copy link
Member

@EricJoy2048 @zhilinli123 I may add this week

Hi, Are there any updates on this PR?

@javalover123
Copy link
Contributor Author

javalover123 commented Oct 18, 2023

Sorry, I've modified one test, but don't have any more time to work on this recently.

@zhilinli123
Copy link
Contributor

zhilinli123 commented Nov 27, 2023

I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future
@EricJoy2048 cc @javalover123

@javalover123
Copy link
Contributor Author

I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future @EricJoy2048 cc @javalover123

@zhilinli123 Sorry, I can't see assign button, @EricJoy2048 please try.

@EricJoy2048
Copy link
Member

Ok, I will close this pr.

@EricJoy2048 EricJoy2048 closed this Dec 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
First-time contributor First-time contributor
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants