Source code for podi_logging

#!/usr/bin/env python

"""

This module contains all routines to configure and start the multi-processing
safe logging. All log output is queued, and handled in sequence by a separate
logging process.

"""


import sys
import numpy
import os
import pyfits
import datetime
import scipy
import scipy.stats
import math
import scipy.spatial
import itertools
import logging

import time
import Queue
import threading
import multiprocessing

from  podi_definitions import *
import podi_sitesetup as sitesetup


from random import choice, random
import time

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

################################################################################
#
# Testing code goes here
#
################################################################################



# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
[docs]def test_worker_process(log_setup): name = multiprocessing.current_process().name print('Worker started xxx: %s' % name) podi_logger_setup(log_setup) #logger = podi_getlogger(name, log_setup) for i in range(10): time.sleep(random()) # print "in worker ." logger = logging.getLogger(choice(LOGGERS)) level = choice(LEVELS) message = "msg %d: %s" % (i+1, choice(MESSAGES)) # print message logger.log(level, message) print('Worker finished: %s' % name) # The worker configuration is done at the start of the worker process run. # Note that on Windows you can't rely on fork semantics, so each process # will run the logging configuration code when it starts.
[docs]def log_slave_setup(queue): h = QueueHandler(queue) # Just the one handler needed root = logging.getLogger() root.addHandler(h) root.setLevel(logging.DEBUG) # send all messages, for demo; no other level or filter logic applied.
[docs]def log_master_setup(): root = logging.getLogger() # h = logging.handlers.RotatingFileHandler('/tmp/mptest.log', 'a', 300, 10) h = logging.StreamHandler(stream=sys.stdout) f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) ################################################################################ # # Real code below # ################################################################################
[docs]class QueueHandler(logging.Handler): """ This is a logging handler which sends events to a multiprocessing queue. """ def __init__(self, queue): """ Initialise an instance, using the passed queue. """ logging.Handler.__init__(self) #import os #print "\n\n\n\n Setting up logging, in Process ",os.getpid(),"XXX\n\n\n\n" self.queue = queue self.msgcount = 0
[docs] def flush(self): pass
[docs] def emit(self, record): """ Emit a record. Writes the LogRecord to the queue. """ self.msgcount += 1 #sys.stdout.write("Current msg count: %d\n" % (self.msgcount)) #sys.stdout.flush() # print "emitting 1 entry,",self.msgcount,"so far" try: #print "before adding one to queue",self.queue.qsize() #print "adding log entry to queue",self.msgcount, self.format(record) self.queue.put_nowait(record) #print "after adding one queue",self.queue.qsize() except (KeyboardInterrupt, SystemExit): raise except: sys.stdout.write("OOppsie!\n") sys.stdout.flush() self.handleError(record)
[docs]def log_master(queue, options): """ This is the main process that handles all log output. Each log-entry is received via the queue that's being fed by all sub-processes, and then forwarded to other log-handlers. """ # print "starting logging!" import sys root = logging.getLogger() try: h = logging.NullHandler() #StreamHandler(stream=sys.stdout) f = logging.Formatter('ROOTHANDLER %(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) except AttributeError: # This happens in older Python versions that don't have a NULLHandler pass except: raise root.propagate = False enable_debug = False if (cmdline_arg_isset("-debugfile") or sitesetup.debug_log_filename != None): debug_filename = cmdline_arg_set_or_default("-debugfile", sitesetup.debug_log_filename) try: open_mode = "a" if (sitesetup.debug_log_append) else "w" debugfile = open(debug_filename, open_mode) enable_debug = True print >>debugfile, " ".join(sys.argv) # print 'activating debug output' debug_logger = logging.getLogger('debug') # debug_logger = logging.getLogger() h = logging.StreamHandler(stream=debugfile) f = logging.Formatter('%(asctime)s -- %(levelname)-8s [ %(filename)30s : %(lineno)4s - %(funcName)30s() in %(processName)-12s] %(name)30s :: %(message)s') h.setFormatter(f) debug_logger.addHandler(h) debug_logger.propagate=False except: pass else: debug_logger = root info = logging.getLogger('info') h = logging.StreamHandler(stream=sys.stdout) f = logging.Formatter('%(name)s: %(message)s') h.setFormatter(f) info.addHandler(h) infolog_file = open("quickreduce.log", "w") h = logging.StreamHandler(stream=infolog_file) f = logging.Formatter('INFOHANDLER %(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) info.addHandler(h) info.propagate = False # # Check if we can connect to a RabbitMQ server # enable_pika = False try: debug_logger.debug("Trying to establish PIKA connection") import pika import podi_pikasetup credentials = pika.PlainCredentials(podi_pikasetup.user, podi_pikasetup.password) connection = pika.BlockingConnection( pika.ConnectionParameters( credentials=credentials, host=podi_pikasetup.host, virtual_host=podi_pikasetup.vhost) ) channel = connection.channel() channel.queue_declare(queue=podi_pikasetup.jobstatus_queue, durable=True) debug_logger.debug("PIKA connection established!") enable_pika = True except: debug_logger.debug("No PIKA connection available") pass msg_received = 0 while True: try: try: record = queue.get() except KeyboardInterrupt, SystemExit: record = None except: raise if (record == None): break msg_received += 1 # Add some logic here #print "record-level:",record.levelno, record.levelname, msg_received if (enable_debug): #print "handling at debug level", record.msg debug_logger.handle(record) if ((record.levelno > logging.DEBUG) and (record.levelno <= logging.INFO) ): info.handle(record) #print "handling at info level" elif (record.levelno > logging.INFO): # logger = logging.getLogger(record.name) # print "msg",msg_received," --> ",record.msg #print "handling at root level" info.handle(record) # No level or filter logic applied - just do it! #print "done with record.\n" # # only sent select message via Pika, and only if Pika has been # initialized successfully # if (enable_pika and (record.levelno >= logging.INFO)): pika_msg = "%s: %s" % (record.name, record.msg) try: # use the optional formating routine to make the message # compatible with e.g. the IU-PPA system pika_msg = podi_pikasetup.format_msg(record) except: pass channel.basic_publish(exchange='', routing_key=podi_pikasetup.jobstatus_queue, properties=pika.BasicProperties(delivery_mode = 2),#persistent body=str(pika_msg) ) queue.task_done() except (KeyboardInterrupt, SystemExit): raise except: import sys, traceback print >> sys.stderr, 'Whoops! Problem:' traceback.print_exc(file=sys.stderr) if (enable_debug): print >>debugfile, "done with logging, closing file" debugfile.close() if (enable_pika): connection.close()
[docs]def podi_log_master_start(options): """ This function creates the logging sub-process that handles all log output. This function also prepares the necessary information so we can activate the multiprocessing-safe logging in all sub-processes """ queue = multiprocessing.JoinableQueue() listener = multiprocessing.Process(target=log_master, kwargs={"queue": queue, "options": options} ) listener.start() worker_setup = {"queue": queue, "configurer": log_slave_setup} log_master_info = {"queue": queue, "listener": listener } # Also start a logger for the main process podi_logger_setup(worker_setup) return log_master_info, worker_setup
[docs]def podi_log_master_quit(log_master_info): """ Shutdown the logging process """ log_master_info['queue'].put_nowait(None) try: log_master_info['listener'].join() except (KeyboardInterrupt, SystemExit): pass return
[docs]def podi_logger_setup(setup): """ This function re-directs all logging output to the logging queue that feeds the logging subprocess. """ if (setup == None): return # handler = logging.StreamHandler(sys.stdout) else: handler = QueueHandler(setup['queue']) # import sys # handler = logging.StreamHandler(stream=sys.stdout) logger = logging.getLogger() for h in logger.handlers: logger.removeHandler(h) logger.setLevel(logging.DEBUG) f = logging.Formatter('MYLOGGER = %(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') handler.setFormatter(f) logger.addHandler(handler) logger.propagate = True logger.debug("Started logging for process %s" % (multiprocessing.current_process().name)) return # def podi_getlogger(name, setup): # if (setup == None): # handler = logging.StreamHandler(sys.stdout) # else: # handler = QueueHandler(setup['queue']) # logger = logging.getLogger(name) # logger.addHandler(handler) # logger.setLevel(logging.DEBUG) # send all messages, for demo; no other level or filter logic applied. # logger.propagate = False # return logger
if __name__ == "__main__": if (cmdline_arg_isset("-pikalisten")): try: import pika print "Found PIKA module!" except ImportError: print "There's no pika package installed, quitting" sys.exit(0) try: import podi_pikasetup print "Found podi-setup for PIKA!" except ImportError: print "No podi-setup found." print "Maybe you need to run `cp podi_pikasetup.py.example podi_pikasetup.py ?" sys.exit(0) print "Trying to connect to AMQP server" try: credentials = pika.PlainCredentials(podi_pikasetup.user, podi_pikasetup.password) connection = pika.BlockingConnection( pika.ConnectionParameters( credentials=credentials, host=podi_pikasetup.host, virtual_host=podi_pikasetup.vhost) ) channel = connection.channel() channel.queue_declare(queue=podi_pikasetup.jobstatus_queue, durable=True) except: print "Connection failed!" sys.exit(0) print "PIKA connection established!" def callback(ch, method, properties, body): print " [o] %r" % (body,) channel.basic_consume(callback, queue=podi_pikasetup.jobstatus_queue, no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' try: channel.start_consuming() except (KeyboardInterrupt, SystemExit): print "\nShutting down listener, good bye!" except: pass sys.exit(0) else: import podi_collectcells options = podi_collectcells.read_options_from_commandline() log_master_info, log_setup = podi_log_master_start(options) # Setup the multi-processing-safe logging podi_logger_setup(options['log_setup']) workers = [] # for i in range(10): # worker = multiprocessing.Process(target=worker_process, kwargs=worker_log) # workers.append(worker) # worker.start() # for w in workers: # w.join() print log_setup for i in range(1): worker = multiprocessing.Process(target=test_worker_process, kwargs={"log_setup": log_setup}) # args=(worker_log)) workers.append(worker) worker.start() for w in workers: w.join() logger = logging.getLogger("main process") logger.info('test info') logger.debug('test debug') logger.critical('test critical') logger.error('test error') podi_log_master_quit(log_master_info) # queue.put_nowait(None) # listener.join()