-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add batch api #71
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import requests | ||
|
||
from any_parser.async_parser import AsyncParser | ||
from any_parser.batch_parser import BatchParser | ||
from any_parser.constants import ProcessType | ||
from any_parser.sync_parser import ( | ||
ExtractKeyValueSyncParser, | ||
|
@@ -20,6 +21,10 @@ | |
from any_parser.utils import validate_file_inputs | ||
|
||
PUBLIC_SHARED_BASE_URL = "https://public-api.cambio-ai.com" | ||
# TODO: Update this to the correct batch endpoint | ||
PUBLIC_BATCH_BASE_URL = ( | ||
"http://AnyPar-ApiCo-cuKOBXasmUF1-1986145995.us-west-2.elb.amazonaws.com" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me link this to our DNS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I just created batch-api domain name forward for batch-api.cambio-ai.com. However, one thing I am not 100% sure is that for API gateway, we used to setup custom domain names in it. This is a directly public elb, so do we have to do anything on the AWS side besides the squarespace domains work that I have already done. I can merge this in first and you can address in a future PR.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tested http://batch-api.cambio-ai.com, and the domain name forward is working. |
||
) | ||
TIMEOUT = 60 | ||
|
||
|
||
|
@@ -133,6 +138,7 @@ def __init__(self, api_key: str, base_url: str = PUBLIC_SHARED_BASE_URL) -> None | |
) | ||
self._sync_extract_pii = ExtractPIISyncParser(api_key, base_url) | ||
self._sync_extract_tables = ExtractTablesSyncParser(api_key, base_url) | ||
self.batches = BatchParser(api_key, PUBLIC_BATCH_BASE_URL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use _batch to indicate this is a private method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, can we pass another batch_url and input and pass in the PUBLIC_BATCH_BASE_URL as argument to improve readability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The initial design is, I want the usage to be such that the user can use ap.batches.retrieve(request_id) to invoke the API, similar to what OpenAI does. Let's sync later offline. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Let me update this one |
||
|
||
@handle_file_processing | ||
def parse( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""Batch parser implementation.""" | ||
|
||
from typing import List, Optional | ||
|
||
import requests | ||
from pydantic import BaseModel, Field | ||
|
||
from any_parser.base_parser import BaseParser | ||
|
||
TIMEOUT = 60 | ||
|
||
|
||
class UploadResponse(BaseModel): | ||
fileName: str | ||
requestId: str | ||
requestStatus: str | ||
|
||
|
||
class UsageResponse(BaseModel): | ||
pageLimit: int | ||
pageRemaining: int | ||
|
||
|
||
class FileStatusResponse(BaseModel): | ||
fileName: str | ||
fileType: str | ||
requestId: str | ||
requestStatus: str | ||
uploadTime: str | ||
completionTime: Optional[str] = None | ||
result: Optional[List[str]] = Field(default_factory=list) | ||
error: Optional[List[str]] = Field(default_factory=list) | ||
Comment on lines
+13
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a docstring and describe what each attribute means to ease the review process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring added. Thanks! |
||
|
||
|
||
class BatchParser(BaseParser): | ||
def __init__(self, api_key: str, base_url: str) -> None: | ||
super().__init__(api_key, base_url) | ||
self._file_upload_url = f"{self._base_url}/files/" | ||
self._processing_status_url = f"{self._base_url}/files/" + "{request_id}" | ||
self._usage_url = f"{self._base_url}/users/current/usage" | ||
|
||
# remove "Content-Type" from headers | ||
self._headers.pop("Content-Type") | ||
Comment on lines
+42
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. qq: why we need to remove this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to upload a file in our request using batch api in any parser SDK, and the Content-Type in base-parser does not work for this scenario. |
||
|
||
def create(self, file_path: str) -> UploadResponse: | ||
"""Upload a single file for batch processing. | ||
|
||
Args: | ||
file_path: Path to the file to upload | ||
|
||
Returns: | ||
FileUploadResponse object containing upload details | ||
""" | ||
with open(file_path, "rb") as f: | ||
files = {"file": f} | ||
response = requests.post( | ||
self._file_upload_url, | ||
headers=self._headers, | ||
files=files, | ||
timeout=TIMEOUT, | ||
) | ||
print(response.json()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: let's not use print, but logger.info |
||
|
||
if response.status_code != 200: | ||
raise Exception(f"Upload failed: {response.text}") | ||
|
||
data = response.json() | ||
return UploadResponse( | ||
fileName=data["fileName"], | ||
requestId=data["requestId"], | ||
requestStatus=data["requestStatus"], | ||
) | ||
|
||
def retrieve(self, request_id: str) -> FileStatusResponse: | ||
"""Get the processing status of a file. | ||
|
||
Args: | ||
request_id: The ID of the file processing request | ||
|
||
Returns: | ||
FileProcessingStatus object containing status details | ||
""" | ||
response = requests.get( | ||
self._processing_status_url.format(request_id=request_id), | ||
headers=self._headers, | ||
timeout=TIMEOUT, | ||
) | ||
|
||
if response.status_code != 200: | ||
raise Exception(f"Status check failed: {response.text}") | ||
|
||
data = response.json() | ||
return FileStatusResponse(**data) | ||
|
||
def get_usage(self) -> UsageResponse: | ||
"""Get current usage information. | ||
|
||
Returns: | ||
UsageResponse object containing usage details | ||
""" | ||
response = requests.get( | ||
self._usage_url, | ||
headers=self._headers, | ||
timeout=TIMEOUT, | ||
) | ||
|
||
if response.status_code != 200: | ||
raise Exception(f"Usage check failed: {response.text}") | ||
|
||
data = response.json() | ||
return UsageResponse( | ||
pageLimit=data["pageLimit"], pageRemaining=data["pageRemaining"] | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
"""Testing Batch API Extraction""" | ||
|
||
import os | ||
import sys | ||
import unittest | ||
|
||
from dotenv import load_dotenv | ||
|
||
sys.path.append(".") | ||
load_dotenv(override=True) | ||
from any_parser import AnyParser # noqa: E402 | ||
|
||
|
||
class TestAnyParserBatchAPI(unittest.TestCase): | ||
"""Testing Any Parser Batch API""" | ||
|
||
def setUp(self): | ||
self.api_key = os.environ.get("CAMBIO_API_KEY") | ||
if not self.api_key: | ||
raise ValueError("CAMBIO_API_KEY is not set") | ||
self.ap = AnyParser(self.api_key) | ||
|
||
def test_batch_api_create(self): | ||
"""Batch API Create""" | ||
working_file = "./examples/sample_data/stoxx_index_guide_0003.pdf" | ||
|
||
response = self.ap.batches.create(working_file) | ||
|
||
self.assertIsNotNone(response) | ||
self.assertEqual(response.requestStatus, "UPLOADED") | ||
|
||
request_id = response.requestId | ||
status = self.ap.batches.retrieve(request_id) | ||
self.assertEqual(status.requestStatus, "UPLOADED") | ||
|
||
quota = self.ap.batches.get_usage() | ||
self.assertGreaterEqual(quota.pageRemaining, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job in updating the README.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also make sure you update the README that the cambioml.com website created API key will not be added into the batch api usage group and user should contact us to make sure your API key will have batch process permission manually added at this moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let me update the readme here.