Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null pointer exception when importing program members multithread. #126

Draft
wants to merge 58 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
98a90f1
delete underbar if prefix is empty (#1)
kazuki-yane Apr 20, 2020
73613af
add list and activity type
kazuki-yane May 11, 2020
5e21ae8
update README
kazuki-yane May 12, 2020
b68ba66
delete unnecessary comment
kazuki-yane May 14, 2020
b9ae6b1
Merge pull request #2 from trocco-io/add_list
kazuki-yane May 18, 2020
88ffe63
アクティビティのJSONデータを読み取りできるように修正
dododo8m May 25, 2020
818fc27
gitignoreの更新
dododo8m May 25, 2020
5547b3b
バージョンアップ
dododo8m May 25, 2020
0d37c5f
Merge pull request #3 from trocco-io/bugfix-activity-json-format
dododo8m May 25, 2020
6409f81
バージョンアップ
dododo8m May 25, 2020
1c04a31
Merge pull request #4 from trocco-io/version-up-0.6.21
dododo8m May 25, 2020
80227bf
バックスラッシュでjson parseエラーになるときの対応を追加
dododo8m May 31, 2020
802f826
バージョンアップ
dododo8m May 31, 2020
319c65c
Merge pull request #5 from trocco-io/bugfix-json-parse
dododo8m May 31, 2020
b5474ed
CSVパーサーを使用するように修正
dododo8m Jun 4, 2020
c0116e0
バージョンアップ
dododo8m Jun 4, 2020
47beabd
不要なログを削除
dododo8m Jun 4, 2020
b287bb6
CSVparserを使用するように修正
dododo8m Jun 8, 2020
19224e8
バージョンアップ
dododo8m Jun 8, 2020
07a8a1f
Merge pull request #6 from trocco-io/csv-parser
dododo8m Jun 8, 2020
2559847
id系のプレビューのデータを数字にするように修正
dododo8m Jun 23, 2020
b2fac65
Merge pull request #7 from trocco-io/change-id-mock-preview-data
dododo8m Jun 23, 2020
cb88d88
OOM対応
dododo8m Jun 30, 2020
3ec9df1
Merge pull request #8 from trocco-io/change-id-mock-preview-data
dododo8m Jun 30, 2020
5772b9f
データの取得期間を10日間に変更
dododo8m Jul 7, 2020
3766593
CSVデータの取得ロジックの修正
dododo8m Jul 16, 2020
2ec9a18
バージョンアップ
dododo8m Jul 16, 2020
789b612
Merge pull request #9 from trocco-io/change_range_extract
dododo8m Jul 16, 2020
3034eee
実装完了
kazuki-yane Jul 23, 2020
239cd83
fix_test
kazuki-yane Jul 24, 2020
e3bc837
version_up
kazuki-yane Jul 24, 2020
c1f6fab
Merge pull request #10 from trocco-io/add_endpoint
dododo8m Jul 26, 2020
6f5255c
tmpファイル生成時にログを追加
dododo8m Jul 27, 2020
fb9859a
バージョンアップ
dododo8m Jul 27, 2020
8935e18
バージョンアップ
dododo8m Jul 27, 2020
a9102c6
Merge pull request #11 from trocco-io/add-log-create-tmp-file
dododo8m Jul 27, 2020
2c03920
add gem-push workflow
tk3fftk Jan 6, 2022
e91857c
fix dir name for actions
tk3fftk Jan 6, 2022
83a2f53
Merge pull request #14 from trocco-io/add-push-action
tk3fftk Jan 6, 2022
3118daf
Merge pull request #15 from trocco-io/fix-workflows
tk3fftk Jan 6, 2022
5e9f60f
ログ変更 (#12)
kazuki-yane Jun 20, 2022
f59cf5e
Merge commit '3911a2cbd0d5fbfb4d394bc2c3e5640a87981180' into marge-tr…
t3t5u Jun 27, 2022
2126d8c
バージョンの付け方を変更
t3t5u Jun 29, 2022
0a09ba7
Merge pull request #18 from trocco-io/marge-treasure-data-109
t3t5u Jun 29, 2022
e006cd8
Merge tag 'v0.6.24' into marge-treasure-data-v0.6.24
t3t5u Jul 1, 2022
1361d01
タグ名の先頭に v を付けるよう修正
t3t5u Jul 1, 2022
547063e
バージョニングのルールを変更
t3t5u Jul 13, 2022
07e256e
Add support for folder
t3t5u Jul 8, 2022
b6a697f
Merge pull request #19 from trocco-io/marge-treasure-data-v0.6.24
t3t5u Jul 13, 2022
f4e6056
Fix tag pattern
t3t5u Jul 13, 2022
a28edf4
Merge pull request #20 from trocco-io/add-support-for-folder
t3t5u Jul 14, 2022
42a1d13
update-to-v0.6.26
pn-koshikawa Jan 23, 2024
0936314
update compileClasspath
pn-koshikawa Jan 23, 2024
ffb524a
fix build error
pn-koshikawa Jan 23, 2024
67003b8
Merge pull request #23 from trocco-io/update-to-v0.6.26
d-hrs Jan 29, 2024
8290924
handle-null-pointer-exception-when-multithread
pn-koshikawa Jan 30, 2024
f1939a8
update trocco version
pn-koshikawa Jan 30, 2024
ea6311a
update comment
pn-koshikawa Jan 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/gem-push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Ruby Gem

on:
workflow_dispatch:
push:
tags:
- '*'

jobs:
build:
name: Build + Publish
runs-on: ubuntu-latest
permissions:
packages: write
contents: read
steps:
- uses: actions/checkout@v2
- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
- name: push gem
uses: trocco-io/push-gem-to-gpr-action@v1
with:
language: java
gem-path: "./build/gems/*.gem"
github-token: "${{ secrets.GITHUB_TOKEN }}"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ build/
/.metadata/
.classpath
.project

bin/
example.yml
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt
- Campaign(campaign)
- Assets Programs (program)
- Program Members (program_members)
- List (list)
- Activity Type (activity_type)
- Assets Folders (folder)

This plugin uses Marketo REST API.

Expand Down Expand Up @@ -199,6 +202,50 @@ Get Members by Program Ids or All Program.
|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
| **program_ids** | false | null | Import Members by specified Program_ID (comma-separated). If not specified will import all Members by all Program IDs |

### List

List extract all list data from Marketo

`target: list`

Schema type: Static schema

Incremental support: no

Range ingestion: no

### Activity Type

Activity Type extract all activity type data from Marketo

`target: activity_type`

Schema type: Static schema

Incremental support: no

Range ingestion: no

### Assets folders

Get child folders from within a specified root folder or all folders if no root folder is specified.

`target: folder`

Configuration:

| name | required | default value | description |
|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
| **root_id** | false | null | Parent folder id |
| **root_type** | false | folder | Parent folder type, supported values `folder`, `program` |
| **max_depth** | false | 2 | Maximum folder depth to traverse |
| **workspace** | false | null | Name of the workspace |

Schema type: Static schema

Incremental support: no

Range ingestion: no

## Example

Expand Down
21 changes: 20 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {
id "jacoco"
id "signing"
id "org.embulk.embulk-plugins" version "0.5.5"
id "com.palantir.git-version" version "0.12.3"
}

repositories {
Expand All @@ -13,7 +14,20 @@ repositories {

group = "com.treasuredata.embulk.plugins"
description = "Loads records from Marketo."
version = "0.6.26"
version = {
def baseVersion = "0.6.26"
def troccoVersion = "0.1.1"
def tag = "${baseVersion}-trocco-${troccoVersion}"
def vd = versionDetails()
if (vd.lastTag != "${tag}") {
logger.warn "lastTag '${vd.lastTag}' is not '${tag}'"
}
if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+([.-][.a-zA-Z0-9-]+)?/) {
vd.lastTag
} else {
"0.0.0.${vd.gitHash}"
}
}()

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down Expand Up @@ -60,6 +74,11 @@ dependencies {
implementation "com.sun.xml.bind:jaxb-impl:2.2.11"
implementation "javax.activation:activation:1.1.1"

compile 'com.google.guava:guava:18.0'
compile "com.google.code.findbugs:annotations:3.0.1"
compile 'org.apache.commons:commons-lang3:3.4'
compile 'org.apache.commons:commons-csv:1.8'

testImplementation "junit:junit:4.+"
testImplementation "org.embulk:embulk-core:$embulkVersion:tests"
testImplementation "org.embulk:embulk-junit4:$embulkVersion"
Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-locks/compileClasspath.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ javax.xml.bind:jaxb-api:2.2.11
net.jcip:jcip-annotations:1.0
org.apache.bval:bval-core:0.5
org.apache.bval:bval-jsr303:0.5
org.apache.commons:commons-csv:1.8
org.apache.commons:commons-lang3:3.12.0
org.eclipse.jetty:jetty-client:9.4.51.v20230217
org.eclipse.jetty:jetty-http:9.4.51.v20230217
Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-locks/runtimeClasspath.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ javax.xml.bind:jaxb-api:2.2.11
net.jcip:jcip-annotations:1.0
org.apache.bval:bval-core:0.5
org.apache.bval:bval-jsr303:0.5
org.apache.commons:commons-csv:1.8
org.apache.commons:commons-lang3:3.12.0
org.eclipse.jetty:jetty-client:9.4.51.v20230217
org.eclipse.jetty:jetty-http:9.4.51.v20230217
Expand Down
66 changes: 65 additions & 1 deletion src/main/java/org/embulk/input/marketo/CsvTokenizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.embulk.config.ConfigException;
import org.embulk.spi.DataException;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.Task;
import org.embulk.util.text.LineDecoder;
import org.embulk.util.text.Newline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +34,8 @@
*/
public class CsvTokenizer
{
private static final Logger LOGGER = LoggerFactory.getLogger(CsvTokenizer.class);

enum RecordState
{
NOT_END, END,
Expand Down Expand Up @@ -130,6 +142,58 @@ public CsvTokenizer(String delimiter, char quote, char escape, String newline, b
this.nullStringOrNull = nullStringOrNull;
}

private Reader inputStream;

public CsvTokenizer(Reader inputStream)
{
this.inputStream = inputStream;
this.delimiterChar = 0; // Unused
this.delimiterFollowingString = null; // Unused
this.quote = 0; // Unused
this.escape = 0; // Unused
this.newline = null; // Unused;
this.trimIfNotQuoted = false; // Unused
this.maxQuotedSizeLimit = 0; // Unused
this.commentLineMarker = null; // Unused
this.input = null; // Unused
this.nullStringOrNull = null; // Unused
}

public CSVParser csvParse()
{
try {
String path = String.format("tmp_%d.csv", Calendar.getInstance().getTimeInMillis());
File file = new File(path);
FileWriter filewriter = new FileWriter(file);

LOGGER.info("create tmp file: " + path);

BufferedReader b = new BufferedReader(inputStream);
String line = b.readLine();
int count = 0;
while (true) {
filewriter.write(line);
line = b.readLine();
if (line == null) {
break;
}
filewriter.write("\r\n");
count += 1;
if (count % 10000 == 0) {
LOGGER.info("import record count: " + count);
}
}
filewriter.close();
inputStream.close();

CSVParser csvParser = CSVParser.parse(file, StandardCharsets.UTF_8, CSVFormat.DEFAULT.withFirstRecordAsHeader());
return csvParser;
}
catch (IOException e) {
throw new InvalidValueException(e.getMessage());
}
}

public long getCurrentLineNumber()
{
return lineNumber;
Expand Down Expand Up @@ -434,7 +498,7 @@ else if (isDelimiterFollowingFrom(linePos)) {
else if (isSpace(c)) {
// column has trailing spaces and quoted. TODO should this be rejected?
} else {
throw new InvalidValueException(String.format("Unexpected extra character '%c' after a value quoted by '%c'", c, quote));
columnState = ColumnState.QUOTED_VALUE;
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import org.embulk.base.restclient.RestClientInputPluginDelegate;
import org.embulk.config.ConfigException;
import org.embulk.input.marketo.delegate.ActivityBulkExtractInputPlugin;
import org.embulk.input.marketo.delegate.ActivityTypeInputPlugin;
import org.embulk.input.marketo.delegate.CampaignInputPlugin;
import org.embulk.input.marketo.delegate.CustomObjectInputPlugin;
import org.embulk.input.marketo.delegate.FolderInputPlugin;
import org.embulk.input.marketo.delegate.LeadBulkExtractInputPlugin;
import org.embulk.input.marketo.delegate.LeadWithListInputPlugin;
import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin;
import org.embulk.input.marketo.delegate.ListInputPlugin;
import org.embulk.input.marketo.delegate.ProgramInputPlugin;
import org.embulk.input.marketo.delegate.ProgramMembersBulkExtractInputPlugin;
import org.embulk.input.marketo.rest.MarketoRestClient;
Expand All @@ -33,7 +36,10 @@ public interface PluginTask
ProgramInputPlugin.PluginTask,
MarketoRestClient.PluginTask,
CustomObjectInputPlugin.PluginTask,
ProgramMembersBulkExtractInputPlugin.PluginTask
ProgramMembersBulkExtractInputPlugin.PluginTask,
ListInputPlugin.PluginTask,
ActivityTypeInputPlugin.PluginTask,
FolderInputPlugin.PluginTask
{
@Config("target")
Target getTarget();
Expand Down Expand Up @@ -79,7 +85,10 @@ public enum Target
ALL_LEAD_WITH_PROGRAM_ID(new LeadWithProgramInputPlugin()),
PROGRAM(new ProgramInputPlugin()),
CUSTOM_OBJECT(new CustomObjectInputPlugin()),
PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin());
PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()),
LIST(new ListInputPlugin()),
ACTIVITY_TYPE(new ActivityTypeInputPlugin()),
FOLDER(new FolderInputPlugin());

private final RestClientInputPluginDelegate restClientInputPluginDelegate;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoService.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.embulk.input.marketo;

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType;
import org.embulk.input.marketo.model.MarketoField;

import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -48,4 +50,6 @@ public interface MarketoService
ObjectNode describeProgramMembers();

File extractProgramMembers(String exportID);

Iterable<ObjectNode> getFolders(Optional<Long> rootId, RootType rootType, Integer maxDepth, Optional<String> workspace);
}
11 changes: 11 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.embulk.input.marketo;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.embulk.input.marketo.delegate.FolderInputPlugin.RootType;
import org.embulk.input.marketo.model.BulkExtractRangeHeader;
import org.embulk.input.marketo.model.MarketoField;
import org.embulk.input.marketo.rest.MarketoRestClient;
Expand All @@ -21,6 +23,7 @@
import java.io.OutputStream;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -275,4 +278,12 @@ public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader)
}
});
}

@Override
public Iterable<ObjectNode> getFolders(Optional<Long> rootId, RootType rootType, Integer maxDepth, Optional<String> workspace)
{
String type = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rootType.name());
Optional<String> root = rootId.isPresent() ? Optional.of(String.format("{\"id\": %d, \"type\": \"%s\"}", rootId.get(), type)) : Optional.empty();
return marketoRestClient.getFolders(root, maxDepth, workspace);
}
}
Loading