Skip to content

Commit

Permalink
[pydriller] 2024-11-11T16:30:04+02:00
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Dec 17, 2024
1 parent 8725d64 commit 17260c0
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 58 deletions.
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@ ignore_missing_imports = True
[mypy-docx.*]
ignore_missing_imports = True

[mypy-pydriller.*]
ignore_missing_imports = True

[mypy-base62.*]
ignore_missing_imports = True
101 changes: 99 additions & 2 deletions credsweeper/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import base64
import binascii
import hashlib
import io
import logging
import os
import sys
import time
from argparse import ArgumentParser, ArgumentTypeError, Namespace
from typing import Any, Union, Optional, Dict
from typing import Any, Union, Optional, Dict, List, Tuple

from pydriller import Repository

from credsweeper import __version__
from credsweeper.app import APP_PATH, CredSweeper
Expand Down Expand Up @@ -116,6 +121,17 @@ def get_arguments() -> Namespace:
const="log.yaml",
dest="export_log_config",
metavar="PATH")
group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH")
parser.add_argument("--commits",
help="scan git repo for N commits only",
type=positive_int,
dest="commits",
default=0,
metavar="POSITIVE_INT")
parser.add_argument("--branch",
help="scan git repo for single branch, otherwise - all branches were scanned (slow)",
dest="branch",
type=str)
parser.add_argument("--rules",
help="path of rule config file (default: credsweeper/rules/config.yaml). "
f"severity:{[i.value for i in Severity]} "
Expand Down Expand Up @@ -316,9 +332,83 @@ def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Opt
return credsweeper.run(content_provider=content_provider)
except Exception as exc:
logger.critical(exc, exc_info=True)
logger.exception(exc)
return -1


def scan_git(args: Namespace) -> Tuple[int, int, int]:
"""Scan repository for branches and commits
Returns:
total credentials found
total scanned branches
total scanned commits
"""
total_credentials = 0
total_branches = 0
total_commits = 0
try:
sha1git = hashlib.sha1(str(args.git).encode()).digest()
repo_hash = base64.b32encode(sha1git).decode("ascii")
journal_filename = f"{repo_hash}.json"
logger.info(f"{args.git} sha1 in base32 {repo_hash}")
repo_journal = Util.json_load(journal_filename)
if not isinstance(repo_journal, dict):
with open(journal_filename, "w") as f:
f.write("{}")
repo_journal = {"repo": args.git}
credsweeper = CredSweeper(rule_path=args.rule_path,
config_path=args.config_path,
sort_output=args.sort_output,
use_filters=args.no_filters,
pool_count=args.jobs,
ml_batch_size=args.ml_batch_size,
ml_threshold=args.ml_threshold,
ml_providers=args.ml_providers,
find_by_ext=args.find_by_ext,
depth=args.depth,
doc=args.doc,
severity=args.severity,
size_limit=args.size_limit,
log_level=args.log)
repository = Repository(args.git, only_in_branch=args.branch)
for commit in repository.traverse_commits():
if commit.hash in repo_journal:
logger.debug(f"Skip already scanned commit: {commit.hash}")
continue
logger.info(f"Scan commit: {commit.hash}")
paths: List[Tuple[str, io.BytesIO]] = []
for file in commit.modified_files:
logger.info(f"FILE: {file.old_path} -> {file.new_path}")
try:
if file.new_path is not None:
_io = io.BytesIO(file.content)
paths.append((file.filename, _io))
except ValueError as exc:
logger.error("Possible missed submodule:%s", str(exc))
provider = FilesProvider(paths)
if args.json_filename:
ext = Util.get_extension(args.json_filename, False)
credsweeper.json_filename = f"{args.json_filename[:-len(ext)]}.{commit.hash}{ext}"
if args.xlsx_filename:
ext = Util.get_extension(args.xlsx_filename, False)
credsweeper.xlsx_filename = f"{args.xlsx_filename[:-len(ext)]}.{commit.hash}{ext}"

commit_cred_number = credsweeper.run(provider)
if credsweeper.is_ml_validator_inited:
# reset not-pickled object for multiprocess
credsweeper.ml_validator = None
credsweeper.credential_manager.candidates.clear()
total_credentials += commit_cred_number
total_commits += 1
repo_journal[commit.hash] = commit_cred_number
Util.json_dump(repo_journal, journal_filename)
total_branches += 1
except Exception as exc:
logger.critical(exc, exc_info=True)
return -1, total_branches, total_commits
return total_credentials, total_branches, total_commits


def main() -> int:
"""Main function"""
result = EXIT_FAILURE
Expand All @@ -327,7 +417,7 @@ def main() -> int:
if args.banner:
print(f"CredSweeper {__version__} crc32:{check_integrity():08x}")
Logger.init_logging(args.log, args.log_config_path)
logger.info(f"Init CredSweeper object with arguments: {args}")
logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}")
summary: Dict[str, int] = {}
if args.path:
logger.info(f"Run analyzer on path: {args.path}")
Expand All @@ -350,6 +440,13 @@ def main() -> int:
summary["Deleted File Credentials"] = del_credentials_number
if 0 <= add_credentials_number and 0 <= del_credentials_number:
result = EXIT_SUCCESS
elif args.git:
logger.info(f"Run analyzer on GIT: {args.git}")
credentials_number, branches_number, commits_number = scan_git(args)
summary[
f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number
if 0 <= credentials_number:
result = EXIT_SUCCESS
elif args.export_config:
logging.info(f"Exporting default config to file: {args.export_config}")
config_dict = Util.json_load(APP_PATH / "secret" / "config.json")
Expand Down
18 changes: 16 additions & 2 deletions credsweeper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self,
sort_output: bool = False,
use_filters: bool = True,
pool_count: int = 1,
ml_batch_size: Optional[int] = None,
ml_batch_size: Optional[int] = 16,
ml_threshold: Union[float, ThresholdPreset] = ThresholdPreset.medium,
ml_config: Union[None, str, Path] = None,
ml_model: Union[None, str, Path] = None,
Expand Down Expand Up @@ -183,11 +183,18 @@ def _use_ml_validation(self) -> bool:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

@property
def is_ml_validator_inited(self) -> bool:
"""method to check whether ml_validator was inited without creation"""
return bool(self.__ml_validator)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

@property
def ml_validator(self) -> MlValidator:
"""ml_validator getter"""
from credsweeper.ml_model import MlValidator
if not self.__ml_validator:
if not self.is_ml_validator_inited:
self.__ml_validator: MlValidator = MlValidator(
threshold=self.ml_threshold, #
ml_config=self.ml_config, #
Expand Down Expand Up @@ -364,6 +371,7 @@ def post_processing(self) -> None:
if ml_cred_groups:
logger.info(f"Run ML Validation for {len(ml_cred_groups)} groups")
is_cred, probability = self.ml_validator.validate_groups(ml_cred_groups, self.ml_batch_size)
logger.info(f"DONE ML Validation for {len(is_cred)} results")
for i, (_, group_candidates) in enumerate(ml_cred_groups):
for candidate in group_candidates:
if candidate.use_ml:
Expand All @@ -387,6 +395,12 @@ def export_results(self) -> None:

credentials = self.credential_manager.get_credentials()

if credentials:
logger.info(f"Exporting {len(credentials)} credentials")
else:
logger.info("No credentials were found")
return

if self.sort_output:
credentials.sort(key=lambda x: ( #
x.line_data_list[0].path, #
Expand Down
9 changes: 8 additions & 1 deletion docs/source/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ Get all argument list:
.. code-block:: text
usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH])
usage: python -m credsweeper [-h]
(--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...])
[--commits POSITIVE_INT] [--branch BRANCH]
[--rules PATH] [--severity SEVERITY] [--config PATH] [--log_config PATH] [--denylist PATH]
[--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR]
[--ml_batch_size POSITIVE_INT] [--ml_config PATH] [--ml_model PATH] [--ml_providers STR]
Expand All @@ -31,6 +33,11 @@ Get all argument list:
exporting default config to file (default: config.json)
--export_log_config [PATH]
exporting default logger config to file (default: log.yaml)
--git PATH [PATH ...]
git repo to scan
--commits POSITIVE_INT
scan git repo for N commits only
--branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow)
--rules PATH path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi']
--severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive)
--config PATH use custom config (default: built-in)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pandas==2.2.3; python_version >= '3.9'
password-strength==0.0.3.post2
pdfminer.six==20240706
pybase62==1.0.0
PyDriller==2.6
pyjks==20.0.0
python-dateutil==2.9.0.post0
python-docx==1.1.2
Expand Down
76 changes: 60 additions & 16 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import subprocess
import sys
import tempfile
from pathlib import Path
import time
from typing import AnyStr, Tuple
from unittest import TestCase

import deepdiff
import pytest
from git import Repo

from credsweeper.app import APP_PATH
from credsweeper.utils import Util
Expand All @@ -25,12 +27,12 @@ def setUp(self):

@staticmethod
def _m_credsweeper(args) -> Tuple[str, str]:
proc = subprocess.Popen(
with subprocess.Popen(
[sys.executable, "-m", "credsweeper", *args], #
cwd=APP_PATH.parent, #
stdout=subprocess.PIPE, #
stderr=subprocess.PIPE) #
_stdout, _stderr = proc.communicate()
cwd=APP_PATH.parent, #
stdout=subprocess.PIPE, #
stderr=subprocess.PIPE) as proc:
_stdout, _stderr = proc.communicate()

def transform(x: AnyStr) -> str:
if isinstance(x, bytes):
Expand Down Expand Up @@ -202,7 +204,10 @@ def test_it_works_n(self) -> None:
" | --diff_path PATH [PATH ...]" \
" | --export_config [PATH]" \
" | --export_log_config [PATH]" \
" | --git PATH [PATH ...]" \
")" \
" [--commits POSITIVE_INT]" \
" [--branch BRANCH]" \
" [--rules PATH]" \
" [--severity SEVERITY]" \
" [--config PATH]" \
Expand Down Expand Up @@ -234,6 +239,7 @@ def test_it_works_n(self) -> None:
" --diff_path" \
" --export_config" \
" --export_log_config" \
" --git" \
" is required "
expected = " ".join(expected.split())
self.assertEqual(expected, output)
Expand Down Expand Up @@ -333,7 +339,8 @@ def test_patch_save_json_p(self) -> None:
_stdout, _stderr = self._m_credsweeper(
["--diff_path", target_path, "--save-json", json_filename, "--log", "silence"])
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json")))
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json")))
# deleted patch contains no issues
self.assertFalse(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json")))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Expand Down Expand Up @@ -471,10 +478,7 @@ def test_find_by_ext_n(self) -> None:
json_filename = os.path.join(tmp_dir, f"{__name__}.json")
_stdout, _stderr = self._m_credsweeper(
["--path", tmp_dir, "--save-json", json_filename, "--log", "silence"])
self.assertTrue(os.path.exists(json_filename))
with open(json_filename, "r") as json_file:
report = json.load(json_file)
self.assertEqual(0, len(report))
self.assertFalse(os.path.exists(json_filename))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Expand Down Expand Up @@ -536,17 +540,29 @@ def test_denylist_p(self) -> None:
_stdout, _stderr = self._m_credsweeper([
"--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence"
])
with open(json_filename, "r") as json_file:
report = json.load(json_file)
self.assertEqual(0, len(report))
self.assertFalse(os.path.exists(json_filename))
with open(denylist_filename, "w") as f:
f.write('ghp_00000000000000000000000000000004WZ4EQ') # value only
f.write("abc")
_stdout, _stderr = self._m_credsweeper([
"--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence"
])
with open(json_filename, "r") as json_file:
report = json.load(json_file)
self.assertEqual(0, len(report))
self.assertEqual(1, len(report))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_denylist_line_p(self) -> None:
target_path = str(SAMPLES_PATH / "password.gradle")
with tempfile.TemporaryDirectory() as tmp_dir:
json_filename = os.path.join(tmp_dir, f"{__name__}.json")
denylist_filename = os.path.join(tmp_dir, "list.txt")
with open(denylist_filename, "w") as f:
f.write(' password = "cackle!" ')
_stdout, _stderr = self._m_credsweeper([
"--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence"
])
self.assertFalse(os.path.exists(json_filename))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Expand All @@ -560,6 +576,7 @@ def test_denylist_n(self) -> None:
_stdout, _stderr = self._m_credsweeper([
"--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence"
])
self.assertTrue(os.path.exists(json_filename))
with open(json_filename, "r") as json_file:
report = json.load(json_file)
self.assertEqual(1, len(report))
Expand All @@ -581,7 +598,7 @@ def test_rules_ml_p(self) -> None:
report_set = set([i["rule"] for i in report])
rules = Util.yaml_load(APP_PATH / "rules" / "config.yaml")
rules_set = set([i["name"] for i in rules])
missed = { #
missed = { # type: ignore
"ID_PASSWD_PAIR",
"SECRET_PAIR",
"IP_ID_PASSWORD_TRIPLE",
Expand Down Expand Up @@ -670,6 +687,33 @@ def test_doc_n(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_pydriller_p(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
with Repo.init(tmp_dir) as repo:
cred_file = Path(tmp_dir) / "with_cred"
value = "GbdD@23#d0"
with open(cred_file, "w") as f:
f.write(f"git_password: {value}")
repo.index.add([cred_file])
repo.index.commit("added file")
with open(cred_file, "w") as f:
f.write("DELETED")
repo.index.add([cred_file])
repo.index.commit("cleared file")
# check that value is not in the file
with open(cred_file, "r") as f:
self.assertNotIn(value, f.read())
# run git scan
_stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)])
self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout)
self.assertNotIn("CRITICAL", _stdout, _stdout)
self.assertNotIn("CRITICAL", _stderr, _stderr)
# check detected value in stdout
self.assertIn(value, _stdout, _stdout)
# del repo

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_external_ml_n(self) -> None:
# not existed ml_config
_stdout, _stderr = self._m_credsweeper(
Expand Down
Loading

0 comments on commit 17260c0

Please sign in to comment.