diff --git a/filebeat/supervisord.conf b/filebeat/supervisord.conf index 78faf96e1..f0b2ab382 100644 --- a/filebeat/supervisord.conf +++ b/filebeat/supervisord.conf @@ -69,7 +69,7 @@ command=python3 /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.py --start-sleep 30 --polling "%(ENV_FILEBEAT_WATCHER_POLLING)s" --closed-sec %(ENV_FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC)s - --in "%(ENV_FILEBEAT_ZEEK_DIR)s"upload + --in "%(ENV_FILEBEAT_ZEEK_DIR)s"/upload --out "%(ENV_FILEBEAT_ZEEK_DIR)s" --uid %(ENV_PUID)s --gid %(ENV_PGID)s diff --git a/pcap-monitor/scripts/watch-pcap-uploads-folder.py b/pcap-monitor/scripts/watch-pcap-uploads-folder.py new file mode 100755 index 000000000..a69ee8792 --- /dev/null +++ b/pcap-monitor/scripts/watch-pcap-uploads-folder.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved. + +################################################################################################### +# Monitor a directory for PCAP files for processing (by publishing their filenames to a ZMQ socket) +# +# Run the script with --help for options +################################################################################################### + +import argparse +import glob +import logging +import magic +import os +import pathlib +import shutil +import signal +import sys +import time + +import malcolm_utils +from malcolm_utils import eprint, str2bool, remove_suffix +import watch_common + +################################################################################################### +scriptName = os.path.basename(__file__) +scriptPath = os.path.dirname(os.path.realpath(__file__)) +origPath = os.getcwd() +shuttingDown = [False] + + +################################################################################################### +# handle sigint/sigterm and set a global shutdown variable +def shutdown_handler(signum, frame): + global shuttingDown + shuttingDown[0] = True + + +################################################################################################### +def file_processor(pathname, **kwargs): + uid = kwargs["uid"] + gid = kwargs["gid"] + pcapDir = kwargs["destination"] + zeekDir = kwargs["zeek"] + logger = kwargs["logger"] + + logger.debug(f"{scriptName}:\tšŸ‘“\t{pathname}") + + if os.path.isfile(pathname): + time.sleep(0.1) + try: + os.chown(pathname, uid, gid) + + # get the file magic mime type + fileMime = magic.from_file(pathname, mime=True) + fileType = magic.from_file(pathname) + + if os.path.isdir(pcapDir) and ( + (fileMime in ('application/vnd.tcpdump.pcap', 'application/x-pcapng')) or ('pcap-ng' in fileType) + ): + # a pcap file to be processed by dropping it into pcapDir + logger.debug(f"{scriptName}:\tšŸ–…\t{pathname} ({fileMime}/{fileType}) to {pcapDir}") + shutil.move(pathname, pcapDir) + + elif os.path.isdir(zeekDir) and ( + fileMime + in [ + 'application/gzip', + 'application/x-gzip', + 'application/x-7z-compressed', + 'application/x-bzip2', + 'application/x-cpio', + 'application/x-lzip', + 'application/x-lzma', + 'application/x-rar-compressed', + 'application/x-tar', + 'application/x-xz', + 'application/zip', + ] + ): + # looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat + logger.debug(f"{scriptName}:\tšŸ–…\t{pathname} ({fileMime}/{fileType}) to {zeekDir}") + shutil.move(pathname, zeekDir) + + else: + # unhandled file type uploaded, delete it + logger.info(f"{scriptName}:\tšŸ—‘\t{pathname} ({fileMime}/{fileType})") + os.unlink(pathname) + + except Exception as genericError: + logger.warning(f"{scriptName}:\texception: {genericError}") + + +################################################################################################### +# main +def main(): + global shuttingDown + + parser = argparse.ArgumentParser( + description=scriptName, + add_help=False, + usage='{} '.format(scriptName), + ) + parser.add_argument('--verbose', '-v', action='count', default=1, help='Increase verbosity (e.g., -v, -vv, etc.)') + parser.add_argument( + '-r', + '--recursive-directory', + dest='recursiveDir', + help="If specified, monitor all directories with this name underneath --directory", + metavar='', + type=str, + required=False, + ) + parser.add_argument( + '-p', + '--polling', + dest='polling', + help="Use polling (instead of inotify)", + metavar='true|false', + type=str2bool, + nargs='?', + const=True, + default=os.getenv('PCAP_PIPELINE_POLLING', False), + required=False, + ) + parser.add_argument( + '-c', + '--closed-sec', + dest='assumeClosedSec', + help="When polling, assume a file is closed after this many seconds of inactivity", + metavar='', + type=int, + default=int(os.getenv('PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT))), + required=False, + ) + parser.add_argument( + '-i', + '--in', + dest='srcDir', + help='Source directory to monitor', + metavar='', + type=str, + default=os.path.join(remove_suffix(os.getenv('PCAP_PATH', '/pcap'), '/'), 'upload'), + required=False, + ) + parser.add_argument( + '-o', + '--out', + dest='dstDir', + help='Destination directory', + metavar='', + type=str, + default=os.path.join(remove_suffix(os.getenv('PCAP_PATH', '/pcap'), '/'), 'processed'), + required=False, + ) + parser.add_argument( + '-z', + '--zeek', + dest='zeekDir', + help='Zeek upload directory', + metavar='', + type=str, + default=os.path.join(remove_suffix(os.getenv('ZEEK_PATH', '/zeek'), '/'), 'upload'), + required=False, + ) + parser.add_argument( + '-u', + '--uid', + dest='chownUid', + help='UID to chown files', + metavar='', + type=int, + default=int(os.getenv('PUID', os.getenv('DEFAULT_UID', '1000'))), + required=False, + ) + parser.add_argument( + '-g', + '--gid', + dest='chownGid', + help='UID to chown files', + metavar='', + type=int, + default=int(os.getenv('PGID', os.getenv('DEFAULT_GID', '1000'))), + required=False, + ) + parser.add_argument( + '--start-sleep', + dest='startSleepSec', + help="Sleep for this many seconds before starting", + metavar='', + type=int, + default=0, + required=False, + ) + + try: + parser.error = parser.exit + args = parser.parse_args() + except SystemExit: + parser.print_help() + exit(2) + + args.verbose = logging.ERROR - (10 * args.verbose) if args.verbose > 0 else 0 + logging.basicConfig( + level=args.verbose, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' + ) + logging.debug(os.path.join(scriptPath, scriptName)) + logging.debug("Arguments: {}".format(sys.argv[1:])) + logging.debug("Arguments: {}".format(args)) + if args.verbose > logging.DEBUG: + sys.tracebacklimit = 0 + + # handle sigint and sigterm for graceful shutdown + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + + # sleep for a bit if requested + sleepCount = 0 + while (not shuttingDown[0]) and (sleepCount < args.startSleepSec): + time.sleep(1) + sleepCount += 1 + + args.dstDir = remove_suffix(args.dstDir, '/') + args.srcDir = remove_suffix(args.srcDir, '/') + args.zeekDir = remove_suffix(args.zeekDir, '/') + + # if directory to monitor doesn't exist, create it now + if not os.path.isdir(args.srcDir): + if debug: + eprint(f'{scriptName}:\tcreating "{args.srcDir}" to monitor') + pathlib.Path(args.srcDir).mkdir(parents=False, exist_ok=True) + + # if recursion was requested, get list of directories to monitor + watchDirs = [] + while len(watchDirs) == 0: + if args.recursiveDir is None: + watchDirs = [args.srcDir] + else: + watchDirs = glob.glob(f'{args.srcDir}/**/{args.recursiveDir}', recursive=True) + + watch_common.WatchAndProcessDirectory( + watchDirs, + args.polling, + file_processor, + { + "logger": logging, + "destination": args.dstDir, + "zeek": args.zeekDir, + "uid": args.chownUid, + "gid": args.chownGid, + }, + args.assumeClosedSec, + shuttingDown, + logging, + ) + + +if __name__ == '__main__': + main() diff --git a/pcap-monitor/scripts/watch-pcap-uploads-folder.sh b/pcap-monitor/scripts/watch-pcap-uploads-folder.sh deleted file mode 100755 index a72d088d8..000000000 --- a/pcap-monitor/scripts/watch-pcap-uploads-folder.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved. - -PCAP_BASE_PATH=${PCAP_PATH:-"/pcap"} -ZEEK_BASE_PATH=${ZEEK_PATH:-"/zeek"} - -PROCESS_DIR="$PCAP_BASE_PATH/processed" -PCAP_UPLOAD_DIR="$PCAP_BASE_PATH/upload" -ZEEK_UPLOAD_DIR="$ZEEK_BASE_PATH/upload" -mkdir -p "$PCAP_UPLOAD_DIR" - -# as new pcaps are closed for writing in /pcap/upload, move them to /pcap/processed for processing -inotifywait -m -e close_write --format '%w%f' "${PCAP_UPLOAD_DIR}" | while read NEWFILE -do - FILEMAGIC=$(file -b "$NEWFILE") - FILEMIME=$(file -b --mime-type "$NEWFILE") - if [[ "$FILEMIME" == 'application/vnd.tcpdump.pcap' ]] || [[ "$FILEMIME" == 'application/x-pcapng' ]] || [[ "$FILEMAGIC" == *"pcap-ng"* ]]; then - # a pcap file to be processed by dropping it into $PROCESS_DIR - sleep 0.1 && chown ${PUID:-${DEFAULT_UID}}:${PGID:-${DEFAULT_GID}} "$NEWFILE" && (>&2 mv -v "$NEWFILE" "$PROCESS_DIR/") - - elif [[ -d "$ZEEK_UPLOAD_DIR" ]] && ( echo "$FILEMIME" | grep --quiet -P "(application/gzip|application/x-gzip|application/x-7z-compressed|application/x-bzip2|application/x-cpio|application/x-lzip|application/x-lzma|application/x-rar-compressed|application/x-tar|application/x-xz|application/zip)" ); then - # looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat - sleep 0.1 && chown ${PUID:-${DEFAULT_UID}}:${PGID:-${DEFAULT_GID}} "$NEWFILE" && (>&2 mv -v "$NEWFILE" "$ZEEK_UPLOAD_DIR/") - - else - # unhandled file type uploaded, delete it - sleep 0.1 && (>&2 rm "$NEWFILE") - echo "Removed \"$NEWFILE\", unhandled file type \"$FILEMIME\"" - - fi -done diff --git a/pcap-monitor/supervisord.conf b/pcap-monitor/supervisord.conf index a4ad0d9f8..50c232c33 100644 --- a/pcap-monitor/supervisord.conf +++ b/pcap-monitor/supervisord.conf @@ -18,7 +18,15 @@ supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface serverurl=unix:///tmp/supervisor.sock [program:watch-upload] -command=/bin/bash -c "sleep 30 && /usr/local/bin/watch-pcap-uploads-folder.sh" +command=python3 /usr/local/bin/watch-pcap-uploads-folder.py + --start-sleep 30 + --polling "%(ENV_PCAP_PIPELINE_POLLING)s" + --closed-sec %(ENV_PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC)s + --in "%(ENV_PCAP_PATH)s"/upload + --out "%(ENV_PCAP_PATH)s"/processed + --zeek "%(ENV_ZEEK_PATH)s"/upload + --uid %(ENV_PUID)s + --gid %(ENV_PGID)s startsecs=35 startretries=1 stopasgroup=true