Skip to content

Commit

Permalink
work in progress for #168
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Apr 5, 2023
1 parent cca2aa3 commit 93583a3
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 24 deletions.
1 change: 1 addition & 0 deletions Dockerfiles/arkime.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ RUN sed -i "s/bullseye main/bullseye main contrib non-free/g" /etc/apt/sources.l
file \
geoip-bin \
gettext \
inotify-tools \
jq \
libcap2-bin \
libjson-perl \
Expand Down
7 changes: 6 additions & 1 deletion Dockerfiles/filebeat.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ENV DEFAULT_GID $DEFAULT_GID
ENV PUSER "filebeat"
ENV PGROUP "filebeat"
# not dropping privileges globally: supervisord will take care of it
# on a case-by-case basis so that one script (filebeat-watch-zeeklogs-uploads-folder.sh)
# on a case-by-case basis so that one script (filebeat-watch-zeeklogs-uploads-folder.py)
# can chown uploaded files
ENV PUSER_PRIV_DROP false

Expand All @@ -37,6 +37,8 @@ ARG FILEBEAT_ZEEK_LOG_PATH="/zeek/current"
ARG FILEBEAT_ZEEK_LOG_LIVE_PATH="/zeek/live"
ARG FILEBEAT_SURICATA_LOG_PATH="/suricata"
ARG FILEBEAT_NGINX_LOG_PATH="/nginx"
ARG FILEBEAT_WATCHER_POLLING=false
ARG FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC=10
ARG LOG_CLEANUP_MINUTES=0
ARG ZIP_CLEANUP_MINUTES=0
ARG NGINX_LOG_ACCESS_AND_ERRORS=false
Expand Down Expand Up @@ -110,6 +112,7 @@ ADD filebeat/filebeat-nginx.yml /usr/share/filebeat-nginx/filebeat-nginx.yml
ADD filebeat/filebeat-tcp.yml /usr/share/filebeat-tcp/filebeat-tcp.yml
ADD filebeat/scripts /usr/local/bin/
ADD scripts/malcolm_utils.py /usr/local/bin/
ADD shared/bin/watch_common.py /usr/local/bin/
ADD shared/bin/opensearch_status.sh /usr/local/bin/
ADD filebeat/supervisord.conf /etc/supervisord.conf
RUN for INPUT in nginx tcp; do \
Expand All @@ -125,6 +128,8 @@ RUN for INPUT in nginx tcp; do \
ENV AUTO_TAG $AUTO_TAG
ENV LOG_CLEANUP_MINUTES $LOG_CLEANUP_MINUTES
ENV ZIP_CLEANUP_MINUTES $ZIP_CLEANUP_MINUTES
ENV FILEBEAT_WATCHER_POLLING $FILEBEAT_WATCHER_POLLING
ENV FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC $FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC
ENV FILEBEAT_SCAN_FREQUENCY $FILEBEAT_SCAN_FREQUENCY
ENV FILEBEAT_CLEAN_INACTIVE $FILEBEAT_CLEAN_INACTIVE
ENV FILEBEAT_IGNORE_OLDER $FILEBEAT_IGNORE_OLDER
Expand Down
4 changes: 4 additions & 0 deletions config/filebeat.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ FILEBEAT_CLOSE_RENAMED=true
FILEBEAT_CLOSE_REMOVED=true
FILEBEAT_CLOSE_EOF=true
FILEBEAT_CLEAN_REMOVED=true
# Whether or not to use polling vs. native inotify API to watch for files
FILEBEAT_WATCHER_POLLING=false
# When polling, seconds of inactivity to assume a file is closed and ready for processing
FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC=10
# Whether or not to expose a filebeat TCP input listener (see
# https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-tcp.html)
FILEBEAT_TCP_LISTEN=false
Expand Down
240 changes: 240 additions & 0 deletions filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved.

###################################################################################################
# Monitor a directory for PCAP files for processing (by publishing their filenames to a ZMQ socket)
#
# Run the script with --help for options
###################################################################################################

import argparse
import glob
import logging
import magic
import os
import pathlib
import signal
import sys
import time

import malcolm_utils
from malcolm_utils import eprint, str2bool, touch
import watch_common

###################################################################################################
scriptName = os.path.basename(__file__)
scriptPath = os.path.dirname(os.path.realpath(__file__))
origPath = os.getcwd()
shuttingDown = [False]

SUPPORTED_MIME_TYPES = [
'application/gzip',
'application/x-gzip',
'application/x-7z-compressed',
'application/x-bzip2',
'application/x-cpio',
'application/x-lzip',
'application/x-lzma',
'application/x-rar-compressed',
'application/x-tar',
'application/x-xz',
'application/zip',
]


###################################################################################################
# handle sigint/sigterm and set a global shutdown variable
def shutdown_handler(signum, frame):
global shuttingDown
shuttingDown[0] = True


###################################################################################################
def file_processor(pathname, **kwargs):
mime_types = kwargs["mime_types"]
uid = kwargs["uid"]
gid = kwargs["gid"]
destination = kwargs["destination"]
logger = kwargs["logger"]

logger.debug(f"{scriptName}:\t👓\t{pathname}")

if os.path.isfile(pathname) and os.path.isdir(destination):
time.sleep(0.1)
try
os.chown(pathname, uid, gid)

# get the file magic mime type
fileMime = magic.from_file(pathname, mime=True)

if fileMime in mime_types:
# looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat
logger.debug(f"{scriptName}:\t📩\t{pathname} ({fileMime})")
shutil.move(pathname, destination)

else:
# unhandled file type uploaded, delete it
logger.info(f"{scriptName}:\t\t{pathname} ({fileMime})")
os.unlink(pathname)

except Exception as genericError:
logger.warning(f"{scriptName}:\texception: {genericError}")


###################################################################################################
# main
def main():
global shuttingDown

parser = argparse.ArgumentParser(
description=scriptName,
add_help=False,
usage='{} <arguments>'.format(scriptName),
)
parser.add_argument(
'-r',
'--recursive-directory',
dest='recursiveDir',
help="If specified, monitor all directories with this name underneath --directory",
metavar='<name>',
type=str,
required=False,
)
parser.add_argument(
'-p',
'--polling',
dest='polling',
help="Use polling (instead of inotify)",
metavar='true|false',
type=str2bool,
nargs='?',
const=True,
default=os.getenv('FILEBEAT_WATCHER_POLLING', False),
required=False,
)
parser.add_argument(
'-c',
'--closed-sec',
dest='assumeClosedSec',
help="When polling, assume a file is closed after this many seconds of inactivity",
metavar='<seconds>',
type=int,
default=int(
os.getenv('FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT))
),
required=False,
)
requiredNamed.add_argument(
'-i',
'--in',
dest='srcDir',
help='Source directory to monitor',
metavar='<directory>',
type=str,
default=os.path.join(os.getenv('FILEBEAT_ZEEK_DIR', '/zeek'), 'upload'),
required=False,
)
requiredNamed.add_argument(
'-o',
'--out',
dest='dstDir',
help='Destination directory',
metavar='<directory>',
type=str,
default=os.getenv('FILEBEAT_ZEEK_DIR', '/zeek'),
required=False,
)
requiredNamed.add_argument(
'-u',
'--uid',
dest='chownUid',
help='UID to chown files',
metavar='<integer>',
type=int,
default=int(os.getenv('PUID', os.getenv('DEFAULT_UID', '1000'))),
required=False,
)
requiredNamed.add_argument(
'-g',
'--gid',
dest='chownGid',
help='UID to chown files',
metavar='<integer>',
type=int,
default=int(os.getenv('PGID', os.getenv('DEFAULT_GID', '1000'))),
required=False,
)
parser.add_argument(
'--start-sleep',
dest='startSleepSec',
help="Sleep for this many seconds before starting",
metavar='<seconds>',
type=int,
default=0,
required=False,
)

try:
parser.error = parser.exit
args = parser.parse_args()
except SystemExit:
parser.print_help()
exit(2)

args.verbose = logging.ERROR - (10 * args.verbose) if args.verbose > 0 else 0
logging.basicConfig(
level=args.verbose, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
logging.debug(os.path.join(scriptPath, scriptName))
logging.debug("Arguments: {}".format(sys.argv[1:]))
logging.debug("Arguments: {}".format(args))
if args.verbose > logging.DEBUG:
sys.tracebacklimit = 0

# handle sigint and sigterm for graceful shutdown
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGUSR1, pdb_handler)
signal.signal(signal.SIGUSR2, debug_toggle_handler)

# sleep for a bit if requested
sleepCount = 0
while (not shuttingDown[0]) and (sleepCount < args.startSleepSec):
time.sleep(1)
sleepCount += 1

# if directory to monitor doesn't exist, create it now
if not os.path.isdir(args.srcDir):
if debug:
eprint(f'{scriptName}:\tcreating "{args.srcDir}" to monitor')
pathlib.Path(args.srcDir).mkdir(parents=False, exist_ok=True)

# if recursion was requested, get list of directories to monitor
watchDirs = []
while len(watchDirs) == 0:
if args.recursiveDir is None:
watchDirs = [args.srcDir]
else:
watchDirs = glob.glob(f'{args.srcDir}/**/{args.recursiveDir}', recursive=True)

watch_common.WatchAndProcessDirectory(
watchDirs,
args.polling,
file_processor,
{
"logger": logging,
"destination": args.dstDir,
"uid": args.chownUid,
"gid": args.chownGid,
"mime_types": SUPPORTED_MIME_TYPES,
},
args.assumeClosedSec,
shuttingDown,
logging,
)


if __name__ == '__main__':
main()
22 changes: 0 additions & 22 deletions filebeat/scripts/filebeat-watch-zeeklogs-uploads-folder.sh

This file was deleted.

9 changes: 8 additions & 1 deletion filebeat/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ stdout_logfile_maxbytes=0
redirect_stderr=true

[program:watch-upload]
command=/bin/bash -c "sleep 30 && /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.sh"
command=python3 /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.py
--start-sleep 30
--polling "%(ENV_FILEBEAT_WATCHER_POLLING)s"
--closed-sec %(ENV_FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC)s
--in "%(ENV_FILEBEAT_ZEEK_DIR)s"/upload
--out "%(ENV_FILEBEAT_ZEEK_DIR)s"
--uid %(ENV_PUID)s
--gid %(ENV_PGID)s
user=root
startsecs=35
startretries=1
Expand Down

0 comments on commit 93583a3

Please sign in to comment.