#!/usr/bin/env python __version__ = "$Revision: 588 $" import json, logging, logging.handlers, shlex import platform, uuid, subprocess, os from time import sleep from subprocess import CalledProcessError from optparse import OptionParser from eslock import validate_blacklist, set_logger, manage_config from eslock import add_ipset_line, del_ipset_line, add_ipset_bulk from eslock import create_ipset, open_connection, get_cluster logger = logging.getLogger(__name__) default_config = { 'log_file':'/var/log/eslock/ipset_consumer.log', 'log_level_console':'CRITICAL', 'log_level_file':'INFO', 'log_level_main':'DEBUG', 'log_backupcounter': 3, 'main_ipset_timeout': 3600, 'sets': { 'tmp':{'type':'hash:net', 'size':'4096', 'timeout':'3600', 'maxelem':'65536', 'enabled':True, 'bulk': False}, 'static':{'type':'hash:net', 'size':'16384', 'timeout':'0', 'maxelem':'65536', 'enabled':'True', 'bulk': False}, 'pub':{'type':'hash:net', 'size':'16384', 'timeout':'0', 'maxelem':'131072', 'enabled':True, 'bulk': True}, 'wl':{'type':'hash:net', 'size':'16384', 'timeout':'0', 'maxelem':'65536', 'enabled':True, 'bulk': False}, }, 'rmq_user':'eslock', 'rmq_pwd': 'PWD', 'rmq_host': 'rabbitmq.it.dadainternal', 'rmq_port': 5672, 'rmq_vhost':'/IT', 'rmq_exchange':'eslock', 'rmq_heartbeat': 60 } def open_channel(connection): """apre il canale con rabbitmq""" channel = connection.channel() channel.exchange_declare(exchange=config['rmq_exchange'], exchange_type='fanout') queue_name = config['rmq_exchange']+'-'+platform.node()+'-'+str(uuid.uuid4()) channel.queue_declare(queue=queue_name, exclusive=True) channel.queue_bind(exchange=config['rmq_exchange'], queue=queue_name) channel.basic_consume(callback, queue=queue_name, no_ack=True) return channel def callback(ch, method, properties, body): """callback per rabbit""" FNULL = open(os.devnull, 'w') raw_message = json.loads(body) message = raw_message['ip_list'] try: payload_name = raw_message.pop('name') except KeyError: logger.error('No set specified') return payload_action = raw_message.pop('action', 'add') payload_timeout = raw_message.pop('timeout', 0) payload_cluster = raw_message.pop('cluster', ['all']) #se il set e' disabilitato o non configurato non facciamo nulla try: if not config['sets'][payload_name]['enabled']: logger.info('Set not enabled, skipping '+payload_name) return except KeyError: logger.info('Unknown set, skipping '+payload_name) return if not(get_cluster() in payload_cluster or 'all' in payload_cluster): logger.info('my cluster (%(cluster)s) is not affected, skipping' %{'cluster':get_cluster()}) return try: is_bulk = config['sets'][payload_name]['bulk'] except KeyError: logger.debug('Bulk not defined for set '+payload_name) is_bulk = False if is_bulk: logger.debug('set %(set)s is bulk, using swap method'%{'set':payload_name}) swap_set = payload_name+'_swp' ip_list = validate_blacklist(message, True) logger.info('bulk type (save/restore) '+payload_name) logger.info('Received %(n)d items, validated %(m)d' %{'n': len(message), 'm':len(ip_list)}) try: commandline = str('ipset flush bl_'+swap_set) logger.debug('executing: '+commandline) subprocess.check_call(shlex.split(commandline), stdout=FNULL) logger.info('Flushed '+swap_set) except CalledProcessError: logger.error('Error while flushing set ') return if not add_ipset_bulk(ip_list, swap_set): logger.error('Failed to bulk add address to set bl_'+swap_set) return try: commandline = str('ipset swap bl_'+ swap_set+' bl_'+payload_name) logger.debug('executing: '+commandline) subprocess.check_call(shlex.split(commandline), stdout=FNULL) logger.info('Swapped bl_'+swap_set+' -> bl_'+ payload_name) except CalledProcessError: logger.error('Error while swapping sets: bl_'+swap_set+' -> bl_'+ payload_name) # Ri-svuotiamo la lista di swap, risparmiamo memoria try: commandline = str('ipset flush bl_'+swap_set) logger.debug('executing: '+commandline) subprocess.check_call(shlex.split(commandline), stdout=FNULL) logger.info('Flushed again bl_'+swap_set) except CalledProcessError: logger.error('Error while flushing set bl_'+swap_set) return else: ip_list = validate_blacklist(message, is_net=True) logger.info('%(name)s type message received'%{'name': payload_name}) logger.info('Received %(n)d items, validated %(m)d' %{'n': len(message), 'm':len(ip_list)}) for item in ip_list: if payload_action == 'add': add_ipset_line(item, payload_name, timeout=payload_timeout) logger.debug('Added '+item+' to '+payload_name) elif payload_action == 'del': del_ipset_line(item, payload_name) logger.debug('Removed '+item+' to '+payload_name) else: logger.error('Action '+payload_action+' unkonwn') if __name__ == "__main__": parser = OptionParser(version=__version__) parser.add_option('-p', '--print-config', action='store_true', dest='print_config', default=False, help='Prints to . a sample cfg file') parser.add_option('-c', '--config-file', dest='config_file', action='store', type='string', default='/etc/eslock/ipset_consumer.yaml', help='configuration file') (options, args) = parser.parse_args() if options.print_config: manage_config('ipset_consumer_sample.yaml', default_config) raise SystemExit ## write pid pid = os.getpid() out_file = open("/var/run/ipset_consumer.pid","w") out_file.write(str(pid)) out_file.close() config = manage_config(options.config_file, default_config) logger = set_logger(config['log_file'], config['log_level_main'], config['log_level_file'], config['log_level_console'], config['log_backupcounter']) logging.getLogger('pika').setLevel(logging.WARN) cluster = get_cluster() logger.info('START: '+__name__+' '+__version__) logger.info('Cluster name: '+cluster) logger.debug('Active configuration:' +json.dumps(config, indent=2, sort_keys=True)) sets = config['sets'] logger.debug('Sets creation') for set in sets: logger.debug('Processing set '+set) if sets[set]['enabled']: logger.debug('set is enabled') if not create_ipset(set, set_type=sets[set]['type'], hashsize=sets[set]['size'], timeout=sets[set]['timeout'], maxelem=sets[set]['maxelem']): logger.critical('STOP: problem with sets') raise SystemExit if sets[set]['bulk']: logger.debug('set is bulk') if not create_ipset(set+'_swp', set_type=sets[set]['type'], hashsize=sets[set]['size'], timeout=sets[set]['timeout'], maxelem=sets[set]['maxelem']): logger.critical('STOP: problem with sets') raise SystemExit logger.debug('Sets created') while 1: connection = open_connection(config['rmq_user'], config['rmq_pwd'], config['rmq_host'], config['rmq_port'], config['rmq_vhost'], config['rmq_heartbeat']) if connection: break logger.error('Failed connection to rmq, waiting 30 secs') sleep(30) logger.info('Rabbitmq connected') channel = open_channel(connection) logger.info('Rabbitmq channel opened') try: channel.start_consuming() except KeyboardInterrupt: logger.info('STOP: keyboard interrupt') channel.stop_consuming() connection.close()