diff --git a/gatox/caching/cache_manager.py b/gatox/caching/cache_manager.py index 6d43af4..c0547ec 100644 --- a/gatox/caching/cache_manager.py +++ b/gatox/caching/cache_manager.py @@ -43,29 +43,31 @@ def get_workflow(self, repo_slug: str, workflow_name: str): """ Get a workflow from the in-memory dictionary. """ - key = f"{repo_slug}:{workflow_name}" + key = f"{repo_slug.lower()}:{workflow_name}" return self.workflow_cache.get(key, None) def is_repo_cached(self, repo_slug: str): """ Check if a repository is in the in-memory dictionary. """ - return repo_slug in self.repo_wf_lookup + return repo_slug.lower() in self.repo_wf_lookup def is_action_cached(self, repo_slug: str, action_path: str, ref: str): """ Check if action is cached. """ - key = f"{repo_slug}:{action_path}:{ref}" + key = f"{repo_slug.lower()}:{action_path}:{ref}" return key in self.action_cache def get_workflows(self, repo_slug: str): """ Get all workflows for a repository from the in-memory dictionary. """ - wf_keys = self.repo_wf_lookup.get(repo_slug, None) + wf_keys = self.repo_wf_lookup.get(repo_slug.lower(), None) if wf_keys: - return [self.workflow_cache[f"{repo_slug}:{key}"] for key in wf_keys] + return [ + self.workflow_cache[f"{repo_slug.lower()}:{key}"] for key in wf_keys + ] else: return set() @@ -73,41 +75,41 @@ def get_action(self, repo_slug: str, action_path: str, ref: str): """ Get an action from the in-memory dictionary. """ - key = f"{repo_slug}:{action_path}:{ref}" + key = f"{repo_slug.lower()}:{action_path}:{ref}" return self.action_cache.get(key, None) def set_repository(self, repository: Repository): """ Set a repository in the in-memory dictionary. """ - key = repository.name + key = repository.name.lower() self.repo_store[key] = repository def get_repository(self, repo_slug: str): """ Get a repository from the in-memory dictionary. """ - return self.repo_store.get(repo_slug, None) + return self.repo_store.get(repo_slug.lower(), None) def set_workflow(self, repo_slug: str, workflow_name: str, value: Workflow): """ Set a workflow in the in-memory dictionary. """ - key = f"{repo_slug}:{workflow_name}" - if repo_slug not in self.repo_wf_lookup: - self.repo_wf_lookup[repo_slug] = set() - self.repo_wf_lookup[repo_slug].add(workflow_name) + key = f"{repo_slug.lower()}:{workflow_name}" + if repo_slug.lower() not in self.repo_wf_lookup: + self.repo_wf_lookup[repo_slug.lower()] = set() + self.repo_wf_lookup[repo_slug.lower()].add(workflow_name) self.workflow_cache[key] = value def set_empty(self, repo_slug: str): """ Set an empty value in the in-memory dictionary for a repository. """ - self.repo_wf_lookup[repo_slug] = set() + self.repo_wf_lookup[repo_slug.lower()] = set() def set_action(self, repo_slug: str, action_path: str, ref: str, value: str): """ Set an action in the in-memory dictionary. """ - key = f"{repo_slug}:{action_path}:{ref}" + key = f"{repo_slug.lower()}:{action_path}:{ref}" self.action_cache[key] = value diff --git a/gatox/cli/cli.py b/gatox/cli/cli.py index ce1236c..75e0cc9 100644 --- a/gatox/cli/cli.py +++ b/gatox/cli/cli.py @@ -97,14 +97,19 @@ def validate_arguments(args, parser): re.match("gh[po]_[A-Za-z0-9]{36}$", gh_token) or re.match("^[a-fA-F0-9]{40}$", gh_token) ): - if re.match("gh[usr]_[A-Za-z0-9]{36}$", gh_token): - parser.error( - f"{Fore.RED}[!]{Style.RESET_ALL} Gato-X only" - " supports GitHub OAuth and Personal Access Tokens." - ) + if re.match("gh[s]_[A-Za-z0-9]{36}$", gh_token): + if not (args.machine and args.repository): + parser.error( + f"{Fore.RED}[!]{Style.RESET_ALL} Gato-X does" + " not support App tokens without machine flag." + ) + else: + Output.info( + "Allowing the use of a GitHub App token for single repo enumeration." + ) else: parser.error( - f"{Fore.RED}[!]{Style.RESET_ALL} Provided GitHub PAT is" " malformed!" + f"{Fore.RED}[!]{Style.RESET_ALL} Provided GitHub PAT is malformed or unsupported!" ) args_dict = vars(args) diff --git a/gatox/cli/enumeration/config.py b/gatox/cli/enumeration/config.py index 9641b5c..9ff79c1 100644 --- a/gatox/cli/enumeration/config.py +++ b/gatox/cli/enumeration/config.py @@ -75,6 +75,15 @@ def configure_parser_enumerate(parser): action="store_true", ) + parser.add_argument( + "--machine", + help=( + "Run with a GitHub App token, which will allow running single repository\n" + " enumeration with server-to-server or user-to-server tokens." + ), + action="store_true", + ) + parser.add_argument( "--output-json", "-oJ", diff --git a/gatox/enumerate/enumerate.py b/gatox/enumerate/enumerate.py index 232d997..260d137 100644 --- a/gatox/enumerate/enumerate.py +++ b/gatox/enumerate/enumerate.py @@ -65,6 +65,26 @@ def __init__( self.org_e = OrganizationEnum(self.api) def __setup_user_info(self): + """Sets up user/app token information.""" + if not self.user_perms and self.api.is_app_token(): + installation_info = self.api.get_installation_repos() + + if installation_info: + count = installation_info["total_count"] + if count > 0: + Output.info( + f"Gato-X is using valid a GitHub App installation token!" + ) + self.user_perms = { + "user": "Github App", + "scopes": [], + "name": "GATO-X App Mode", + } + + return True + else: + return False + if not self.user_perms: self.user_perms = self.api.check_user() if not self.user_perms: diff --git a/gatox/enumerate/ingest/ingest.py b/gatox/enumerate/ingest/ingest.py index d7d100f..e4412ea 100644 --- a/gatox/enumerate/ingest/ingest.py +++ b/gatox/enumerate/ingest/ingest.py @@ -182,6 +182,12 @@ def construct_workflow_cache(yml_results): cache.set_workflow(owner, yml_name, wf_wrapper) + # If we are using app installation tokens, then + # the query might return empty for this field, but if + # we are here then we can read. + if not result["viewerPermission"]: + result["viewerPermission"] = "READ" + repo_data = { "full_name": result["nameWithOwner"], "html_url": result["url"], @@ -221,6 +227,5 @@ def construct_workflow_cache(yml_results): if env["node"]["name"] != "github-pages" ] repo_data["environments"] = envs - repo_wrapper = Repository(repo_data) cache.set_repository(repo_wrapper) diff --git a/gatox/github/api.py b/gatox/github/api.py index c9351e7..039e71e 100644 --- a/gatox/github/api.py +++ b/gatox/github/api.py @@ -272,6 +272,10 @@ def __verify_result(response: requests.Response, expected_code: int): return False return True + def is_app_token(self): + """Returns if the API is using a GitHub App installation token.""" + return self.pat.startswith("ghs_") + def call_get(self, url: str, params: dict = None, strip_auth=False): """Internal method to wrap a GET request so that proxies and headers do not need to be repeated. @@ -1729,6 +1733,12 @@ def retrieve_raw_action(self, repo: str, file_path: str, ref: str): return None + def get_installation_repos(self): + """ """ + response = self.call_get("/installation/repositories") + if response.status_code == 200: + return response.json() + def get_commit_merge_date(self, repo: str, sha: str): """Gets the date of the merge commit.""" diff --git a/unit_test/test_cli.py b/unit_test/test_cli.py index b7b4b39..9c761aa 100644 --- a/unit_test/test_cli.py +++ b/unit_test/test_cli.py @@ -45,7 +45,26 @@ def test_cli_s2s_token(capfd): with pytest.raises(SystemExit): cli.cli(["enumerate", "-t", "test"]) out, err = capfd.readouterr() - assert "supports GitHub OAuth and Personal Access Tokens" in err + assert "not support App tokens without machine flag" in err + + +def test_cli_s2s_token_no_machine(capfd): + """Test case where a service-to-service token is provided.""" + os.environ["GH_TOKEN"] = "ghs_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + with pytest.raises(SystemExit): + cli.cli(["enumerate", "-r", "testOrg/testRepo"]) + out, err = capfd.readouterr() + assert "not support App tokens without machine flag" in err + + +def test_cli_s2s_token_machine(capfd): + """Test case where a service-to-service token is provided.""" + os.environ["GH_TOKEN"] = "ghs_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + + cli.cli(["enumerate", "-r", "testOrg/testRepo", "--machine"]) + out, err = capfd.readouterr() + assert "Allowing the use of a GitHub App token for single repo enumeration" in out def test_cli_u2s_token(capfd): @@ -55,7 +74,7 @@ def test_cli_u2s_token(capfd): with pytest.raises(SystemExit): cli.cli(["enumerate", "-t", "test"]) out, err = capfd.readouterr() - assert "supports GitHub OAuth and Personal Access Tokens" in err + assert "Provided GitHub PAT is malformed or unsupported" in err @mock.patch("gatox.cli.cli.Enumerator") diff --git a/unit_test/test_enumerate.py b/unit_test/test_enumerate.py index 56cd0e8..c815fe3 100644 --- a/unit_test/test_enumerate.py +++ b/unit_test/test_enumerate.py @@ -69,6 +69,9 @@ def test_init(mock_api): @patch("gatox.enumerate.enumerate.Api") def test_self_enumerate(mock_api, capsys): """Test constructor for enumerator.""" + + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo", "workflow"], @@ -104,6 +107,8 @@ def test_enumerate_repo_admin(mock_api, capsys): skip_log=False, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo", "workflow"], @@ -137,6 +142,8 @@ def test_enumerate_repo_admin_no_wf(mock_api, capsys): skip_log=False, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo"], @@ -170,6 +177,8 @@ def test_enumerate_repo_no_wf_no_admin(mock_api, capsys): skip_log=False, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo"], @@ -205,6 +214,8 @@ def test_enumerate_repo_no_wf_maintain(mock_api, capsys): skip_log=False, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo", "workflow"], @@ -239,6 +250,8 @@ def test_enumerate_repo_only(mock_api, capsys): skip_log=False, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["repo", "workflow"], @@ -268,6 +281,8 @@ def test_enum_validate(mock_api, capfd): "scopes": ["repo", "workflow"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_organizations.return_value = [] gh_enumeration_runner = Enumerator( @@ -293,6 +308,8 @@ def test_enum_repo(mock_api, mock_time, capfd): "scopes": ["repo", "workflow"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.get_repository.return_value = TEST_REPO_DATA gh_enumeration_runner = Enumerator( @@ -318,6 +335,8 @@ def test_enum_org(mock_api, mock_time, capfd): "scopes": ["repo", "workflow", "admin:org"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.get_repository.return_value = TEST_REPO_DATA mock_api.return_value.get_organization_details.return_value = TEST_ORG_DATA @@ -396,6 +415,8 @@ def test_enum_repo_runner(mock_api, capfd): "scopes": ["repo", "workflow"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.get_repo_runners.return_value = [ { "id": 2, @@ -457,6 +478,8 @@ def test_enum_repos(mock_api, mock_time, capfd): "scopes": ["repo", "workflow"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.get_repository.return_value = TEST_REPO_DATA gh_enumeration_runner = Enumerator( @@ -481,6 +504,8 @@ def test_enum_repos_empty(mock_api, capfd): "scopes": ["repo", "workflow"], } + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.get_repository.return_value = TEST_REPO_DATA gh_enumeration_runner = Enumerator( @@ -508,6 +533,8 @@ def test_bad_token(mock_api): skip_log=True, ) + mock_api.return_value.is_app_token.return_value = False + mock_api.return_value.check_user.return_value = None val = gh_enumeration_runner.self_enumeration() @@ -526,6 +553,7 @@ def test_unscoped_token(mock_api, capfd): skip_log=True, ) + mock_api.return_value.is_app_token.return_value = False mock_api.return_value.check_user.return_value = { "user": "testUser", "scopes": ["public_repo"],