crl-monitor/client/pypssl/api.py

77 lines
2.6 KiB
Python
Raw Permalink Normal View History

2015-02-24 12:20:56 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import socket
2015-08-27 12:34:21 +00:00
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
2015-08-27 12:29:59 +00:00
import dateutil.parser
2015-02-24 12:20:56 +00:00
class PyPSSL(object):
2015-08-27 12:29:59 +00:00
def __init__(self, base_url='https://www.circl.lu/', api_version=2, basic_auth=None, auth_token=None):
self.base_url = base_url
self.api_version = api_version
2015-02-24 12:20:56 +00:00
self.session = requests.Session()
if basic_auth is not None:
# basic_auth has do be a tuple ('user_name', 'password')
self.session.auth = basic_auth
elif auth_token is not None:
self.session.headers.update({'Authorization': auth_token})
else:
# No authentication defined.
pass
2021-04-14 14:28:55 +00:00
def _query(self, url, timeout=None):
response = self.session.get(url, timeout=timeout)
2015-08-27 12:29:59 +00:00
try:
return response.json()
except:
raise Exception('Unable to decode JSON object: ' + response.text)
2015-02-24 12:20:56 +00:00
def _check_IP(self, ip):
if ':' in ip:
return {'error': 'IPv6 is not (yet) supported'}
splitted = ip.split('/')
try:
if len(splitted) == 2:
ip, block = splitted
if int(block) < 23:
return {'error': 'Maximum CIDR block size reached >/23'}
socket.inet_aton(ip)
except:
return {'error': 'Incorrect format'}
return None
2021-04-14 14:28:55 +00:00
def query(self, q, timeout=None):
2015-02-24 12:20:56 +00:00
check = self._check_IP(q)
if check is not None:
return check
2015-08-27 12:29:59 +00:00
if self.api_version == 1:
path = 'pssl/query/{}'.format(q)
else:
path = 'v2pssl/query/{}'.format(q)
2021-04-14 14:28:55 +00:00
return self._query(urljoin(self.base_url, path), timeout=timeout)
2015-08-27 12:29:59 +00:00
2021-04-14 14:28:55 +00:00
def query_cert(self, q, timeout=None):
2015-08-27 12:29:59 +00:00
if self.api_version != 2:
return {'error': 'Only available in API v2'}
path = 'v2pssl/cquery/{}'.format(q)
2021-04-14 14:28:55 +00:00
return self._query(urljoin(self.base_url, path), timeout=timeout)
2015-08-27 12:29:59 +00:00
2021-04-14 14:28:55 +00:00
def fetch_cert(self, q, make_datetime=True, timeout=None):
2015-08-27 12:29:59 +00:00
if self.api_version != 2:
return {'error': 'Only available in API v2'}
path = 'v2pssl/cfetch/{}'.format(q)
2021-04-14 14:28:55 +00:00
response = self._query(urljoin(self.base_url, path), timeout=timeout)
2015-08-27 12:29:59 +00:00
if response.get('error') or not make_datetime:
return response
# create python datetime, doesn't return a json object
response['info']['not_before'] = dateutil.parser.parse(response['info']['not_before'])
response['info']['not_after'] = dateutil.parser.parse(response['info']['not_after'])
return response