A filter option added to allow filtering messages based on a key and a

value.

You can now filter with a list of -o option to filter the key and
values that you are interested in. The other values will be discarded.

python3 pdns.py -o Type,A -o Type,AAAA --debug
This commit is contained in:
Alexandre Dulaunoy 2017-04-23 11:35:50 +02:00
parent 377119de6f
commit d83eafd16e

View file

@ -17,16 +17,19 @@ from ripe.atlas.sagan import DnsResult
import dns.message import dns.message
import base64 import base64
import argparse import argparse
import logging
# Fields name are different in sagan and cousteau for parsed DNS # Fields name are different in sagan and cousteau for parsed DNS
fields = ['TYPE', 'NAME'] fields = ['TYPE', 'NAME']
fieldsSagan = ['Type', 'Name', 'TTL', 'Class', 'Serial', 'Rname', 'MasterServerName', 'MaintainerName', 'Data'] fieldsSagan = ['Type', 'Name', 'TTL', 'Class', 'Serial', 'Rname', 'MasterServerName', 'MaintainerName', 'Data']
filters = None
defaultloglevel = logging.INFO
def process_answers(data=None, sagan=False): def process_answers(data=None, sagan=False):
data = filterout(filters=filters, data=data)
if data is None: if data is None:
return False return False
if args.debug: if args.debug:
print ("{}".format(data)) print("{}".format(data))
if not sagan: if not sagan:
for answer in data: for answer in data:
for field in fields: for field in fields:
@ -42,6 +45,21 @@ def truncating(default=50):
r.truncating r.truncating
#ZREMRANGEBYRANK name 20 -1 #ZREMRANGEBYRANK name 20 -1
def filterout(filters=None, data=None):
if filters is None:
return data
flag = False
for filter_rule in filters:
k, v = filter_rule
if k in data:
if v in data[k]:
if data[k] == v:
flag = True
if flag is False:
return None
else:
return data
def on_result_response(*args ): def on_result_response(*args ):
#Store probe statistics in Redis #Store probe statistics in Redis
#print (args[0]['prb_id']) #print (args[0]['prb_id'])
@ -49,7 +67,9 @@ def on_result_response(*args ):
result = args[0]['result'] result = args[0]['result']
if 'answers' in result: if 'answers' in result:
#process_answers(data=result['answers']) #process_answers(data=result['answers'])
logging.disable(logging.WARNING)
res = DnsResult.get(args[0],parse_buf=True) res = DnsResult.get(args[0],parse_buf=True)
logging.basicConfig(level=defaultloglevel)
if res.is_error: if res.is_error:
return True return True
if (res.responses[0].abuf.answers): if (res.responses[0].abuf.answers):
@ -60,7 +80,9 @@ def on_result_response(*args ):
else: else:
# Some of the records are not automatically decoded and need to pass # Some of the records are not automatically decoded and need to pass
# into ripe.atlas.sagan firat # into ripe.atlas.sagan firat
logging.disable(logging.WARNING)
res = DnsResult.get(args[0],parse_buf=True) res = DnsResult.get(args[0],parse_buf=True)
logging.basicConfig(level=defaultloglevel)
if res.is_error: if res.is_error:
return True return True
if (res.responses[0].abuf.answers): if (res.responses[0].abuf.answers):
@ -69,21 +91,35 @@ def on_result_response(*args ):
#print (answer['raw_data']) #print (answer['raw_data'])
## TO add in debug option dns.message verus sagan (sagan seems faster) ## TO add in debug option dns.message verus sagan (sagan seems faster)
#print (dns.message.from_wire(base64.b64decode(args[0]['result']['abuf']))) #print (dns.message.from_wire(base64.b64decode(args[0]['result']['abuf'])))
print ('no answers')
parser = argparse.ArgumentParser(description='passive-dns-atlas') parser = argparse.ArgumentParser(description='passive-dns-atlas')
parser.add_argument("-d","--debug", default=False, action='store_true') parser.add_argument("-d","--debug", default=False, action='store_true')
parser.add_argument("-t","--timeout", default=400, type=float, help="set atlas stream timeout, default is 400 sec") parser.add_argument("-t","--timeout", default=400, type=float, help="set atlas stream timeout, default is 400 sec")
parser.add_argument("-o","--only", action='append', default=None, help="set a filter to allow check a specifity key and value in DNS. IN,A")
args = parser.parse_args() args = parser.parse_args()
if args.debug:
defaultloglevel = logging.DEBUG
logging.basicConfig(level=defaultloglevel)
if args.only is not None:
filters = []
for v in args.only:
filters.append(v.split(","))
logging.info("Filter applied: {}".format(filters))
atlas_stream = AtlasStream() atlas_stream = AtlasStream()
atlas_stream.connect() atlas_stream.connect()
channel = "atlas_result" channel = "atlas_result"
#channel = "atlas_subscribe"
atlas_stream.bind_channel(channel, on_result_response) atlas_stream.bind_channel(channel, on_result_response)
stream_parameters = {"type": "dns"} stream_parameters = {"type": "dns"}
#stream_parameters = {"startTime": 1489568000, "stopTime": 1489569100, "msm": 30001}
atlas_stream.start_stream(stream_type="result", **stream_parameters) atlas_stream.start_stream(stream_type="result", **stream_parameters)
atlas_stream.timeout(seconds=args.timeout) atlas_stream.timeout(seconds=args.timeout)