Rollback to primitive aha-worker: Task only record files

This commit is contained in:
Gerard Wagener 2010-01-20 18:22:58 +01:00
parent 51da827445
commit 9f5b296497

View file

@ -9,18 +9,16 @@
#for the aha tak to take the decisions #for the aha tak to take the decisions
#The aha framework can be launched then in screen #The aha framework can be launched then in screen
# #
#TODO implement signal handler HUP flushes the file
import dircache,os.path,time,sys,ConfigParser,getopt, traceback
import dircache,os.path,time,sys,ConfigParser,getopt
from ahalib import * from ahalib import *
class PeriodTaks(): class PeriodTaks():
#Define message types #Define message types
FROM_KERNEL = 1 FROM_KERNEL = 1
TO_KERNEL = 2 TO_KERNEL = 2
def debug(self,msg):
print "WDBG ",msg
def __init__(self,outqueue,inqueue, timeout,sleeptime, logfile): def __init__(self,outqueue,inqueue, timeout,sleeptime, logfile):
self.outqueue= outqueue self.outqueue= outqueue
self.inqueue = inqueue self.inqueue = inqueue
@ -30,103 +28,41 @@ class PeriodTaks():
#Log file descriptor #Log file descriptor
self.lfd = open(logfile,'a') self.lfd = open(logfile,'a')
self.aha = AHAActions(inqueue,outqueue) self.aha = AHAActions(inqueue,outqueue)
#Processtree related stuff
self.ptree = ProcessTrees()
#Make close action externally available #Make close action externally available
def closeLogFile(self): def closeLogFile(self):
self.lfd.close() self.lfd.close()
def remove_old_msg(self,queue): def remove_old_msg(self,queue):
msg = None
#Get current date if the files are older than the timeout remove them #Get current date if the files are older than the timeout remove them
t0 = int(time.strftime("%s")) t0 = int(time.strftime("%s"))
files = dircache.listdir(queue) files = dircache.listdir(queue)
mlist = []
for file in files: for file in files:
af = queue + os.sep + file af = queue + os.sep + file
#self.debug("found file : %s"%af)
s = os.stat(af) s = os.stat(af)
t1 = int(s[os.path.stat.ST_CTIME]) t1 = int(s[os.path.stat.ST_CTIME])
delta = t0 - t1 delta = t0 - t1
if (delta > self.timeout): if (delta > self.timeout):
#self.debug("%s exceeds threshold"%af)
#Old file was found record it #Old file was found record it
if queue == self.outqueue: if queue == self.outqueue:
msg = self.record_message(af,t1,PeriodTaks.FROM_KERNEL) self.record_message(af,t1,PeriodTaks.FROM_KERNEL)
mlist.append(msg)
if queue == self.inqueue: if queue == self.inqueue:
msg = self.record_message(af,t1,PeriodTaks.TO_KERNEL) self.record_message(af,t1,PeriodTaks.TO_KERNEL)
mlist.append(msg)
#Remove it #Remove it
self.aha.silent_clean(af) self.aha.silent_clean(af)
return mlist
def clean_input_queue(self): def clean_input_queue(self):
try: try:
self.remove_old_msg(self.inqueue) self.remove_old_msg(self.inqueue)
except OSError,e: except OSError,e:
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() sys.stderr.write(str(e))
traceback.print_exception(exceptionType, exceptionValue,
exceptionTraceback, file=sys.stderr)
traceback.print_tb(exceptionTraceback, file=sys.stdout)
def maintain_process_tree(self,mlist,exportFile):
if mlist == None:
return
for msg in mlist:
self.handle_msg(msg,exportFile)
def handle_msg(self,msg,exportFile):
try:
if msg:
type = int(msg['type'][0])
pid = int(msg['pid'][0])
ppid = int(msg['ppid'][0])
#sys_execve messages
if (type == 1):
self.debug('Got sys_execve message')
#Is there a new user
file = msg['file'][0]
self.debug('Got command: %s, pid=%d,ppid=%d'%(file,pid,ppid))
self.ptree.annotateProcessList(msg)
if file == '/usr/sbin/sshd':
self.debug("New user found %s"%pid)
self.ptree.addUser(pid)
#Annotate all the processes
#Check all pids and ppids
if self.ptree.searchTree(pid,ppid):
self.debug("User related command %d"%pid)
else:
self.debug("System related command")
#TODO free annotated list
# Remove dead processes from process tree
if (type == 3):
pid = int(msg['pid'][0])
#When the attacker disconnects, regenerate a status file
if self.ptree.userList.has_key(pid):
print "User disconnected export file"
self.ptree.exportUserListTxt(exportFile)
#self.ptree.silent_remove_pid(pid)
except KeyError,e:
print e
except ValueError,e:
print e
except IndexError,e:
print e
def clean_output_queue(self): def clean_output_queue(self):
try: try:
mlist = self.remove_old_msg(self.outqueue) self.remove_old_msg(self.outqueue)
#Propagate message list for further processor
return mlist
except OSError,e: except OSError,e:
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() sys.stderr.write(str(e))
traceback.print_exception(exceptionType, exceptionValue,
exceptionTraceback, file=sys.stderr)
traceback.print_tb(exceptionTraceback, file=sys.stdout)
#Parse the file an put the information in a log file for later processing #Parse the file an put the information in a log file for later processing
#One log file is handier than for each message a file #One log file is handier than for each message a file
@ -137,16 +73,13 @@ class PeriodTaks():
msg = self.aha.load_file(filename) msg = self.aha.load_file(filename)
logEntry = self.aha.serializeKernelMessage(msg,filename,ctime) logEntry = self.aha.serializeKernelMessage(msg,filename,ctime)
self.lfd.write(logEntry) self.lfd.write(logEntry)
return msg
if type == PeriodTaks.TO_KERNEL: if type == PeriodTaks.TO_KERNEL:
msg = self.aha.get_kernel_reply(filename) msg = self.aha.get_kernel_reply(filename)
logEntry=self.aha.serializeAhaReply(msg,filename,ctime) logEntry=self.aha.serializeAhaReply(msg,filename,ctime)
self.lfd.write(logEntry) self.lfd.write(logEntry)
return msg
except IOError,e: except IOError,e:
sys.stderr.write('Failed to record message: %s\n'%filename) sys.stderr.write('Failed to record message: %s\n'%filename)
return mlist
def usage(exitcode): def usage(exitcode):
print """ print """
@ -188,15 +121,11 @@ try:
inqueue = c.get('common','inqueue') inqueue = c.get('common','inqueue')
outqueue= c.get('common','outqueue') outqueue= c.get('common','outqueue')
logfile = c.get('worker','logfile') logfile = c.get('worker','logfile')
userlistFile = c.get('worker','exportdir') + os.sep + 'userlist'
p = PeriodTaks(outqueue, inqueue, timeout,sleeptime,logfile) p = PeriodTaks(outqueue, inqueue, timeout,sleeptime,logfile)
print "Start working ..." print "Start working ..."
while True: while True:
p.clean_input_queue() p.clean_input_queue()
mlist = p.clean_output_queue() p.clean_output_queue()
p.maintain_process_tree(mlist,userlistFile)
time.sleep(sleeptime) time.sleep(sleeptime)
print "Resume ..." print "Resume ..."