diff --git a/Dockerfiles/arkime.Dockerfile b/Dockerfiles/arkime.Dockerfile index b9026370d..76989be9d 100644 --- a/Dockerfiles/arkime.Dockerfile +++ b/Dockerfiles/arkime.Dockerfile @@ -142,6 +142,7 @@ RUN sed -i "s/bullseye main/bullseye main contrib non-free/g" /etc/apt/sources.l file \ geoip-bin \ gettext \ + inotify-tools \ jq \ libcap2-bin \ libjson-perl \ diff --git a/Dockerfiles/filebeat.Dockerfile b/Dockerfiles/filebeat.Dockerfile index 302d67438..e63eac2e8 100644 --- a/Dockerfiles/filebeat.Dockerfile +++ b/Dockerfiles/filebeat.Dockerfile @@ -17,7 +17,7 @@ ENV DEFAULT_GID $DEFAULT_GID ENV PUSER "filebeat" ENV PGROUP "filebeat" # not dropping privileges globally: supervisord will take care of it -# on a case-by-case basis so that one script (filebeat-watch-zeeklogs-uploads-folder.sh) +# on a case-by-case basis so that one script (filebeat-watch-zeeklogs-uploads-folder.py) # can chown uploaded files ENV PUSER_PRIV_DROP false @@ -37,6 +37,8 @@ ARG FILEBEAT_ZEEK_LOG_PATH="/zeek/current" ARG FILEBEAT_ZEEK_LOG_LIVE_PATH="/zeek/live" ARG FILEBEAT_SURICATA_LOG_PATH="/suricata" ARG FILEBEAT_NGINX_LOG_PATH="/nginx" +ARG FILEBEAT_WATCHER_POLLING=false +ARG FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC=10 ARG LOG_CLEANUP_MINUTES=0 ARG ZIP_CLEANUP_MINUTES=0 ARG NGINX_LOG_ACCESS_AND_ERRORS=false @@ -110,6 +112,7 @@ ADD filebeat/filebeat-nginx.yml /usr/share/filebeat-nginx/filebeat-nginx.yml ADD filebeat/filebeat-tcp.yml /usr/share/filebeat-tcp/filebeat-tcp.yml ADD filebeat/scripts /usr/local/bin/ ADD scripts/malcolm_utils.py /usr/local/bin/ +ADD shared/bin/watch_common.py /usr/local/bin/ ADD shared/bin/opensearch_status.sh /usr/local/bin/ ADD filebeat/supervisord.conf /etc/supervisord.conf RUN for INPUT in nginx tcp; do \ @@ -125,6 +128,8 @@ RUN for INPUT in nginx tcp; do \ ENV AUTO_TAG $AUTO_TAG ENV LOG_CLEANUP_MINUTES $LOG_CLEANUP_MINUTES ENV ZIP_CLEANUP_MINUTES $ZIP_CLEANUP_MINUTES +ENV FILEBEAT_WATCHER_POLLING $FILEBEAT_WATCHER_POLLING +ENV FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC $FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC ENV FILEBEAT_SCAN_FREQUENCY $FILEBEAT_SCAN_FREQUENCY ENV FILEBEAT_CLEAN_INACTIVE $FILEBEAT_CLEAN_INACTIVE ENV FILEBEAT_IGNORE_OLDER $FILEBEAT_IGNORE_OLDER diff --git a/config/filebeat.env.example b/config/filebeat.env.example index 6745b7e97..e0c9d9bda 100644 --- a/config/filebeat.env.example +++ b/config/filebeat.env.example @@ -9,6 +9,10 @@ FILEBEAT_CLOSE_RENAMED=true FILEBEAT_CLOSE_REMOVED=true FILEBEAT_CLOSE_EOF=true FILEBEAT_CLEAN_REMOVED=true +# Whether or not to use polling vs. native inotify API to watch for files +FILEBEAT_WATCHER_POLLING=false +# When polling, seconds of inactivity to assume a file is closed and ready for processing +FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC=10 # Whether or not to expose a filebeat TCP input listener (see # https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-tcp.html) FILEBEAT_TCP_LISTEN=false diff --git a/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.py b/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.py new file mode 100755 index 000000000..3809a0ee4 --- /dev/null +++ b/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved. + +################################################################################################### +# Monitor a directory for PCAP files for processing (by publishing their filenames to a ZMQ socket) +# +# Run the script with --help for options +################################################################################################### + +import argparse +import glob +import logging +import magic +import os +import pathlib +import signal +import sys +import time + +import malcolm_utils +from malcolm_utils import eprint, str2bool, touch +import watch_common + +################################################################################################### +scriptName = os.path.basename(__file__) +scriptPath = os.path.dirname(os.path.realpath(__file__)) +origPath = os.getcwd() +shuttingDown = [False] + +SUPPORTED_MIME_TYPES = [ + 'application/gzip', + 'application/x-gzip', + 'application/x-7z-compressed', + 'application/x-bzip2', + 'application/x-cpio', + 'application/x-lzip', + 'application/x-lzma', + 'application/x-rar-compressed', + 'application/x-tar', + 'application/x-xz', + 'application/zip', +] + + +################################################################################################### +# handle sigint/sigterm and set a global shutdown variable +def shutdown_handler(signum, frame): + global shuttingDown + shuttingDown[0] = True + + +################################################################################################### +def file_processor(pathname, **kwargs): + mime_types = kwargs["mime_types"] + uid = kwargs["uid"] + gid = kwargs["gid"] + destination = kwargs["destination"] + logger = kwargs["logger"] + + logger.debug(f"{scriptName}:\t👓\t{pathname}") + + if os.path.isfile(pathname) and os.path.isdir(destination): + time.sleep(0.1) + try + os.chown(pathname, uid, gid) + + # get the file magic mime type + fileMime = magic.from_file(pathname, mime=True) + + if fileMime in mime_types: + # looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat + logger.debug(f"{scriptName}:\t📩\t{pathname} ({fileMime})") + shutil.move(pathname, destination) + + else: + # unhandled file type uploaded, delete it + logger.info(f"{scriptName}:\t✋\t{pathname} ({fileMime})") + os.unlink(pathname) + + except Exception as genericError: + logger.warning(f"{scriptName}:\texception: {genericError}") + + +################################################################################################### +# main +def main(): + global shuttingDown + + parser = argparse.ArgumentParser( + description=scriptName, + add_help=False, + usage='{} '.format(scriptName), + ) + parser.add_argument( + '-r', + '--recursive-directory', + dest='recursiveDir', + help="If specified, monitor all directories with this name underneath --directory", + metavar='', + type=str, + required=False, + ) + parser.add_argument( + '-p', + '--polling', + dest='polling', + help="Use polling (instead of inotify)", + metavar='true|false', + type=str2bool, + nargs='?', + const=True, + default=os.getenv('FILEBEAT_WATCHER_POLLING', False), + required=False, + ) + parser.add_argument( + '-c', + '--closed-sec', + dest='assumeClosedSec', + help="When polling, assume a file is closed after this many seconds of inactivity", + metavar='', + type=int, + default=int( + os.getenv('FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT)) + ), + required=False, + ) + requiredNamed.add_argument( + '-i', + '--in', + dest='srcDir', + help='Source directory to monitor', + metavar='', + type=str, + default=os.path.join(os.getenv('FILEBEAT_ZEEK_DIR', '/zeek'), 'upload'), + required=False, + ) + requiredNamed.add_argument( + '-o', + '--out', + dest='dstDir', + help='Destination directory', + metavar='', + type=str, + default=os.getenv('FILEBEAT_ZEEK_DIR', '/zeek'), + required=False, + ) + requiredNamed.add_argument( + '-u', + '--uid', + dest='chownUid', + help='UID to chown files', + metavar='', + type=int, + default=int(os.getenv('PUID', os.getenv('DEFAULT_UID', '1000'))), + required=False, + ) + requiredNamed.add_argument( + '-g', + '--gid', + dest='chownGid', + help='UID to chown files', + metavar='', + type=int, + default=int(os.getenv('PGID', os.getenv('DEFAULT_GID', '1000'))), + required=False, + ) + parser.add_argument( + '--start-sleep', + dest='startSleepSec', + help="Sleep for this many seconds before starting", + metavar='', + type=int, + default=0, + required=False, + ) + + try: + parser.error = parser.exit + args = parser.parse_args() + except SystemExit: + parser.print_help() + exit(2) + + args.verbose = logging.ERROR - (10 * args.verbose) if args.verbose > 0 else 0 + logging.basicConfig( + level=args.verbose, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' + ) + logging.debug(os.path.join(scriptPath, scriptName)) + logging.debug("Arguments: {}".format(sys.argv[1:])) + logging.debug("Arguments: {}".format(args)) + if args.verbose > logging.DEBUG: + sys.tracebacklimit = 0 + + # handle sigint and sigterm for graceful shutdown + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal(signal.SIGUSR1, pdb_handler) + signal.signal(signal.SIGUSR2, debug_toggle_handler) + + # sleep for a bit if requested + sleepCount = 0 + while (not shuttingDown[0]) and (sleepCount < args.startSleepSec): + time.sleep(1) + sleepCount += 1 + + # if directory to monitor doesn't exist, create it now + if not os.path.isdir(args.srcDir): + if debug: + eprint(f'{scriptName}:\tcreating "{args.srcDir}" to monitor') + pathlib.Path(args.srcDir).mkdir(parents=False, exist_ok=True) + + # if recursion was requested, get list of directories to monitor + watchDirs = [] + while len(watchDirs) == 0: + if args.recursiveDir is None: + watchDirs = [args.srcDir] + else: + watchDirs = glob.glob(f'{args.srcDir}/**/{args.recursiveDir}', recursive=True) + + watch_common.WatchAndProcessDirectory( + watchDirs, + args.polling, + file_processor, + { + "logger": logging, + "destination": args.dstDir, + "uid": args.chownUid, + "gid": args.chownGid, + "mime_types": SUPPORTED_MIME_TYPES, + }, + args.assumeClosedSec, + shuttingDown, + logging, + ) + + +if __name__ == '__main__': + main() diff --git a/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.sh b/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.sh deleted file mode 100755 index 09fc1b484..000000000 --- a/filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved. - - -PROCESS_DIR=${FILEBEAT_ZEEK_DIR:-/zeek/} -UPLOAD_DIR="${PROCESS_DIR}/upload" -mkdir -p "$UPLOAD_DIR" - -# as new zeek log archives are closed for writing in /zeek/upload, move them to /zeek for processing -inotifywait -m -e close_write --format '%w%f' "${UPLOAD_DIR}" | while read NEWFILE -do - FILEMIME=$(file -b --mime-type "$NEWFILE") - if ( echo "$FILEMIME" | grep --quiet -P "(application/gzip|application/x-gzip|application/x-7z-compressed|application/x-bzip2|application/x-cpio|application/x-lzip|application/x-lzma|application/x-rar-compressed|application/x-tar|application/x-xz|application/zip)" ); then - # looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat - sleep 0.1 && chown ${PUID:-${DEFAULT_UID}}:${PGID:-${DEFAULT_GID}} "$NEWFILE" && (>&2 mv -v "$NEWFILE" "$PROCESS_DIR/") - else - # unhandled file type uploaded, delete it - sleep 0.1 && chown ${PUID:-${DEFAULT_UID}}:${PGID:-${DEFAULT_GID}} && (>&2 rm "$NEWFILE") && echo "Removed \"$NEWFILE\", unhandled file type \"$FILEMIME\"" - fi -done - diff --git a/filebeat/supervisord.conf b/filebeat/supervisord.conf index e19cf460a..f0b2ab382 100644 --- a/filebeat/supervisord.conf +++ b/filebeat/supervisord.conf @@ -65,7 +65,14 @@ stdout_logfile_maxbytes=0 redirect_stderr=true [program:watch-upload] -command=/bin/bash -c "sleep 30 && /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.sh" +command=python3 /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.py + --start-sleep 30 + --polling "%(ENV_FILEBEAT_WATCHER_POLLING)s" + --closed-sec %(ENV_FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC)s + --in "%(ENV_FILEBEAT_ZEEK_DIR)s"/upload + --out "%(ENV_FILEBEAT_ZEEK_DIR)s" + --uid %(ENV_PUID)s + --gid %(ENV_PGID)s user=root startsecs=35 startretries=1