From 36dba458bf0558f3768c5f07f6a0e9e689aa3a41 Mon Sep 17 00:00:00 2001 From: SG Date: Mon, 3 Apr 2023 15:46:40 -0600 Subject: [PATCH] work in progress for moving inotify to watchdog, see idaholab/Malcolm#168 --- Dockerfiles/pcap-monitor.Dockerfile | 5 + config/upload-common.env.example | 4 + pcap-monitor/supervisord.conf | 2 + .../demo/amazon_linux_2_malcolm_demo_setup.sh | 1 + shared/bin/pcap_watcher.py | 192 +++++++++++------- shared/bin/watch_common.py | 14 +- 6 files changed, 147 insertions(+), 71 deletions(-) diff --git a/Dockerfiles/pcap-monitor.Dockerfile b/Dockerfiles/pcap-monitor.Dockerfile index 151309e82..27f298e64 100644 --- a/Dockerfiles/pcap-monitor.Dockerfile +++ b/Dockerfiles/pcap-monitor.Dockerfile @@ -30,6 +30,8 @@ ARG PCAP_PATH=/pcap ARG PCAP_PIPELINE_DEBUG=false ARG PCAP_PIPELINE_DEBUG_EXTRA=false ARG PCAP_PIPELINE_IGNORE_PREEXISTING=false +ARG PCAP_PIPELINE_POLLING=false +ARG PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC=10 ARG PCAP_NODE_NAME=malcolm ARG ZEEK_PATH=/zeek @@ -39,6 +41,8 @@ ENV PCAP_PATH $PCAP_PATH ENV PCAP_PIPELINE_DEBUG $PCAP_PIPELINE_DEBUG ENV PCAP_PIPELINE_DEBUG_EXTRA $PCAP_PIPELINE_DEBUG_EXTRA ENV PCAP_PIPELINE_IGNORE_PREEXISTING $PCAP_PIPELINE_IGNORE_PREEXISTING +ENV PCAP_PIPELINE_POLLING $PCAP_PIPELINE_POLLING +ENV PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC $PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC ENV PCAP_NODE_NAME $PCAP_NODE_NAME ENV ZEEK_PATH $ZEEK_PATH @@ -71,6 +75,7 @@ ADD pcap-monitor/supervisord.conf /etc/supervisord.conf ADD pcap-monitor/scripts/ /usr/local/bin/ ADD shared/bin/pcap_watcher.py /usr/local/bin/ ADD shared/bin/pcap_utils.py /usr/local/bin/ +ADD shared/bin/watch_common.py /usr/local/bin/ ADD scripts/malcolm_utils.py /usr/local/bin/ EXPOSE 30441 diff --git a/config/upload-common.env.example b/config/upload-common.env.example index edc9d577d..e5891711f 100644 --- a/config/upload-common.env.example +++ b/config/upload-common.env.example @@ -10,6 +10,10 @@ PCAP_PIPELINE_DEBUG=false PCAP_PIPELINE_DEBUG_EXTRA=false # Whether or not PCAP files extant in ./pcap/ will be ignored on startup PCAP_PIPELINE_IGNORE_PREEXISTING=false +# Whether or not to use polling vs. native inotify API to watch for files +PCAP_PIPELINE_POLLING=false +# When polling, seconds of inactivity to assume a file is closed and ready for processing +PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC=10 # 'pcap-monitor' to match the name of the container providing the uploaded/captured PCAP file # monitoring service PCAP_MONITOR_HOST=pcap-monitor diff --git a/pcap-monitor/supervisord.conf b/pcap-monitor/supervisord.conf index a48fa7963..a4ad0d9f8 100644 --- a/pcap-monitor/supervisord.conf +++ b/pcap-monitor/supervisord.conf @@ -37,6 +37,8 @@ command=python3 /usr/local/bin/pcap_watcher.py --opensearch-wait --node "%(ENV_PCAP_NODE_NAME)s" --ignore-existing "%(ENV_PCAP_PIPELINE_IGNORE_PREEXISTING)s" + --polling "%(ENV_PCAP_PIPELINE_POLLING)s" + --closed-sec %(ENV_PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC)s --start-sleep 60 --directory "%(ENV_PCAP_PATH)s"/processed user=%(ENV_PUSER)s diff --git a/scripts/demo/amazon_linux_2_malcolm_demo_setup.sh b/scripts/demo/amazon_linux_2_malcolm_demo_setup.sh index da6beadbf..0670f2345 100755 --- a/scripts/demo/amazon_linux_2_malcolm_demo_setup.sh +++ b/scripts/demo/amazon_linux_2_malcolm_demo_setup.sh @@ -559,6 +559,7 @@ function InstallMalcolm { "LOGSTASH_REVERSE_DNS:'true'" "LOGSTASH_SEVERITY_SCORING:'true'" "PCAP_PIPELINE_IGNORE_PREEXISTING:'true'" + "PCAP_PIPELINE_POLLING:'true'" "YARA_MAX_REQUESTS:4" "ZEEK_AUTO_ANALYZE_PCAP_FILES:'true'" "ZEEK_DISABLE_BEST_GUESS_ICS:''" diff --git a/shared/bin/pcap_watcher.py b/shared/bin/pcap_watcher.py index f7eb13ceb..fb972a66e 100755 --- a/shared/bin/pcap_watcher.py +++ b/shared/bin/pcap_watcher.py @@ -16,7 +16,6 @@ import magic import os import pathlib -import pyinotify import signal import sys import time @@ -35,13 +34,19 @@ ) import malcolm_utils from malcolm_utils import eprint, str2bool, ParseCurlFile, remove_prefix, touch +import watch_common from collections import defaultdict +from multiprocessing.pool import ThreadPool from opensearchpy import OpenSearch, Search from opensearchpy.exceptions import ConnectionError, ConnectionTimeout from urllib3.exceptions import NewConnectionError +from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver +from watchdog.utils import WatchdogShutdown + ################################################################################################### MINIMUM_CHECKED_FILE_SIZE_DEFAULT = 24 MAXIMUM_CHECKED_FILE_SIZE_DEFAULT = 32 * 1024 * 1024 * 1024 @@ -61,21 +66,19 @@ scriptName = os.path.basename(__file__) scriptPath = os.path.dirname(os.path.realpath(__file__)) origPath = os.getcwd() -shuttingDown = False +shuttingDown = [False] DEFAULT_NODE_NAME = os.getenv('PCAP_NODE_NAME', 'malcolm') ################################################################################################### # watch files written to and moved to this directory -class EventWatcher(pyinotify.ProcessEvent): - # notify on files written in-place then closed (IN_CLOSE_WRITE), and moved into this directory (IN_MOVED_TO) - _methods = ["IN_CLOSE_WRITE", "IN_MOVED_TO"] - +class EventWatcher: def __init__(self): global args global opensearchHttpAuth global debug global verboseDebug + global shuttingDown super().__init__() @@ -88,7 +91,7 @@ def __init__(self): healthy = False # create the connection to OpenSearch - while (not connected) and (not shuttingDown): + while (not connected) and (not shuttingDown[0]): try: try: if debug: @@ -139,7 +142,7 @@ def __init__(self): break # if requested, wait for at least "yellow" health in the cluster for the "files" index - while connected and args.opensearchWaitForHealth and (not healthy) and (not shuttingDown): + while connected and args.opensearchWaitForHealth and (not healthy) and (not shuttingDown[0]): try: if debug: eprint(f"{scriptName}:\twaiting for OpenSearch to be healthy") @@ -181,31 +184,28 @@ def __init__(self): if debug: eprint(f"{scriptName}:\tEventWatcher initialized") - -################################################################################################### -# set up event processor to append processed events from to the event queue -def event_process_generator(cls, method): - # actual method called when we are notified of a file - def _method_name(self, event): + ################################################################################################### + # set up event processor to append processed events from to the event queue + def processFile(self, pathname): global args global debug global verboseDebug if debug: - eprint(f"{scriptName}:\t👓\t{event.pathname}") + eprint(f"{scriptName}:\t👓\t{pathname}") # the entity must be a regular PCAP file and actually exist - if (not event.dir) and os.path.isfile(event.pathname): + if os.path.isfile(pathname): # get the file magic description and mime type - fileMime = magic.from_file(event.pathname, mime=True) - fileType = magic.from_file(event.pathname) + fileMime = magic.from_file(pathname, mime=True) + fileType = magic.from_file(pathname) # get the file size, in bytes to compare against sane values - fileSize = os.path.getsize(event.pathname) + fileSize = os.path.getsize(pathname) if (args.minBytes <= fileSize <= args.maxBytes) and ( (fileMime in PCAP_MIME_TYPES) or ('pcap-ng' in fileType) ): - relativePath = remove_prefix(event.pathname, os.path.join(args.baseDir, '')) + relativePath = remove_prefix(pathname, os.path.join(args.baseDir, '')) # check with Arkime's files index in OpenSearch and make sure it's not a duplicate fileIsDuplicate = False @@ -225,15 +225,15 @@ def _method_name(self, event): if fileIsDuplicate: # this is duplicate file (it's been processed before) so ignore it if debug: - eprint(f"{scriptName}:\t📋\t{event.pathname}") + eprint(f"{scriptName}:\t📋\t{pathname}") else: # the entity is a right-sized non-duplicate file, and it exists, so send it to get processed if debug: - eprint(f"{scriptName}:\t📩\t{event.pathname}") + eprint(f"{scriptName}:\t📩\t{pathname}") try: fileInfo = { - FILE_INFO_DICT_NAME: event.pathname if args.includeAbsolutePath else relativePath, + FILE_INFO_DICT_NAME: pathname if args.includeAbsolutePath else relativePath, FILE_INFO_DICT_SIZE: fileSize, FILE_INFO_FILE_MIME: fileMime, FILE_INFO_FILE_TYPE: fileType, @@ -245,23 +245,24 @@ def _method_name(self, event): eprint(f"{scriptName}:\t📫\t{fileInfo}") except zmq.Again: if verboseDebug: - eprint(f"{scriptName}:\t🕑\t{event.pathname}") + eprint(f"{scriptName}:\t🕑\t{pathname}") else: # too small/big to care about, or the wrong type, ignore it if debug: - eprint(f"{scriptName}:\t✋\t{event.pathname}") + eprint(f"{scriptName}:\t✋\t{pathname}") - # assign process method to class - _method_name.__name__ = "process_{}".format(method) - setattr(cls, _method_name.__name__, _method_name) + +def file_processor(pathname, **kwargs): + if "watcher" in kwargs and kwargs["watcher"]: + kwargs["watcher"].processFile(pathname) ################################################################################################### # handle sigint/sigterm and set a global shutdown variable def shutdown_handler(signum, frame): global shuttingDown - shuttingDown = True + shuttingDown[0] = True ################################################################################################### @@ -430,6 +431,28 @@ def main(): type=str, required=False, ) + parser.add_argument( + '-p', + '--polling', + dest='polling', + help="Use polling (instead of inotify)", + metavar='true|false', + type=str2bool, + nargs='?', + const=True, + default=os.getenv('PCAP_PIPELINE_POLLING', False), + required=False, + ) + parser.add_argument( + '-c', + '--closed-sec', + dest='assumeClosedSec', + help="When polling, assume a file is closed after this many seconds of inactivity", + metavar='', + type=int, + default=int(os.getenv('PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT))), + required=False, + ) requiredNamed = parser.add_argument_group('required arguments') requiredNamed.add_argument( '-d', '--directory', dest='baseDir', help='Directory to monitor', metavar='', type=str, required=True @@ -474,21 +497,17 @@ def main(): # sleep for a bit if requested sleepCount = 0 - while (not shuttingDown) and (sleepCount < args.startSleepSec): + while (not shuttingDown[0]) and (sleepCount < args.startSleepSec): time.sleep(1) sleepCount += 1 - # add events to watch to EventWatcher class - for method in EventWatcher._methods: - event_process_generator(EventWatcher, method) - # if directory to monitor doesn't exist, create it now if os.path.isdir(args.baseDir): preexistingDir = True else: preexistingDir = False if debug: - eprint(f'{scriptName}: creating "{args.baseDir}" to monitor') + eprint(f'{scriptName}:\tcreating "{args.baseDir}" to monitor') pathlib.Path(args.baseDir).mkdir(parents=False, exist_ok=True) # if recursion was requested, get list of directories to monitor @@ -502,44 +521,81 @@ def main(): # begin threaded watch of path(s) time.sleep(1) - event_notifier_started = False - watch_manager = pyinotify.WatchManager() - event_notifier = pyinotify.ThreadedNotifier(watch_manager, EventWatcher()) + observer = PollingObserver() if args.polling else Observer() + handler = watch_common.FileOperationEventHandler( + logger=None, + polling=args.polling, + ) for watchDir in watchDirs: - watch_manager.add_watch(os.path.abspath(watchDir), pyinotify.ALL_EVENTS) - if debug: - eprint(f"{scriptName}: monitoring {watchDirs}") - time.sleep(2) - if not shuttingDown: - event_notifier.start() - event_notifier_started = True - - # if there are any previously included files (and not ignoreExisting), "touch" them so that they will be notified on - if preexistingDir and (not args.ignoreExisting) and (not shuttingDown): - filesTouched = 0 - for watchDir in watchDirs: - for preexistingFile in [os.path.join(watchDir, x) for x in pathlib.Path(watchDir).iterdir() if x.is_file()]: - touch(preexistingFile) - filesTouched += 1 - if debug and (filesTouched > 0): - eprint(f"{scriptName}: found {filesTouched} preexisting files to check") - - # loop forever, or until we're told to shut down, whichever comes first - while not shuttingDown: - if pdbFlagged: - pdbFlagged = False - breakpoint() - time.sleep(0.2) - - # graceful shutdown + if verboseDebug: + eprint(f"{scriptName}:\tScheduling {watchDir}") + observer.schedule(handler, watchDir, recursive=False) + + observer.start() + if debug: - eprint(f"{scriptName}: shutting down...") - if event_notifier_started: - event_notifier.stop() + eprint(f"{scriptName}:\tmonitoring {watchDirs}") + + try: + time.sleep(2) + + # if there are any previously included files (and not ignoreExisting), "touch" them so that they will be notified on + if preexistingDir and (not args.ignoreExisting) and (not shuttingDown[0]): + filesTouched = 0 + for watchDir in watchDirs: + for preexistingFile in [ + os.path.join(watchDir, x) for x in pathlib.Path(watchDir).iterdir() if x.is_file() + ]: + touch(preexistingFile) + filesTouched += 1 + if debug and (filesTouched > 0): + eprint(f"{scriptName}:\tfound {filesTouched} preexisting files to check") + + # start the thread to actually handle the files as they're queued by the FileOperationEventHandler handler + workerThreadCount = malcolm_utils.AtomicInt(value=0) + ThreadPool( + 1, + watch_common.ProcessFileEventWorker( + [ + handler, + observer, + file_processor, + {'watcher': EventWatcher()}, + args.assumeClosedSec, + workerThreadCount, + shuttingDown, + None, + ], + ), + ) + + # loop forever, or until we're told to shut down, whichever comes first + while (not shuttingDown[0]) and observer.is_alive(): + if pdbFlagged: + pdbFlagged = False + breakpoint() + observer.join(1) + + # graceful shutdown + if debug: + eprint(f"{scriptName}:\tshutting down...") + + if shuttingDown[0]: + raise WatchdogShutdown() + + except WatchdogShutdown: + observer.unschedule_all() + + finally: + observer.stop() + observer.join() + time.sleep(1) + while workerThreadCount.value() > 0: + time.sleep(1) if debug: - eprint(f"{scriptName}: finished monitoring {watchDirs}") + eprint(f"{scriptName}:\tfinished monitoring {watchDirs}") if __name__ == '__main__': diff --git a/shared/bin/watch_common.py b/shared/bin/watch_common.py index 6971e2fab..827dc19a5 100644 --- a/shared/bin/watch_common.py +++ b/shared/bin/watch_common.py @@ -27,6 +27,8 @@ from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver +ASSUME_CLOSED_SEC_DEFAULT = 10 + ################################################################################################### class FileOperationEventHandler(FileSystemEventHandler): @@ -100,7 +102,7 @@ def on_deleted(self, event): ################################################################################################### def ProcessFileEventWorker(workerArgs): - handler, observer, fileProcessor, assumeClosedSec, workerThreadCount, shutDown, logger = ( + handler, observer, fileProcessor, fileProcessorKwargs, assumeClosedSec, workerThreadCount, shutDown, logger = ( workerArgs[0], workerArgs[1], workerArgs[2], @@ -108,6 +110,7 @@ def ProcessFileEventWorker(workerArgs): workerArgs[4], workerArgs[5], workerArgs[6], + workerArgs[7], ) with workerThreadCount as workerId: @@ -125,8 +128,11 @@ def ProcessFileEventWorker(workerArgs): else: del d[fileName] if fileProcessor is not None: - fileProcessor(fileName) - logger.debug(f"processed {fileName} at {(nowTime-eventTime) if (eventTime > 0) else 0} seconds") + fileProcessor(fileName, func(**fileProcessorKwargs)) + if logger is not None: + logger.debug( + f"processed {fileName} at {(nowTime-eventTime) if (eventTime > 0) else 0} seconds" + ) time.sleep(1) if logger is not None: @@ -137,6 +143,7 @@ def WatchAndProcessDirectory( directories, polling, fileProcessor, + fileProcessorKwargs, assumeClosedSec, shuttingDown, logger, @@ -161,6 +168,7 @@ def WatchAndProcessDirectory( handler, observer, fileProcessor, + fileProcessorKwargs, assumeClosedSec, workerThreadCount, shuttingDown,