chg: [domainclassifier] clean-up code

This commit is contained in:
Alexandre Dulaunoy 2022-07-30 15:51:06 +02:00
parent 115c48f65c
commit 8debd6c6b7
Signed by: adulau
GPG key ID: 09E2CD4944E6CBCD

View file

@ -12,16 +12,16 @@ import time
from datetime import date, timedelta from datetime import date, timedelta
try: try:
#python 3 # python 3
import urllib.request as urllib import urllib.request as urllib
except: except:
#python 2 # python 2
import urllib2 as urllib import urllib2 as urllib
try: try:
from pybgpranking import BGPRanking from pybgpranking import BGPRanking
except: except:
print ("pybgpranking is not installed - ranking of ASN values won't be possible") print("pybgpranking is not installed - ranking of ASN values won't be possible")
__author__ = "Alexandre Dulaunoy" __author__ = "Alexandre Dulaunoy"
__copyright__ = "Copyright 2012-2021, Alexandre Dulaunoy" __copyright__ = "Copyright 2012-2021, Alexandre Dulaunoy"
__license__ = "AGPL version 3" __license__ = "AGPL version 3"
@ -34,7 +34,7 @@ class Extract:
from a rawtext stream. When call, the rawtext parameter is a string from a rawtext stream. When call, the rawtext parameter is a string
containing the raw data to be process.""" containing the raw data to be process."""
def __init__(self, rawtext=None, nameservers=['8.8.8.8'], port= 53): def __init__(self, rawtext=None, nameservers=['8.8.8.8'], port=53):
self.rawtext = rawtext self.rawtext = rawtext
self.presolver = dns.resolver.Resolver() self.presolver = dns.resolver.Resolver()
self.presolver.nameservers = nameservers self.presolver.nameservers = nameservers
@ -52,7 +52,11 @@ class Extract:
def __origin(self, ipaddr=None): def __origin(self, ipaddr=None):
if ipaddr: if ipaddr:
clook = IPy.IP(str(ipaddr)).reverseName().replace('.in-addr.arpa.', '.origin.asn.cymru.com') clook = (
IPy.IP(str(ipaddr))
.reverseName()
.replace('.in-addr.arpa.', '.origin.asn.cymru.com')
)
try: try:
a = self.presolver.query(clook, 'TXT') a = self.presolver.query(clook, 'TXT')
except dns.resolver.NXDOMAIN: except dns.resolver.NXDOMAIN:
@ -62,23 +66,30 @@ class Extract:
if a: if a:
x = str(a[0]).split("|") x = str(a[0]).split("|")
# why so many spaces? # why so many spaces?
x = list( map(lambda t: t.replace("\"", "").strip(), x) ) x = list(map(lambda t: t.replace("\"", "").strip(), x))
return (x[0], x[2], a[0]) return (x[0], x[2], a[0])
else: else:
return None return None
"""__bgpanking return the ranking the float value of an ASN. """__bgpanking return the ranking the float value of an ASN.
""" """
def __bgpranking(self, asn=None): def __bgpranking(self, asn=None):
if asn: if asn:
bgpranking = BGPRanking() bgpranking = BGPRanking()
value = bgpranking.query(asn, date=(date.today() - timedelta(1)).isoformat()) value = bgpranking.query(
asn, date=(date.today() - timedelta(1)).isoformat()
)
return value['response']['ranking']['rank'] return value['response']['ranking']['rank']
def __updatelisttld(self): def __updatelisttld(self):
ianatldlist = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt" ianatldlist = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
req = urllib.Request(ianatldlist) req = urllib.Request(ianatldlist)
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0') req.add_header(
tlds = ( urllib.urlopen(req).read() ).decode('utf8') 'User-Agent',
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0',
)
tlds = (urllib.urlopen(req).read()).decode('utf8')
tlds = tlds.split("\n") tlds = tlds.split("\n")
for tld in tlds: for tld in tlds:
self.listtld.append(tld.lower()) self.listtld.append(tld.lower())
@ -104,10 +115,12 @@ class Extract:
self.vdomain = [] self.vdomain = []
return True return True
return False return False
"""potentialdomain method extracts potential domains matching any """potentialdomain method extracts potential domains matching any
string that is a serie of string with maximun 63 character separated by a string that is a serie of string with maximun 63 character separated by a
dot. The method used the rawtext defined at the instantiation of the class. dot. The method used the rawtext defined at the instantiation of the class.
This return a list of a potential domain.""" This return a list of a potential domain."""
def potentialdomain(self, validTLD=True): def potentialdomain(self, validTLD=True):
self.domain = [] self.domain = []
domain = re.compile(r'\b([a-zA-Z\d-]{,63}(\.[a-zA-Z\d-]{,63})+)\b') domain = re.compile(r'\b([a-zA-Z\d-]{,63}(\.[a-zA-Z\d-]{,63})+)\b')
@ -124,7 +137,12 @@ class Extract:
returns a list of existing domain. If the extended flag is true, a set is returns a list of existing domain. If the extended flag is true, a set is
return with the associated DNS resources found.""" return with the associated DNS resources found."""
def validdomain(self, rtype=['A', 'AAAA', 'SOA', 'MX', 'CNAME'], extended=True, passive_dns=False): def validdomain(
self,
rtype=['A', 'AAAA', 'SOA', 'MX', 'CNAME'],
extended=True,
passive_dns=False,
):
if extended is False: if extended is False:
self.vdomain = set() self.vdomain = set()
else: else:
@ -143,7 +161,17 @@ class Extract:
rrset = answers.rrset.to_text().splitlines() rrset = answers.rrset.to_text().splitlines()
for dns_resp in rrset: for dns_resp in rrset:
dns_resp = dns_resp.split() dns_resp = dns_resp.split()
passive_dns_out = '{}||127.0.0.1||{}||{}||{}||{}||{}||{}||1\n'.format(time.time(), self.presolver.nameservers[0], dns_resp[2], domain, dnstype, dns_resp[4], answers.ttl) passive_dns_out = (
'{}||127.0.0.1||{}||{}||{}||{}||{}||{}||1\n'.format(
time.time(),
self.presolver.nameservers[0],
dns_resp[2],
domain,
dnstype,
dns_resp[4],
answers.ttl,
)
)
self.vdomain.add((passive_dns_out)) self.vdomain.add((passive_dns_out))
elif extended: elif extended:
self.vdomain.append((domain, dnstype, answers[0])) self.vdomain.append((domain, dnstype, answers[0]))
@ -188,7 +216,7 @@ class Extract:
orig = self.__origin(ipaddr=dom[2])[1] orig = self.__origin(ipaddr=dom[2])[1]
except: except:
continue continue
if(orig == cc): if orig == cc:
self.localdom.append(dom) self.localdom.append(dom)
elif dom[1] == 'CNAME': elif dom[1] == 'CNAME':
cname = str(dom[2]) cname = str(dom[2])
@ -197,7 +225,7 @@ class Extract:
orig = self.__origin(ipaddr=ip)[1] orig = self.__origin(ipaddr=ip)[1]
except: except:
continue continue
if(orig == cc): if orig == cc:
self.localdom.append(dom) self.localdom.append(dom)
return self.localdom return self.localdom
@ -276,32 +304,37 @@ class Extract:
if type(dom) == tuple: if type(dom) == tuple:
dom = dom[0] dom = dom[0]
if includefilter.search(dom): if includefilter.search(dom):
self.cleandomain.append(dom) self.cleandomain.append(dom)
return set(self.cleandomain) return set(self.cleandomain)
if __name__ == "__main__": if __name__ == "__main__":
c = Extract(rawtext="www.foo.lu www.xxx.com this is a text with a domain called test@foo.lu another test abc.lu something a.b.c.d.e end of 1.2.3.4 foo.be www.belnet.be http://www.cert.be/ www.public.lu www.allo.lu quuxtest www.eurodns.com something-broken-www.google.com www.google.lu trailing test www.facebook.com www.nic.ru www.youporn.com 8.8.8.8 201.1.1.1 abc.dontexist") c = Extract(
c.text(rawtext="www.abc.lu www.xxx.com random text a test bric broc www.lemonde.fr www.belnet.be www.foo.be") rawtext="www.foo.lu www.xxx.com this is a text with a domain called test@foo.lu another test abc.lu something a.b.c.d.e end of 1.2.3.4 foo.be www.belnet.be http://www.cert.be/ www.public.lu www.allo.lu quuxtest www.eurodns.com something-broken-www.google.com www.google.lu trailing test www.facebook.com www.nic.ru www.youporn.com 8.8.8.8 201.1.1.1 abc.dontexist"
print (c.potentialdomain()) )
print (c.potentialdomain(validTLD=True)) c.text(
print (c.validdomain(extended=True)) rawtext="www.abc.lu www.xxx.com random text a test bric broc www.lemonde.fr www.belnet.be www.foo.be"
print ("US:") )
print (c.localizedomain(cc='US')) print(c.potentialdomain())
print ("LU:") print(c.potentialdomain(validTLD=True))
print (c.localizedomain(cc='LU')) print(c.validdomain(extended=True))
print ("BE:") print("US:")
print (c.localizedomain(cc='BE')) print(c.localizedomain(cc='US'))
print ("Ranking:") print("LU:")
print (c.rankdomain()) print(c.localizedomain(cc='LU'))
print ("List of ip addresses:") print("BE:")
print (c.ipaddress(extended=False)) print(c.localizedomain(cc='BE'))
print ("Include dot.lu:") print("Ranking:")
print (c.include(expression=r'\.lu$')) print(c.rankdomain())
print ("Exclude dot.lu:") print("List of ip addresses:")
print (c.exclude(expression=r'\.lu$')) print(c.ipaddress(extended=False))
print("Include dot.lu:")
print(c.include(expression=r'\.lu$'))
print("Exclude dot.lu:")
print(c.exclude(expression=r'\.lu$'))
c.text(rawtext="www.lwn.net www.undeadly.org") c.text(rawtext="www.lwn.net www.undeadly.org")
print (c.potentialdomain(validTLD=True)) print(c.potentialdomain(validTLD=True))
c.validdomain() c.validdomain()
print (c.localizedomain(cc='US')) print(c.localizedomain(cc='US'))
print(c.validdomain(extended=False, passive_dns=True)) print(c.validdomain(extended=False, passive_dns=True))