Skip to content

Commit

Permalink
NIFI-13448 Added initial Python Processors
Browse files Browse the repository at this point in the history
- Added pyproject.toml with source bundles directory
- Added Check Formatting step with Hatch
- Added Build Distribution step with Hatch

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
exceptionfactory committed Jun 26, 2024
1 parent 5fa9099 commit 2687b7c
Show file tree
Hide file tree
Showing 26 changed files with 3,203 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Check Formatting
run: hatch fmt --check
- name: Build Distribution
run: hatch build
1 change: 1 addition & 0 deletions .ratignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ share/python-wheels/*
.idea/*
.git/*
.cache/*
.ruff_cache/*
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@

[![license](https://img.shields.io/github/license/apache/nifi-python-extensions)](https://github.com/apache/nifi-python-extensions/blob/main/LICENSE)
[![build](https://github.com/apache/nifi-python-extensions/actions/workflows/build.yml/badge.svg)](https://github.com/apache/nifi-python-extensions/actions/workflows/build.yml)
[![Hatch](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

The [Apache NiFi](https://nifi.apache.org) Python Extensions repository contains Processors implemented in [Python](https://www.python.org/)
for deployment in Apache NiFi 2.

## Building

This project uses [Hatch](https://hatch.pypa.io) to build distribution packages.

```
hatch build
```

The build command creates a source distribution in the `dist` directory.

The source distribution contains an `extensions` directory can be copied into Apache NiFi to use the packaged Processors.

## Developing

The Apache NiFi [Python Developer's Guide](https://nifi.apache.org/documentation/nifi-2.0.0-M3/html/python-developer-guide.html)
provides the API and implementation guidelines for Python Processors.

The Hatch format command supports evaluating Python Processors against configured rules.

```
hatch fmt --check
```

## Documentation

The Apache NiFi [Documentation](https://nifi.apache.org/documentation/) includes reference information for project capabilities.
Expand Down
66 changes: 66 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# SPDX-License-Identifier: Apache-2.0

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "nifi-python-extensions"
dynamic = ["version"]
description = "Apache NiFi Processors implemented in Python"
requires-python = ">=3.11"
keywords = ["apache", "nifi", "extensions", "processors"]
readme = "README.md"
authors = [
{ name = "Apache NiFi Developers", email = "[email protected]" },
]
maintainers = [
{ name = "Apache NiFi Developers", email = "[email protected]" },
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Framework :: Hatch",
]

[project.urls]
Homepage = "https://nifi.apache.org"
Issues = "https://issues.apache.org/jira/projects/NIFI/issues"
Source = "https://github.com/apache/nifi-python-extensions"

[tool.hatch.version]
path = "src/__about__.py"

[[tool.hatch.envs.all.matrix]]
python = ["3.11", "3.12"]

[tool.hatch.build.targets.wheel]
packages = ["src/extensions"]

[tool.ruff]
preview = true
lint.pep8-naming.extend-ignore-names = [
"flowFile",
"getPropertyDescriptors",
"onScheduled",
]
lint.flake8-self.extend-ignore-names = [
"_standard_validators"
]
lint.extend-select = [
"CPY001"
]
lint.ignore = [
"G004", # Allow f-string for logging
"N999", # Allow Processor module names that do not follow pep8-naming
"PERF401", # Allow manual list comprehension
"RUF012", # Allow mutable class attributes without typing.ClassVar
"S105", # Avoid checking for hardcoded-password-string values
]

[tool.ruff.lint.flake8-copyright]
notice-rgx = "# SPDX-License-Identifier: Apache-2.0\n"
3 changes: 3 additions & 0 deletions src/__about__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-License-Identifier: Apache-2.0

__version__ = "2.0.0.dev0"
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# SPDX-License-Identifier: Apache-2.0
277 changes: 277 additions & 0 deletions src/extensions/chunking/ChunkDocument.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
# SPDX-License-Identifier: Apache-2.0

import json

from langchain.text_splitter import Language
from nifiapi.documentation import ProcessorConfiguration, multi_processor_use_case, use_case
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, PropertyDescriptor, StandardValidators

SPLIT_BY_CHARACTER = "Split by Character"
SPLIT_CODE = "Split Code"
RECURSIVELY_SPLIT_BY_CHARACTER = "Recursively Split by Character"

TEXT_KEY = "text"
METADATA_KEY = "metadata"


@use_case(
description="Create chunks of text from a single larger chunk.",
notes="The input for this use case is expected to be a FlowFile whose content is a JSON Lines document, with each line having a 'text' and a 'metadata' element.",
keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"],
configuration="""
Set "Input Format" to "Plain Text"
Set "Element Strategy" to "Single Document"
""",
)
@multi_processor_use_case(
description="""
Chunk Plaintext data in order to prepare it for storage in a vector store. The output is in "json-lines" format,
containing the chunked data as text, as well as metadata pertaining to the chunk.""",
notes="The input for this use case is expected to be a FlowFile whose content is a plaintext document.",
keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"],
configurations=[
ProcessorConfiguration(
processor_type="ParseDocument",
configuration="""
Set "Input Format" to "Plain Text"
Set "Element Strategy" to "Single Document"
Connect the 'success' Relationship to ChunkDocument.
""",
),
ProcessorConfiguration(
processor_type="ChunkDocument",
configuration="""
Set the following properties:
"Chunking Strategy" = "Recursively Split by Character"
"Separator" = "\\n\\n,\\n, ,"
"Separator Format" = "Plain Text"
"Chunk Size" = "4000"
"Chunk Overlap" = "200"
"Keep Separator" = "false"
Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store.
""",
),
],
)
@multi_processor_use_case(
description="""
Parse and chunk the textual contents of a PDF document in order to prepare it for storage in a vector store. The output is in "json-lines" format,
containing the chunked data as text, as well as metadata pertaining to the chunk.""",
notes="The input for this use case is expected to be a FlowFile whose content is a PDF document.",
keywords=["pdf", "embedding", "vector", "text", "rag", "retrieval augmented generation"],
configurations=[
ProcessorConfiguration(
processor_type="ParseDocument",
configuration="""
Set "Input Format" to "PDF"
Set "Element Strategy" to "Single Document"
Set "Include Extracted Metadata" to "false"
Connect the 'success' Relationship to ChunkDocument.
""",
),
ProcessorConfiguration(
processor_type="ChunkDocument",
configuration="""
Set the following properties:
"Chunking Strategy" = "Recursively Split by Character"
"Separator" = "\\n\\n,\\n, ,"
"Separator Format" = "Plain Text"
"Chunk Size" = "4000"
"Chunk Overlap" = "200"
"Keep Separator" = "false"
Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store.
""",
),
],
)
class ChunkDocument(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]

class ProcessorDetails:
version = "2.0.0.dev0"
description = """Chunks incoming documents that are formatted as JSON Lines into chunks that are appropriately sized for creating Text Embeddings.
The input is expected to be in "json-lines" format, with each line having a 'text' and a 'metadata' element.
Each line will then be split into one or more lines in the output."""
tags = [
"text",
"split",
"chunk",
"langchain",
"embeddings",
"vector",
"machine learning",
"ML",
"artificial intelligence",
"ai",
"document",
]
dependencies = ["langchain"]

CHUNK_STRATEGY = PropertyDescriptor(
name="Chunking Strategy",
description="Specifies which splitter should be used to split the text",
allowable_values=[RECURSIVELY_SPLIT_BY_CHARACTER, SPLIT_BY_CHARACTER, SPLIT_CODE],
required=True,
default_value=RECURSIVELY_SPLIT_BY_CHARACTER,
)
SEPARATOR = PropertyDescriptor(
name="Separator",
description="""Specifies the character sequence to use for splitting apart the text. If using a Chunking Strategy of Recursively Split by Character,
it is a comma-separated list of character sequences. Meta-characters \\n, \\r and \\t are automatically un-escaped.""",
required=True,
default_value="\\n\\n,\\n, ,",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
SEPARATOR_FORMAT = PropertyDescriptor(
name="Separator Format",
description="Specifies how to interpret the value of the <Separator> property",
required=True,
default_value="Plain Text",
allowable_values=["Plain Text", "Regular Expression"],
dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)],
)
CHUNK_SIZE = PropertyDescriptor(
name="Chunk Size",
description="The maximum size of a chunk that should be returned",
required=True,
default_value="4000",
validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
)
CHUNK_OVERLAP = PropertyDescriptor(
name="Chunk Overlap",
description="The number of characters that should be overlapped between each chunk of text",
required=True,
default_value="200",
validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
)
KEEP_SEPARATOR = PropertyDescriptor(
name="Keep Separator",
description="Whether or not to keep the text separator in each chunk of data",
required=True,
default_value="false",
allowable_values=["true", "false"],
dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)],
)
STRIP_WHITESPACE = PropertyDescriptor(
name="Strip Whitespace",
description="Whether or not to strip the whitespace at the beginning and end of each chunk",
required=True,
default_value="true",
allowable_values=["true", "false"],
dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)],
)
LANGUAGE = PropertyDescriptor(
name="Language",
description="The language to use for the Code's syntax",
required=True,
default_value="python",
allowable_values=[e.value for e in Language],
dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_CODE)],
)

property_descriptors = [
CHUNK_STRATEGY,
SEPARATOR,
SEPARATOR_FORMAT,
CHUNK_SIZE,
CHUNK_OVERLAP,
KEEP_SEPARATOR,
STRIP_WHITESPACE,
LANGUAGE,
]

def __init__(self, **kwargs):
pass

def getPropertyDescriptors(self):
return self.property_descriptors

def split_docs(self, context, flowfile, documents):
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter

strategy = context.getProperty(self.CHUNK_STRATEGY).getValue()
if strategy == SPLIT_BY_CHARACTER:
text_splitter = CharacterTextSplitter(
separator=context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue(),
keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(),
is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == "Regular Expression",
chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
length_function=len,
strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(),
)
elif strategy == SPLIT_CODE:
text_splitter = RecursiveCharacterTextSplitter.from_language(
language=context.getProperty(self.LANGUAGE).getValue(),
chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
)
else:
separator_text = context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue()
splits = separator_text.split(",")
unescaped = []
for split in splits:
unescaped.append(split.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"))
text_splitter = RecursiveCharacterTextSplitter(
separators=unescaped,
keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(),
is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == "Regular Expression",
chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(),
chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(),
length_function=len,
strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(),
)

return text_splitter.split_documents(documents)

def to_json(self, docs) -> str:
json_docs = []

for i, doc in enumerate(docs):
doc.metadata["chunk_index"] = i
doc.metadata["chunk_count"] = len(docs)

json_doc = json.dumps({TEXT_KEY: doc.page_content, METADATA_KEY: doc.metadata})
json_docs.append(json_doc)

return "\n".join(json_docs)

def load_docs(self, flowfile):
from langchain.schema import Document

flowfile_contents = flowfile.getContentsAsBytes().decode()
docs = []
for line in flowfile_contents.split("\n"):
stripped = line.strip()
if stripped == "":
continue

json_element = json.loads(stripped)
page_content = json_element.get(TEXT_KEY)
if page_content is None:
continue

metadata = json_element.get(METADATA_KEY)
if metadata is None:
metadata = {}

doc = Document(page_content=page_content, metadata=metadata)
docs.append(doc)

return docs

def transform(self, context, flowfile):
documents = self.load_docs(flowfile)
split_docs = self.split_docs(context, flowfile, documents)

output_json = self.to_json(split_docs)
attributes = {"document.count": str(len(split_docs))}
return FlowFileTransformResult("success", contents=output_json, attributes=attributes)
Loading

0 comments on commit 2687b7c

Please sign in to comment.