Skip to content

Commit

Permalink
unittests work
Browse files Browse the repository at this point in the history
  • Loading branch information
TunahanGuler committed Jan 9, 2025
1 parent 1e16057 commit fa8f2f6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 46 deletions.
17 changes: 1 addition & 16 deletions components/collector/src/source_collectors/bitbucket/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@
class BitbucketBase(SourceCollector, ABC):
"""Base class for Bitbucket collectors."""

async def _get_source_responses(self, *urls: URL) -> SourceResponses:
"""Extend to follow Bitbucket pagination links, if necessary."""
all_responses = responses = await super()._get_source_responses(*urls)
while next_urls := await self._next_urls(responses):
# Retrieving consecutive big responses without reading the response hangs the client, see
# https://github.com/aio-libs/aiohttp/issues/2217
for response in responses:
await response.read()
all_responses.extend(responses := await super()._get_source_responses(*next_urls))
return all_responses

def _basic_auth_credentials(self) -> tuple[str, str] | None:
"""Override to return None, as the private token is passed as header."""
return None
Expand All @@ -33,10 +22,6 @@ def _headers(self) -> dict[str, str]:
headers["Authorization"] = "Bearer " + str(private_token)
return headers

async def _next_urls(self, responses: SourceResponses) -> list[URL]:
"""Return the next (pagination) links from the responses."""
return [URL(next_url) for response in responses if (next_url := response.links.get("next", {}).get("url"))]


class BitbucketProjectBase(BitbucketBase, ABC):
"""Base class for Bitbucket collectors for a specific project."""
Expand All @@ -46,4 +31,4 @@ async def _bitbucket_api_url(self, api: str) -> URL:
url = await super()._api_url()
project = f"{self._parameter('owner')}/repos/{self._parameter('repository')}"
api_url = URL(f"{url}/rest/api/1.0/projects/{project}" + (f"/{api}" if api else ""))
return add_query(api_url, "pagelen=100&details=true")
return add_query(api_url, "limit=100&details=true")
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
from datetime import datetime
from typing import cast

from collector_utilities.exceptions import NotFoundError
from base_collectors import BranchType, InactiveBranchesSourceCollector
from collector_utilities.date_time import parse_datetime
from collector_utilities.type import URL
from model import SourceResponses

from .base import BitbucketProjectBase

class BitbucketBranchInfoError(NotFoundError):
"""Bitbucket branch info is missing."""

def __init__(self, project: str) -> None:
tip = (
"Please check if the repository (name with owner) and access token (with repo scope) are "
"configured correctly."
)
super().__init__("Branch info for repository", project, extra=tip)

class BitbucketBranchType(BranchType):
"""Bitbucket branch information as returned by the API."""
Expand Down Expand Up @@ -43,6 +53,8 @@ async def _branches(self, responses: SourceResponses) -> list[Branch]:
)
for branch in json["values"]
])
if len(branches) == 0:
raise BitbucketBranchInfoError(f"projects/{self._parameter('owner')}/repos/{self._parameter('repository')}")
return branches

def _is_default_branch(self, branch: Branch) -> bool:
Expand All @@ -61,4 +73,5 @@ def _branch_landing_url(self, branch: Branch) -> URL:
"""Override to get the landing URL from the branch."""
instance_url=super()._parameter('url')
project = f"projects/{self._parameter('owner')}/repos/{self._parameter('repository')}/browse?at="
return cast(URL, f"{instance_url}/{project}{branch.get('id') or ""}")
branch_id = branch.get('id').lstrip('/')
return cast(URL, f"{instance_url}/{project}{branch_id or ""}")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from datetime import datetime
from dateutil.tz import tzutc
from unittest.mock import AsyncMock
import pdb

from .base import BitbucketTestCase

Expand All @@ -25,7 +24,7 @@ def setUp(self):
self.branches = self.create_branches_json([main, unmerged, ignored, active_unmerged])
self.unmerged_branch_entity = self.create_entity("unmerged_branch")
self.entities = [self.unmerged_branch_entity]
self.landing_url = "https://bitbucket/rest/api/1.0/projects/owner/repos/repository/branches?pagelen=100&details=true"
self.landing_url = "https://bitbucket/rest/api/1.0/projects/owner/repos/repository/branches?limit=100&details=true"

def create_branch(
self, name: str, *, default: bool = False, active: bool = False
Expand Down Expand Up @@ -69,40 +68,26 @@ def create_entity(self, name: str) -> dict[str, str]:
async def test_inactive_branches(self):
"""Test that the number of inactive branches can be measured."""
response = await self.collect(get_request_json_return_value=self.branches)
# pdb.set_trace()
self.assert_measurement(response, value="1", entities=self.entities, landing_url=self.landing_url)

async def test_unmerged_inactive_branches(self):
"""Test that the number of unmerged inactive branches can be measured."""
self.set_source_parameter("branch_merge_status", ["unmerged"])
response = await self.collect(get_request_json_return_value=self.branches)
# pdb.set_trace()
self.assert_measurement(
response, value="1", entities=[self.unmerged_branch_entity], landing_url=self.landing_url
)

# async def test_merged_inactive_branches(self):
# """Test that the number of merged inactive branches can be measured."""
# self.set_source_parameter("branch_merge_status", ["merged"])
# response = await self.collect(get_request_json_return_value=self.branches)
# self.assert_measurement(response, value="0", entities=[self.unmerged_branch_entity], landing_url=self.landing_url)

# async def test_pagination(self):
# """Test that pagination works."""
# branches = [self.branches[:3], self.branches[3:]]
# links = {"next": {"url": "https://bitbucket/next_page"}}
# response = await self.collect(get_request_json_side_effect=branches, get_request_links=links)
# self.assert_measurement(response, value="2", entities=self.entities, landing_url=self.landing_url)
#
# async def test_private_token(self):
# """Test that the private token is used."""
# self.set_source_parameter("private_token", "token")
# inactive_branches_json = {"data": {"repository": None}}
# inactive_branches_response = AsyncMock()
# inactive_branches_response.json = AsyncMock(return_value=inactive_branches_json)
# response = await self.collect(get_request_json_return_value=self.branches)
# self.assert_measurement(
# response,
# landing_url=self.landing_url,
# connection_error="Pull request info for repository",
# )
async def test_private_token(self):
"""Test that the private token is used."""
self.set_source_parameter("private_token", "token")
inactive_branches_json = {"values": None}
inactive_branches_response = AsyncMock()
execute = AsyncMock(side_effect=[inactive_branches_response])
inactive_branches_response.json = AsyncMock(return_value=inactive_branches_json)
response = await self.collect(get_request_json_return_value=execute)
self.assert_measurement(
response,
landing_url=self.landing_url,
parse_error="Branch info for repository",
)

0 comments on commit fa8f2f6

Please sign in to comment.