diff --git a/.github/workflows/test-cloud.yml b/.github/workflows/test-cloud.yml index f5efab9..5687417 100644 --- a/.github/workflows/test-cloud.yml +++ b/.github/workflows/test-cloud.yml @@ -156,6 +156,7 @@ jobs: AWS_S3_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }} AWS_S3_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }} AWS_STORAGE_BUCKET_NAME: ${{ secrets.AWS_STORAGE_BUCKET_NAME }} + SVIX_TOKEN: ${{ secrets.SVIX_TOKEN }} GOTENBERG_URL: http://localhost:3000 REDIS_URL: redis://localhost:6379/0 run: pytest --cov-fail-under=50 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0255084..65e7eb9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -80,6 +80,7 @@ jobs: AWS_S3_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }} AWS_S3_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }} AWS_STORAGE_BUCKET_NAME: ${{ secrets.AWS_STORAGE_BUCKET_NAME }} + SVIX_TOKEN: ${{ secrets.SVIX_TOKEN }} DATABASE_URL: postgres://test_user:test_password@localhost:5432/test_db GOTENBERG_URL: http://localhost:3000 run: pytest diff --git a/web/accounts/migrations/0003_customuser_svix_application_id.py b/web/accounts/migrations/0003_customuser_svix_application_id.py new file mode 100644 index 0000000..041aa51 --- /dev/null +++ b/web/accounts/migrations/0003_customuser_svix_application_id.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.1 on 2024-11-15 15:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('accounts', '0002_customuser_token'), + ] + + operations = [ + migrations.AddField( + model_name='customuser', + name='svix_application_id', + field=models.CharField(blank=True, max_length=255), + ), + ] diff --git a/web/accounts/migrations/0004_customuser_svix_endpoint_id.py b/web/accounts/migrations/0004_customuser_svix_endpoint_id.py new file mode 100644 index 0000000..3a3c2bb --- /dev/null +++ b/web/accounts/migrations/0004_customuser_svix_endpoint_id.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.1 on 2024-11-20 22:37 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('accounts', '0003_customuser_svix_application_id'), + ] + + operations = [ + migrations.AddField( + model_name='customuser', + name='svix_endpoint_id', + field=models.CharField(blank=True, max_length=255), + ), + ] diff --git a/web/accounts/models.py b/web/accounts/models.py index 9adc853..1c7205c 100644 --- a/web/accounts/models.py +++ b/web/accounts/models.py @@ -12,6 +12,8 @@ class CustomUser(AbstractUser): tier = models.CharField(max_length=50, choices=TIER, default="free") stripe_customer_id = models.CharField(max_length=255, blank=True) stripe_subscription_id = models.CharField(max_length=255, blank=True) + svix_application_id = models.CharField(max_length=255, blank=True) + svix_endpoint_id = models.CharField(max_length=255, blank=True) token = models.CharField(max_length=255, blank=True) def __str__(self) -> str: diff --git a/web/api/tests/tests.py b/web/api/tests/tests.py index 0517c95..14f39d7 100644 --- a/web/api/tests/tests.py +++ b/web/api/tests/tests.py @@ -6,12 +6,21 @@ from accounts.models import CustomUser from api.middleware import add_slash from api.models import Collection, Document, Page, PageEmbedding -from api.views import (Bearer, QueryFilter, QueryIn, filter_collections, - filter_documents, filter_query, router) +from api.views import ( + Bearer, + QueryFilter, + QueryIn, + filter_collections, + filter_documents, + filter_query, + router, +) from django.core.exceptions import ValidationError as DjangoValidationError from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import override_settings from ninja.testing import TestAsyncClient from pydantic import ValidationError +from svix.api import ApplicationOut, EndpointOut, EndpointSecretOut pytestmark = [pytest.mark.django_db(transaction=True, reset_sequences=True)] @@ -309,6 +318,189 @@ async def test_delete_collection_not_found(async_client, user, collection): """ Document tests """ +async def test_add_webhook(async_client, user): + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Mock SvixAsync endpoint creation + with patch("api.views.SvixAsync") as MockSvixAsync: + # Create a mock instance of SvixAsync + mock_svix = AsyncMock() + MockSvixAsync.return_value = mock_svix + + # Set return values for the mocked methods + mock_svix.application.create.return_value = ApplicationOut( + id="app_id", + name="app_name", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + ) + mock_svix.endpoint.create.return_value = EndpointOut( + id="endpoint_id", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + description="endpoint_description", + url="endpoint_url", + version="v1", + ) + mock_svix.endpoint.get_secret.return_value = EndpointSecretOut(key="secret_key") + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response is successful + assert response.status_code == 200, "Failed to register webhook" + + # Verify that Svix application.create was called + ( + mock_svix.application.create.assert_called_once(), + "Svix application.create was not called", + ) + + # Verify that Svix endpoint.create was called + ( + mock_svix.endpoint.create.assert_called_once(), + "Svix endpoint.create was not called", + ) + + # Verify that Svix endpoint.get_secret was called + ( + mock_svix.endpoint.get_secret.assert_called_once(), + "Svix endpoint.get_secret was not called", + ) + + +@override_settings(SVIX_TOKEN="") +async def test_add_webhook_no_token(async_client, user): + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response status code is 400 + assert response.status_code == 400 + + +async def test_add_webhook_error(async_client, user): + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Mock SvixAsync endpoint creation + with patch("api.views.SvixAsync") as MockSvixAsync: + # Create a mock instance of SvixAsync + mock_svix = AsyncMock() + MockSvixAsync.return_value = mock_svix + + # Simulate an exception being raised during the application.create call + mock_svix.application.create.side_effect = Exception( + "Failed to create application" + ) + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response status code is 400 + assert response.status_code == 400 + + # Verify that Svix application.create was called + ( + mock_svix.application.create.assert_called_once(), + "Svix application.create was not called", + ) + + +async def test_add_webhook_twice(async_client, user): + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Mock SvixAsync endpoint creation + with patch("api.views.SvixAsync") as MockSvixAsync: + # Create a mock instance of SvixAsync + mock_svix = AsyncMock() + MockSvixAsync.return_value = mock_svix + + # Set return values for the mocked methods + mock_svix.application.create.return_value = ApplicationOut( + id="app_id", + name="app_name", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + ) + mock_svix.endpoint.create.return_value = EndpointOut( + id="endpoint_id", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + description="endpoint_description", + url="endpoint_url", + version="v1", + ) + mock_svix.endpoint.update.return_value = EndpointOut( + id="endpoint_id", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + description="endpoint_description", + url="endpoint_url", + version="v1", + ) + mock_svix.endpoint.get_secret.return_value = EndpointSecretOut(key="secret_key") + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response is successful + assert response.status_code == 200, "Failed to register webhook" + + # Register the webhook again by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response is successful + assert response.status_code == 200, "Failed to register webhook" + + # Verify that Svix application.create was called + ( + mock_svix.application.create.assert_called_once(), + "Svix application.create was not called", + ) + + # Verify that Svix endpoint.create was called once + ( + mock_svix.endpoint.create.assert_called_once(), + "Svix endpoint.create was not called", + ) + + # Verify that Svix endpoint.update was called once + ( + mock_svix.endpoint.update.assert_called_once(), + "Svix endpoint.update was not called", + ) + + async def test_create_document_pdf_url_await(async_client, user): response = await async_client.post( "/documents/upsert-document/", @@ -343,6 +535,81 @@ async def test_create_document_pdf_url_async(async_client, user): assert response.status_code == 202 +async def test_create_document_pdf_url_async_webhook(async_client, user): + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Mock SvixAsync endpoint creation + with patch("api.views.SvixAsync") as MockSvixAsync: + # Create a mock instance of SvixAsync + mock_svix = AsyncMock() + MockSvixAsync.return_value = mock_svix + + # Set return values for the mocked methods + mock_svix.application.create.return_value = ApplicationOut( + id="app_id", + name="app_name", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + ) + mock_svix.endpoint.create.return_value = EndpointOut( + id="endpoint_id", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + description="endpoint_description", + url="endpoint_url", + version="v1", + ) + mock_svix.endpoint.get_secret.return_value = EndpointSecretOut(key="secret_key") + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response is successful + assert response.status_code == 200, "Failed to register webhook" + + # Verify that Svix application.create was called + ( + mock_svix.application.create.assert_called_once(), + "Svix application.create was not called", + ) + + # Verify that Svix endpoint.create was called + ( + mock_svix.endpoint.create.assert_called_once(), + "Svix endpoint.create was not called", + ) + + # Create a document with a PDF URL + response = await async_client.post( + "/documents/upsert-document/", + json={ + "name": "Test Document Fixture", + "url": "https://pdfobject.com/pdf/sample.pdf", + }, + headers={"Authorization": f"Bearer {user.token}"}, + ) + assert response.status_code == 202 + + # Wait for all pending tasks to complete + pending_tasks = [ + task for task in asyncio.all_tasks() if task is not asyncio.current_task() + ] + await asyncio.gather(*pending_tasks) + + # Verify that Svix message.create was called + ( + mock_svix.message.create.assert_called_once(), + "Svix message.create was not called", + ) + + # the update in upsert async def test_create_document_pdf_url_update_await( async_client, user, document, collection @@ -1791,6 +2058,113 @@ async def test_document_fetch_failure_async(async_client, user): mock_email_instance.send.assert_called_once() +async def test_document_fetch_failure_async_webhook(async_client, user): + AIOHTTP_GET_PATH = "api.models.aiohttp.ClientSession.get" + AIOHTTP_HEAD_PATH = "api.models.aiohttp.ClientSession.head" + + # Mock for HEAD request + mock_head_response = AsyncMock() + mock_head_response.status = 200 + mock_head_response.headers = { + "Content-Type": "application/pdf", + "Content-Length": "1000", + } + mock_head_response.__aenter__.return_value = mock_head_response + + # Mock for GET request (failing response) + mock_get_response = AsyncMock() + mock_get_response.status = 500 + mock_get_response.headers = {} + mock_get_response.read = AsyncMock(return_value=b"") + mock_get_response.__aenter__.return_value = mock_get_response + + # Patch both HEAD and GET methods + with patch(AIOHTTP_HEAD_PATH, return_value=mock_head_response) as mock_head, patch( + AIOHTTP_GET_PATH, return_value=mock_get_response + ) as mock_get: + # Define a mock webhook URL + webhook_url = "http://localhost:8000/webhook-receive" + + # Mock SvixAsync endpoint creation + with patch("api.views.SvixAsync") as MockSvixAsync: + # Create a mock instance of SvixAsync + mock_svix = AsyncMock() + MockSvixAsync.return_value = mock_svix + + # Set return values for the mocked methods + mock_svix.application.create.return_value = ApplicationOut( + id="app_id", + name="app_name", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + ) + mock_svix.endpoint.create.return_value = EndpointOut( + id="endpoint_id", + created_at="2021-01-01T00:00:00Z", + metadata={}, + updated_at="2021-01-01T00:00:00Z", + description="endpoint_description", + url="endpoint_url", + version="v1", + ) + mock_svix.endpoint.get_secret.return_value = EndpointSecretOut( + key="secret_key" + ) + + # Register the webhook by calling the /webhook/ endpoint + response = await async_client.post( + "/webhook/", + json={"url": webhook_url}, + headers={"Authorization": f"Bearer {user.token}"}, + ) + + # Assert that the response is successful + assert response.status_code == 200, "Failed to register webhook" + + # Verify that Svix application.create was called + ( + mock_svix.application.create.assert_called_once(), + "Svix application.create was not called", + ) + + # Verify that Svix endpoint.create was called + ( + mock_svix.endpoint.create.assert_called_once(), + "Svix endpoint.create was not called", + ) + + # Create a document with a PDF URL + # Perform the POST request + response = await async_client.post( + "/documents/upsert-document/", + json={ + "name": "Test Document Fetch Failure", + "url": "https://example.com/nonexistent.pdf", + }, + headers={"Authorization": f"Bearer {user.token}"}, + ) + assert response.status_code == 202 + + # Wait for all pending tasks to complete + pending_tasks = [ + task + for task in asyncio.all_tasks() + if task is not asyncio.current_task() + ] + await asyncio.gather(*pending_tasks) + + # Assert that both HEAD and GET were called + mock_head.assert_called_once() + mock_get.assert_called_once() + + # Verify that Svix message.create was called + ( + mock_svix.message.create.assert_called_once(), + "Svix message.create was not called", + ) + + async def test_document_file_too_big(async_client, user): AIOHTTP_GET_PATH = "api.models.aiohttp.ClientSession.get" AIOHTTP_HEAD_PATH = "api.models.aiohttp.ClientSession.head" diff --git a/web/api/views.py b/web/api/views.py index 793c67c..6e03191 100644 --- a/web/api/views.py +++ b/web/api/views.py @@ -18,6 +18,7 @@ from ninja.security import HttpBearer from pgvector.utils import HalfVector from pydantic import Field, model_validator +from svix.api import ApplicationIn, EndpointIn, EndpointUpdate, MessageIn, SvixAsync from typing_extensions import Self from .models import Collection, Document, MaxSim, Page @@ -69,7 +70,7 @@ async def authenticate( class CollectionIn(Schema): name: str - metadata: Optional[dict] = Field(default_factory=dict) + metadata: Optional[dict] = Field(default_factory=lambda: {}) @model_validator(mode="after") def validate_name(self) -> Self: @@ -347,7 +348,7 @@ class DocumentOut(Schema): class DocumentInPatch(Schema): name: Optional[str] = None - metadata: Optional[dict] = Field(default_factory=dict) + metadata: Optional[dict] = Field(default_factory=lambda: {}) collection_name: Optional[str] = Field( "default_collection", description="""The name of the collection to which the document belongs. If not provided, the document will be added to the default_collection. Use 'all' to access all collections belonging to the user.""", @@ -410,6 +411,31 @@ async def process_upsert_document( ) ) logger.info(f"Document {document.name} processed successfully.") + + if ( + not payload.wait + and request.auth.svix_application_id + and settings.SVIX_TOKEN != "" + ): + # send an event to the webhook + svix = SvixAsync(settings.SVIX_TOKEN) + await svix.message.create( + request.auth.svix_application_id, + MessageIn( + event_type="upsert.success", + payload={ + "type": "upsert.success", + "message": "Document upserted successfully", + "id": document.id, + "name": document.name, + "metadata": document.metadata, + "url": await document.get_url(), + "num_pages": document.num_pages, + "collection_name": document.collection.name, + }, + ), + ) + return 201, DocumentOut( id=document.id, name=document.name, @@ -422,7 +448,30 @@ async def process_upsert_document( except Exception as e: logger.error(f"Error processing document: {str(e)}") - if not payload.wait: # only send an email in the async case + if ( + not payload.wait + and request.auth.svix_application_id + and settings.SVIX_TOKEN != "" + ): + # send an event to the webhook + svix = SvixAsync(settings.SVIX_TOKEN) + await svix.message.create( + request.auth.svix_application_id, + MessageIn( + event_type="upsert.fail", + payload={ + "type": "upsert.fail", + "message": "There was an error processing your document", + "name": payload.name, + "metadata": payload.metadata, + "collection_name": payload.collection_name, + "error": str(e), + }, + ), + ) + elif ( + not payload.wait + ): # only send an email in the async case and if there's no webhook user_email = request.auth.email admin_email = settings.ADMINS[0][1] from_email = settings.DEFAULT_FROM_EMAIL @@ -1273,3 +1322,94 @@ async def embeddings( # change object to _object output_data["_object"] = output_data.pop("object") return 200, EmbeddingsOut(**output_data) + + +""" Webhooks """ + + +class WebhookIn(Schema): + url: str + + +class WebhookOut(Schema): + app_id: str + endpoint_id: str + webhook_secret: str + + +@router.post( + "/webhook/", + auth=Bearer(), + tags=["webhook"], + response={200: WebhookOut, 400: GenericError}, +) +async def add_webhook( + request: Request, payload: WebhookIn +) -> Tuple[int, GenericError] | Tuple[int, WebhookOut]: + """ + Add a webhook to the service. + + This endpoint allows the user to add a webhook to the service. The webhook will be called when a document is upserted + with the upsertion status. + + Events are document upsert successful, document upsert failed. + + Args: + request: The HTTP request object, which includes the user information. + url (str): The URL of the webhook. + + Returns: + A message indicating that the webhook was added successfully. + + Raises: + HttpError: If the webhook is invalid. + """ + try: + # Throw an error if SVIX_TOKEN is not set + if settings.SVIX_TOKEN == "": + raise ValueError( + "SVIX_TOKEN is not set in the environment variables. Please set it before adding a webhook." + ) + + svix = SvixAsync(settings.SVIX_TOKEN) + + if not request.auth.svix_application_id: + app = await svix.application.create(ApplicationIn(name=request.auth.email)) + app_id = app.id + else: + app_id = request.auth.svix_application_id + + if request.auth.svix_endpoint_id: + # update the webhook + endpoint_out = await svix.endpoint.update( + app_id, + request.auth.svix_endpoint_id, + EndpointUpdate( + url=payload.url, + ), + ) + else: + # create the webhook + endpoint_out = await svix.endpoint.create( + app_id, + EndpointIn( + url=payload.url, + version=1, + description="User webhook", + ), + ) + + # save the app_id and endpoint_id to the user + request.auth.svix_application_id = app_id + request.auth.svix_endpoint_id = endpoint_out.id + await request.auth.asave() + + endpoint_secret_out = await svix.endpoint.get_secret(app_id, endpoint_out.id) + + return 200, WebhookOut( + app_id=app_id, + endpoint_id=endpoint_out.id, + webhook_secret=endpoint_secret_out.key, + ) + except Exception as e: + return 400, GenericError(detail="Error adding webhook: " + str(e)) diff --git a/web/config/settings.py b/web/config/settings.py index 2223e40..f35aed7 100644 --- a/web/config/settings.py +++ b/web/config/settings.py @@ -283,6 +283,9 @@ AWS_S3_SECRET_ACCESS_KEY = env("AWS_S3_SECRET_ACCESS_KEY", default="dummy_key") AWS_STORAGE_BUCKET_NAME = env("AWS_STORAGE_BUCKET_NAME", default="dummy_bucket") +# Svix +SVIX_TOKEN = env("SVIX_TOKEN", default="") + STORAGES = { "default": { "BACKEND": "storages.backends.s3.S3Storage", diff --git a/web/requirements.in b/web/requirements.in index 5d9198d..ccc1795 100644 --- a/web/requirements.in +++ b/web/requirements.in @@ -20,4 +20,5 @@ pytest-django==4.8.0 pytest-asyncio==0.24.0 pytest-cov==5.0.0 mypy==1.11 -django-stubs[compatible-mypy]==5.1.0 \ No newline at end of file +django-stubs[compatible-mypy]==5.1.0 +svix==1.40.0 \ No newline at end of file