Skip to content

Commit

Permalink
Fix Santa ballot box bug
Browse files Browse the repository at this point in the history
The reset checks where not done per configuration, but globally.
  • Loading branch information
np5 committed Nov 25, 2024
1 parent 65f794e commit c5c7a85
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
58 changes: 58 additions & 0 deletions tests/santa/test_ballot_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,3 +824,61 @@ def test_ballot_box_target_state_reset(self):
self.assertEqual(rules_qs.count(), 0)
self.assertEqual(votes_qs.count(), 1)
self.assertEqual(votes_qs.first(), vote)

# update target states

def test_update_target_states(self):
realm, realm_user = force_realm_user()
configuration = force_configuration(voting_realm=realm)
force_ballot(
self.file_target, realm_user,
[(configuration, True, configuration.partially_allowlisted_threshold)]
)
target_state, _ = TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration,
state=TargetState.State.UNTRUSTED,
reset_at=datetime.utcnow()
)
configuration2 = force_configuration(voting_realm=realm)
# second target state in unrelated configurations must not interfere
TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration2,
score=configuration2.partially_allowlisted_threshold,
state=TargetState.State.PARTIALLY_ALLOWLISTED,
)
ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True)
ballot_box._update_target_states([(configuration, True)])
target_state.refresh_from_db()
self.assertEqual(target_state.score, 0)

# partially allowlist

def test_partially_allowlist_rules(self):
realm, realm_user = force_realm_user()
configuration = force_configuration(voting_realm=realm)
force_ballot(
self.file_target, realm_user,
[(configuration, True, configuration.partially_allowlisted_threshold)]
)
target_state, _ = TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration,
state=TargetState.State.UNTRUSTED,
reset_at=datetime.utcnow()
)
self.assertEqual(configuration.rule_set.count(), 0)
configuration2 = force_configuration(voting_realm=realm)
# second target state in unrelated configurations must not interfere
TargetState.objects.update_or_create(
target=self.file_target,
configuration=configuration2,
score=configuration2.partially_allowlisted_threshold,
state=TargetState.State.PARTIALLY_ALLOWLISTED,
)
ballot_box = BallotBox.for_realm_user(self.file_target, realm_user, all_configurations=True)
with self.assertRaises(AssertionError) as cm:
ballot_box._partially_allowlist(configuration)
self.assertEqual(cm.exception.args[0], "No primary users found")
self.assertEqual(configuration.rule_set.count(), 0)
11 changes: 9 additions & 2 deletions zentral/contrib/santa/ballot_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,10 @@ def _update_target_states(self, votes):
"select sum(v.weight * (case when v.was_yes_vote then 1 else -1 end)) "
"from santa_vote v "
"join santa_ballot b on (v.ballot_id = b.id) "
"left join santa_targetstate ts on (b.target_id = ts.target_id) "
"left join santa_targetstate ts on ("
" b.target_id = ts.target_id"
" and v.configuration_id = ts.configuration_id"
") "
"where b.target_id = %s and b.replaced_by_id is null "
"and (ts.reset_at is null or v.created_at > ts.reset_at) "
"and v.configuration_id = %s",
Expand Down Expand Up @@ -551,7 +554,10 @@ def _partially_allowlist(self, configuration):
"from santa_ballot b "
"left join realms_realmuser u on (b.realm_user_id = u.uuid) "
"join santa_vote v on (b.id = v.ballot_id) "
"join santa_targetstate ts on (b.target_id = ts.target_id) "
"join santa_targetstate ts on ("
" b.target_id = ts.target_id"
" and v.configuration_id = ts.configuration_id"
") "
"where b.target_id = %s "
"and b.replaced_by_id is null "
"and v.configuration_id = %s "
Expand All @@ -560,6 +566,7 @@ def _partially_allowlist(self, configuration):
[self.target.pk, configuration.pk]
)
primary_users = list(r[0] for r in cursor.fetchall() if r)
assert len(primary_users) > 0, "No primary users found"
self._update_or_create_rules(configuration, Rule.Policy.ALLOWLIST, primary_users)

def _blocklist(self, configuration):
Expand Down

0 comments on commit c5c7a85

Please sign in to comment.