Skip to content

Commit

Permalink
Support App tokens for single repo enumeration (#37)
Browse files Browse the repository at this point in the history
* Adding option to run with app tokens for single repo enum.

* Add unit test cases.

* Fixing checks to allow S2S tokens for single repo enumeration.
  • Loading branch information
AdnaneKhan authored Sep 23, 2024
1 parent d386dd3 commit 004dedd
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 23 deletions.
30 changes: 16 additions & 14 deletions gatox/caching/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,71 +43,73 @@ def get_workflow(self, repo_slug: str, workflow_name: str):
"""
Get a workflow from the in-memory dictionary.
"""
key = f"{repo_slug}:{workflow_name}"
key = f"{repo_slug.lower()}:{workflow_name}"
return self.workflow_cache.get(key, None)

def is_repo_cached(self, repo_slug: str):
"""
Check if a repository is in the in-memory dictionary.
"""
return repo_slug in self.repo_wf_lookup
return repo_slug.lower() in self.repo_wf_lookup

def is_action_cached(self, repo_slug: str, action_path: str, ref: str):
"""
Check if action is cached.
"""
key = f"{repo_slug}:{action_path}:{ref}"
key = f"{repo_slug.lower()}:{action_path}:{ref}"
return key in self.action_cache

def get_workflows(self, repo_slug: str):
"""
Get all workflows for a repository from the in-memory dictionary.
"""
wf_keys = self.repo_wf_lookup.get(repo_slug, None)
wf_keys = self.repo_wf_lookup.get(repo_slug.lower(), None)
if wf_keys:
return [self.workflow_cache[f"{repo_slug}:{key}"] for key in wf_keys]
return [
self.workflow_cache[f"{repo_slug.lower()}:{key}"] for key in wf_keys
]
else:
return set()

def get_action(self, repo_slug: str, action_path: str, ref: str):
"""
Get an action from the in-memory dictionary.
"""
key = f"{repo_slug}:{action_path}:{ref}"
key = f"{repo_slug.lower()}:{action_path}:{ref}"
return self.action_cache.get(key, None)

def set_repository(self, repository: Repository):
"""
Set a repository in the in-memory dictionary.
"""
key = repository.name
key = repository.name.lower()
self.repo_store[key] = repository

def get_repository(self, repo_slug: str):
"""
Get a repository from the in-memory dictionary.
"""
return self.repo_store.get(repo_slug, None)
return self.repo_store.get(repo_slug.lower(), None)

def set_workflow(self, repo_slug: str, workflow_name: str, value: Workflow):
"""
Set a workflow in the in-memory dictionary.
"""
key = f"{repo_slug}:{workflow_name}"
if repo_slug not in self.repo_wf_lookup:
self.repo_wf_lookup[repo_slug] = set()
self.repo_wf_lookup[repo_slug].add(workflow_name)
key = f"{repo_slug.lower()}:{workflow_name}"
if repo_slug.lower() not in self.repo_wf_lookup:
self.repo_wf_lookup[repo_slug.lower()] = set()
self.repo_wf_lookup[repo_slug.lower()].add(workflow_name)
self.workflow_cache[key] = value

def set_empty(self, repo_slug: str):
"""
Set an empty value in the in-memory dictionary for a repository.
"""
self.repo_wf_lookup[repo_slug] = set()
self.repo_wf_lookup[repo_slug.lower()] = set()

def set_action(self, repo_slug: str, action_path: str, ref: str, value: str):
"""
Set an action in the in-memory dictionary.
"""
key = f"{repo_slug}:{action_path}:{ref}"
key = f"{repo_slug.lower()}:{action_path}:{ref}"
self.action_cache[key] = value
17 changes: 11 additions & 6 deletions gatox/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,19 @@ def validate_arguments(args, parser):
re.match("gh[po]_[A-Za-z0-9]{36}$", gh_token)
or re.match("^[a-fA-F0-9]{40}$", gh_token)
):
if re.match("gh[usr]_[A-Za-z0-9]{36}$", gh_token):
parser.error(
f"{Fore.RED}[!]{Style.RESET_ALL} Gato-X only"
" supports GitHub OAuth and Personal Access Tokens."
)
if re.match("gh[s]_[A-Za-z0-9]{36}$", gh_token):
if not (args.machine and args.repository):
parser.error(
f"{Fore.RED}[!]{Style.RESET_ALL} Gato-X does"
" not support App tokens without machine flag."
)
else:
Output.info(
"Allowing the use of a GitHub App token for single repo enumeration."
)
else:
parser.error(
f"{Fore.RED}[!]{Style.RESET_ALL} Provided GitHub PAT is" " malformed!"
f"{Fore.RED}[!]{Style.RESET_ALL} Provided GitHub PAT is malformed or unsupported!"
)

args_dict = vars(args)
Expand Down
9 changes: 9 additions & 0 deletions gatox/cli/enumeration/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ def configure_parser_enumerate(parser):
action="store_true",
)

parser.add_argument(
"--machine",
help=(
"Run with a GitHub App token, which will allow running single repository\n"
" enumeration with server-to-server or user-to-server tokens."
),
action="store_true",
)

parser.add_argument(
"--output-json",
"-oJ",
Expand Down
20 changes: 20 additions & 0 deletions gatox/enumerate/enumerate.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,26 @@ def __init__(
self.org_e = OrganizationEnum(self.api)

def __setup_user_info(self):
"""Sets up user/app token information."""
if not self.user_perms and self.api.is_app_token():
installation_info = self.api.get_installation_repos()

if installation_info:
count = installation_info["total_count"]
if count > 0:
Output.info(
f"Gato-X is using valid a GitHub App installation token!"
)
self.user_perms = {
"user": "Github App",
"scopes": [],
"name": "GATO-X App Mode",
}

return True
else:
return False

if not self.user_perms:
self.user_perms = self.api.check_user()
if not self.user_perms:
Expand Down
7 changes: 6 additions & 1 deletion gatox/enumerate/ingest/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ def construct_workflow_cache(yml_results):

cache.set_workflow(owner, yml_name, wf_wrapper)

# If we are using app installation tokens, then
# the query might return empty for this field, but if
# we are here then we can read.
if not result["viewerPermission"]:
result["viewerPermission"] = "READ"

repo_data = {
"full_name": result["nameWithOwner"],
"html_url": result["url"],
Expand Down Expand Up @@ -221,6 +227,5 @@ def construct_workflow_cache(yml_results):
if env["node"]["name"] != "github-pages"
]
repo_data["environments"] = envs

repo_wrapper = Repository(repo_data)
cache.set_repository(repo_wrapper)
10 changes: 10 additions & 0 deletions gatox/github/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ def __verify_result(response: requests.Response, expected_code: int):
return False
return True

def is_app_token(self):
"""Returns if the API is using a GitHub App installation token."""
return self.pat.startswith("ghs_")

def call_get(self, url: str, params: dict = None, strip_auth=False):
"""Internal method to wrap a GET request so that proxies and headers
do not need to be repeated.
Expand Down Expand Up @@ -1729,6 +1733,12 @@ def retrieve_raw_action(self, repo: str, file_path: str, ref: str):

return None

def get_installation_repos(self):
""" """
response = self.call_get("/installation/repositories")
if response.status_code == 200:
return response.json()

def get_commit_merge_date(self, repo: str, sha: str):
"""Gets the date of the merge commit."""

Expand Down
23 changes: 21 additions & 2 deletions unit_test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,26 @@ def test_cli_s2s_token(capfd):
with pytest.raises(SystemExit):
cli.cli(["enumerate", "-t", "test"])
out, err = capfd.readouterr()
assert "supports GitHub OAuth and Personal Access Tokens" in err
assert "not support App tokens without machine flag" in err


def test_cli_s2s_token_no_machine(capfd):
"""Test case where a service-to-service token is provided."""
os.environ["GH_TOKEN"] = "ghs_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"

with pytest.raises(SystemExit):
cli.cli(["enumerate", "-r", "testOrg/testRepo"])
out, err = capfd.readouterr()
assert "not support App tokens without machine flag" in err


def test_cli_s2s_token_machine(capfd):
"""Test case where a service-to-service token is provided."""
os.environ["GH_TOKEN"] = "ghs_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"

cli.cli(["enumerate", "-r", "testOrg/testRepo", "--machine"])
out, err = capfd.readouterr()
assert "Allowing the use of a GitHub App token for single repo enumeration" in out


def test_cli_u2s_token(capfd):
Expand All @@ -55,7 +74,7 @@ def test_cli_u2s_token(capfd):
with pytest.raises(SystemExit):
cli.cli(["enumerate", "-t", "test"])
out, err = capfd.readouterr()
assert "supports GitHub OAuth and Personal Access Tokens" in err
assert "Provided GitHub PAT is malformed or unsupported" in err


@mock.patch("gatox.cli.cli.Enumerator")
Expand Down
28 changes: 28 additions & 0 deletions unit_test/test_enumerate.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def test_init(mock_api):
@patch("gatox.enumerate.enumerate.Api")
def test_self_enumerate(mock_api, capsys):
"""Test constructor for enumerator."""

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo", "workflow"],
Expand Down Expand Up @@ -104,6 +107,8 @@ def test_enumerate_repo_admin(mock_api, capsys):
skip_log=False,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo", "workflow"],
Expand Down Expand Up @@ -137,6 +142,8 @@ def test_enumerate_repo_admin_no_wf(mock_api, capsys):
skip_log=False,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo"],
Expand Down Expand Up @@ -170,6 +177,8 @@ def test_enumerate_repo_no_wf_no_admin(mock_api, capsys):
skip_log=False,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo"],
Expand Down Expand Up @@ -205,6 +214,8 @@ def test_enumerate_repo_no_wf_maintain(mock_api, capsys):
skip_log=False,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo", "workflow"],
Expand Down Expand Up @@ -239,6 +250,8 @@ def test_enumerate_repo_only(mock_api, capsys):
skip_log=False,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["repo", "workflow"],
Expand Down Expand Up @@ -268,6 +281,8 @@ def test_enum_validate(mock_api, capfd):
"scopes": ["repo", "workflow"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_organizations.return_value = []

gh_enumeration_runner = Enumerator(
Expand All @@ -293,6 +308,8 @@ def test_enum_repo(mock_api, mock_time, capfd):
"scopes": ["repo", "workflow"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.get_repository.return_value = TEST_REPO_DATA

gh_enumeration_runner = Enumerator(
Expand All @@ -318,6 +335,8 @@ def test_enum_org(mock_api, mock_time, capfd):
"scopes": ["repo", "workflow", "admin:org"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.get_repository.return_value = TEST_REPO_DATA
mock_api.return_value.get_organization_details.return_value = TEST_ORG_DATA

Expand Down Expand Up @@ -396,6 +415,8 @@ def test_enum_repo_runner(mock_api, capfd):
"scopes": ["repo", "workflow"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.get_repo_runners.return_value = [
{
"id": 2,
Expand Down Expand Up @@ -457,6 +478,8 @@ def test_enum_repos(mock_api, mock_time, capfd):
"scopes": ["repo", "workflow"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.get_repository.return_value = TEST_REPO_DATA

gh_enumeration_runner = Enumerator(
Expand All @@ -481,6 +504,8 @@ def test_enum_repos_empty(mock_api, capfd):
"scopes": ["repo", "workflow"],
}

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.get_repository.return_value = TEST_REPO_DATA

gh_enumeration_runner = Enumerator(
Expand Down Expand Up @@ -508,6 +533,8 @@ def test_bad_token(mock_api):
skip_log=True,
)

mock_api.return_value.is_app_token.return_value = False

mock_api.return_value.check_user.return_value = None

val = gh_enumeration_runner.self_enumeration()
Expand All @@ -526,6 +553,7 @@ def test_unscoped_token(mock_api, capfd):
skip_log=True,
)

mock_api.return_value.is_app_token.return_value = False
mock_api.return_value.check_user.return_value = {
"user": "testUser",
"scopes": ["public_repo"],
Expand Down

0 comments on commit 004dedd

Please sign in to comment.