Skip to content

Commit

Permalink
Merge pull request #101 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Preparing for Release
  • Loading branch information
kkoumantaros authored Nov 9, 2018
2 parents f526422 + 277f734 commit 469178d
Show file tree
Hide file tree
Showing 90 changed files with 9,441 additions and 1,937 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ python:
script:
- pip install -r ./bin/requirements.txt
- pytest
- cd flink_jobs/ams_ingest_metric/ && mvn test
- cd ../batch_ar && mvn test
- cd ../batch_status && mvn test
- cd ../stream_status && mvn test
- cd ../ams_ingest_sync && mvn test
- cd flink_jobs/ams_ingest_metric/ && travis_wait mvn test
- cd ../batch_ar && travis_wait mvn test
- cd ../batch_status && travis_wait mvn test
- cd ../stream_status && travis_wait mvn test
- cd ../ams_ingest_sync && travis_wait mvn test

33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ Job optional cli parameters:

`--ams.verify` : optional turn on/off ssl verify

### Restart strategy
Job has a fixed delay restart strategy. If it fails it will try to restart for a maximum of 10 attempt with a retry interval of 2 minutes
between each attempt

### Metric data hbase schema

Metric data are stored in hbase tables using different namespaces for different tenants (e.g. hbase table name = '{TENANT_name}:metric_data')
Expand Down Expand Up @@ -127,6 +131,9 @@ Job required cli parameters:

`--ams.verify` : optional turn on/off ssl verify

### Restart strategy
Job has a fixed delay restart strategy. If it fails it will try to restart for a maximum of 10 attempt with a retry interval of 2 minutes
between each attempt

### Stream Status

Expand Down Expand Up @@ -210,6 +217,9 @@ Other optional cli parameters

`--ams.verify` : optional turn on/off ssl verify

### Restart strategy
Job has a fixed delay restart strategy. If it fails it will try to restart for a maximum of 10 attempt with a retry interval of 2 minutes
between each attempt


### Status events schema
Expand All @@ -233,6 +243,25 @@ Status events are generated as JSON messages that are defined by the following c
A metric data message can produce zero, one or more status metric events. The system analyzes the new status introduced by the metric and then aggregates on top levels to see if any other status changes are produced.
If a status of an item actually changes an appropriate status event is produced based on the item type (endpoint_group,service,endpoint,metric).

## Threshold rule files
Each report can be accompanied by a threshold rules file which includes rules on low level metric data which may accompany a monitoring message with the field 'actual_data'.
The rule file is in JSON format and has the following schema:
```
{
"rules": [
{
"group" : "site-101",
"host" : "host.foo",
"metric": "org.namespace.metric",
"thresholds": "firstlabel=10s;30;50:60;0;100 secondlabel=5;0:10;20:30;50;30"
}
]
}
```
Each rule has multiple thresholds separated by whitespace. Each threshold has the following format:
`firstlabel=10s;30;50:60;0;100` which corresponds to `{{label}}={{value}}{{uom}};{{warning-range}};{{critical-range}};{{min}};{{max}}`. Each range is in the form of`{{floor}}:{{ceiling}}` but some shortcuts can be taken in declarations.


## Batch Status

Flink batch job that calculates status results for a specific date
Expand Down Expand Up @@ -273,6 +302,8 @@ Job required cli parameters:

`--mongo.method` : MongoDB method to be used when storing the results ~ either: `insert` or `upsert`

`--thr` : (optional) file location of threshold rules


## Batch AR

Expand Down Expand Up @@ -318,6 +349,8 @@ Job required cli parameters:

`--mongo.method` : MongoDB method to be used when storing the results ~ either: `insert` or `upsert`

`--thr` : (optional) file location of threshold rules


## Flink job names
Running flink jobs can be listed either in flink dashboard by visiting `http://{{flink.webui.host}}:{{flink.webui.port}}`
Expand Down
191 changes: 102 additions & 89 deletions bin/ar_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,88 +5,101 @@
import argparse
import datetime
from snakebite.client import Client
import ConfigParser
import logging
from urlparse import urlparse
from utils.argo_log import ArgoLogger
from utils.argo_mongo import ArgoMongoClient
from utils.common import cmd_toString, date_rollback, flink_job_submit, hdfs_check_path
from utils.common import cmd_to_string, date_rollback, flink_job_submit, hdfs_check_path, get_log_conf, get_config_paths
from utils.update_profiles import ArgoProfileManager
from utils.argo_config import ArgoConfig
from utils.recomputations import upload_recomputations

def compose_hdfs_commands(year, month, day, args, config, logger):

log = logging.getLogger(__name__)


def compose_hdfs_commands(year, month, day, args, config):

# set up the hdfs client to be used in order to check the files
client = Client(config.get("HDFS", "hdfs_host"), config.getint("HDFS", "hdfs_port"), use_trash=False)
namenode = config.get("HDFS", "namenode")
client = Client(namenode.hostname, namenode.port, use_trash=False)

# hdfs sync path for the tenant
hdfs_sync = config.get("HDFS", "hdfs_sync")
hdfs_sync = hdfs_sync.replace("{{hdfs_host}}", config.get("HDFS", "hdfs_host"))
hdfs_sync = hdfs_sync.replace("{{hdfs_port}}", config.get("HDFS", "hdfs_port"))
hdfs_sync = hdfs_sync.replace("{{hdfs_user}}", config.get("HDFS", "hdfs_user"))
hdfs_sync = hdfs_sync.replace("{{tenant}}", args.Tenant)

# hdfs metric path for the tenant
hdfs_metric = config.get("HDFS", "hdfs_metric")
hdfs_metric = hdfs_metric.replace("{{hdfs_host}}", config.get("HDFS", "hdfs_host"))
hdfs_metric = hdfs_metric.replace("{{hdfs_port}}", config.get("HDFS", "hdfs_port"))
hdfs_metric = hdfs_metric.replace("{{hdfs_user}}", config.get("HDFS", "hdfs_user"))
hdfs_metric = hdfs_metric.replace("{{tenant}}", args.Tenant)

hdfs_user = config.get("HDFS", "user")
tenant = args.tenant
hdfs_sync = config.get("HDFS", "path_sync")
hdfs_sync = hdfs_sync.fill(namenode=namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).geturl()

hdfs_metric = config.get("HDFS", "path_metric")

hdfs_metric = hdfs_metric.fill(namenode=namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).geturl()

# dictionary holding all the commands with their respective arguments' name
hdfs_commands = {}
hdfs_commands = dict()

# file location of previous day's metric data (local or hdfs)
hdfs_commands["--pdata"] = hdfs_check_path(hdfs_metric+"/"+str(datetime.date(year, month, day) - datetime.timedelta(1)), logger, client)
hdfs_commands["--pdata"] = hdfs_check_path(
hdfs_metric + "/" + str(datetime.date(year, month, day) - datetime.timedelta(1)), client)

# file location of target day's metric data (local or hdfs)
hdfs_commands["--mdata"] = hdfs_check_path(hdfs_metric+"/"+args.Date, logger, client)
hdfs_commands["--mdata"] = hdfs_check_path(hdfs_metric + "/" + args.date, client)

# file location of report configuration json file (local or hdfs)
hdfs_commands["--conf"] = hdfs_check_path(hdfs_sync+"/"+args.Tenant+"_"+args.Report+"_cfg.json", logger, client)
hdfs_commands["--conf"] = hdfs_check_path(hdfs_sync + "/" + args.tenant+"_"+args.report+"_cfg.json", client)

# file location of metric profile (local or hdfs)
hdfs_commands["--mps"] = date_rollback(hdfs_sync+"/"+args.Report+"/"+"metric_profile_"+"{{date}}"+".avro", year, month, day, config, logger, client)
hdfs_commands["--mps"] = date_rollback(
hdfs_sync + "/" + args.report + "/" + "metric_profile_" + "{{date}}" + ".avro", year, month, day, config,
client)

# file location of operations profile (local or hdfs)
hdfs_commands["--ops"] = hdfs_check_path(hdfs_sync+"/"+args.Tenant+"_ops.json", logger, client)
hdfs_commands["--ops"] = hdfs_check_path(hdfs_sync+"/"+args.tenant+"_ops.json", client)

# file location of aggregations profile (local or hdfs)
hdfs_commands["--apr"] = hdfs_check_path(hdfs_sync+"/"+args.Tenant+"_"+args.Report+"_ap.json", logger, client)
hdfs_commands["--apr"] = hdfs_check_path(hdfs_sync+"/"+args.tenant+"_"+args.report+"_ap.json", client)

if args.thresholds:
# file location of thresholds rules file (local or hdfs)
hdfs_commands["--thr"] = hdfs_check_path(
os.path.join(hdfs_sync, "".join([args.tenant, "_", args.report, "_thresholds.json"])), client)

# file location of endpoint group topology file (local or hdfs)
hdfs_commands["-egp"] = date_rollback(hdfs_sync+"/"+args.Report+"/"+"group_endpoints_"+"{{date}}"+".avro", year, month, day, config, logger, client)
hdfs_commands["-egp"] = date_rollback(
hdfs_sync + "/" + args.report + "/" + "group_endpoints_" + "{{date}}" + ".avro", year, month, day, config,
client)

# file location of group of groups topology file (local or hdfs)
hdfs_commands["-ggp"] = date_rollback(hdfs_sync+"/"+args.Report+"/"+"group_groups_"+"{{date}}"+".avro", year, month, day, config, logger, client)
hdfs_commands["-ggp"] = date_rollback(hdfs_sync + "/" + args.report + "/" + "group_groups_" + "{{date}}" + ".avro",
year, month, day, config, client)

# file location of weights file (local or hdfs)
hdfs_commands["--weights"] = date_rollback(hdfs_sync+"/"+args.Report+"/weights_"+"{{date}}"+".avro", year, month, day, config, logger, client)
hdfs_commands["--weights"] = date_rollback(hdfs_sync + "/" + args.report + "/weights_" + "{{date}}" + ".avro", year,
month, day, config, client)

# file location of downtimes file (local or hdfs)
hdfs_commands["--downtimes"] = hdfs_check_path(hdfs_sync+"/"+args.Report+"/downtimes_"+str(datetime.date(year, month, day))+".avro", logger, client)
hdfs_commands["--downtimes"] = hdfs_check_path(
hdfs_sync + "/" + args.report + "/downtimes_" + str(datetime.date(year, month, day)) + ".avro", client)

# file location of recomputations file (local or hdfs)
# first check if there is a recomputations file for the given date
if client.test(urlparse(hdfs_sync+"/recomp_"+args.Date+".json").path, exists=True):
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.Date+".json"
# recomputation lies in the hdfs in the form of
# /sync/recomp_TENANTNAME_ReportName_2018-08-02.json
if client.test(urlparse(hdfs_sync+"/recomp_"+args.tenant+"_"+args.report+"_"+args.date+".json").path, exists=True):
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.date+".json"
else:
hdfs_commands["--rec"] = hdfs_check_path(hdfs_sync+"/recomp.json", logger, client)
hdfs_commands["--rec"] = hdfs_check_path(hdfs_sync+"/recomp.json", client)

return hdfs_commands


def compose_command(config, args, hdfs_commands, logger=None):
def compose_command(config, args, hdfs_commands):

# job sumbission command
# job submission command
cmd_command = []

if args.Sudo is True:
if args.sudo is True:
cmd_command.append("sudo")

# create a simple stream_handler whenever tetsing
if logger is None:
logger = ArgoLogger()

# flink executable
cmd_command.append(config.get("FLINK", "path"))

Expand All @@ -102,105 +115,105 @@ def compose_command(config, args, hdfs_commands, logger=None):

# date the report will run for
cmd_command.append("--run.date")
cmd_command.append(args.Date)
cmd_command.append(args.date)

# MongoDB uri for outputting the results to (e.g. mongodb://localhost:21017/example_db)
cmd_command.append("--mongo.uri")
mongo_tenant = "TENANTS:"+args.Tenant+":MONGO"
mongo_uri = config.get(mongo_tenant, "mongo_uri")
mongo_uri = mongo_uri.replace("{{mongo_host}}", config.get(mongo_tenant, "mongo_host"))
mongo_uri = mongo_uri.replace("{{mongo_port}}", config.get(mongo_tenant, "mongo_port"))
cmd_command.append(mongo_uri)

if args.Method == "insert":
argo_mongo_client = ArgoMongoClient(args, config, logger, ["service_ar", "endpoint_group_ar"])
group_tenant = "TENANTS:"+args.tenant
mongo_endpoint = config.get("MONGO","endpoint").geturl()
mongo_uri = config.get(group_tenant, "mongo_uri").fill(mongo_endpoint=mongo_endpoint, tenant=args.tenant)
cmd_command.append(mongo_uri.geturl())

if args.method == "insert":
argo_mongo_client = ArgoMongoClient(args, config, ["service_ar", "endpoint_group_ar"])
argo_mongo_client.mongo_clean_ar(mongo_uri)

# MongoDB method to be used when storing the results, either insert or upsert
cmd_command.append("--mongo.method")
cmd_command.append(args.Method)
cmd_command.append(args.method)

# add the hdfs commands
for command in hdfs_commands:
cmd_command.append(command)
cmd_command.append(hdfs_commands[command])

# ams proxy
if config.getboolean("AMS", "proxy_enabled"):
# get optional ams proxy
proxy = config.get("AMS", "proxy")
if proxy is not None:
cmd_command.append("--ams.proxy")
cmd_command.append(config.get("AMS", "ams_proxy"))
cmd_command.append(proxy.geturl())

# ssl verify
cmd_command.append("--ams.verify")
if config.getboolean("AMS", "ssl_enabled"):
cmd_command.append("true")
ams_verify = config.get("AMS", "verify")
if ams_verify is not None:
cmd_command.append(str(ams_verify).lower())
else:
cmd_command.append("false")
# by default assume ams verify is always true
cmd_command.append("true")

return cmd_command


def main(args=None):

# make sure the argument are in the correct form
args.Tenant = args.Tenant.upper()
args.Method = args.Method.lower()

year, month, day = [int(x) for x in args.Date.split("-")]
# Get configuration paths
conf_paths = get_config_paths(args.config)

# set up the config parser
config = ConfigParser.ConfigParser()
# Get logger config file
get_log_conf(conf_paths['log'])

# check if config file has been given as cli argument else
# check if config file resides in /etc/argo-streaming/ folder else
# check if config file resides in local folder
if args.ConfigPath is None:
if os.path.isfile("/etc/argo-streaming/conf/conf.cfg"):
config.read("/etc/argo-streaming/conf/conf.cfg")
else:
config.read("../conf/conf.cfg")
else:
config.read(args.ConfigPath)
# Get main configuration and schema
config = ArgoConfig(conf_paths["main"], conf_paths["schema"])

# set up the logger
logger = ArgoLogger(log_name="batch-ar", config=config)
year, month, day = [int(x) for x in args.date.split("-")]

# check if configuration for the given tenant exists
if not config.has_section("TENANTS:"+args.Tenant):
logger.print_and_log(logging.CRITICAL, "Tenant: "+args.Tenant+" doesn't exist.", 1)
if not config.has("TENANTS:"+args.tenant):
log.info("Tenant: "+args.tenant+" doesn't exist.")
sys.exit(1)

# call update profiles
profile_mgr = ArgoProfileManager(args.ConfigPath)
profile_mgr.profile_update_check(args.Tenant, args.Report)
# check and upload recomputations
upload_recomputations(args.tenant, args.report, args.date, config)

# optional call to update profiles
if args.profile_check:
profile_mgr = ArgoProfileManager(config)
profile_type_checklist = ["operations", "aggregations", "reports", "thresholds"]
for profile_type in profile_type_checklist:
profile_mgr.profile_update_check(args.tenant, args.report, profile_type)

# dictionary containing the argument's name and the command assosciated with each name
hdfs_commands = compose_hdfs_commands(year, month, day, args, config, logger)
hdfs_commands = compose_hdfs_commands(year, month, day, args, config)

cmd_command = compose_command(config, args, hdfs_commands, logger)
cmd_command = compose_command(config, args, hdfs_commands)

logger.print_and_log(logging.INFO, "Getting ready to submit job")
logger.print_and_log(logging.INFO, cmd_toString(cmd_command)+"\n")
log.info("Getting ready to submit job")
log.info(cmd_to_string(cmd_command)+"\n")

# submit the script's command
flink_job_submit(config, logger, cmd_command)
flink_job_submit(config, cmd_command)


if __name__ == "__main__":

parser = argparse.ArgumentParser(description="Batch A/R Job submit script")
parser.add_argument(
"-t", "--Tenant", type=str, help="Name of the tenant", required=True)
"-t", "--tenant", metavar="STRING", help="Name of the tenant", required=True, dest="tenant")
parser.add_argument(
"-r", "--Report", type=str, help="Report status", required=True)
"-r", "--report", metavar="STRING", help="Report status", required=True, dest="report")
parser.add_argument(
"-d", "--Date", type=str, help="Date to run the job for", required=True)
"-d", "--date", metavar="DATE(YYYY-MM-DD)", help="Date to run the job for", required=True, dest="date")
parser.add_argument(
"-m", "--Method", type=str, help="Insert or Upsert data in mongoDB", required=True)
"-m", "--method", metavar="KEYWORD(insert|upsert)", help="Insert or Upsert data in mongoDB", required=True, dest="method")
parser.add_argument(
"-c", "--ConfigPath", type=str, help="Path for the config file")
"-c", "--config", metavar="PATH", help="Path for the config file", dest="config")
parser.add_argument(
"-u", "--Sudo", help="Run the submition as superuser", action="store_true")
"-u", "--sudo", help="Run the submition as superuser", action="store_true")
parser.add_argument("--profile-check", help="check if profiles are up to date before running job",
dest="profile_check", action="store_true")
parser.add_argument("--thresholds", help="check and use threshold rule file if exists",
dest="thresholds", action="store_true")

# Pass the arguments to main method
sys.exit(main(parser.parse_args()))
Loading

0 comments on commit 469178d

Please sign in to comment.