diff --git a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java index bdac277..c377f6e 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java @@ -55,6 +55,7 @@ public class ProgramMembersBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Object pageBuilderLock = new Object(); public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask { @@ -214,7 +215,9 @@ private Future createFutureTask(PluginTask task, RecordImporter recordImporte while (csvRecords.hasNext()) { Map csvRecord = csvRecords.next(); ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); - recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + synchronized (pageBuilderLock) { + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + } imported = imported + 1; }