diff --git a/tests/santa/test_santa_api_views.py b/tests/santa/test_santa_api_views.py index db446bedcd..41d882c054 100644 --- a/tests/santa/test_santa_api_views.py +++ b/tests/santa/test_santa_api_views.py @@ -1,3 +1,4 @@ +from datetime import datetime import json from unittest.mock import patch import uuid @@ -43,7 +44,9 @@ def post_as_json(self, url, data): # preflight - def _get_preflight_data(self, enrolled=False): + def _get_preflight_data(self, version=None, enrolled=False): + if version is None: + version = datetime.utcnow().strftime("%Y.2") if enrolled: serial_number = self.machine_serial_number hardware_uuid = self.enrolled_machine.hardware_uuid @@ -52,7 +55,7 @@ def _get_preflight_data(self, enrolled=False): hardware_uuid = uuid.uuid4() data = { "os_build": "20C69", - "santa_version": "2022.1", + "santa_version": version, "hostname": "hostname", "os_version": "11.1", "client_mode": "LOCKDOWN", @@ -106,7 +109,7 @@ def test_preflight(self, post_event): json_response = response.json() self.assertEqual(json_response["client_mode"], Configuration.PREFLIGHT_MONITOR_MODE) self.assertEqual(json_response["full_sync_interval"], Configuration.DEFAULT_FULL_SYNC_INTERVAL) - self.assertEqual(json_response["clean_sync"], True) + self.assertEqual(json_response["sync_type"], "clean") self.assertTrue(json_response["blocked_path_regex"].startswith("NON_MATCHING_PLACEHOLDER_")) self.assertTrue(json_response["allowed_path_regex"].startswith("NON_MATCHING_PLACEHOLDER_")) @@ -211,33 +214,61 @@ def test_preflight_binary_rule_count_negative(self): enrolled_machine = EnrolledMachine.objects.get(enrollment=self.enrollment, hardware_uuid=hardware_uuid) self.assertEqual(enrolled_machine.binary_rule_count, 0) - def test_preflight_enrollment_clean_sync(self): - # enrollment, clean sync not requested → clean sync - data, serial_number, hardware_uuid = self._get_preflight_data() + def test_preflight_enrollment_clean_sync_true(self): + # enrollment, clean sync not requested, legacy Santa version → clean sync True + data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1") url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) response = self.post_as_json(url, data) self.assertEqual(response.status_code, 200) json_response = response.json() self.assertTrue(json_response["clean_sync"]) - def test_preflight_no_enrollment_no_clean_sync(self): - # no enrollment, clean sync not requested → no clean sync + def test_preflight_enrollment_sync_type_clean(self): + # enrollment, clean sync not requested → sync type clean + data, serial_number, hardware_uuid = self._get_preflight_data() + url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) + response = self.post_as_json(url, data) + self.assertEqual(response.status_code, 200) + json_response = response.json() + self.assertEqual(json_response["sync_type"], "clean") + + def test_preflight_no_enrollment_clean_sync_false(self): + # no enrollment, clean sync not requested, legacy Santa version → clean sync False + data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1", enrolled=True) + url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) + response = self.post_as_json(url, data) + self.assertEqual(response.status_code, 200) + json_response = response.json() + self.assertFalse(json_response["clean_sync"]) + + def test_preflight_no_enrollment_sync_type_normal(self): + # no enrollment, clean sync not requested → sync type normal data, serial_number, hardware_uuid = self._get_preflight_data(enrolled=True) url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) response = self.post_as_json(url, data) self.assertEqual(response.status_code, 200) json_response = response.json() - self.assertFalse(json_response.get("clean_sync", False)) + self.assertEqual(json_response["sync_type"], "normal") + + def test_preflight_no_enrollment_legacy_clean_sync_requested(self): + # no enrollment, clean sync requested, legacy Santa version → clean sync True + data, serial_number, hardware_uuid = self._get_preflight_data(version="2022.1", enrolled=True) + data["request_clean_sync"] = True + url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) + response = self.post_as_json(url, data) + self.assertEqual(response.status_code, 200) + json_response = response.json() + self.assertTrue(json_response["clean_sync"]) def test_preflight_no_enrollment_clean_sync_requested(self): - # no enrollment, clean sync requested → clean sync + # no enrollment, clean sync requested → sync type clean data, serial_number, hardware_uuid = self._get_preflight_data(enrolled=True) data["request_clean_sync"] = True url = reverse("santa_public:preflight", args=(self.enrollment_secret.secret, hardware_uuid)) response = self.post_as_json(url, data) self.assertEqual(response.status_code, 200) json_response = response.json() - self.assertTrue(json_response["clean_sync"]) + self.assertEqual(json_response["sync_type"], "clean") @patch("zentral.core.queues.backends.kombu.EventQueues.post_event") def test_preflight_sync_not_ok_conf_without_severity_no_incident_update(self, post_event): diff --git a/zentral/contrib/santa/public_views.py b/zentral/contrib/santa/public_views.py index 37d0c67192..235ce0b02b 100644 --- a/zentral/contrib/santa/public_views.py +++ b/zentral/contrib/santa/public_views.py @@ -263,17 +263,26 @@ def _commit_machine_snapshot(self): def do_post(self): self._commit_machine_snapshot() + comparable_santa_version = self.enrolled_machine.get_comparable_santa_version() response_dict = self.enrolled_machine.enrollment.configuration.get_sync_server_config( self.enrolled_machine.serial_number, - self.enrolled_machine.get_comparable_santa_version() + comparable_santa_version, ) # clean sync? clean_sync = self.request_data.get("request_clean_sync") or self.enrollment_action is not None if clean_sync: MachineRule.objects.filter(enrolled_machine=self.enrolled_machine).delete() - response_dict["clean_sync"] = True + if comparable_santa_version < (2024, 1): + response_dict["clean_sync"] = True + else: + response_dict["sync_type"] = "clean" + else: + if comparable_santa_version < (2024, 1): + response_dict["clean_sync"] = False + else: + response_dict["sync_type"] = "normal" # sync incident update? incident_update = None