Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zeta] [savepoint] Job resume from savepoint failed #5198

Closed
3 tasks done
kim-up opened this issue Aug 2, 2023 · 1 comment
Closed
3 tasks done

[Zeta] [savepoint] Job resume from savepoint failed #5198

kim-up opened this issue Aug 2, 2023 · 1 comment
Labels

Comments

@kim-up
Copy link
Contributor

kim-up commented Aug 2, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

https://seatunnel.apache.org/docs/seatunnel-engine/savepoint
When I did savePoint for Job, and then restore with savepoint.

I got the error,like this:

java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.notifyCheckpointComplete(SinkFlowLifeCycle.java:214)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:358)
	at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:133)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:372)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:372)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:358)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_311]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_311]
	at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300) ~[?:1.8.0_311]
	at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284) ~[?:1.8.0_311]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_311]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_311]
	at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:426) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:420) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:253) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.notifyCheckpointComplete(SinkFlowLifeCycle.java:214)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:358)
	at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:133)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:372)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:372)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:358)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:97) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
	... 5 more

It was savepoint successful.
image

SeaTunnel Version

2.3.3

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MySQL-CDC {
    result_table_name = "binlog"
    format= "compatible_debezium_json"
    parallelism = 1
    username = "****"
    password = "****"
    table-names = ["*****"]
    base-url = "jdbc:mysql://****:3306/***"
    debezium {
        snapshot.mode = "never"   }
  }
}

transform {
}

sink {
    kafka {
       # topic = "seatunnel"
        bootstrap.servers = "*****:9092"
        format = compatible_debezium_json
        kafka.request.timeout.ms = 60000
        #semantics = EXACTLY_ONCE
        kafka.config = {
          acks = "all"
          request.timeout.ms = 60000
          buffer.memory = 33554432
        }
    }
}

Running Command

./bin/seatunnel.sh -s {jobId}

./bin/seatunnel.sh -c {jobConfig} -r {jobId}

Error Exception

*

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@kim-up kim-up added the bug label Aug 2, 2023
@kim-up
Copy link
Contributor Author

kim-up commented Aug 2, 2023

#5152

@kim-up kim-up closed this as completed Aug 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant