From 09c2b827d8f686946ec7a2f2cdc53daab426093e Mon Sep 17 00:00:00 2001 From: vincenzofanizza Date: Thu, 4 Jan 2024 02:35:32 +0100 Subject: [PATCH 1/2] included concurrency limit in RecursiveUrlLoader --- .../document_loaders/recursive_url_loader.py | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/libs/community/langchain_community/document_loaders/recursive_url_loader.py b/libs/community/langchain_community/document_loaders/recursive_url_loader.py index 6fca4edf86a80..617c2afc4eeb0 100644 --- a/libs/community/langchain_community/document_loaders/recursive_url_loader.py +++ b/libs/community/langchain_community/document_loaders/recursive_url_loader.py @@ -93,6 +93,7 @@ def __init__( link_regex: Union[str, re.Pattern, None] = None, headers: Optional[dict] = None, check_response_status: bool = False, + concurrent_requests_limit: Optional[int] = 100 ) -> None: """Initialize with URL to crawl and any subdirectories to exclude. @@ -117,6 +118,8 @@ def __init__( link_regex: Regex for extracting sub-links from the raw html of a web page. check_response_status: If True, check HTTP response status and skip URLs with error responses (400-599). + concurrent_requests_limit: Maximum number of concurrent http requests when + using asyncronous loading. """ self.url = url @@ -142,6 +145,7 @@ def __init__( self._lock = asyncio.Lock() if self.use_async else None self.headers = headers self.check_response_status = check_response_status + self.concurrent_requests_limit = concurrent_requests_limit def _get_child_links_recursive( self, url: str, visited: Set[str], *, depth: int = 0 @@ -196,9 +200,10 @@ async def _async_get_child_links_recursive( self, url: str, visited: Set[str], + semaphore: asyncio.Semaphore, *, session: Optional[aiohttp.ClientSession] = None, - depth: int = 0, + depth: int = 0 ) -> List[Document]: """Recursively get all child links starting with the path of the input URL. @@ -206,6 +211,7 @@ async def _async_get_child_links_recursive( url: The URL to crawl. visited: A set of visited URLs. depth: To reach the current url, how many pages have been visited. + semaphore: An object to keep track of the number of concurrent requests. """ try: import aiohttp @@ -231,19 +237,26 @@ async def _async_get_child_links_recursive( ) async with self._lock: # type: ignore visited.add(url) - try: - async with session.get(url) as response: - text = await response.text() - if self.check_response_status and 400 <= response.status <= 599: - raise ValueError(f"Received HTTP status {response.status}") - except (aiohttp.client_exceptions.InvalidURL, Exception) as e: - logger.warning( - f"Unable to load {url}. Received error {e} of type " - f"{e.__class__.__name__}" - ) - if close_session: - await session.close() - return [] + + async with semaphore: + try: + async with session.get(url) as response: + text = await response.text() + if self.check_response_status and 400 <= response.status <= 599: + raise ValueError(f"Received HTTP status {response.status}") + except (aiohttp.client_exceptions.InvalidURL, Exception) as e: + logger.warning( + f"Unable to load {url}. Received error {e} of type " + f"{e.__class__.__name__}" + ) + if close_session: + await session.close() + return [] + + # Wait if the concurrent requests limit is reached + if semaphore.locked(): + await asyncio.sleep(1) + results = [] content = self.extractor(text) if content: @@ -270,7 +283,7 @@ async def _async_get_child_links_recursive( for link in to_visit: sub_tasks.append( self._async_get_child_links_recursive( - link, visited, session=session, depth=depth + 1 + link, visited, semaphore, session=session, depth=depth + 1 ) ) next_results = await asyncio.gather(*sub_tasks) @@ -291,8 +304,9 @@ def lazy_load(self) -> Iterator[Document]: but it will still work in the expected way, just not lazy.""" visited: Set[str] = set() if self.use_async: + semaphore = asyncio.Semaphore(self.concurrent_requests_limit) results = asyncio.run( - self._async_get_child_links_recursive(self.url, visited) + self._async_get_child_links_recursive(self.url, visited, semaphore) ) return iter(results or []) else: From 8ccece4cfd8d811eca1067ffeb02a410a9441a31 Mon Sep 17 00:00:00 2001 From: vincenzofanizza Date: Fri, 12 Jan 2024 18:24:17 +0100 Subject: [PATCH 2/2] fixed spelling mistake --- .../document_loaders/recursive_url_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/community/langchain_community/document_loaders/recursive_url_loader.py b/libs/community/langchain_community/document_loaders/recursive_url_loader.py index 617c2afc4eeb0..fd5bc3ec4c8d7 100644 --- a/libs/community/langchain_community/document_loaders/recursive_url_loader.py +++ b/libs/community/langchain_community/document_loaders/recursive_url_loader.py @@ -119,7 +119,7 @@ def __init__( check_response_status: If True, check HTTP response status and skip URLs with error responses (400-599). concurrent_requests_limit: Maximum number of concurrent http requests when - using asyncronous loading. + using asynchronous loading. """ self.url = url