diff --git a/tests/santa/test_ballot_box.py b/tests/santa/test_ballot_box.py index 544f56a983..622a8bea9c 100644 --- a/tests/santa/test_ballot_box.py +++ b/tests/santa/test_ballot_box.py @@ -2,7 +2,7 @@ from django.test import TestCase from django.utils.crypto import get_random_string from zentral.contrib.santa.ballot_box import (AnonymousVoter, BallotBox, DuplicateVoteError, - Voter, VotingError, VotingNotAllowedError) + ResetNotAllowedError, Voter, VotingError, VotingNotAllowedError) from zentral.contrib.santa.models import Rule, Target, TargetState from .utils import (add_file_to_test_class, force_ballot, force_configuration, force_enrolled_machine, force_realm_user, force_target, force_voting_group) @@ -619,3 +619,49 @@ def test_ballot_box_allowlist_metabundle(self): self.assertEqual(rule.target.identifier, self.file_signing_id) self.assertEqual(rule.policy, Rule.Policy.ALLOWLIST) self.assertEqual(len(rule.primary_users), 0) + + # target state reset + + def test_ballot_box_target_state_reset_not_allowed(self): + realm, realm_user = force_realm_user() + configuration = force_configuration(voting_realm=realm) + force_voting_group(configuration, realm_user, can_unflag_target=True) + ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True) + with self.assertRaises(ResetNotAllowedError): + ballot_box.reset_target_state(configuration) + + def test_ballot_box_target_state_reset(self): + realm, realm_user = force_realm_user() + configuration = force_configuration(voting_realm=realm) + rules_qs = configuration.rule_set.all() + self.assertEqual(rules_qs.count(), 0) + votes_qs = configuration.vote_set.all() + self.assertEqual(votes_qs.count(), 0) + force_voting_group(configuration, realm_user, + ballot_target_types=[Target.Type.BINARY], + can_reset_target=True, voting_weight=100) + ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True) + ballot_box.cast_default_votes(False, None) + self.assertEqual(rules_qs.count(), 1) + rule = rules_qs.first() + self.assertEqual(rule.target, self.file_target) + self.assertEqual(rule.policy, Rule.Policy.BLOCKLIST) + self.assertEqual(votes_qs.count(), 1) + vote = votes_qs.first() + self.assertEqual(vote.weight, 100) + self.assertFalse(vote.was_yes_vote) + ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True) + ts = ballot_box.target_states[configuration] + self.assertEqual(ts.state, TargetState.State.BANNED) + self.assertTrue(ts.flagged) + self.assertEqual(ts.score, -100) + self.assertIsNone(ts.reset_at) + ballot_box.reset_target_state(configuration) + ts.refresh_from_db() + self.assertEqual(ts.state, TargetState.State.UNTRUSTED) + self.assertFalse(ts.flagged) + self.assertEqual(ts.score, 0) + self.assertIsNotNone(ts.reset_at) + self.assertEqual(rules_qs.count(), 0) + self.assertEqual(votes_qs.count(), 1) + self.assertEqual(votes_qs.first(), vote) diff --git a/tests/santa/test_targets_views.py b/tests/santa/test_targets_views.py index c7cd974437..806ce9fad5 100644 --- a/tests/santa/test_targets_views.py +++ b/tests/santa/test_targets_views.py @@ -1,17 +1,24 @@ from datetime import datetime, timedelta from functools import reduce +from importlib import import_module import operator from unittest.mock import patch from django.contrib.auth.models import Group, Permission +from django.conf import settings from django.db.models import Q +from django.http import HttpRequest from django.urls import reverse from django.test import TestCase, override_settings from django.utils.crypto import get_random_string from accounts.models import User +from realms.backends.views import finalize_session +from realms.models import RealmAuthenticationSession from zentral.contrib.inventory.models import Source, File from zentral.contrib.santa.models import Target, TargetCounter, TargetState from zentral.core.stores.conf import frontend_store -from .utils import add_file_to_test_class, force_ballot, force_configuration, force_realm_user, new_sha256 +from .utils import (add_file_to_test_class, force_ballot, force_configuration, + force_realm, force_realm_user, force_voting_group, + new_sha256) @override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage') @@ -22,6 +29,8 @@ def setUpTestData(cls): cls.user = User.objects.create_user("godzilla", "godzilla@zentral.io", get_random_string(12)) cls.group = Group.objects.create(name=get_random_string(12)) cls.user.groups.set([cls.group]) + cls.realm = force_realm(enabled_for_login=True) + _, cls.realm_user = force_realm_user(realm=cls.realm, username=cls.user.username, email=cls.user.email) # file tree add_file_to_test_class(cls) @@ -31,7 +40,7 @@ def _login_redirect(self, url): response = self.client.get(url) self.assertRedirects(response, "{u}?next={n}".format(u=reverse("login"), n=url)) - def _login(self, *permissions): + def _login(self, *permissions, realm_user=False): if permissions: permission_filter = reduce(operator.or_, ( Q(content_type__app_label=app_label, codename=codename) @@ -43,7 +52,36 @@ def _login(self, *permissions): self.group.permissions.set(list(Permission.objects.filter(permission_filter))) else: self.group.permissions.clear() - self.client.force_login(self.user) + if not realm_user: + self.client.force_login(self.user) + else: + # see https://github.com/django/django/blob/705066d186ce880bf64142e47084f3d8df3c2352/django/test/client.py#L785 # NOQA + request = HttpRequest() + # HACK + # see https://github.com/django/django/blob/705066d186ce880bf64142e47084f3d8df3c2352/django/contrib/auth/__init__.py#L141-L142 # NOQA + # so that the user is attached to the request. The realm callback expects a user on the request! + request.user = None + if self.client.session: + request.session = self.client.session + else: + engine = import_module(settings.SESSION_ENGINE) + request.session = engine.SessionStore() + ras = RealmAuthenticationSession.objects.create( + realm=self.realm, + callback="realms.utils.login_callback", + ) + finalize_session(ras, request, self.realm_user) + request.session.save() + session_cookie = settings.SESSION_COOKIE_NAME + self.client.cookies[session_cookie] = request.session.session_key + cookie_data = { + "max-age": None, + "path": "/", + "domain": settings.SESSION_COOKIE_DOMAIN, + "secure": settings.SESSION_COOKIE_SECURE or None, + "expires": None, + } + self.client.cookies[session_cookie].update(cookie_data) # targets @@ -809,3 +847,64 @@ def test_signing_id_target_store_redirect(self): response = self.client.get(reverse("santa:signingid_events_store_redirect", args=(self.file_signing_id,)), {"es": frontend_store.name}) self.assertTrue(response.url.startswith("/kibana/")) + + # reset target state + + def test_reset_target_state_redirect(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login_redirect(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk))) + + def test_reset_target_state_permission_denied(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login() + response = self.client.get(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk))) + self.assertEqual(response.status_code, 403) + + def test_reset_target_state_get_not_allowed(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login("santa.view_target", realm_user=True) + response = self.client.get(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk))) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "santa/targetstate_reset.html") + self.assertEqual(response.context["configuration"], configuration) + self.assertContains(response, 'id="reset-target-state" type="submit" disabled>') + + def test_reset_target_state_get_allowed(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login("santa.view_target", realm_user=True) + force_voting_group(configuration, self.realm_user, can_reset_target=True) + response = self.client.get(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk))) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "santa/targetstate_reset.html") + self.assertEqual(response.context["configuration"], configuration) + self.assertContains(response, 'id="reset-target-state" type="submit">') + + def test_rest_target_state_post_not_allowed(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login("santa.view_target", realm_user=True) + response = self.client.post(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk)), + follow=True) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "santa/target_detail.html") + self.assertContains(response, "Target state reset not allowed") + ts.refresh_from_db() + self.assertIsNone(ts.reset_at) + + def test_rest_target_state_post_allowed(self): + configuration = force_configuration() + ts = TargetState.objects.create(configuration=configuration, target=self.file_target) + self._login("santa.view_target", realm_user=True) + force_voting_group(configuration, self.realm_user, can_reset_target=True) + response = self.client.post(reverse("santa:reset_target_state", args=(configuration.pk, ts.pk)), + follow=True) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, "santa/target_detail.html") + self.assertContains(response, "Target state reset") + self.assertNotContains(response, "Target state reset not allowed") + ts.refresh_from_db() + self.assertIsNotNone(ts.reset_at) diff --git a/zentral/contrib/santa/ballot_box.py b/zentral/contrib/santa/ballot_box.py index f415a87aa2..65f901095c 100644 --- a/zentral/contrib/santa/ballot_box.py +++ b/zentral/contrib/santa/ballot_box.py @@ -1,3 +1,4 @@ +from datetime import datetime import logging from django.db import connection from django.db.models import Q @@ -9,7 +10,15 @@ logger = logging.getLogger("zentral.contrib.santa.ballot_box") -class VotingError(Exception): +class BallotBoxError(Exception): + pass + + +class ResetNotAllowedError(BallotBoxError): + pass + + +class VotingError(BallotBoxError): pass @@ -209,7 +218,7 @@ def publisher_info(self): def existing_ballot(self): if self.voter.is_anonymous: return None - #TODO decide what happens if we only have the user_uid + # TODO decide what happens if we only have the user_uid try: return Ballot.objects.get(target=self.target, realm_user=self.voter.realm_user, replaced_by__isnull=True) except Ballot.DoesNotExist: @@ -263,7 +272,7 @@ def check_voting_allowed_for_configuration(self, configuration, yes_vote): configuration, (Target.Type.CERTIFICATE, Target.Type.TEAM_ID) ): - #TODO better perm? + # TODO better perm? if ( rel_target_state["state"] == TargetState.State.BANNED and not self.voter.can_reset_target(configuration) @@ -410,6 +419,16 @@ def _update_target_state_state(self, target_state, score): return True return False + def reset_target_state(self, configuration): + if not self.voter.can_reset_target(configuration): + raise ResetNotAllowedError + target_state = self.target_states[configuration] + target_state.score = 0 + target_state.flagged = False + self._update_target_state_state(target_state, 0) + target_state.reset_at = datetime.utcnow() + target_state.save() + # rules def _iter_rule_targets(self): diff --git a/zentral/contrib/santa/templates/santa/_ballot_list.html b/zentral/contrib/santa/templates/santa/_ballot_list.html index 1a168b4061..4268384ce5 100644 --- a/zentral/contrib/santa/templates/santa/_ballot_list.html +++ b/zentral/contrib/santa/templates/santa/_ballot_list.html @@ -38,10 +38,10 @@ {{ ballot.user_uid }} {% endif %} - +