Skip to content

Commit

Permalink
work in progress for moving inotify to watchdog, see #168
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Apr 5, 2023
1 parent 7d12b82 commit b571358
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 34 deletions.
2 changes: 1 addition & 1 deletion filebeat/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ command=python3 /usr/local/bin/filebeat-watch-zeeklogs-uploads-folder.py
--start-sleep 30
--polling "%(ENV_FILEBEAT_WATCHER_POLLING)s"
--closed-sec %(ENV_FILEBEAT_WATCHER_POLLING_ASSUME_CLOSED_SEC)s
--in "%(ENV_FILEBEAT_ZEEK_DIR)s"upload
--in "%(ENV_FILEBEAT_ZEEK_DIR)s"/upload
--out "%(ENV_FILEBEAT_ZEEK_DIR)s"
--uid %(ENV_PUID)s
--gid %(ENV_PGID)s
Expand Down
261 changes: 261 additions & 0 deletions pcap-monitor/scripts/watch-pcap-uploads-folder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2023 Battelle Energy Alliance, LLC. All rights reserved.

###################################################################################################
# Monitor a directory for PCAP files for processing (by publishing their filenames to a ZMQ socket)
#
# Run the script with --help for options
###################################################################################################

import argparse
import glob
import logging
import magic
import os
import pathlib
import shutil
import signal
import sys
import time

import malcolm_utils
from malcolm_utils import eprint, str2bool, remove_suffix
import watch_common

###################################################################################################
scriptName = os.path.basename(__file__)
scriptPath = os.path.dirname(os.path.realpath(__file__))
origPath = os.getcwd()
shuttingDown = [False]


###################################################################################################
# handle sigint/sigterm and set a global shutdown variable
def shutdown_handler(signum, frame):
global shuttingDown
shuttingDown[0] = True


###################################################################################################
def file_processor(pathname, **kwargs):
uid = kwargs["uid"]
gid = kwargs["gid"]
pcapDir = kwargs["destination"]
zeekDir = kwargs["zeek"]
logger = kwargs["logger"]

logger.debug(f"{scriptName}:\t👓\t{pathname}")

if os.path.isfile(pathname):
time.sleep(0.1)
try:
os.chown(pathname, uid, gid)

# get the file magic mime type
fileMime = magic.from_file(pathname, mime=True)
fileType = magic.from_file(pathname)

if os.path.isdir(pcapDir) and (
(fileMime in ('application/vnd.tcpdump.pcap', 'application/x-pcapng')) or ('pcap-ng' in fileType)
):
# a pcap file to be processed by dropping it into pcapDir
logger.debug(f"{scriptName}:\t🖅\t{pathname} ({fileMime}/{fileType}) to {pcapDir}")
shutil.move(pathname, pcapDir)

elif os.path.isdir(zeekDir) and (
fileMime
in [
'application/gzip',
'application/x-gzip',
'application/x-7z-compressed',
'application/x-bzip2',
'application/x-cpio',
'application/x-lzip',
'application/x-lzma',
'application/x-rar-compressed',
'application/x-tar',
'application/x-xz',
'application/zip',
]
):
# looks like this is a compressed file, we're assuming it's a zeek log archive to be processed by filebeat
logger.debug(f"{scriptName}:\t🖅\t{pathname} ({fileMime}/{fileType}) to {zeekDir}")
shutil.move(pathname, zeekDir)

else:
# unhandled file type uploaded, delete it
logger.info(f"{scriptName}:\t🗑\t{pathname} ({fileMime}/{fileType})")
os.unlink(pathname)

except Exception as genericError:
logger.warning(f"{scriptName}:\texception: {genericError}")


###################################################################################################
# main
def main():
global shuttingDown

parser = argparse.ArgumentParser(
description=scriptName,
add_help=False,
usage='{} <arguments>'.format(scriptName),
)
parser.add_argument('--verbose', '-v', action='count', default=1, help='Increase verbosity (e.g., -v, -vv, etc.)')
parser.add_argument(
'-r',
'--recursive-directory',
dest='recursiveDir',
help="If specified, monitor all directories with this name underneath --directory",
metavar='<name>',
type=str,
required=False,
)
parser.add_argument(
'-p',
'--polling',
dest='polling',
help="Use polling (instead of inotify)",
metavar='true|false',
type=str2bool,
nargs='?',
const=True,
default=os.getenv('PCAP_PIPELINE_POLLING', False),
required=False,
)
parser.add_argument(
'-c',
'--closed-sec',
dest='assumeClosedSec',
help="When polling, assume a file is closed after this many seconds of inactivity",
metavar='<seconds>',
type=int,
default=int(os.getenv('PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC', str(watch_common.ASSUME_CLOSED_SEC_DEFAULT))),
required=False,
)
parser.add_argument(
'-i',
'--in',
dest='srcDir',
help='Source directory to monitor',
metavar='<directory>',
type=str,
default=os.path.join(remove_suffix(os.getenv('PCAP_PATH', '/pcap'), '/'), 'upload'),
required=False,
)
parser.add_argument(
'-o',
'--out',
dest='dstDir',
help='Destination directory',
metavar='<directory>',
type=str,
default=os.path.join(remove_suffix(os.getenv('PCAP_PATH', '/pcap'), '/'), 'processed'),
required=False,
)
parser.add_argument(
'-z',
'--zeek',
dest='zeekDir',
help='Zeek upload directory',
metavar='<directory>',
type=str,
default=os.path.join(remove_suffix(os.getenv('ZEEK_PATH', '/zeek'), '/'), 'upload'),
required=False,
)
parser.add_argument(
'-u',
'--uid',
dest='chownUid',
help='UID to chown files',
metavar='<integer>',
type=int,
default=int(os.getenv('PUID', os.getenv('DEFAULT_UID', '1000'))),
required=False,
)
parser.add_argument(
'-g',
'--gid',
dest='chownGid',
help='UID to chown files',
metavar='<integer>',
type=int,
default=int(os.getenv('PGID', os.getenv('DEFAULT_GID', '1000'))),
required=False,
)
parser.add_argument(
'--start-sleep',
dest='startSleepSec',
help="Sleep for this many seconds before starting",
metavar='<seconds>',
type=int,
default=0,
required=False,
)

try:
parser.error = parser.exit
args = parser.parse_args()
except SystemExit:
parser.print_help()
exit(2)

args.verbose = logging.ERROR - (10 * args.verbose) if args.verbose > 0 else 0
logging.basicConfig(
level=args.verbose, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
logging.debug(os.path.join(scriptPath, scriptName))
logging.debug("Arguments: {}".format(sys.argv[1:]))
logging.debug("Arguments: {}".format(args))
if args.verbose > logging.DEBUG:
sys.tracebacklimit = 0

# handle sigint and sigterm for graceful shutdown
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)

# sleep for a bit if requested
sleepCount = 0
while (not shuttingDown[0]) and (sleepCount < args.startSleepSec):
time.sleep(1)
sleepCount += 1

args.dstDir = remove_suffix(args.dstDir, '/')
args.srcDir = remove_suffix(args.srcDir, '/')
args.zeekDir = remove_suffix(args.zeekDir, '/')

# if directory to monitor doesn't exist, create it now
if not os.path.isdir(args.srcDir):
if debug:
eprint(f'{scriptName}:\tcreating "{args.srcDir}" to monitor')
pathlib.Path(args.srcDir).mkdir(parents=False, exist_ok=True)

# if recursion was requested, get list of directories to monitor
watchDirs = []
while len(watchDirs) == 0:
if args.recursiveDir is None:
watchDirs = [args.srcDir]
else:
watchDirs = glob.glob(f'{args.srcDir}/**/{args.recursiveDir}', recursive=True)

watch_common.WatchAndProcessDirectory(
watchDirs,
args.polling,
file_processor,
{
"logger": logging,
"destination": args.dstDir,
"zeek": args.zeekDir,
"uid": args.chownUid,
"gid": args.chownGid,
},
args.assumeClosedSec,
shuttingDown,
logging,
)


if __name__ == '__main__':
main()
32 changes: 0 additions & 32 deletions pcap-monitor/scripts/watch-pcap-uploads-folder.sh

This file was deleted.

10 changes: 9 additions & 1 deletion pcap-monitor/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface
serverurl=unix:///tmp/supervisor.sock

[program:watch-upload]
command=/bin/bash -c "sleep 30 && /usr/local/bin/watch-pcap-uploads-folder.sh"
command=python3 /usr/local/bin/watch-pcap-uploads-folder.py
--start-sleep 30
--polling "%(ENV_PCAP_PIPELINE_POLLING)s"
--closed-sec %(ENV_PCAP_PIPELINE_POLLING_ASSUME_CLOSED_SEC)s
--in "%(ENV_PCAP_PATH)s"/upload
--out "%(ENV_PCAP_PATH)s"/processed
--zeek "%(ENV_ZEEK_PATH)s"/upload
--uid %(ENV_PUID)s
--gid %(ENV_PGID)s
startsecs=35
startretries=1
stopasgroup=true
Expand Down

0 comments on commit b571358

Please sign in to comment.