Skip to content

Commit

Permalink
configuration validation thru Confuse templates. NOTE: confuse.OneOf(…
Browse files Browse the repository at this point in the history
…) is buggy - see beetbox/confuse#88
  • Loading branch information
buanzo committed May 17, 2020
1 parent 6c71d7b commit f3dbd06
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 29 deletions.
136 changes: 109 additions & 27 deletions humed.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
# See:
# https://github.com/beetbox/confuse/blob/master/example/__init__.py
config_template = { # TODO: add debug. check confuse.Bool()
'listen_url': confuse.String(),
'endpoint': confuse.String(),
'transfer_method': confuse.OneOf(TRANSFER_METHODS),
'remote_syslog': {
'server': confuse.String(),
Expand All @@ -63,17 +63,18 @@ def __init__(self, config):
# We will only expose config if needed
# self.config = config
self.debug = config['debug'].get()
self.listen_url = config['listen_url'].get()
self.endpoint = config['endpoint'].get()
self.transfer_method = config['transfer_method'].get()
self.transfer_method_args = config[self.transfer_method].get()
# TODO: improve
self.logger = logging.getLogger('humed-{}'.format(self.transfer_method))
self.logger.setLevel(logging.INFO)
if self.transfer_method is 'logstash':
self.transfer_method_args = config[self.transfer_method].get()
host = self.transfer_method_args['host'].get()
port = self.transfer_method_args['host'].get()
self.logger.addHandler(AsyncLSH(host, port, database_path='logstash.db'))
elif self.transfer_method is 'remote_syslog':
self.transfer_method_args = config[self.transfer_method].get()
server = self.config['remote_syslog']['server'].get()
port = self.config['remote_syslog']['port'].get()
proto = self.config['remote_syslog']['proto'].get()
Expand Down Expand Up @@ -159,23 +160,25 @@ def process_transfers(self):
for item in pendientes:
# TODO: send to master-hume
if self.transfer_method == 'logstash':
self.logstash(item=item)
ret = self.logstash(item=item)
elif self.transfer_method == 'syslog':
self.syslog(item=item)
ret = self.syslog(item=item) # using std SysLogHandler
elif self.transfer_method == 'remote_syslog':
ret = self.syslog(item=item) # using std SysLogHandler
# if sent ok then:
# self.transfer_ok(archivo=archivo)
# if error return(False)
return(True)

def logstash(self,item=None):
if item is None:
return() # FIX: should not happen
return(False) # FIX: should not happen
rowid = item[0]
ts = item[1]
try:
humepkt = json.loads(item[3])
except Exception as ex:
return() # FIX: malformed json at this stage? mmm
return(False) # FIX: malformed json at this stage? mmm
hume = humepkt['hume']
if 'process' in humepkt.keys(): # This data is optional in hume (-a)
process = humepkt['process']
Expand Down Expand Up @@ -211,23 +214,101 @@ def logstash(self,item=None):
# error -> error
# critical -> critical
# debug -> debug
if level == 'ok' or level == 'info':
# https://python-logstash-async.readthedocs.io/en/stable/usage.html#
self.logger.info('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'warn':
self.logger.warning('hume({}) {}'.format(hostname, msg), extra=extra)
elif level == 'error':
self.logger.error('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'critical':
self.logger.critical('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'debug':
self.logger.debug('hume({}): {}'.format(hostname, msg), extra=extra)
try:
if level == 'ok' or level == 'info':
# https://python-logstash-async.readthedocs.io/en/stable/usage.html#
self.logger.info('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'warn':
self.logger.warning('hume({}) {}'.format(hostname, msg), extra=extra)
elif level == 'error':
self.logger.error('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'critical':
self.logger.critical('hume({}): {}'.format(hostname, msg), extra=extra)
elif level == 'debug':
self.logger.debug('hume({}): {}'.format(hostname, msg), extra=extra)
except Exception: # TODO: improve exception handling
return(False)
else:
return(True)

def syslog(self,item=None):
# This function handles both local and remote syslog
# according to logging.handlers.SysLogHandler()
if item is None:
return(False) # FIX: should not happen

# Required data:
rowid = item[0]
ts = item[1]
try:
humepkt = json.loads(item[3])
except Exception as ex:
return(False) # FIX: malformed json at this stage? mmm
hume = humepkt['hume']

# Optional data
if 'process' in humepkt.keys(): # This data is optional in hume (-a)
process = humepkt['process']
else:
process = None

# Extract info from hume to prepare syslog message
# TODO: decide if we should split these in the parent caller
# pros: tidier
# cons: makes development of other transfer methods more cumbersome?
level = hume['level']
msg = hume['msg']
task = hume['task']
tags = hume['tags']
humecmd = hume['humecmd']
timestamp = hume['timestamp']
# hostname
hostname = socket.getfqdn() # FIX: add a hostname configuration keyword

# We dont have the 'extra' field for syslog, in contrast to logstash
msg = '[{}-{}-{}] TAGS=[{}] HUMECMD={} MSG={}'.format(timestamp,
task,
humelevel,
tags,
humecmd,
msg)
if process is not None:
msg = '{} PROC={}'.format(msg,
json.dumps(extra['process']))
else:
msg = '{} PROC=None'.format(msg)
# Hume level does not relate completely, because 'ok' is not
# a syslog severity, closest is info but... TODO: think about this
# hume level -> syslog severity
# ----------------------------
# ok -> info
# info -> info
# warn -> warning
# error -> error
# critical -> critical
# debug -> debug
try:
if level == 'ok' or level == 'info':
# https://python-logstash-async.readthedocs.io/en/stable/usage.html#
self.logger.info('hume({}): {}'.format(hostname, msg))
elif level == 'warn':
self.logger.warning('hume({}) {}'.format(hostname, msg))
elif level == 'error':
self.logger.error('hume({}): {}'.format(hostname, msg))
elif level == 'critical':
self.logger.critical('hume({}): {}'.format(hostname, msg))
elif level == 'debug':
self.logger.debug('hume({}): {}'.format(hostname, msg))
except Exception: # TODO: improve exception handling
return(False)
else:
return(True)

def run(self):
# Humed main loop
sock = zmq.Context().socket(zmq.PULL)
# print("Binding to '{}'".format(self.listen_url))
sock.bind(self.listen_url)
# print("Binding to '{}'".format(self.endpoint))
sock.bind(self.endpoint)
# 2a - Await hume message over zmp
while True:
hume = {}
Expand All @@ -250,20 +331,21 @@ def main():
# First, parse configuration
config = confuse.Configuration('humed')
# Config defaults
config['listen_url'] = 'tcp://localhost:198'
config['remote_syslog']['address'] = 'localhost'
config['endpoint'] = 'tcp://127.0.0.1:198'
config['remote_syslog']['server'] = 'localhost'
config['remote_syslog']['proto'] = 'udp'
config['remote_syslog']['port'] = 514
parser = argparse.ArgumentParser()
parser.add_argument('--listen_url',
help='Listening url for humed zeromq')
config['debug'] = DEBUG
parser.add_argument('--debug',
help='Enable debug')
help='Enables debug messages')
args = parser.parse_args()
config.set_args(args)
print('Reading configuration from {}/{}'.format(config.config_dir(),
confuse.CONFIG_FILENAME))
try:
valid = config.get(template=config_template)
except Exception as ex:
print('Humed: Config file validation error: {}'.format(ex))
sys.exit(2)
print('-----[ CONFIG DUMP ]-----')
print(config.dump())
print('Available Transfer Methods: {}'.format(TRANSFER_METHODS))
Expand Down
8 changes: 6 additions & 2 deletions sample_humed_config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
listen_url: tcp://127.0.0.1:198
# NOT an url. Plesee see http://api.zeromq.org/2-1:zmq-tcp
endpoint: tcp://127.0.0.1:198
transfer_method: logstash
logstash:
host: 127.0.0.1
port: 24224

remote_syslog:
server: syslog.example.net
proto: udp
port: 514

1 comment on commit f3dbd06

@buanzo
Copy link
Owner Author

@buanzo buanzo commented on f3dbd06 May 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to mention in this commit that I renamed listen_url to endpoint to better reflect why tcp://localhost:198 does not work in zmq_tcp on zmq.bind() - See http://api.zeromq.org/2-1:zmq-tcp

Please sign in to comment.