mirror of
https://github.com/adulau/mmdb-server.git
synced 2024-11-07 11:36:24 +00:00
77 lines
2 KiB
Python
77 lines
2 KiB
Python
|
#!/usr/bin/env python3
|
||
|
#
|
||
|
# mmdb-server is an open source fast API server to lookup IP addresses for their geographic location.
|
||
|
#
|
||
|
# The server is released under the AGPL version 3 or later.
|
||
|
#
|
||
|
# Copyright (C) 2022 Alexandre Dulaunoy
|
||
|
|
||
|
import configparser
|
||
|
import time
|
||
|
from ipaddress import ip_address
|
||
|
from wsgiref.simple_server import make_server
|
||
|
|
||
|
import falcon
|
||
|
import maxminddb
|
||
|
|
||
|
version = "0.1"
|
||
|
config = configparser.ConfigParser()
|
||
|
config.read('../etc/server.conf')
|
||
|
mmdb_file = config['global'].get('mmdb_file')
|
||
|
pubsub = config['global'].getboolean('lookup_pubsub')
|
||
|
if pubsub:
|
||
|
import redis
|
||
|
|
||
|
rdb = redis.Redis(host='127.0.0.1')
|
||
|
|
||
|
meta = {}
|
||
|
q = maxminddb.open_database(mmdb_file, maxminddb.MODE_MEMORY)
|
||
|
meta['description'] = q.metadata().description
|
||
|
meta['build_db'] = time.strftime(
|
||
|
'%Y-%m-%d %H:%M:%S', time.localtime(q.metadata().build_epoch)
|
||
|
)
|
||
|
meta['db_source'] = q.metadata().database_type
|
||
|
meta['nb_nodes'] = q.metadata().node_count
|
||
|
|
||
|
|
||
|
def validIPAddress(IP: str) -> bool:
|
||
|
try:
|
||
|
type(ip_address(IP))
|
||
|
return True
|
||
|
except ValueError:
|
||
|
return False
|
||
|
|
||
|
|
||
|
def pubLookup(value: str) -> bool:
|
||
|
if not pubsub:
|
||
|
return False
|
||
|
rdb.publish('mmdb-server::lookup', f'{value}')
|
||
|
return True
|
||
|
|
||
|
|
||
|
class GeoLookup:
|
||
|
def on_get(self, req, resp, value):
|
||
|
ret = []
|
||
|
ua = req.get_header('User-Agent')
|
||
|
ips = req.access_route
|
||
|
if not validIPAddress(value):
|
||
|
resp.status = falcon.HTTP_422
|
||
|
resp.media = "IPv4 or IPv6 address is in an incorrect format. Dotted decimal for IPv4 or textual representation for IPv6 are required."
|
||
|
return
|
||
|
pubLookup(value=f'{value} via {ips} using {ua}')
|
||
|
georesult = q.get(value)
|
||
|
ret.append(georesult)
|
||
|
georesult['meta'] = meta
|
||
|
resp.media = ret
|
||
|
return
|
||
|
|
||
|
|
||
|
app = falcon.App()
|
||
|
|
||
|
app.add_route('/geolookup/{value}', GeoLookup())
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
with make_server('', 8000, app) as httpd:
|
||
|
print('Serving on port 8000...')
|
||
|
httpd.serve_forever()
|