Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import argparse
- import code # for code.interact(local=dict(globals, **locals()) ) debugging
- import datetime
- import json
- import os
- import requests
- import psutil
- import sys
- import time
- from typing import Dict, List, Tuple
- ######################
- # compumancer
- #
- # A very small tool meant to be run under cron, so it will email somebody
- # if something is up. This will return a nonzero response code if any defined
- # tests are not entirely okay. Meant to be run with chronic or some other wrapper.
- #
- # This is meant to be useful if you need cron-based monitoring because you're working somewhere where
- # nagios is impractical. This is not meant as a replacement for good monitoring, just a stopgap.
- #
- # This uses conda, because Ubuntu's python doesn't come with psutil.
- def main():
- cfg, jconf = handle_args()
- anger = False
- all_msgs = []
- fail, msgs = trial_of_space(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- fail, msgs = trial_of_attention(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- fail, msgs = trial_of_whispers(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- fail, msgs = trial_of_calligraphy(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- fail, msgs = trial_of_the_burro(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- fail, msgs = trial_of_loneliness(jconf, cfg.fatal_only)
- if fail:
- anger = True
- all_msgs += msgs
- if anger:
- print("Compumancer is angry. Reasons:")
- print("\n".join(map(lambda x: "\t" + x, all_msgs)))
- sys.exit(1)
- else:
- print("Compumancer is happy")
- def handle_args():
- # Unlike my normal handle_args(), this loads a json configuration from a file.
- parser = argparse.ArgumentParser(description="Mini monitoring")
- parser.add_argument("--fatal_only", action="store_true", help="Be noisier")
- ret = parser.parse_args()
- config_fn = 'compumancer.cfg'
- if not os.path.exists(config_fn):
- raise Exception('JSON Config file {config_fn} does not exist!')
- with open(config_fn, 'r') as jconf:
- jret = json.load(jconf)
- return ret, jret
- ###########
- # trials
- #
- # Trials should take a dict and a boolean (even if they ignore either/both).
- # Trials should not take extra optional parameters, and must not require additional parameters.
- # They should not refer to globals for machine-specific configuration
- # Trials must return a tuple of a boolean (whether the trial failed, where grounds for failure *may* be loosened by the parameter)
- # and a list of strings providing useful diagnostic information.
- # Trials *should not* write to stdout.
- # Trials *must* leave their list return value empty if their boolean is False.
- # Trials *should* be written with a sense of humour in their variable names.
- def trial_of_space(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # Is there enough free space on filesystems?
- # Threshold for warnings is 80%
- # Threshold for errors is 90%
- # psutil.disk_usage('path') returns (total,used,free,percentage)
- anger = False
- if fatal_only:
- thresh = 90
- else:
- thresh = 80
- ret = []
- for fs in jconf['filesystems']:
- if not os.path.exists(fs):
- anger = True
- ret.append(f"Filesystem {fs} does not exist")
- continue
- total, used, free, percent = psutil.disk_usage(fs)
- if percent > thresh:
- anger = True
- ret.append(f"Filesystem {fs} is {percent} full, over {thresh} threshold")
- return anger, ret
- def trial_of_attention(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # Is the system load acceptable?
- # Threshold for warnings is 10
- # Threshold for errors is 30
- # use os.getloadavg()
- anger = False
- ret = []
- if fatal_only:
- thresh = 30
- else:
- thresh = 10
- loadavg = os.getloadavg()[1] # returns 3 figures. Picking the five-minute average just because
- if loadavg > thresh:
- anger = True
- ret.append(f"Load average {loadavg} is over threshold {thresh}")
- return anger, ret
- def trial_of_whispers(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # Visit certain URLs. Meant to verify that
- # some web service is running (and, if SSL, that its certs work)
- # use the requests library
- anger = False
- ret = []
- for url in jconf['urls']:
- req = requests.get(url)
- status = req.status_code
- if status != 200:
- anger = True
- r.append(f'Got a {status} when visiting {url}')
- return anger, ret
- def trial_of_calligraphy(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # See if there's any mention of the oom killer in dmesg. Try to just look at recent events
- # so rebooting isn't needed to recover. Note that we're a little bit lazy and only read one file
- # to look for this.
- anger = False
- ret = []
- if not os.path.isfile(jconf['logfile']):
- anger = True
- ret.append(f"Could not open logfile {jconf['logfile']}")
- return anger, ret
- with open(jconf['logfile']) as syslog:
- for sline in syslog:
- if 'killed process' in sline:
- mon,day,time = sline.split(' ', 3)
- # Next we make a good-faith effort to get the time the oom killer ran.
- # Note that because the file we're consulting lacks a year string, we assume that the logs are from the
- # current year, and to compensate for when it is not we filter out times in the future. This will produce
- # weirdness around the turning of the year, but hopefully survivable weirdness. There's nothing better that
- # we can do that immediately comes to mind.
- logzeit = int(time.mktime(time.strptime(f'{datetime.now().year} {mon} {day} {time}', '%Y %b %d %H:%M:%S')))
- if logzeit > time.time():
- continue # We could try to parse these by decrementing the year and giving it another go
- if logzeit - int(time.time()) > 60*60*3: # OOM killer in the last 3 hours
- continue
- anger = True
- ret.append(f'OOM killer at {time}')
- return anger, ret
- def trial_of_the_burro(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # See if the system has an acceptable amount of free RAM.
- # Use psutil.virtual_memory() for this.
- anger = False
- ret = []
- if fatal_only:
- thresh = 90
- else:
- thresh = 70
- mem_used_percent = psutil.virtual_memory().percent
- if mem_used_percent > thresh:
- anger = True
- ret.append(f'Memory usage at {mem_used_percent}, over threshold of {thresh}')
- return anger, ret
- def trial_of_loneliness(jconf:Dict, fatal_only:bool=False) -> Tuple[bool, List[str]]:
- # See if processes necessary for this box are running.
- # use psutil.process_iter() for this (in a loop)
- anger = False
- ret = []
- seen = {}
- for proc in psutil.process_iter(): # This iter, we just note all the process names active on the system
- try:
- info = proc.as_dict(attrs=['pid', 'name'])
- except psutil.NoSuchProcess: # handle race conditions
- pass
- else:
- seen[info['name']] = 1
- for sought in jconf['processes']:
- if sought not in seen:
- anger = True
- ret.append(f"Could not find process {sought}")
- return anger, ret
- #####
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement