Skip to content

Commit

Permalink
弱化依赖接口,只使用InputProvider即可
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Nov 24, 2024
1 parent dd51b3e commit f6f882d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.nop.batch.core.exceptions.BatchCancelException;
import io.nop.commons.util.IoHelper;
import io.nop.core.resource.IResource;
import io.nop.core.resource.record.IResourceRecordIO;
import io.nop.core.resource.record.IResourceRecordOutputProvider;
import io.nop.dataset.record.IRecordOutput;

import java.util.List;
Expand All @@ -32,7 +32,7 @@
*/
public class ResourceRecordConsumerProvider<R> extends AbstractBatchResourceHandler
implements IBatchConsumerProvider<R> {
private IResourceRecordIO<R> recordIO;
private IResourceRecordOutputProvider<R> recordIO;
private String encoding;

/**
Expand All @@ -54,11 +54,11 @@ static class ConsumerState<R> {
}


public IResourceRecordIO<R> getRecordIO() {
public IResourceRecordOutputProvider<R> getRecordIO() {
return recordIO;
}

public void setRecordIO(IResourceRecordIO<R> recordIO) {
public void setRecordIO(IResourceRecordOutputProvider<R> recordIO) {
this.recordIO = recordIO;
}

Expand Down Expand Up @@ -106,9 +106,9 @@ ConsumerState<R> newConsumerState(IBatchTaskContext context) {
context.onBeforeComplete(() -> {
Map<String, Object> trailer = aggregator.complete(null, state.combinedValue);
state.output.endWrite(trailer);
try{
try {
state.output.flush();
}catch(Exception e){
} catch (Exception e) {
throw NopException.adapt(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.nop.batch.core.common.AbstractBatchResourceHandler;
import io.nop.commons.util.IoHelper;
import io.nop.core.resource.IResource;
import io.nop.core.resource.record.IResourceRecordIO;
import io.nop.core.resource.record.IResourceRecordInputProvider;
import io.nop.dataset.record.IRecordInput;
import io.nop.dataset.record.IRowNumberRecord;
import io.nop.dataset.record.impl.RowNumberRecordInput;
Expand All @@ -40,7 +40,7 @@ public class ResourceRecordLoaderProvider<S> extends AbstractBatchResourceHandle
implements IBatchLoaderProvider<S> {
static final String VAR_PROCESSED_ROW_NUMBER = "processedRowNumber";

private IResourceRecordIO<S> recordIO;
private IResourceRecordInputProvider<S> recordIO;
private String encoding;

// 对读取的数据进行汇总处理,例如统计读入的总行数等,最后在complete时写入到数据库中
Expand Down Expand Up @@ -96,11 +96,11 @@ public void setFilter(IBatchRecordFilter<S> filter) {
this.filter = filter;
}

public IResourceRecordIO<S> getRecordIO() {
public IResourceRecordInputProvider<S> getRecordIO() {
return recordIO;
}

public void setRecordIO(IResourceRecordIO<S> recordIO) {
public void setRecordIO(IResourceRecordInputProvider<S> recordIO) {
this.recordIO = recordIO;
}

Expand Down Expand Up @@ -144,6 +144,7 @@ public IBatchLoader<S> setup(IBatchTaskContext context) {
return load(batchSize, state);
};
}

LoaderState<S> newLoaderState(IBatchTaskContext context) {
LoaderState<S> state = new LoaderState<>();
state.context = context;
Expand Down

0 comments on commit f6f882d

Please sign in to comment.