From 074e036011b53f38dbbe31fd2a097b8be81a4616 Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Thu, 19 Dec 2024 23:09:18 +0800 Subject: [PATCH 1/7] add input folder support for batch api --- any_parser/batch_parser.py | 74 ++++++++++++++++++++++++++++++++-- examples/parse_batch_fetch.py | 60 +++++++++++++++++++++++++++ examples/parse_batch_upload.py | 22 ++++++++++ 3 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 examples/parse_batch_fetch.py create mode 100644 examples/parse_batch_upload.py diff --git a/any_parser/batch_parser.py b/any_parser/batch_parser.py index 7713184..66ff74f 100644 --- a/any_parser/batch_parser.py +++ b/any_parser/batch_parser.py @@ -1,5 +1,10 @@ """Batch parser implementation.""" +import json +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path from typing import List, Optional import requests @@ -8,6 +13,8 @@ from any_parser.base_parser import BaseParser TIMEOUT = 60 +MAX_FILES = 1000 +MAX_WORKERS = 10 class UploadResponse(BaseModel): @@ -43,14 +50,25 @@ def __init__(self, api_key: str, base_url: str) -> None: self._headers.pop("Content-Type") def create(self, file_path: str) -> UploadResponse: - """Upload a single file for batch processing. + """Upload a single file or folder for batch processing. Args: - file_path: Path to the file to upload + file_path: Path to the file or folder to upload Returns: - FileUploadResponse object containing upload details + If file: UploadResponse object containing upload details + If folder: Path to the JSONL file containing upload responses """ + path = Path(file_path) + if path.is_file(): + return self._upload_single_file(path) + elif path.is_dir(): + return self._upload_folder(path) + else: + raise ValueError(f"Path {file_path} does not exist") + + def _upload_single_file(self, file_path: Path) -> UploadResponse: + """Upload a single file for batch processing.""" with open(file_path, "rb") as f: files = {"file": f} response = requests.post( @@ -59,7 +77,6 @@ def create(self, file_path: str) -> UploadResponse: files=files, timeout=TIMEOUT, ) - print(response.json()) if response.status_code != 200: raise Exception(f"Upload failed: {response.text}") @@ -71,6 +88,55 @@ def create(self, file_path: str) -> UploadResponse: requestStatus=data["requestStatus"], ) + def _upload_folder(self, folder_path: Path) -> str: + """Upload all files in a folder for batch processing. + + Args: + folder_path: Path to the folder containing files to upload + + Returns: + Path to the JSONL file containing upload responses + """ + # Get all files in folder and subfolders + files = [] + for root, _, filenames in os.walk(folder_path): + for filename in filenames: + files.append(Path(root) / filename) + + if len(files) > MAX_FILES: + raise ValueError( + f"Found {len(files)} files. Maximum allowed is {MAX_FILES}" + ) + + # Upload files concurrently using thread pool + responses = [] + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_file = { + executor.submit(self._upload_single_file, file_path): file_path + for file_path in files + } + + for future in as_completed(future_to_file): + file_path = future_to_file[future] + try: + response = future.result() + responses.append(response.dict()) + except Exception as e: + print(f"Failed to upload {file_path}: {str(e)}") + + # Save responses to JSONL file in parallel folder + folder_name = folder_path.name + folder_size = len(files) + current_time = time.strftime("%Y%m%d%H%M%S") + output_filename = f"{folder_name}_{folder_size}_{current_time}.jsonl" + output_path = folder_path.parent / output_filename + + with open(output_path, "w") as f: + for response in responses: + f.write(json.dumps(response) + "\n") + + return str(output_path) + def retrieve(self, request_id: str) -> FileStatusResponse: """Get the processing status of a file. diff --git a/examples/parse_batch_fetch.py b/examples/parse_batch_fetch.py new file mode 100644 index 0000000..b82cdc9 --- /dev/null +++ b/examples/parse_batch_fetch.py @@ -0,0 +1,60 @@ +"""Test batch API folder fetch response""" + +import json +import os +from concurrent.futures import ThreadPoolExecutor, as_completed + +from dotenv import load_dotenv + +from any_parser import AnyParser + +# Load environment variables +load_dotenv(override=True) + +MAX_WORKER = 10 + +# Get API key and create parser +api_key = os.environ.get("CAMBIO_API_KEY") +if not api_key: + raise ValueError("CAMBIO_API_KEY is not set") +ap = AnyParser(api_key) + +# Read responses from JSONL file +# Change to your real output json from parse_batch_upload.py +response_file = "./sample_data_20241219190049.jsonl" +with open(response_file, "r") as f: + responses = [json.loads(line) for line in f] + + +def process_response(response): + """Process a single response by retrieving markdown content""" + request_id = response["requestId"] + try: + markdown = ap.batches.retrieve(request_id) + if markdown: + response["result"] = [markdown.result[0] if markdown.result else ""] + response["requestStatus"] = "COMPLETED" + response["completionTime"] = markdown.completionTime + except Exception as e: + print(f"Error processing {request_id}: {str(e)}") + response["error"] = [str(e)] + return response + + +# Process responses concurrently +with ThreadPoolExecutor(max_workers=MAX_WORKER) as executor: + future_to_response = { + executor.submit(process_response, response): response for response in responses + } + + updated_responses = [] + for future in as_completed(future_to_response): + updated_response = future.result() + updated_responses.append(updated_response) + +# Write all updated responses back to file +with open(response_file, "w") as f: + for response in updated_responses: + f.write(json.dumps(response) + "\n") + +print(f"Updated all responses in {response_file} with markdown content") diff --git a/examples/parse_batch_upload.py b/examples/parse_batch_upload.py new file mode 100644 index 0000000..ab3883a --- /dev/null +++ b/examples/parse_batch_upload.py @@ -0,0 +1,22 @@ +"""Batch API Folder Processing Upload Example""" + +import os + +from dotenv import load_dotenv + +from any_parser import AnyParser + +# Load environment variables +load_dotenv(override=True) + +# Get API key and create parser +api_key = os.environ.get("CAMBIO_API_KEY") +if not api_key: + raise ValueError("CAMBIO_API_KEY is not set") +ap = AnyParser(api_key) + +# Upload folder for batch processing +WORKING_FOLDER = "./sample_data" +response = ap.batches.create(WORKING_FOLDER) + +print(f"Upload response saved to: {response}") From da914a2439640318d57cb90292c1e8196828546b Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Thu, 19 Dec 2024 23:25:09 +0800 Subject: [PATCH 2/7] update readme --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index f5d4c14..d2b4bd0 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,19 @@ request_id = response.requestId markdown = ap.batches.retrieve(request_id) ``` +Batch API for folder input: +```python +# Send the folder to begin batch extraction +WORKING_FOLDER = "./sample_data" +# This will generate a jsonl with filename and requestID +response = ap.batches.create(WORKING_FOLDER) + +# Fetch the extracted content using the request ID +markdown = ap.batches.retrieve(request_id) +``` +For more details about code implementation of batch API, refer to +[examples/parse_batch_upload.py](examples/parse_batch_upload.py) and [examples/parse_batch_fetch.py](examples/parse_batch_fetch.py) + > ⚠️ **Note:** Batch extraction is currently in beta testing. Processing time may take up to 12 hours to complete. ## :scroll: Examples From d001adfee1c3324ee74033f4a5629299aeb7d2b2 Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Mon, 23 Dec 2024 17:59:52 +0800 Subject: [PATCH 3/7] address comments in batch api logic --- any_parser/batch_parser.py | 40 ++++++++++------------------------ examples/parse_batch_upload.py | 14 ++++++++++-- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/any_parser/batch_parser.py b/any_parser/batch_parser.py index 66ff74f..4ab0492 100644 --- a/any_parser/batch_parser.py +++ b/any_parser/batch_parser.py @@ -1,11 +1,10 @@ """Batch parser implementation.""" -import json +import logging import os -import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Union import requests from pydantic import BaseModel, Field @@ -13,9 +12,10 @@ from any_parser.base_parser import BaseParser TIMEOUT = 60 -MAX_FILES = 1000 MAX_WORKERS = 10 +logger = logging.getLogger(__name__) + class UploadResponse(BaseModel): fileName: str @@ -49,15 +49,15 @@ def __init__(self, api_key: str, base_url: str) -> None: # remove "Content-Type" from headers self._headers.pop("Content-Type") - def create(self, file_path: str) -> UploadResponse: + def create(self, file_path: str) -> Union[UploadResponse, List[UploadResponse]]: """Upload a single file or folder for batch processing. Args: file_path: Path to the file or folder to upload Returns: - If file: UploadResponse object containing upload details - If folder: Path to the JSONL file containing upload responses + If file: Single UploadResponse object containing upload details + If folder: List of UploadResponse objects for each file """ path = Path(file_path) if path.is_file(): @@ -88,14 +88,14 @@ def _upload_single_file(self, file_path: Path) -> UploadResponse: requestStatus=data["requestStatus"], ) - def _upload_folder(self, folder_path: Path) -> str: + def _upload_folder(self, folder_path: Path) -> List[UploadResponse]: """Upload all files in a folder for batch processing. Args: folder_path: Path to the folder containing files to upload Returns: - Path to the JSONL file containing upload responses + List of UploadResponse objects for each uploaded file """ # Get all files in folder and subfolders files = [] @@ -103,11 +103,6 @@ def _upload_folder(self, folder_path: Path) -> str: for filename in filenames: files.append(Path(root) / filename) - if len(files) > MAX_FILES: - raise ValueError( - f"Found {len(files)} files. Maximum allowed is {MAX_FILES}" - ) - # Upload files concurrently using thread pool responses = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: @@ -120,22 +115,11 @@ def _upload_folder(self, folder_path: Path) -> str: file_path = future_to_file[future] try: response = future.result() - responses.append(response.dict()) + responses.append(response) except Exception as e: - print(f"Failed to upload {file_path}: {str(e)}") - - # Save responses to JSONL file in parallel folder - folder_name = folder_path.name - folder_size = len(files) - current_time = time.strftime("%Y%m%d%H%M%S") - output_filename = f"{folder_name}_{folder_size}_{current_time}.jsonl" - output_path = folder_path.parent / output_filename - - with open(output_path, "w") as f: - for response in responses: - f.write(json.dumps(response) + "\n") + logger.error(f"Failed to upload {file_path}: {str(e)}") - return str(output_path) + return responses def retrieve(self, request_id: str) -> FileStatusResponse: """Get the processing status of a file. diff --git a/examples/parse_batch_upload.py b/examples/parse_batch_upload.py index ab3883a..d9f4cc4 100644 --- a/examples/parse_batch_upload.py +++ b/examples/parse_batch_upload.py @@ -1,6 +1,8 @@ """Batch API Folder Processing Upload Example""" +import json import os +from datetime import datetime from dotenv import load_dotenv @@ -17,6 +19,14 @@ # Upload folder for batch processing WORKING_FOLDER = "./sample_data" -response = ap.batches.create(WORKING_FOLDER) +responses = ap.batches.create(WORKING_FOLDER) -print(f"Upload response saved to: {response}") +# Save responses to JSONL file with timestamp +timestamp = datetime.now().strftime("%Y%m%d%H%M%S") +output_file = f"./sample_data_{timestamp}.jsonl" + +with open(output_file, "w") as f: + for response in responses: + f.write(json.dumps(response.model_dump()) + "\n") + +print(f"Upload responses saved to: {output_file}") From a1bfe948ec4265d01b9d06517e40b313997f31bc Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Tue, 24 Dec 2024 12:35:14 +0800 Subject: [PATCH 4/7] trigger testcase --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 189971e..eeb4994 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -17,7 +17,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] max-parallel: 1 # Ensures the tests run sequentially - + # ================================ steps: - uses: actions/checkout@v3 - name: Set up Python From ef2ebe420cc3a982b8280ac2bfbf6683e4d80d80 Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Tue, 24 Dec 2024 13:54:30 +0800 Subject: [PATCH 5/7] trigger testcase --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index eeb4994..189971e 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -17,7 +17,7 @@ jobs: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] max-parallel: 1 # Ensures the tests run sequentially - # ================================ + steps: - uses: actions/checkout@v3 - name: Set up Python From 70fcbfddb0fc29633ca33fca062f3ccb446b14d8 Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Mon, 30 Dec 2024 15:15:19 +0800 Subject: [PATCH 6/7] fix logger and update readme --- README.md | 10 +++++++++- examples/parse_batch_fetch.py | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d2b4bd0..e6610be 100644 --- a/README.md +++ b/README.md @@ -82,8 +82,16 @@ Batch API for folder input: WORKING_FOLDER = "./sample_data" # This will generate a jsonl with filename and requestID response = ap.batches.create(WORKING_FOLDER) +``` -# Fetch the extracted content using the request ID +Each response in the JSONL file contains: +- The filename +- A unique request ID +- Additional processing metadata +You can later use these request IDs to retrieve the extracted content for each file: + +```python +# Fetch the extracted content using the request ID from the jsonl file markdown = ap.batches.retrieve(request_id) ``` For more details about code implementation of batch API, refer to diff --git a/examples/parse_batch_fetch.py b/examples/parse_batch_fetch.py index b82cdc9..4704c64 100644 --- a/examples/parse_batch_fetch.py +++ b/examples/parse_batch_fetch.py @@ -1,6 +1,7 @@ """Test batch API folder fetch response""" import json +import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed @@ -8,6 +9,10 @@ from any_parser import AnyParser +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + # Load environment variables load_dotenv(override=True) @@ -36,7 +41,7 @@ def process_response(response): response["requestStatus"] = "COMPLETED" response["completionTime"] = markdown.completionTime except Exception as e: - print(f"Error processing {request_id}: {str(e)}") + logger.error(f"Error processing {request_id}: {str(e)}") response["error"] = [str(e)] return response From 6e24a8567b030e42f73c06a30e4f2e4e97d3d0ca Mon Sep 17 00:00:00 2001 From: Charles Yuan Date: Tue, 31 Dec 2024 16:38:00 +0800 Subject: [PATCH 7/7] fix broken testcase --- tests/test_data.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/tests/test_data.py b/tests/test_data.py index 5a23615..fa8c120 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -7,14 +7,12 @@ "first_name": "the first name of the employee", "last_name": "the last name of the employee", }, - "correct_output": [ - { - "social_security_number": ["758-58-5787"], - "ein": ["78-8778788"], - "first_name": ["Jesan"], - "last_name": ["Rahaman"], - } - ], + "correct_output": { + "social_security_number": ["758-58-5787"], + "ein": ["78-8778788"], + "first_name": ["Jesan"], + "last_name": ["Rahaman"], + }, }, # { # "working_file": "./examples/sample_data/test_w2.pptx", @@ -58,13 +56,11 @@ "first_name": "the first name of the employee", "last_name": "the last name of the employee", }, - "correct_output": [ - { - "social_security_number": ["758-58-5787"], - "ein": ["78-8778788"], - "first_name": ["Jesan"], - "last_name": ["Rahaman"], - } - ], + "correct_output": { + "social_security_number": ["758-58-5787"], + "ein": ["78-8778788"], + "first_name": ["Jesan"], + "last_name": ["Rahaman"], + }, }, ]