Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Related to issue#335, Prevent concurrent malheur and support malheur … #343

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/reporting.conf
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ resublimit = 5
[malheur]
enabled = no
maxsimilar = 20
# Created to run malheur in increment mode if analysis report generated more than increment_mode_threshold
# https://github.com/spender-sandbox/cuckoo-modified/issues/335
# default 10000
# Set to 0 to disable increment mode
increment_mode_threshold = 10000

[compression]
enabled = yes
Expand Down
110 changes: 81 additions & 29 deletions modules/reporting/malheur.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
import hashlib
import urllib
import random
import logging
import multiprocessing

from lib.cuckoo.common.constants import CUCKOO_ROOT
from lib.cuckoo.common.abstracts import Report
from lib.cuckoo.common.exceptions import CuckooReportError
from lib.cuckoo.core.database import Database
from lib.cuckoo.common.config import Config

db = Database()
log = logging.getLogger(__name__)

def sanitize_file(filename):
normals = filename.lower().replace('\\', ' ').replace('.', ' ').split(' ')
Expand Down Expand Up @@ -144,41 +151,86 @@ def run(self, results):
"""Runs Malheur processing
@return: Nothing. Results of this processing are obtained at an arbitrary future time.
"""
self.lock = multiprocessing.Lock()
if results["target"]["category"] in ["pcap"]:
return

basedir = os.path.join(CUCKOO_ROOT, "storage", "malheur")
cfgpath = os.path.join(CUCKOO_ROOT, "conf", "malheur.conf")
reportsdir = os.path.join(basedir, "reports")
task_id = str(results["info"]["id"])
outputfile = os.path.join(basedir, "malheur.txt." + hashlib.md5(str(random.random())).hexdigest())
mh_conf = Config("reporting").malheur
self.lock.acquire()
try:
os.makedirs(reportsdir)
except:
pass
if mh_conf.increment_mode_threshold == 0 or mh_conf.increment_mode_threshold:
increment_mode_threshold = mh_conf.increment_mode_threshold
else:
increment_mode_threshold = 10000

mist = mist_convert(results)
if mist:
with open(os.path.join(reportsdir, task_id + ".txt"), "w") as outfile:
outfile.write(mist)
try:
task_id = str(results["info"]["id"])
except KeyError as e:
raise CuckooReportError("No key in result. Error %s" % e)

# might need to prevent concurrent modifications to internal state of malheur by only allowing
# one analysis to be running malheur at a time
num_reports = db.count_tasks()

path, dirs, files = os.walk(reportsdir).next()
try:
cmdline = ["malheur", "-c", cfgpath, "-o", outputfile, "cluster", reportsdir]
run = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = run.communicate()
for line in err.splitlines():
if line.startswith("Warning: Discarding empty feature vector"):
badfile = line.split("'")[1].split("'")[0]
os.remove(os.path.join(reportsdir, badfile))

# replace previous classification state with new results atomically
os.rename(outputfile, outputfile[:-33])

except Exception as e:
raise CuckooReportError("Failed to perform Malheur classification: %s" % e)
if (increment_mode_threshold == 0) or (num_reports < increment_mode_threshold):
malheur_running_mode = "cluster"
reportsdir = os.path.join(basedir, "reports")
else:
malheur_running_mode = "increment"
# Create mist report into a separated directory to reduce malheur execution time using increment mode
reportsdir = os.path.join(basedir, "reports", task_id)

new_output_file = os.path.join(basedir, "malheur.txt." + hashlib.md5(str(random.random())).hexdigest())
previous_malheur_output_file = new_output_file[:-33]

if malheur_running_mode == "increment":
log.info("malheur will be running in increment mode")

try:
os.makedirs(reportsdir)
except:
pass

mist = mist_convert(results)
if mist:
with open(os.path.join(reportsdir, task_id + ".txt"), "w") as outfile:
outfile.write(mist)
else:
# if no mist then no point to run malheur
raise CuckooReportError("Failed to extract mist data for task: %s" % task_id)

try:
cmdline = ["malheur", "-c", cfgpath, "-o", new_output_file, malheur_running_mode, reportsdir]
run = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = run.communicate()
for line in err.splitlines():
if line.startswith("Warning: Discarding empty feature vector"):
badfile = line.split("'")[1].split("'")[0]
os.remove(os.path.join(reportsdir, badfile))

if os.path.exists(new_output_file):
if malheur_running_mode == "increment":
# Append previous output result into output file just created (for merge with previous result)
# This is for Django Web UI similar tab for analysis
with open(new_output_file,'ab') as new_output:
with open(previous_malheur_output_file, 'rb') as previous_output:
for line in previous_output:
if line.startswith("#"):
continue
elif line.startswith(task_id):
# This is the case if run reporting multiple times for one task
continue
new_output.write(line)
else:
# this will happen?
raise CuckooReportError("Failed to generate output file from malheur execution for task: %s" % task_id)

# replace previous classification state with new results atomically
os.rename(new_output_file, previous_malheur_output_file)
except Exception as e:
raise CuckooReportError("Failed to perform Malheur classification: %s" % e)
finally:
# To make sure fh closed in any case otherwise other malheur process wait indefinitely
self.lock.release()