diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 14df7e17e..a1c75043d 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -158,7 +158,7 @@ public List> getExecutorBuilders(Settings settings) { settings, PROVISION_THREAD_POOL, OpenSearchExecutors.allocatedProcessors(settings), - 10, + 100, FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL ) ); diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java index 42d59e07f..706cd2c62 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java @@ -24,7 +24,9 @@ import java.util.Objects; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap; import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap; +import static org.opensearch.flowframework.util.ParseUtils.parseStringToObjectMap; import static org.opensearch.flowframework.util.ParseUtils.parseStringToStringMap; /** @@ -93,7 +95,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } else { for (Map map : (Map[]) e.getValue()) { - buildStringToStringMap(xContentBuilder, map); + buildStringToObjectMap(xContentBuilder, map); } } xContentBuilder.endArray(); @@ -150,9 +152,9 @@ public static WorkflowNode parse(XContentParser parser) throws IOException { } userInputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0])); } else { - List> mapList = new ArrayList<>(); + List> mapList = new ArrayList<>(); while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - mapList.add(parseStringToStringMap(parser)); + mapList.add(parseStringToObjectMap(parser)); } userInputs.put(inputFieldName, mapList.toArray(new Map[0])); } diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 9e3b8d067..6e1a506a1 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -84,6 +84,25 @@ public static void buildStringToStringMap(XContentBuilder xContentBuilder, Map map) throws IOException { + xContentBuilder.startObject(); + for (Entry e : map.entrySet()) { + if (e.getValue() instanceof String) { + xContentBuilder.field((String) e.getKey(), (String) e.getValue()); + } else { + xContentBuilder.field((String) e.getKey(), e.getValue()); + } + } + xContentBuilder.endObject(); + } + /** * Builds an XContent object representing a LLMSpec. * @@ -117,6 +136,30 @@ public static Map parseStringToStringMap(XContentParser parser) return map; } + /** + * Parses an XContent object representing a map of String keys to Object values. + * The Object value here can either be a string or a map + * @param parser An XContent parser whose position is at the start of the map object to parse + * @return A map as identified by the key-value pairs in the XContent + * @throws IOException on a parse failure + */ + public static Map parseStringToObjectMap(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + Map map = new HashMap<>(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + // If the current token is a START_OBJECT, parse it as Map + map.put(fieldName, parseStringToStringMap(parser)); + } else { + // Otherwise, parse it as a string + map.put(fieldName, parser.text()); + } + } + return map; + } + /** * Parse content parser to {@link java.time.Instant}. * diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 94fe7b01e..02222b9aa 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -60,6 +60,16 @@ public void testToInstantWithNotValue() throws IOException { assertNull(instant); } + public void testBuildAndParseStringToStringMap() throws IOException { + Map stringMap = Map.ofEntries(Map.entry("one", "two")); + XContentBuilder builder = XContentFactory.jsonBuilder(); + ParseUtils.buildStringToStringMap(builder, stringMap); + XContentParser parser = this.createParser(builder); + parser.nextToken(); + Map parsedMap = ParseUtils.parseStringToStringMap(parser); + assertEquals(stringMap.get("one"), parsedMap.get("one")); + } + public void testGetInputsFromPreviousSteps() { WorkflowData currentNodeInputs = new WorkflowData( Map.ofEntries(Map.entry("content1", 1), Map.entry("param1", 2), Map.entry("content3", "${{step1.output1}}")),