mirror of
https://github.com/adulau/aha.git
synced 2024-12-27 11:16:11 +00:00
aha-worker parses now also the messages and put them in a log file
This commit is contained in:
parent
3859a6d83b
commit
b30d45db44
1 changed files with 53 additions and 12 deletions
|
@ -9,15 +9,29 @@
|
||||||
#for the aha tak to take the decisions
|
#for the aha tak to take the decisions
|
||||||
#The aha framework can be launched then in screen
|
#The aha framework can be launched then in screen
|
||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
import dircache,os.path,time,sys,ConfigParser,getopt
|
import dircache,os.path,time,sys,ConfigParser,getopt
|
||||||
|
from ahalib import *
|
||||||
|
|
||||||
class PeriodTaks():
|
class PeriodTaks():
|
||||||
|
#Define message types
|
||||||
|
FROM_KERNEL = 1
|
||||||
|
TO_KERNEL = 2
|
||||||
|
|
||||||
def __init__(self,outqueue,inqueue, timeout,sleeptime):
|
def __init__(self,outqueue,inqueue, timeout,sleeptime, logfile):
|
||||||
self.outqueue= outqueue
|
self.outqueue= outqueue
|
||||||
self.inqueue = inqueue
|
self.inqueue = inqueue
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.sleeptime = sleeptime
|
self.sleeptime = sleeptime
|
||||||
|
self.logfile = logfile
|
||||||
|
#Log file descriptor
|
||||||
|
self.lfd = open(logfile,'a')
|
||||||
|
self.aha = AHAActions(inqueue,outqueue)
|
||||||
|
|
||||||
|
#Make close action externally available
|
||||||
|
def closeLogFile(self):
|
||||||
|
self.lfd.close()
|
||||||
|
|
||||||
def remove_old_msg(self,queue):
|
def remove_old_msg(self,queue):
|
||||||
#Get current date if the files are older than the timeout remove them
|
#Get current date if the files are older than the timeout remove them
|
||||||
|
@ -29,9 +43,13 @@ class PeriodTaks():
|
||||||
t1 = int(s[os.path.stat.ST_CTIME])
|
t1 = int(s[os.path.stat.ST_CTIME])
|
||||||
delta = t0 - t1
|
delta = t0 - t1
|
||||||
if (delta > self.timeout):
|
if (delta > self.timeout):
|
||||||
#Old file was found remove it
|
#Old file was found record it
|
||||||
os.unlink(af)
|
if queue == self.outqueue:
|
||||||
|
self.record_message(af,t1,PeriodTaks.FROM_KERNEL)
|
||||||
|
if queue == self.inqueue:
|
||||||
|
self.record_message(af,t1,PeriodTaks.TO_KERNEL)
|
||||||
|
#Remove it
|
||||||
|
self.aha.silent_clean(af)
|
||||||
|
|
||||||
def clean_input_queue(self):
|
def clean_input_queue(self):
|
||||||
try:
|
try:
|
||||||
|
@ -46,6 +64,22 @@ class PeriodTaks():
|
||||||
except OSError,e:
|
except OSError,e:
|
||||||
sys.stderr.write(str(e))
|
sys.stderr.write(str(e))
|
||||||
|
|
||||||
|
#Parse the file an put the information in a log file for later processing
|
||||||
|
#One log file is handier than for each message a file
|
||||||
|
#Take timestamps when the kernel created the file
|
||||||
|
def record_message(self,filename, ctime,type):
|
||||||
|
try:
|
||||||
|
if type == PeriodTaks.FROM_KERNEL:
|
||||||
|
msg = self.aha.load_file(filename)
|
||||||
|
logEntry = self.aha.serializeKernelMessage(msg,ctime)
|
||||||
|
self.lfd.write(logEntry)
|
||||||
|
|
||||||
|
if type == PeriodTaks.TO_KERNEL:
|
||||||
|
msg = self.aha.get_kernel_reply(filename)
|
||||||
|
logEntry=self.aha.serializeAhaReply(msg,ctime)
|
||||||
|
self.lfd.write(logEntry)
|
||||||
|
except IOError,e:
|
||||||
|
sys.stderr.write('Failed to record message: %s\n'%filename)
|
||||||
|
|
||||||
def usage(exitcode):
|
def usage(exitcode):
|
||||||
print """
|
print """
|
||||||
|
@ -68,7 +102,7 @@ LICENSE
|
||||||
|
|
||||||
configfile = None
|
configfile = None
|
||||||
isHelp = 0
|
isHelp = 0
|
||||||
|
p = None
|
||||||
try:
|
try:
|
||||||
opts,args = getopt.getopt(sys.argv[1:],"hc:",["help","config="])
|
opts,args = getopt.getopt(sys.argv[1:],"hc:",["help","config="])
|
||||||
for o,a in opts:
|
for o,a in opts:
|
||||||
|
@ -76,6 +110,9 @@ try:
|
||||||
usage(0)
|
usage(0)
|
||||||
if o in ('--config','-c'):
|
if o in ('--config','-c'):
|
||||||
configfile = a
|
configfile = a
|
||||||
|
if configfile == None:
|
||||||
|
sys.stderr.write('A configuration file needs to be specified\n')
|
||||||
|
sys.exit(1)
|
||||||
#Load config file and get opts
|
#Load config file and get opts
|
||||||
c=ConfigParser.ConfigParser()
|
c=ConfigParser.ConfigParser()
|
||||||
c.read(configfile)
|
c.read(configfile)
|
||||||
|
@ -83,20 +120,24 @@ try:
|
||||||
sleeptime = int(c.get('worker','sleeptime'))
|
sleeptime = int(c.get('worker','sleeptime'))
|
||||||
inqueue = c.get('common','inqueue')
|
inqueue = c.get('common','inqueue')
|
||||||
outqueue= c.get('common','outqueue')
|
outqueue= c.get('common','outqueue')
|
||||||
p = PeriodTaks(outqueue, inqueue, timeout,sleeptime)
|
logfile = c.get('worker','logfile')
|
||||||
|
p = PeriodTaks(outqueue, inqueue, timeout,sleeptime,logfile)
|
||||||
print "Start working ..."
|
print "Start working ..."
|
||||||
while True:
|
while True:
|
||||||
p.clean_input_queue()
|
p.clean_input_queue()
|
||||||
p.clean_output_queue()
|
p.clean_output_queue()
|
||||||
time.sleep(sleeptime)
|
time.sleep(sleeptime)
|
||||||
|
print "Resume ..."
|
||||||
|
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
except getopt.GetoptError,e:
|
except getopt.GetoptError,e:
|
||||||
usage(1)
|
usage(1)
|
||||||
except TypeError,e:
|
except ConfigParser.NoOptionError,e:
|
||||||
sys.stderr.write('Configuration file error\n')
|
sys.stderr.write('Configuration error. (%s)\n'%(str(e)))
|
||||||
except KeyboardInterrupt,e:
|
|
||||||
sys.exit(0)
|
|
||||||
sys.stderr.write(str(e))
|
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
except KeyboardInterrupt,e:
|
||||||
|
if p !=None:
|
||||||
|
p.closeLogFile()
|
||||||
|
sys.exit(0)
|
||||||
|
#Should not be reached
|
||||||
|
sys.exit(0)
|
||||||
|
|
Loading…
Reference in a new issue