Skip to content

Commit

Permalink
work in progress for moving inotify to watchdog, see #168
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Apr 3, 2023
1 parent d719ba6 commit 36dba45
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 71 deletions.
5 changes: 5 additions & 0 deletions Dockerfiles/pcap-monitor.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ ARG PCAP_PATH=/pcap
ARG PCAP_PIPELINE_DEBUG=false
ARG PCAP_PIPELINE_DEBUG_EXTRA=false
ARG PCAP_PIPELINE_IGNORE_PREEXISTING=false
ARG PCAP_PIPELINE_POLLING=false
ARG PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC=10
ARG PCAP_NODE_NAME=malcolm
ARG ZEEK_PATH=/zeek

Expand All @@ -39,6 +41,8 @@ ENV PCAP_PATH $PCAP_PATH
ENV PCAP_PIPELINE_DEBUG $PCAP_PIPELINE_DEBUG
ENV PCAP_PIPELINE_DEBUG_EXTRA $PCAP_PIPELINE_DEBUG_EXTRA
ENV PCAP_PIPELINE_IGNORE_PREEXISTING $PCAP_PIPELINE_IGNORE_PREEXISTING
ENV PCAP_PIPELINE_POLLING $PCAP_PIPELINE_POLLING
ENV PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC $PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC
ENV PCAP_NODE_NAME $PCAP_NODE_NAME
ENV ZEEK_PATH $ZEEK_PATH

Expand Down Expand Up @@ -71,6 +75,7 @@ ADD pcap-monitor/supervisord.conf /etc/supervisord.conf
ADD pcap-monitor/scripts/ /usr/local/bin/
ADD shared/bin/pcap_watcher.py /usr/local/bin/
ADD shared/bin/pcap_utils.py /usr/local/bin/
ADD shared/bin/watch_common.py /usr/local/bin/
ADD scripts/malcolm_utils.py /usr/local/bin/

EXPOSE 30441
Expand Down
4 changes: 4 additions & 0 deletions config/upload-common.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ PCAP_PIPELINE_DEBUG=false
PCAP_PIPELINE_DEBUG_EXTRA=false
# Whether or not PCAP files extant in ./pcap/ will be ignored on startup
PCAP_PIPELINE_IGNORE_PREEXISTING=false
# Whether or not to use polling vs. native inotify API to watch for files
PCAP_PIPELINE_POLLING=false
# When polling, seconds of inactivity to assume a file is closed and ready for processing
PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC=10
# 'pcap-monitor' to match the name of the container providing the uploaded/captured PCAP file
# monitoring service
PCAP_MONITOR_HOST=pcap-monitor
Expand Down
2 changes: 2 additions & 0 deletions pcap-monitor/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ command=python3 /usr/local/bin/pcap_watcher.py
--opensearch-wait
--node "%(ENV_PCAP_NODE_NAME)s"
--ignore-existing "%(ENV_PCAP_PIPELINE_IGNORE_PREEXISTING)s"
--polling "%(ENV_PCAP_PIPELINE_POLLING)s"
--closed-sec %(ENV_PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC)s
--start-sleep 60
--directory "%(ENV_PCAP_PATH)s"/processed
user=%(ENV_PUSER)s
Expand Down
1 change: 1 addition & 0 deletions scripts/demo/amazon_linux_2_malcolm_demo_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ function InstallMalcolm {
"LOGSTASH_REVERSE_DNS:'true'"
"LOGSTASH_SEVERITY_SCORING:'true'"
"PCAP_PIPELINE_IGNORE_PREEXISTING:'true'"
"PCAP_PIPELINE_POLLING:'true'"
"YARA_MAX_REQUESTS:4"
"ZEEK_AUTO_ANALYZE_PCAP_FILES:'true'"
"ZEEK_DISABLE_BEST_GUESS_ICS:''"
Expand Down
192 changes: 124 additions & 68 deletions shared/bin/pcap_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import magic
import os
import pathlib
import pyinotify
import signal
import sys
import time
Expand All @@ -35,13 +34,19 @@
)
import malcolm_utils
from malcolm_utils import eprint, str2bool, ParseCurlFile, remove_prefix, touch
import watch_common

from collections import defaultdict
from multiprocessing.pool import ThreadPool

from opensearchpy import OpenSearch, Search
from opensearchpy.exceptions import ConnectionError, ConnectionTimeout
from urllib3.exceptions import NewConnectionError

from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.utils import WatchdogShutdown

###################################################################################################
MINIMUM_CHECKED_FILE_SIZE_DEFAULT = 24
MAXIMUM_CHECKED_FILE_SIZE_DEFAULT = 32 * 1024 * 1024 * 1024
Expand All @@ -61,21 +66,19 @@
scriptName = os.path.basename(__file__)
scriptPath = os.path.dirname(os.path.realpath(__file__))
origPath = os.getcwd()
shuttingDown = False
shuttingDown = [False]
DEFAULT_NODE_NAME = os.getenv('PCAP_NODE_NAME', 'malcolm')


###################################################################################################
# watch files written to and moved to this directory
class EventWatcher(pyinotify.ProcessEvent):
# notify on files written in-place then closed (IN_CLOSE_WRITE), and moved into this directory (IN_MOVED_TO)
_methods = ["IN_CLOSE_WRITE", "IN_MOVED_TO"]

class EventWatcher:
def __init__(self):
global args
global opensearchHttpAuth
global debug
global verboseDebug
global shuttingDown

super().__init__()

Expand All @@ -88,7 +91,7 @@ def __init__(self):
healthy = False

# create the connection to OpenSearch
while (not connected) and (not shuttingDown):
while (not connected) and (not shuttingDown[0]):
try:
try:
if debug:
Expand Down Expand Up @@ -139,7 +142,7 @@ def __init__(self):
break

# if requested, wait for at least "yellow" health in the cluster for the "files" index
while connected and args.opensearchWaitForHealth and (not healthy) and (not shuttingDown):
while connected and args.opensearchWaitForHealth and (not healthy) and (not shuttingDown[0]):
try:
if debug:
eprint(f"{scriptName}:\twaiting for OpenSearch to be healthy")
Expand Down Expand Up @@ -181,31 +184,28 @@ def __init__(self):
if debug:
eprint(f"{scriptName}:\tEventWatcher initialized")


###################################################################################################
# set up event processor to append processed events from to the event queue
def event_process_generator(cls, method):
# actual method called when we are notified of a file
def _method_name(self, event):
###################################################################################################
# set up event processor to append processed events from to the event queue
def processFile(self, pathname):
global args
global debug
global verboseDebug

if debug:
eprint(f"{scriptName}:\t👓\t{event.pathname}")
eprint(f"{scriptName}:\t👓\t{pathname}")

# the entity must be a regular PCAP file and actually exist
if (not event.dir) and os.path.isfile(event.pathname):
if os.path.isfile(pathname):
# get the file magic description and mime type
fileMime = magic.from_file(event.pathname, mime=True)
fileType = magic.from_file(event.pathname)
fileMime = magic.from_file(pathname, mime=True)
fileType = magic.from_file(pathname)

# get the file size, in bytes to compare against sane values
fileSize = os.path.getsize(event.pathname)
fileSize = os.path.getsize(pathname)
if (args.minBytes <= fileSize <= args.maxBytes) and (
(fileMime in PCAP_MIME_TYPES) or ('pcap-ng' in fileType)
):
relativePath = remove_prefix(event.pathname, os.path.join(args.baseDir, ''))
relativePath = remove_prefix(pathname, os.path.join(args.baseDir, ''))

# check with Arkime's files index in OpenSearch and make sure it's not a duplicate
fileIsDuplicate = False
Expand All @@ -225,15 +225,15 @@ def _method_name(self, event):
if fileIsDuplicate:
# this is duplicate file (it's been processed before) so ignore it
if debug:
eprint(f"{scriptName}:\t📋\t{event.pathname}")
eprint(f"{scriptName}:\t📋\t{pathname}")

else:
# the entity is a right-sized non-duplicate file, and it exists, so send it to get processed
if debug:
eprint(f"{scriptName}:\t📩\t{event.pathname}")
eprint(f"{scriptName}:\t📩\t{pathname}")
try:
fileInfo = {
FILE_INFO_DICT_NAME: event.pathname if args.includeAbsolutePath else relativePath,
FILE_INFO_DICT_NAME: pathname if args.includeAbsolutePath else relativePath,
FILE_INFO_DICT_SIZE: fileSize,
FILE_INFO_FILE_MIME: fileMime,
FILE_INFO_FILE_TYPE: fileType,
Expand All @@ -245,23 +245,24 @@ def _method_name(self, event):
eprint(f"{scriptName}:\t📫\t{fileInfo}")
except zmq.Again:
if verboseDebug:
eprint(f"{scriptName}:\t🕑\t{event.pathname}")
eprint(f"{scriptName}:\t🕑\t{pathname}")

else:
# too small/big to care about, or the wrong type, ignore it
if debug:
eprint(f"{scriptName}:\t\t{event.pathname}")
eprint(f"{scriptName}:\t\t{pathname}")

# assign process method to class
_method_name.__name__ = "process_{}".format(method)
setattr(cls, _method_name.__name__, _method_name)

def file_processor(pathname, **kwargs):
if "watcher" in kwargs and kwargs["watcher"]:
kwargs["watcher"].processFile(pathname)


###################################################################################################
# handle sigint/sigterm and set a global shutdown variable
def shutdown_handler(signum, frame):
global shuttingDown
shuttingDown = True
shuttingDown[0] = True


###################################################################################################
Expand Down Expand Up @@ -430,6 +431,28 @@ def main():
type=str,
required=False,
)
parser.add_argument(
'-p',
'--polling',
dest='polling',
help="Use polling (instead of inotify)",
metavar='true|false',
type=str2bool,
nargs='?',
const=True,
default=os.getenv('PCAP_PIPELINE_POLLING', False),
required=False,
)
parser.add_argument(
'-c',
'--closed-sec',
dest='assumeClosedSec',
help="When polling, assume a file is closed after this many seconds of inactivity",
metavar='<seconds>',
type=int,
default=int(os.getenv('PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT))),
required=False,
)
requiredNamed = parser.add_argument_group('required arguments')
requiredNamed.add_argument(
'-d', '--directory', dest='baseDir', help='Directory to monitor', metavar='<directory>', type=str, required=True
Expand Down Expand Up @@ -474,21 +497,17 @@ def main():

# sleep for a bit if requested
sleepCount = 0
while (not shuttingDown) and (sleepCount < args.startSleepSec):
while (not shuttingDown[0]) and (sleepCount < args.startSleepSec):
time.sleep(1)
sleepCount += 1

# add events to watch to EventWatcher class
for method in EventWatcher._methods:
event_process_generator(EventWatcher, method)

# if directory to monitor doesn't exist, create it now
if os.path.isdir(args.baseDir):
preexistingDir = True
else:
preexistingDir = False
if debug:
eprint(f'{scriptName}: creating "{args.baseDir}" to monitor')
eprint(f'{scriptName}:\tcreating "{args.baseDir}" to monitor')
pathlib.Path(args.baseDir).mkdir(parents=False, exist_ok=True)

# if recursion was requested, get list of directories to monitor
Expand All @@ -502,44 +521,81 @@ def main():
# begin threaded watch of path(s)
time.sleep(1)

event_notifier_started = False
watch_manager = pyinotify.WatchManager()
event_notifier = pyinotify.ThreadedNotifier(watch_manager, EventWatcher())
observer = PollingObserver() if args.polling else Observer()
handler = watch_common.FileOperationEventHandler(
logger=None,
polling=args.polling,
)
for watchDir in watchDirs:
watch_manager.add_watch(os.path.abspath(watchDir), pyinotify.ALL_EVENTS)
if debug:
eprint(f"{scriptName}: monitoring {watchDirs}")
time.sleep(2)
if not shuttingDown:
event_notifier.start()
event_notifier_started = True

# if there are any previously included files (and not ignoreExisting), "touch" them so that they will be notified on
if preexistingDir and (not args.ignoreExisting) and (not shuttingDown):
filesTouched = 0
for watchDir in watchDirs:
for preexistingFile in [os.path.join(watchDir, x) for x in pathlib.Path(watchDir).iterdir() if x.is_file()]:
touch(preexistingFile)
filesTouched += 1
if debug and (filesTouched > 0):
eprint(f"{scriptName}: found {filesTouched} preexisting files to check")

# loop forever, or until we're told to shut down, whichever comes first
while not shuttingDown:
if pdbFlagged:
pdbFlagged = False
breakpoint()
time.sleep(0.2)

# graceful shutdown
if verboseDebug:
eprint(f"{scriptName}:\tScheduling {watchDir}")
observer.schedule(handler, watchDir, recursive=False)

observer.start()

if debug:
eprint(f"{scriptName}: shutting down...")
if event_notifier_started:
event_notifier.stop()
eprint(f"{scriptName}:\tmonitoring {watchDirs}")

try:
time.sleep(2)

# if there are any previously included files (and not ignoreExisting), "touch" them so that they will be notified on
if preexistingDir and (not args.ignoreExisting) and (not shuttingDown[0]):
filesTouched = 0
for watchDir in watchDirs:
for preexistingFile in [
os.path.join(watchDir, x) for x in pathlib.Path(watchDir).iterdir() if x.is_file()
]:
touch(preexistingFile)
filesTouched += 1
if debug and (filesTouched > 0):
eprint(f"{scriptName}:\tfound {filesTouched} preexisting files to check")

# start the thread to actually handle the files as they're queued by the FileOperationEventHandler handler
workerThreadCount = malcolm_utils.AtomicInt(value=0)
ThreadPool(
1,
watch_common.ProcessFileEventWorker(
[
handler,
observer,
file_processor,
{'watcher': EventWatcher()},
args.assumeClosedSec,
workerThreadCount,
shuttingDown,
None,
],
),
)

# loop forever, or until we're told to shut down, whichever comes first
while (not shuttingDown[0]) and observer.is_alive():
if pdbFlagged:
pdbFlagged = False
breakpoint()
observer.join(1)

# graceful shutdown
if debug:
eprint(f"{scriptName}:\tshutting down...")

if shuttingDown[0]:
raise WatchdogShutdown()

except WatchdogShutdown:
observer.unschedule_all()

finally:
observer.stop()
observer.join()

time.sleep(1)
while workerThreadCount.value() > 0:
time.sleep(1)

if debug:
eprint(f"{scriptName}: finished monitoring {watchDirs}")
eprint(f"{scriptName}:\tfinished monitoring {watchDirs}")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 36dba45

Please sign in to comment.