diff --git a/libs/community/langchain_community/document_loaders/recursive_url_loader.py b/libs/community/langchain_community/document_loaders/recursive_url_loader.py index 6fca4edf86a80..fd5bc3ec4c8d7 100644 --- a/libs/community/langchain_community/document_loaders/recursive_url_loader.py +++ b/libs/community/langchain_community/document_loaders/recursive_url_loader.py @@ -93,6 +93,7 @@ def __init__( link_regex: Union[str, re.Pattern, None] = None, headers: Optional[dict] = None, check_response_status: bool = False, + concurrent_requests_limit: Optional[int] = 100 ) -> None: """Initialize with URL to crawl and any subdirectories to exclude. @@ -117,6 +118,8 @@ def __init__( link_regex: Regex for extracting sub-links from the raw html of a web page. check_response_status: If True, check HTTP response status and skip URLs with error responses (400-599). + concurrent_requests_limit: Maximum number of concurrent http requests when + using asynchronous loading. """ self.url = url @@ -142,6 +145,7 @@ def __init__( self._lock = asyncio.Lock() if self.use_async else None self.headers = headers self.check_response_status = check_response_status + self.concurrent_requests_limit = concurrent_requests_limit def _get_child_links_recursive( self, url: str, visited: Set[str], *, depth: int = 0 @@ -196,9 +200,10 @@ async def _async_get_child_links_recursive( self, url: str, visited: Set[str], + semaphore: asyncio.Semaphore, *, session: Optional[aiohttp.ClientSession] = None, - depth: int = 0, + depth: int = 0 ) -> List[Document]: """Recursively get all child links starting with the path of the input URL. @@ -206,6 +211,7 @@ async def _async_get_child_links_recursive( url: The URL to crawl. visited: A set of visited URLs. depth: To reach the current url, how many pages have been visited. + semaphore: An object to keep track of the number of concurrent requests. """ try: import aiohttp @@ -231,19 +237,26 @@ async def _async_get_child_links_recursive( ) async with self._lock: # type: ignore visited.add(url) - try: - async with session.get(url) as response: - text = await response.text() - if self.check_response_status and 400 <= response.status <= 599: - raise ValueError(f"Received HTTP status {response.status}") - except (aiohttp.client_exceptions.InvalidURL, Exception) as e: - logger.warning( - f"Unable to load {url}. Received error {e} of type " - f"{e.__class__.__name__}" - ) - if close_session: - await session.close() - return [] + + async with semaphore: + try: + async with session.get(url) as response: + text = await response.text() + if self.check_response_status and 400 <= response.status <= 599: + raise ValueError(f"Received HTTP status {response.status}") + except (aiohttp.client_exceptions.InvalidURL, Exception) as e: + logger.warning( + f"Unable to load {url}. Received error {e} of type " + f"{e.__class__.__name__}" + ) + if close_session: + await session.close() + return [] + + # Wait if the concurrent requests limit is reached + if semaphore.locked(): + await asyncio.sleep(1) + results = [] content = self.extractor(text) if content: @@ -270,7 +283,7 @@ async def _async_get_child_links_recursive( for link in to_visit: sub_tasks.append( self._async_get_child_links_recursive( - link, visited, session=session, depth=depth + 1 + link, visited, semaphore, session=session, depth=depth + 1 ) ) next_results = await asyncio.gather(*sub_tasks) @@ -291,8 +304,9 @@ def lazy_load(self) -> Iterator[Document]: but it will still work in the expected way, just not lazy.""" visited: Set[str] = set() if self.use_async: + semaphore = asyncio.Semaphore(self.concurrent_requests_limit) results = asyncio.run( - self._async_get_child_links_recursive(self.url, visited) + self._async_get_child_links_recursive(self.url, visited, semaphore) ) return iter(results or []) else: