From f6f882dd0f4f54f808a06de0b1322e8dd8951585 Mon Sep 17 00:00:00 2001 From: canonical Date: Sun, 24 Nov 2024 23:51:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=B1=E5=8C=96=E4=BE=9D=E8=B5=96=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=EF=BC=8C=E5=8F=AA=E4=BD=BF=E7=94=A8InputProvider?= =?UTF-8?q?=E5=8D=B3=E5=8F=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/ResourceRecordConsumerProvider.java | 12 ++++++------ .../core/loader/ResourceRecordLoaderProvider.java | 9 +++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java index df858f888..e0956a683 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/consumer/ResourceRecordConsumerProvider.java @@ -16,7 +16,7 @@ import io.nop.batch.core.exceptions.BatchCancelException; import io.nop.commons.util.IoHelper; import io.nop.core.resource.IResource; -import io.nop.core.resource.record.IResourceRecordIO; +import io.nop.core.resource.record.IResourceRecordOutputProvider; import io.nop.dataset.record.IRecordOutput; import java.util.List; @@ -32,7 +32,7 @@ */ public class ResourceRecordConsumerProvider extends AbstractBatchResourceHandler implements IBatchConsumerProvider { - private IResourceRecordIO recordIO; + private IResourceRecordOutputProvider recordIO; private String encoding; /** @@ -54,11 +54,11 @@ static class ConsumerState { } - public IResourceRecordIO getRecordIO() { + public IResourceRecordOutputProvider getRecordIO() { return recordIO; } - public void setRecordIO(IResourceRecordIO recordIO) { + public void setRecordIO(IResourceRecordOutputProvider recordIO) { this.recordIO = recordIO; } @@ -106,9 +106,9 @@ ConsumerState newConsumerState(IBatchTaskContext context) { context.onBeforeComplete(() -> { Map trailer = aggregator.complete(null, state.combinedValue); state.output.endWrite(trailer); - try{ + try { state.output.flush(); - }catch(Exception e){ + } catch (Exception e) { throw NopException.adapt(e); } }); diff --git a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/loader/ResourceRecordLoaderProvider.java b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/loader/ResourceRecordLoaderProvider.java index 22700a7ba..1aa683c25 100644 --- a/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/loader/ResourceRecordLoaderProvider.java +++ b/nop-batch/nop-batch-core/src/main/java/io/nop/batch/core/loader/ResourceRecordLoaderProvider.java @@ -16,7 +16,7 @@ import io.nop.batch.core.common.AbstractBatchResourceHandler; import io.nop.commons.util.IoHelper; import io.nop.core.resource.IResource; -import io.nop.core.resource.record.IResourceRecordIO; +import io.nop.core.resource.record.IResourceRecordInputProvider; import io.nop.dataset.record.IRecordInput; import io.nop.dataset.record.IRowNumberRecord; import io.nop.dataset.record.impl.RowNumberRecordInput; @@ -40,7 +40,7 @@ public class ResourceRecordLoaderProvider extends AbstractBatchResourceHandle implements IBatchLoaderProvider { static final String VAR_PROCESSED_ROW_NUMBER = "processedRowNumber"; - private IResourceRecordIO recordIO; + private IResourceRecordInputProvider recordIO; private String encoding; // 对读取的数据进行汇总处理,例如统计读入的总行数等,最后在complete时写入到数据库中 @@ -96,11 +96,11 @@ public void setFilter(IBatchRecordFilter filter) { this.filter = filter; } - public IResourceRecordIO getRecordIO() { + public IResourceRecordInputProvider getRecordIO() { return recordIO; } - public void setRecordIO(IResourceRecordIO recordIO) { + public void setRecordIO(IResourceRecordInputProvider recordIO) { this.recordIO = recordIO; } @@ -144,6 +144,7 @@ public IBatchLoader setup(IBatchTaskContext context) { return load(batchSize, state); }; } + LoaderState newLoaderState(IBatchTaskContext context) { LoaderState state = new LoaderState<>(); state.context = context;