hashlookup-server/bin/import-poc/import-hashlookup-server.py

246 lines
9.6 KiB
Python

from pathlib import Path
import pathlab
import zipfile
import wget
import sys
import redis
import json
import time
import argparse
class import_hash:
def __init__(self):
with open('config.json') as config_file:
data = json.load(config_file)
self.hash_datasets = data["nsrl_downloads"]
self.max_value = data["import"]["max_value"]
self.mod_lines = data["import"]["mod_lines"]
self.local_path = data["local_path"]
redis_host = data["redis"]["hostname"]
redis_port = data["redis"]["port"]
self.flushrdb = data["redis"]["flushdb_on_init"]
self.rdb = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
def download(self, dataset=False):
""" Download a dataset
:param dataset: The dataset to use. This is a key looked for in the config.json file to get the correct download URL
"""
if not dataset:
self.error("no dataset")
print("**DOWNLOAD** dataset {0} from {1} to {2} .".format(dataset, self.hash_datasets[dataset]["url"], self.local_path))
wget.download(self.hash_datasets[dataset]["url"], self.local_path)
print("\nDownload completed.")
def __process_nsrl_support(self, isofile, dataset_file, key):
""" Process support NSRL data (OS, Product, Vendor/Manufacturer)
:param isofile: The object to the ISO file. When set to False this indicates the NSRL is provided in a ZIP format
:param dataset_file: The location of the dataset, is either a path in the ISO or a direct filepath
:param key: type of support NSRL data
"""
print("\n Work on {0}".format(dataset_file))
if isofile:
with_element = isofile.IsoPath("/{0}".format(dataset_file)).open()
else:
with_element = open(dataset_file, encoding='utf-8')
ln = 0
with with_element as f:
while True:
l = f.readline()
if not l:
break
if ln == 0:
headers = l.rstrip().replace("\"", "").split(",")
else:
records = l.rstrip().replace("\"", "").split(",")
drecords = {}
for index, value in enumerate(records):
try:
drecords[headers[index]] = value
except:
continue
self.rdb.sadd("s:{0}".format(key), drecords[key])
self.rdb.hmset("h-{0}:{1}".format(key, drecords[key]), drecords)
stat_import_key = dataset_file[dataset_file.rfind("/")+1:dataset_file.rfind(".txt")]
self.rdb.incrby("stat:{0}-import".format(stat_import_key))
if ln % self.mod_lines == 0:
print(" Imported {0} records.".format(ln))
if ln == self.max_value:
break
ln = ln + 1
print(" Finished, importing {0} records.".format(ln))
def __process_nsrl_base(self, isofile, dataset_file, rdbkey):
""" Process base NSRL data (file hashes)
:param isofile: The object to the ISO file. When set to False this indicates the NSRL is provided in a ZIP format
:param dataset_file: The location of the dataset, is either a path in the ISO or a direct filepath
:param rdbkey: redis database key (corresponds with key of dataset in config.json)
"""
print("\n Work on {0}".format(dataset_file))
if isofile:
# We received the NSRL dataset as an ISO file
# First get the ZIP from the ISO and then extract the ZIP
zip_f = open(self.local_path + dataset_file, "wb")
with isofile.IsoPath("/{0}".format(dataset_file)).open("rb") as f:
zip_f.write((f.read()))
zip_f.close()
zip_f = zipfile.ZipFile(self.local_path + dataset_file)
zip_f.extractall(self.local_path)
local_dataset_file = self.local_path + "NSRLFile.txt"
else:
# No need to do additional actions
# We probably received the NSRL as a ZIP file
local_dataset_file = dataset_file
ln = 0
lines = open(local_dataset_file, "r")
for l in lines:
if ln == 0:
headers = l.rstrip().replace("\"", "").split(",")
else:
records = l.rstrip().replace("\"", "").split(",")
drecords = {}
for index, value in enumerate(records):
try:
drecords[headers[index]] = value
except:
continue
# Add some meta data
drecords['source'] = "NSRL"
drecords['db'] = rdbkey
drecords['insert-timestamp'] = time.time()
# Base records
self.rdb.set("l:{}".format(drecords['MD5']), drecords['SHA-1'])
self.rdb.hmset("h:{}".format(drecords['SHA-1']), drecords)
self.rdb.incrby("stat:{0}".format(rdbkey))
if ln % self.mod_lines == 0:
print(" Imported {0} records.".format(ln))
if ln == self.max_value:
break
ln = ln + 1
print(" Finished, importing {0} records.".format(ln))
def process(self, dataset=False):
"""Process a dataset
:param dataset: The dataset to process
"""
if not dataset:
self.error("no dataset")
local_dataset = self.local_path + self.hash_datasets[dataset]["url"][self.hash_datasets[dataset]["url"].rfind("/")+1:]
local_dataset.lower()
print("**PROCESS** dataset {0} from location {1} .".format(dataset, local_dataset))
if not Path(local_dataset).is_file():
self.error("Cannot find file {0}".format(local_dataset))
# Determine dataset file type
dataset_file_type = local_dataset[local_dataset.rfind(".")+1:]
if dataset_file_type == "iso":
# We read directly from the ISO file
isofile = pathlab.IsoAccessor(local_dataset)
self.__process_nsrl_base(isofile, "NSRLFILE.ZIP", dataset)
self.__process_nsrl_support(isofile, "NSRLMFG.TXT", "MfgCode")
self.__process_nsrl_support(isofile, "NSRLOS.TXT", "OpSystemCode")
self.__process_nsrl_support(isofile, "NSRLPROD.TXT", "ProductCode")
elif dataset_file_type == "zip":
# Extract the ZIP
zip_f = zipfile.ZipFile(local_dataset)
zip_f.extractall(self.local_path)
# NSRL ZIPs store the datafiles in a subdirectory
namelist_first = zip_f.namelist()[0]
zip_extract_path = ""
if namelist_first[-1] == "/":
zip_extract_path = self.local_path + namelist_first
# Indicate we don't have an ISO object
isofile = False
self.__process_nsrl_base(isofile, zip_extract_path + "NSRLFile.txt", dataset)
self.__process_nsrl_support(isofile, zip_extract_path + "NSRLMfg.txt", "MfgCode")
self.__process_nsrl_support(isofile, zip_extract_path + "NSRLOS.txt", "OpSystemCode")
self.__process_nsrl_support(isofile, zip_extract_path + "NSRLProd.txt", "ProductCode")
def init(self, dataset=False):
""" Remove / Initialize a dataset
:param dataset: Affected dataset
"""
if not dataset:
self.error("no dataset")
print("**INIT** dataset {0} .".format(dataset))
if self.flushrdb:
self.rdb.flushdb()
else:
self.rdb.delete("stat:{0}".format(dataset))
self.rdb.set("stat:{0}".format(dataset), 0)
def datasetlist(self):
""" List the available datasets
"""
for nsrl in self.hash_datasets:
print("{0}\n {1}\n from: {2}\n".format(nsrl, self.hash_datasets[nsrl]["description"], self.hash_datasets[nsrl]["url"]))
def valid_dataset(self, dataset):
""" Verify if the datset exist
:param dataset: Affected dataset
"""
if dataset in self.hash_datasets:
return True
else:
return False
def error(self, error):
""" Return an error message and exit
:param error: Error message
"""
print("!!ERROR!! {0}".format(error))
sys.exit()
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-l", "--list", action="store_true", help="List datasets available for download and import.")
group.add_argument("-i", "--import-dataset", help="Import a dataset.")
group.add_argument("-e", "--init-dataset", help="Remove / initialize a dataset.")
parser.add_argument("-d", "--skip-download", action="store_true", help="Skip downloading the dataset.")
parser.add_argument("-c", "--skip-init", action="store_true", help="Skip initialization of the database.")
args = parser.parse_args()
import_hash = import_hash()
if args.list:
import_hash.datasetlist()
elif args.import_dataset:
dataset = args.import_dataset
if import_hash.valid_dataset(dataset):
if not args.skip_download:
import_hash.download(dataset=dataset)
if not args.skip_init:
import_hash.init(dataset=dataset)
import_hash.process(dataset=dataset)
else:
print("Dataset not found.")
elif args.init_dataset:
dataset = args.init_dataset
if import_hash.valid_dataset(dataset):
import_hash.init(dataset=dataset)
else:
print("Dataset not found.")