Skip to content

Commit

Permalink
Remove unused var, add typehints
Browse files Browse the repository at this point in the history
  • Loading branch information
ablakley-r7 committed Jan 13, 2025
1 parent 3d1cdd2 commit caae2be
Showing 1 changed file with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime, timedelta, timezone
from insightconnect_plugin_runtime.exceptions import PluginException, APIException
from insightconnect_plugin_runtime.helper import hash_sha1
from typing import Any, Dict, Tuple, Union
from typing import Tuple, List, Dict, Any
from dataclasses import dataclass


Expand Down Expand Up @@ -101,7 +101,9 @@ def run(self, params={}, state={}, custom_config: dict = {}): # pylint: disable
###########################
# Make request
###########################
def get_alerts_palo_alto(self, state: dict, query_values: QueryValues, alert_limit: int):
def get_alerts_palo_alto(
self, state: dict, query_values: QueryValues, alert_limit: int
) -> Tuple[List[Dict[str, Any]], Dict, bool]:

post_body = self.build_post_body(
search_from=query_values.QUERY_SEARCH_FROM,
Expand All @@ -112,11 +114,11 @@ def get_alerts_palo_alto(self, state: dict, query_values: QueryValues, alert_lim

results, results_count, total_count = self.connection.xdr_api.get_response_alerts(post_body)

new_alerts, new_alert_hashes, last_alert_time = self._dedupe_and_get_highest_time(results, state)
new_alerts, new_alert_hashes = self._dedupe_and_get_highest_time(results, state)

state[CURRENT_COUNT] = state.get(CURRENT_COUNT, 0) + len(new_alerts)

is_paginating = results_count >= alert_limit
# We paginate if the total actual results returned in a page is equal to the alert limit
is_paginating = len(results) >= alert_limit

log_string = (
f"search_from = {query_values.QUERY_SEARCH_FROM} "
Expand Down Expand Up @@ -151,7 +153,7 @@ def _dedupe_and_get_highest_time(self, alerts: list, state: dict) -> Tuple[list,
:param alerts: list of alerts
:param state: state of the plugin
:return: list of unique alerts, list of new hashes, and the highest timestamp
:return: list of unique alerts, list of new hashes
"""
old_hashes = state.get(LAST_ALERT_HASH, [])
deduped_alerts = 0
Expand Down Expand Up @@ -185,9 +187,11 @@ def _dedupe_and_get_highest_time(self, alerts: list, state: dict) -> Tuple[list,
self.logger.debug(f"Last alert timestamp is {highest_timestamp}")
self.logger.debug(f"Last hash is {new_hashes}")

return new_alerts, new_hashes, highest_timestamp
return new_alerts, new_hashes

def calculate_query_values(self, custom_config: dict, now_unix: int, saved_state: dict, alert_limit: int):
def calculate_query_values(
self, custom_config: dict, now_unix: int, saved_state: dict, alert_limit: int
) -> QueryValues:
"""
Takes custom config from CPS and allows the specification of a new start time for alerts,
and allows the limit to be customised.
Expand Down Expand Up @@ -220,6 +224,7 @@ def calculate_query_values(self, custom_config: dict, now_unix: int, saved_state
)
self.logger.info("Adjusting start time to cutoff value")
start_time = max_lookback_unix
# Reset search_from and search_to if this is not a backfill
if not custom_config:
self.logger.info("Resetting search_from and search_to")
search_from = 0
Expand All @@ -235,7 +240,13 @@ def calculate_query_values(self, custom_config: dict, now_unix: int, saved_state
QUERY_SEARCH_TO=search_to,
)

def get_query_times(self, state, now_unix):
def get_query_times(self, state, now_unix) -> Tuple[int, int, datetime]:
"""
Get initial query times in unix for get alerts query, and max lookback date time
:param state:
:param now_unix:
:return: start time, end time, max lookback date time
"""
now_date_time = self.convert_unix_to_datetime(now_unix)

last_query_start_time = state.get(QUERY_START_TIME)
Expand All @@ -254,7 +265,14 @@ def get_query_times(self, state, now_unix):
###########################
# Custom Config
###########################
def _parse_custom_config(self, custom_config, now_datetime, start_time):
def _parse_custom_config(self, custom_config, now_datetime, start_time) -> Tuple[int, datetime]:
"""
Retrieve values from custom config
:param custom_config:
:param now_datetime:
:param start_time:
:return: start time and maxlookback time
"""
# Get custom config lookback value only if start_time in state is cleared
custom_timings = custom_config.get("lookback", {})
custom_date = custom_timings.get("date")
Expand Down Expand Up @@ -310,6 +328,7 @@ def build_post_body(self, search_from: int, search_to: int, start_time: int, end
# Datetime Helpers
###########################
def convert_unix_to_datetime(self, unix_time: int) -> datetime:
# Convert a unix timestamp to a datetime object
return datetime.fromtimestamp(unix_time / 1000, tz=timezone.utc)

def convert_datetime_to_unix(self, date_time: datetime) -> int:
Expand All @@ -319,7 +338,7 @@ def convert_datetime_to_unix(self, date_time: datetime) -> int:

return int(date_time.timestamp() * 1000)

def _get_current_time(self):
def _get_current_time(self) -> int:
# Gets the last 15 minutes in UNIX
last_15_min = datetime.utcnow() - timedelta(minutes=15)
return self.convert_datetime_to_unix(last_15_min)
Expand Down

0 comments on commit caae2be

Please sign in to comment.