Skip to content

Commit

Permalink
Support Memo in visibility (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored and meiliang86 committed May 30, 2019
1 parent ce02fea commit 3036ef5
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 12 deletions.
36 changes: 31 additions & 5 deletions src/main/java/com/uber/cadence/client/WorkflowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.uber.cadence.internal.common.OptionsUtils;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;

public final class WorkflowOptions {
Expand Down Expand Up @@ -62,6 +63,7 @@ public static WorkflowOptions merge(
.setChildPolicy(o.getChildPolicy())
.setRetryOptions(RetryOptions.merge(methodRetry, o.getRetryOptions()))
.setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
.setMemo(o.getMemo())
.validateBuildWithDefaults();
}

Expand All @@ -83,6 +85,8 @@ public static final class Builder {

private String cronSchedule;

private Map<String, Object> memo;

public Builder() {}

public Builder(WorkflowOptions o) {
Expand All @@ -97,6 +101,7 @@ public Builder(WorkflowOptions o) {
this.childPolicy = o.childPolicy;
this.retryOptions = o.retryOptions;
this.cronSchedule = o.cronSchedule;
this.memo = o.memo;
}

/**
Expand Down Expand Up @@ -181,6 +186,12 @@ public Builder setCronSchedule(String cronSchedule) {
return this;
}

/** Specifies additional non-indexed information in result of list workflow. */
public Builder setMemo(Map<String, Object> memo) {
this.memo = memo;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -190,7 +201,8 @@ public WorkflowOptions build() {
taskList,
childPolicy,
retryOptions,
cronSchedule);
cronSchedule,
memo);
}

/**
Expand Down Expand Up @@ -235,7 +247,8 @@ public WorkflowOptions validateBuildWithDefaults() {
taskList,
childPolicy,
retryOptions,
cronSchedule);
cronSchedule,
memo);
}
}

Expand All @@ -255,6 +268,8 @@ public WorkflowOptions validateBuildWithDefaults() {

private String cronSchedule;

private Map<String, Object> memo;

private WorkflowOptions(
String workflowId,
WorkflowIdReusePolicy workflowIdReusePolicy,
Expand All @@ -263,7 +278,8 @@ private WorkflowOptions(
String taskList,
ChildPolicy childPolicy,
RetryOptions retryOptions,
String cronSchedule) {
String cronSchedule,
Map<String, Object> memo) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
Expand All @@ -272,6 +288,7 @@ private WorkflowOptions(
this.childPolicy = childPolicy;
this.retryOptions = retryOptions;
this.cronSchedule = cronSchedule;
this.memo = memo;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -306,6 +323,10 @@ public String getCronSchedule() {
return cronSchedule;
}

public Map<String, Object> getMemo() {
return memo;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -318,7 +339,8 @@ public boolean equals(Object o) {
&& Objects.equals(taskList, that.taskList)
&& childPolicy == that.childPolicy
&& Objects.equals(retryOptions, that.retryOptions)
&& Objects.equals(cronSchedule, that.cronSchedule);
&& Objects.equals(cronSchedule, that.cronSchedule)
&& Objects.equals(memo, that.memo);
}

@Override
Expand All @@ -331,7 +353,8 @@ public int hashCode() {
taskList,
childPolicy,
retryOptions,
cronSchedule);
cronSchedule,
memo);
}

@Override
Expand All @@ -356,6 +379,9 @@ public String toString() {
+ ", cronSchedule='"
+ cronSchedule
+ '\''
+ ", memo='"
+ memo
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public final class StartWorkflowExecutionParameters {
Expand All @@ -50,6 +51,8 @@ public final class StartWorkflowExecutionParameters {

private String cronSchedule;

private Map<String, byte[]> memo;

/**
* Returns the value of the WorkflowId property for this object.
*
Expand Down Expand Up @@ -292,6 +295,14 @@ public void setCronSchedule(String cronSchedule) {
this.cronSchedule = cronSchedule;
}

public Map<String, byte[]> getMemo() {
return memo;
}

public void setMemo(Map<String, byte[]> memo) {
this.memo = memo;
}

public StartWorkflowExecutionParameters withRetryParameters(RetryParameters retryParameters) {
this.retryParameters = retryParameters;
return this;
Expand Down Expand Up @@ -364,6 +375,9 @@ public String toString() {
+ ", cronSchedule='"
+ cronSchedule
+ '\''
+ ", memo='"
+ memo
+ '\''
+ '}';
}

Expand All @@ -381,7 +395,8 @@ public boolean equals(Object o) {
&& childPolicy == that.childPolicy
&& workflowIdReusePolicy == that.workflowIdReusePolicy
&& Objects.equals(retryParameters, that.retryParameters)
&& Objects.equals(cronSchedule, that.cronSchedule);
&& Objects.equals(cronSchedule, that.cronSchedule)
&& Objects.equals(memo, that.memo);
}

@Override
Expand All @@ -396,7 +411,8 @@ public int hashCode() {
childPolicy,
workflowIdReusePolicy,
retryParameters,
cronSchedule);
cronSchedule,
memo);
result = 31 * result + Arrays.hashCode(input);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -107,6 +109,7 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
if (!Strings.isNullOrEmpty(startParameters.getCronSchedule())) {
request.setCronSchedule(startParameters.getCronSchedule());
}
request.setMemo(toMemoThrift(startParameters.getMemo()));

// if(startParameters.getChildPolicy() != null) {
// request.setChildPolicy(startParameters.getChildPolicy());
Expand All @@ -127,6 +130,20 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
return execution;
}

private Memo toMemoThrift(Map<String, byte[]> memo) {
if (memo == null || memo.isEmpty()) {
return null;
}

Map<String, ByteBuffer> fields = new HashMap<>();
for (Map.Entry<String, byte[]> item : memo.entrySet()) {
fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
}
Memo memoThrift = new Memo();
memoThrift.setFields(fields);
return memoThrift;
}

private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {
return new RetryPolicy()
.setBackoffCoefficient(retryParameters.getBackoffCoefficient())
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.uber.cadence.client.WorkflowServiceException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.converter.DataConverterException;
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
import com.uber.cadence.internal.common.StartWorkflowExecutionParameters;
Expand All @@ -41,6 +42,8 @@
import com.uber.cadence.internal.replay.QueryWorkflowParameters;
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -136,9 +139,26 @@ private StartWorkflowExecutionParameters getStartWorkflowExecutionParameters(
}
p.setInput(dataConverter.toData(args));
p.setWorkflowType(new WorkflowType().setName(workflowType.get()));
p.setMemo(convertMemoFromObjectToBytes(o.getMemo()));
return p;
}

private Map<String, byte[]> convertMemoFromObjectToBytes(Map<String, Object> memoFromOption) {
if (memoFromOption == null) {
return null;
}
Map<String, byte[]> memo = new HashMap<>();
for (Map.Entry<String, Object> item : memoFromOption.entrySet()) {
try {
memo.put(item.getKey(), dataConverter.toData(item.getValue()));
} catch (DataConverterException e) {
throw new DataConverterException(
"Cannot serialize memo for key " + item.getKey(), e.getCause());
}
}
return memo;
}

@Override
public WorkflowExecution start(Object... args) {
if (!options.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ private static void startWorkflow(
a.setAttempt(data.retryState.get().getAttempt());
}
a.setLastCompletionResult(data.lastCompletionResult);
a.setMemo(request.getMemo());
HistoryEvent event =
new HistoryEvent()
.setEventType(EventType.WorkflowExecutionStarted)
Expand Down
Loading

0 comments on commit 3036ef5

Please sign in to comment.