Skip to content

Commit

Permalink
Merge pull request #48 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Release 0.3.2
  • Loading branch information
kkoumantaros authored Dec 5, 2018
2 parents b5724c0 + 2cce344 commit 5b53fa2
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 70 deletions.
21 changes: 13 additions & 8 deletions argo-nagios-ams-publisher.spec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
%endif

Name: argo-nagios-ams-publisher
Version: 0.3.1
Version: 0.3.2
Release: 1%{mydist}
Summary: Bridge from Nagios to the ARGO Messaging system

Expand Down Expand Up @@ -57,13 +57,13 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/
%files -f INSTALLED_FILES
%defattr(-,root,root,-)
%config(noreplace) %{_sysconfdir}/%{name}/ams-publisher.conf
%config(noreplace) %{_sysconfdir}/%{name}/metric_data.avsc
%dir %{python_sitelib}/%{underscore %{name}}
%{python_sitelib}/%{underscore %{name}}/*.py[co]
%defattr(-,nagios,nagios,-)
%dir %{_localstatedir}/log/%{name}/
%attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/
%dir %{_localstatedir}/spool/%{name}/metrics/
%dir %{_localstatedir}/spool/%{name}/alarms/
%dir %{_localstatedir}/spool/%{name}/

%post
%if 0%{?el7:1}
Expand All @@ -83,8 +83,8 @@ if [ "$1" = 0 ]; then
%if 0%{?el7:1}
%systemd_preun ams-publisher.service
%else
/sbin/service ams-publisher stop > /dev/null 2>&1
/sbin/chkconfig --del ams-publisher
/sbin/service ams-publisher stop > /dev/null 2>&1
/sbin/chkconfig --del ams-publisher
%endif
fi
exit 0
Expand All @@ -94,21 +94,26 @@ exit 0
%systemd_postun_with_restart ams-publisher.service
%endif
if [ "$1" = 0 ]; then
rm -rf %{_localstatedir}/run/%{name}/
test -d %{_localstatedir}/run/%{name}/ && rm -rf %{_localstatedir}/run/%{name}/
fi
exit 0

%pre
if ! /usr/bin/id nagios &>/dev/null; then
/usr/sbin/useradd -r -m -d /var/log/nagios -s /bin/sh -c "nagios" nagios || \
/usr/sbin/useradd -r -m -d /var/log/nagios -s /bin/sh -c "nagios" nagios || \
logger -t nagios/rpm "Unexpected error adding user \"nagios\". Aborting installation."
fi
if ! /usr/bin/getent group nagiocmd &>/dev/null; then
/usr/sbin/groupadd nagiocmd &>/dev/null || \
/usr/sbin/groupadd nagiocmd &>/dev/null || \
logger -t nagios/rpm "Unexpected error adding group \"nagiocmd\". Aborting installation."
fi

%changelog
* Thu Nov 8 2018 Daniel Vrcic <[email protected]> - 0.3.2-1%{?dist}
- ARGO-1429 Improved msg counter stats for probe testing purposes
- ARGO-1408 Ensure correct permissions on pidfile directory
- ARGO-1348 Descriptive error in case delivery cache tool is called with queue
path not specified in configs
* Tue Jun 19 2018 Daniel Vrcic <[email protected]> - 0.3.1-1%{?dist}
- ARGO-1250 Inspection local socket is left with root permissions
- ARGO-1147 AMS publisher to add optional field
Expand Down
102 changes: 89 additions & 13 deletions bin/ams-publisherd
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,66 @@ shared = None
logger = None


def query_stats(last_minutes):
def parse_result(query):
try:
w, r = query.split('+')

w = w.split(':')[1]
r = int(r.split(':')[1])

except (ValueError, KeyError):
return (w, 'error')

return (w, r)

maxcmdlength = 128
query_consumed, query_published = '', ''

for w in shared.workers:
query_consumed += 'w:{0}+g:consumed{1} '.format(w, last_minutes)

for w in shared.workers:
query_published += 'w:{0}+g:published{1} '.format(w, last_minutes)

try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0)
sock.settimeout(15)

sock.connect(shared.general['statsocket'])
sock.send(query_published, maxcmdlength)
data = sock.recv(maxcmdlength)
for answer in data.split():
if answer.startswith('t:'):
continue
w, r = parse_result(answer)
shared.log.info('worker:{0} published:{1}'.format(w, r))
sock.close()

sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0)
sock.settimeout(15)
sock.connect(shared.general['statsocket'])
sock.send(query_consumed, maxcmdlength)
data = sock.recv(maxcmdlength)
for answer in data.split(' '):
if answer.startswith('t:'):
continue
w, r = parse_result(answer)
shared.log.info('worker:{0} consumed:{1}'.format(w, r))
sock.close()

except socket.timeout as e:
shared.log.error('Socket response timeout after 15s')

except socket.error as e:
shared.log.error('Socket error: {0}'.format(str(e)))

finally:
sock.close()


def setup_statssocket(path, uid, gid):
global shared

Expand All @@ -49,14 +109,12 @@ def setup_statssocket(path, uid, gid):

return sock


def get_userids(user):
return pwd.getpwnam(user)[2], pwd.getpwnam(user)[3]

def daemon_start(context_daemon, restart=False):
global shared
confopts = parse_config(shared.log)
shared = Shared(confopts=confopts)

def daemon_start(context_daemon, restart=False):
if context_daemon.pidfile.is_locked() and not \
context_daemon.pidfile.i_am_locking():
pid = context_daemon.pidfile.read_pid()
Expand Down Expand Up @@ -92,7 +150,12 @@ def daemon_start(context_daemon, restart=False):
with context_daemon:
init_dirq_consume(shared.workers, daemonized=True, sockstat=sock)


def daemon_stop(context_daemon, restart=False):
def on_terminate(proc):
if not restart:
shared.log.info('Stopping (%s)' % proc.pid)

if context_daemon.pidfile.is_locked():
pid = context_daemon.pidfile.read_pid()

Expand All @@ -104,10 +167,7 @@ def daemon_stop(context_daemon, restart=False):
shared.log.info('Not running - cleaning stale pidfile')
else:
process.terminate()
pgone, palive = psutil.wait_procs([process])

if not restart:
shared.log.info('Stopping (%s)' % pid)
pgone, palive = psutil.wait_procs([process], callback=on_terminate)

for p in palive:
p.kill()
Expand All @@ -117,6 +177,7 @@ def daemon_stop(context_daemon, restart=False):

return 0


def daemon_status(context_daemon):
if context_daemon.pidfile.is_locked() and not \
context_daemon.pidfile.i_am_locking():
Expand All @@ -135,18 +196,20 @@ def daemon_status(context_daemon):
shared.log.info('Not running')
return 3


def pidfiledir(pidfile):
try:
if not os.path.exists(os.path.dirname(pidfile)):
dirp = os.path.dirname(pidfile)
dirp = os.path.dirname(pidfile)
if not os.path.exists(dirp):
os.makedirs(dirp)
uid, gid = get_userids(shared.general['runasuser'])
os.chown(dirp, uid, gid)
uid, gid = get_userids(shared.general['runasuser'])
os.chown(dirp, uid, gid)
except (OSError, IOError) as e:
if e.args[0] != errno.EEXIST:
shared.log.error('%s %s' % (os.strerror(e.args[0]), e.args[1]))
raise SystemExit(1)


def daemonizer(args):
"""
Create DaemonContext for setting the behaviour and the options for
Expand All @@ -173,6 +236,7 @@ def daemonizer(args):
ret = daemon_status(context_daemon)
raise SystemExit(ret)


def main():
"""
Function fetch arguments from command line, initialize logger,
Expand All @@ -197,18 +261,30 @@ def main():
help='do not fork into background')
group.add_argument('-d', dest='daemon', type=str,
help='daemon arguments: start, stop, restart, status', metavar='')
group.add_argument('-q', dest='query', required=False, help='query for statistics for last n minutes',
nargs='?', type=int, metavar='number of minutes', const=180)
args = parser.parse_args()

if args.nofork:
try:
confopts = parse_config()
shared = Shared(confopts=confopts)
sock = setup_statssocket(shared.general['statsocket'])
uid, gid = get_userids(shared.general['runasuser'])
sock = setup_statssocket(shared.general['statsocket'], uid, gid)
init_dirq_consume(shared.workers, daemonized=False, sockstat=sock)
except KeyboardInterrupt:
raise SystemExit(1)

elif args.daemon:
confopts = parse_config(shared.log)
shared = Shared(confopts=confopts)
daemonizer(args)

elif args.query:
confopts = parse_config(shared.log)
shared = Shared(confopts=confopts)
shared.log.info('Asked for statistics for last %s minutes' % int(args.query))
query_stats(args.query)


main()
8 changes: 8 additions & 0 deletions pymod/alarmtoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ def main():
except MessageError as e:
logger.error('Error constructing alarm - %s', repr(e))

except KeyError:
logger.error('No configured Queue for directory %s' % q)
queue_paths = list()
for (k, v) in confopts['queues'].items():
queue_paths.append('{0} - {1}'.format(k, v['directory']))
logger.error('Queues and directories found in config: %s' % ', '.join(queue_paths))
raise SystemExit(1)

except (OSError, IOError) as e:
logger.error(e)
raise SystemExit(1)
12 changes: 11 additions & 1 deletion pymod/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@

conf = '/etc/argo-nagios-ams-publisher/ams-publisher.conf'


def get_queue_granul(queue):
confopts = parse_config()
is_queue_found = False

for k, v in confopts['queues'].iteritems():
if confopts['queues'][k]['directory'].startswith(queue):
return confopts['queues'][k]['granularity']
is_queue_found = True
break

if is_queue_found:
return confopts['queues'][k]['granularity']
else:
raise KeyError


def parse_config(logger=None):
"""
Expand Down
6 changes: 4 additions & 2 deletions pymod/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def run(self):
raise SystemExit(0)

def _increm_intervalcounters(self, num):
for i in range(len(self.shared.statint[self.name]['consumed'])):
self.shared.statint[self.name]['consumed'][i] += num
now = int(time.time())
counter = self.shared.statint[self.name]['consumed']
counter[now] = num + counter.get(now, 0)
self.shared.statint[self.name]['consumed_periodic'] += num

def consume_dirq_msgs(self, num=0):
def _inmemq_append(elem):
Expand Down
2 changes: 1 addition & 1 deletion pymod/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _init_filelog(self, logfile):
lf = logging.Formatter(fmt=lfs, datefmt='%Y-%m-%d %H:%M:%S')
lv = logging.INFO

sf = logging.handlers.RotatingFileHandler(logfile, maxBytes=512*1024, backupCount=5)
sf = logging.FileHandler(logfile)
self.logger.fileloghandle = sf.stream
sf.setFormatter(lf)
sf.setLevel(lv)
Expand Down
8 changes: 8 additions & 0 deletions pymod/metrictoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def main():
except MessageError as e:
logger.error('Error constructing metric - %s', repr(e))

except KeyError:
logger.error('No configured Queue for directory %s' % q)
queue_paths = list()
for (k, v) in confopts['queues'].items():
queue_paths.append('{0} - {1}'.format(k, v['directory']))
logger.error('Queues and directories found in config: %s' % ', '.join(queue_paths))
raise SystemExit(1)

except (OSError, IOError) as e:
logger.error(e)
raise SystemExit(1)
6 changes: 4 additions & 2 deletions pymod/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def write(self, num=0):
pass

def _increm_intervalcounters(self, num):
for i in range(len(self.shared.statint[self.name]['published'])):
self.shared.statint[self.name]['published'][i] += num
now = int(time.time())
counter = self.shared.statint[self.name]['published']
counter[now] = num + counter.get(now, 0)
self.shared.statint[self.name]['published_periodic'] += num

class FilePublisher(Publish):
"""
Expand Down
31 changes: 23 additions & 8 deletions pymod/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from argo_nagios_ams_publisher.consume import ConsumerQueue
from argo_nagios_ams_publisher.stats import StatSock
from argo_nagios_ams_publisher.shared import Shared
from multiprocessing import Event, Lock, Array
from multiprocessing import Event, Lock, Value, Manager
from threading import Event as ThreadEvent

def init_dirq_consume(workers, daemonized, sockstat):
Expand All @@ -22,18 +22,33 @@ def init_dirq_consume(workers, daemonized, sockstat):
evsleep = 2
consumers = list()
localevents = dict()
manager = Manager()

for w in workers:
shared = Shared(worker=w)
# Create arrays of integers that will be shared across spawned processes
# and that will keep track of number of published and consumed messages
# in 15, 30, 60, 180, 360, 720 and 1440 minutes. Last integer will be
# used for periodic reports.
shared.statint[w]['published'] = Array('i', 8)
shared.statint[w]['consumed'] = Array('i', 8)

# Create dictionaries that hold number of (published, consumed) messages
# in seconds from epoch. Second from epoch is a key and number of
# (published, consumed) messages in given second is associated value:
#
# { int(time.time()): num_of_bulk_msgs, ... }
#
# Counter is read on queries from socket.
# collections.Counter cannot be shared between processes so
# manager.dict() is used.
shared.statint[w]['consumed'] = manager.dict()
shared.statint[w]['published'] = manager.dict()

# Create integer counters that will be shared across spawned processes
# and that will keep track of number of published and consumed messages.
# Counter is read on perodic status reports and signal SIGUSR1.
shared.statint[w]['consumed_periodic'] = Value('i', 1)
shared.statint[w]['published_periodic'] = Value('i', 1)

if not getattr(shared, 'runtime', False):
shared.runtime = dict()
shared.runtime['started'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
shared.runtime['started'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
shared.runtime['started_epoch'] = str(int(time.time()))

if shared.general['publishmsgfile']:
shared.runtime.update(publisher=FilePublisher)
Expand Down
Loading

0 comments on commit 5b53fa2

Please sign in to comment.