From 2c94e1b920345900fb45214c2dd4eb71dda463b1 Mon Sep 17 00:00:00 2001 From: aaronkaplan Date: Tue, 27 Apr 2021 15:31:13 +0200 Subject: [PATCH] Add a very simple parse for COF in python including validation of the format. --- example_code/python/README.md | 6 +++ example_code/python/__init__.py | 0 example_code/python/cofparser.py | 66 ++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 example_code/python/README.md create mode 100644 example_code/python/__init__.py create mode 100644 example_code/python/cofparser.py diff --git a/example_code/python/README.md b/example_code/python/README.md new file mode 100644 index 0000000..21c1c98 --- /dev/null +++ b/example_code/python/README.md @@ -0,0 +1,6 @@ +# Example python parser + +This little package can parse the Passive DNS Common Output Format (COF) and validate it. +It is given as example code. + + diff --git a/example_code/python/__init__.py b/example_code/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example_code/python/cofparser.py b/example_code/python/cofparser.py new file mode 100644 index 0000000..24217dc --- /dev/null +++ b/example_code/python/cofparser.py @@ -0,0 +1,66 @@ +""" +Example passive DNS Common Output Format]() parser. +It will parse the JSON file and validate it. + +Author: Aaron Kaplan +Copyright 2021, all rights reserved. + +License: AGPL v3. See https://www.gnu.org/licenses/agpl-3.0.en.html +""" + +import sys +import json # maybe use an ndjson library... + + +def is_valid(d: dict) -> bool: + # Check MANDATORY fields according to COF + if "rrname" not in d: + print("Missing MANDATORY field 'rrname'", file=sys.stderr) + return False + if not isinstance(d['rrname'], str): + print("Type error: 'rrname' is not a JSON string", file=sys.stderr) + return False + if "rrtype" not in d: + print("Missing MANDATORY field 'rrtype'", file=sys.stderr) + return False + if not isinstance(d['rrtype'], str): + print("Type error: 'rrtype' is not a JSON string", file=sys.stderr) + return False + if "rdata" not in d: + print("Missing MANDATORY field 'rdata'", file=sys.stderr) + return False + if "rdata" not in d: + print("Missing MANDATORY field 'rdata'", file=sys.stderr) + return False + if not isinstance(d['rdata'], str) and not isinstance(d['rdata'], list): + print("'rdata' is not a list and not a string.", file=sys.stderr) + return False + if not ("time_first" in d and "time_last" in d) or ("zone_time_first" in d and "zone_time_last" in d): + print("We are missing EITHER ('first_seen' and 'last_seen') OR ('zone_time_first' and zone_time_last') fields") + return False + # currently we don't check the OPTIONAL fields. Sorry... to be done later. + return True + + +def parse_line(input: str) -> dict: + d = None + try: + d = json.loads(input) + if not is_valid(d): + print("Warning: line %s does not conform to the COF standard." % input) + except Exception as ex: + print("error. Could not parse input '%s'. Reason: '%s'" %(input, str(ex)), file=sys.stderr) + return d + + +def parse_lines(multilines: str): + for line in multilines.split('\n'): + yield parse_line(line) + + +if __name__ == "__main__": + mock_input = """{"count":1909,"rdata":["cpa.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1315586409","time_last":"1449566799"} +{"count":2560,"rdata":["cpab.circl.lu"],"rrname":"www.circl.lu","rrtype":"CNAME","time_first":"1449584660","time_last":"1617676151"}""" + + for result in parse_lines(mock_input): + print("result: %r" % result)