Skip to content

Commit

Permalink
Add sync_type key in Santa preflight response
Browse files Browse the repository at this point in the history
  • Loading branch information
np5 committed Apr 5, 2024
1 parent 3e4bf4f commit 8abd292
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 13 deletions.
53 changes: 42 additions & 11 deletions tests/santa/test_santa_api_views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import json
from unittest.mock import patch
import uuid
Expand Down Expand Up @@ -43,7 +44,9 @@ def post_as_json(self, url, data):

# preflight

def _get_preflight_data(self, enrolled=False):
def _get_preflight_data(self, version=None, enrolled=False):
if version is None:
version = datetime.utcnow().strftime("%Y.2")
if enrolled:
serial_number = self.machine_serial_number
hardware_uuid = self.enrolled_machine.hardware_uuid
Expand All @@ -52,7 +55,7 @@ def _get_preflight_data(self, enrolled=False):
hardware_uuid = uuid.uuid4()
data = {
"os_build": "20C69",
"santa_version": "2022.1",
"santa_version": version,
"hostname": "hostname",
"os_version": "11.1",
"client_mode": "LOCKDOWN",
Expand Down Expand Up @@ -106,7 +109,7 @@ def test_preflight(self, post_event):
json_response = response.json()
self.assertEqual(json_response["client_mode"], Configuration.PREFLIGHT_MONITOR_MODE)
self.assertEqual(json_response["full_sync_interval"], Configuration.DEFAULT_FULL_SYNC_INTERVAL)
self.assertEqual(json_response["clean_sync"], True)
self.assertEqual(json_response["sync_type"], "clean")
self.assertTrue(json_response["blocked_path_regex"].startswith("NON_MATCHING_PLACEHOLDER_"))
self.assertTrue(json_response["allowed_path_regex"].startswith("NON_MATCHING_PLACEHOLDER_"))

Expand Down Expand Up @@ -211,33 +214,61 @@ def test_preflight_binary_rule_count_negative(self):
enrolled_machine = EnrolledMachine.objects.get(enrollment=self.enrollment, hardware_uuid=hardware_uuid)
self.assertEqual(enrolled_machine.binary_rule_count, 0)

def test_preflight_enrollment_clean_sync(self):
# enrollment, clean sync not requested → clean sync
data, serial_number, hardware_uuid = self._get_preflight_data()
def test_preflight_enrollment_clean_sync_true(self):
# enrollment, clean sync not requested, legacy Santa version → clean sync True
data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1")
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertTrue(json_response["clean_sync"])

def test_preflight_no_enrollment_no_clean_sync(self):
# no enrollment, clean sync not requested → no clean sync
def test_preflight_enrollment_sync_type_clean(self):
# enrollment, clean sync not requested → sync type clean
data, serial_number, hardware_uuid = self._get_preflight_data()
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertEqual(json_response["sync_type"], "clean")

def test_preflight_no_enrollment_clean_sync_false(self):
# no enrollment, clean sync not requested, legacy Santa version → clean sync False
data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1", enrolled=True)
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertFalse(json_response["clean_sync"])

def test_preflight_no_enrollment_sync_type_normal(self):
# no enrollment, clean sync not requested → sync type normal
data, serial_number, hardware_uuid = self._get_preflight_data(enrolled=True)
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertFalse(json_response.get("clean_sync", False))
self.assertEqual(json_response["sync_type"], "normal")

def test_preflight_no_enrollment_legacy_clean_sync_requested(self):
# no enrollment, clean sync requested, legacy Santa version → clean sync True
data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1", enrolled=True)
data["request_clean_sync"] = True
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertTrue(json_response["clean_sync"])

def test_preflight_no_enrollment_clean_sync_requested(self):
# no enrollment, clean sync requested → clean sync
# no enrollment, clean sync requested → sync type clean
data, serial_number, hardware_uuid = self._get_preflight_data(enrolled=True)
data["request_clean_sync"] = True
url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid))
response = self.post_as_json(url, data)
self.assertEqual(response.status_code, 200)
json_response = response.json()
self.assertTrue(json_response["clean_sync"])
self.assertEqual(json_response["sync_type"], "clean")

@patch("zentral.core.queues.backends.kombu.EventQueues.post_event")
def test_preflight_sync_not_ok_conf_without_severity_no_incident_update(self, post_event):
Expand Down
13 changes: 11 additions & 2 deletions zentral/contrib/santa/public_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,26 @@ def _commit_machine_snapshot(self):

def do_post(self):
self._commit_machine_snapshot()
comparable_santa_version = self.enrolled_machine.get_comparable_santa_version()

response_dict = self.enrolled_machine.enrollment.configuration.get_sync_server_config(
self.enrolled_machine.serial_number,
self.enrolled_machine.get_comparable_santa_version()
comparable_santa_version,
)

# clean sync?
clean_sync = self.request_data.get("request_clean_sync") or self.enrollment_action is not None
if clean_sync:
MachineRule.objects.filter(enrolled_machine=self.enrolled_machine).delete()
response_dict["clean_sync"] = True
if comparable_santa_version < (2024, 1):
response_dict["clean_sync"] = True
else:
response_dict["sync_type"] = "clean"
else:
if comparable_santa_version < (2024, 1):
response_dict["clean_sync"] = False
else:
response_dict["sync_type"] = "normal"

# sync incident update?
incident_update = None
Expand Down

0 comments on commit 8abd292

Please sign in to comment.