From 4f262dc7ea9f1ea169932dfd4addb85526d436a6 Mon Sep 17 00:00:00 2001 From: Peter Gyori Date: Thu, 8 Aug 2024 22:49:05 +0200 Subject: [PATCH] NIFI-13604 Python Source processors can be triggered without creating new FlowFiles Closes #9159 Signed-off-by: Marton Szasz --- .../main/asciidoc/python-developer-guide.adoc | 2 ++ .../python/processor/FlowFileSourceProxy.java | 3 ++ .../PythonControllerInteractionIT.java | 10 ++++++ .../resources/extensions/CreateNothing.py | 32 +++++++++++++++++++ 4 files changed, 47 insertions(+) create mode 100644 nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateNothing.py diff --git a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc index e96ea3386794..0bfd4368bdc4 100644 --- a/nifi-docs/src/main/asciidoc/python-developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/python-developer-guide.adoc @@ -323,6 +323,8 @@ Each processor based on the `FlowFileSource` API has a `success` relationship an created in the Processor's Python code. `attributes` and `contents` are both optional. If `attributes` is not provided, the FlowFile will still have the usual `filename`, `path` and `uuid` attributes, but no additional ones. If `contents` is not provided, a FlowFile with no contents (only attributes) will be created. +In case there is no useful information to return from the `create` method, `return None` can be used instead of returning an +empty `FlowFileSourceResult`. When `create` returns with `None`, the processor does not produce any output. diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java index f21c5c69b208..30b86e440894 100644 --- a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/FlowFileSourceProxy.java @@ -49,6 +49,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final FlowFileSourceResult result; try { result = getTransform().createFlowFile(); + if (result == null) { + return; + } } catch (final Py4JNetworkException e) { throw new ProcessException("Failed to communicate with Python Process", e); } catch (final Exception e) { diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index adebe92a7f39..39a1bb91ebd2 100644 --- a/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -595,6 +595,16 @@ public void testCreateFlowFile() throws IOException { multiLineContent.getBytes(StandardCharsets.UTF_8)); } + @Test + public void testCreateNothing() { + // Test the use-case where the source processor returns with None. + final TestRunner runner = createProcessor("CreateNothing"); + waitForValid(runner); + runner.run(); + + runner.assertTransferCount("success", 0); + } + public interface StringLookupService extends ControllerService { Optional lookup(Map coordinates); } diff --git a/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateNothing.py b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateNothing.py new file mode 100644 index 000000000000..2ea3125ad246 --- /dev/null +++ b/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/CreateNothing.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfilesource import FlowFileSource + +class CreateNothing(FlowFileSource): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileSource'] + + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = '''A Python processor for testing a use-case where the Source processor + does not create any output.''' + tags = ['test', 'python', 'source'] + + def __init__(self, **kwargs): + pass + + def create(self, context): + return None