On Friday August 14th. from 9 p.m. to 9:30 p.m., we will upgrade GitLab software and its cloud platform release. Service may not be available during this maintenance window. Please schedule your work accordingly.

Commit e2577a3d authored by Marcello Golfieri's avatar Marcello Golfieri

added option for listing also ignored vulnerabilities

parent 02851e35
......@@ -22,6 +22,7 @@ from pprint import pprint
from lxml import objectify
import ipaddress
def is_ip_address(addr):
try:
socket.inet_aton(addr)
......@@ -29,14 +30,15 @@ def is_ip_address(addr):
return False
return addr
def resolve_to_ip(host):
logger.debug("Resolving for: %s", host)
host = host.strip()
if ',' in host:
listing=[]
for h in host.split(','):
if "," in host:
listing = []
for h in host.split(","):
listing.append(str(resolve_to_ip(h)))
return ','.join(listing)
return ",".join(listing)
if host.endswith(".local"):
return None
try:
......@@ -55,17 +57,28 @@ def resolve_to_ip(host):
logger.debug("Can not resolve name: %s", host)
return None
def get_pqdn(host):
return host.lower().replace(".stanford.edu", "")
def get_fqdn(host):
if "." in host:
return host.lower()
else:
return host.lower() + ".stanford.edu"
class Qapi:
vm_fields = ["TYPE", "SEVERITY", 'QID', 'RESULTS', "PORT", "PROTOCOL", "LAST_FOUND_DATETIME"]
vm_fields = [
"TYPE",
"SEVERITY",
"QID",
"RESULTS",
"PORT",
"PROTOCOL",
"LAST_FOUND_DATETIME",
]
hosts = []
url = None
cache = None
......@@ -90,15 +103,15 @@ class Qapi:
conf = configparser.RawConfigParser()
buffer = StringIO(config)
conf.read_file(buffer)
self.url = "https://" + conf.get('info','hostname')
self.auth = (conf.get('info','username'), conf.get('info','password'))
self.url = "https://" + conf.get("info", "hostname")
self.auth = (conf.get("info", "username"), conf.get("info", "password"))
self.cache = cache
self._tag = 'TCG'
self._tag = "TCG"
@property
def tag(self):
return self._tag
@tag.setter
def tag(self, value):
self._tag = value
......@@ -112,8 +125,8 @@ class Qapi:
return ""
def _request_output(self, response, raw_output):
if response.status_code !=200:
raise Exception('Error, response was {}'.format(response.__dict__))
if response.status_code != 200:
raise Exception("Error, response was {}".format(response.__dict__))
if raw_output:
return response.text
else:
......@@ -126,7 +139,6 @@ class Qapi:
logging.info("POST: {}, params: {}, response: {}".format(url, params, response))
return self._request_output(response, raw_output)
def _get_xml(self, uri, params, headers=None, raw_output=False):
url = self.url + uri + "/" + self._make_get_params(params)
headers = self.headers if not headers else headers
......@@ -134,6 +146,12 @@ class Qapi:
logging.info("GET: {}".format(url))
return self._request_output(response, raw_output)
def print(self, sorted_list, vm_fields=None):
vm_fields = vm_fields or self.vm_fields
pprint(
[dict((key, x.__dict__.get(key)) for key in vm_fields) for x in sorted_list]
)
def get_rest_api_version(self):
return self._get_xml(
"/qps/rest/portal/version/",
......@@ -141,13 +159,22 @@ class Qapi:
headers={"Accept": "application/xml"},
raw_output=True,
)
def ignore_vuln(self, host, qids, comments="DEFAULT comment from qqualys for ignoring", action="ignore", days_to_reopen=None):
params = {"action": action,
def ignore_vuln(
self,
host,
qids,
comments="DEFAULT comment from qqualys for ignoring",
action="ignore",
days_to_reopen=None,
):
params = {
"action": action,
"ips": resolve_to_ip(host),
"comments": comments,
"qids": qids}
if days_to_reopen and 1<days_to_reopen<730:
"qids": qids,
}
if days_to_reopen and 1 < days_to_reopen < 730:
params.update({"reopen_ignored_days": days_to_reopen})
return self._get_xml("/msp/ignore_vuln.php", params)
......@@ -229,7 +256,7 @@ class Qapi:
"scan_title": "TCG_scanning_for_" + hostname,
"ip": resolve_to_ip(hostname),
"option_title": "ISO Official 3/4/5 (Site-wide)",
"default_scanner": 1
"default_scanner": 1,
}
# DO NOT ask me why, but somehow one day I realized start new scan is a post call that needs get style params???
return self._post_xml(
......@@ -320,10 +347,16 @@ class Qapi:
self.request(params, "/api/2.0/fo/scan/")
# Get host info about a specific IP address or hostname
def get_host_vulnerabilities(self, host):
params = {"action":"list","ips": str(resolve_to_ip(host)), "show_igs": 1}
def get_host_vulnerabilities(self, host, extra_params=None):
params = {"action": "list", "ips": str(resolve_to_ip(host)), "show_igs": 1}
if extra_params:
params.update(extra_params)
oxml = self._get_xml("/api/2.0/fo/asset/host/vm/detection/", params)
return oxml.xpath("//DETECTION[SEVERITY>2]")
return oxml.xpath("//DETECTION[SEVERITY>2]")
def get_host_ignored_vulnerabilities(self, host):
extra_params = {"include_ignored": 1}
return self.get_host_vulnerabilities(host, extra_params)
def get_qid_info(self, qid):
params = {"action": "list", "ids": qid, "details": "Basic", "echo_request": 1}
......@@ -380,7 +413,10 @@ if __name__ == "__main__":
"-s", "--scan", action="store_true", help="Scan the server specified with -H"
)
parser.add_argument(
"-T", "--tickets", action="store_true", help="Search for remediation tickets for host specified by -H"
"-T",
"--tickets",
action="store_true",
help="Search for remediation tickets for host specified by -H",
)
parser.add_argument(
"-u",
......@@ -423,6 +459,11 @@ if __name__ == "__main__":
dest="qids",
help="QID info to show (can be comma separated no spaces or range e.g. 90023-90040)",
)
parser.add_argument(
"--list_ignored_vulnerabilities",
action="store_true",
help="shows ignored vulnerabilities for a given host, needs -H",
)
parser.add_argument(
"-I",
"--ignore_vuln",
......@@ -448,7 +489,7 @@ if __name__ == "__main__":
action="store",
dest="action",
help="Action to use when ignoring with -I (ignore, restore)",
)
)
parser.add_argument(
"-i", "--interactive", action="store_true", help="don't return, go into Ipython"
)
......@@ -485,20 +526,37 @@ if __name__ == "__main__":
elif args.tickets:
sorted_list = q.get_remediation_tickets_list(args.host)
elif args.report:
sorted_list = q.get_host_vulnerabilities(args.host)
pprint([dict((key, x.__dict__.get(key)) for key in q.vm_fields) for x in sorted_list])
q.print(q.get_host_vulnerabilities(args.host))
elif args.ignore_vuln:
oxml = q.ignore_vuln(args.host, args.qids, args.comment, args.action, args.days_to_reopen)
pprint(oxml.xpath('//*/text()'))
oxml = q.ignore_vuln(
args.host, args.qids, args.comment, args.action, args.days_to_reopen
)
pprint(oxml.xpath("//*/text()"))
elif args.show_qca_asset_info:
print(q.get_asset_details_xml(args.host))
else: # Just args.host
elif args.list_ignored_vulnerabilities:
q.print(
q.get_host_ignored_vulnerabilities(args.host),
[
"TYPE",
"SEVERITY",
"QID",
"RESULTS",
"PORT",
"PROTOCOL",
"LAST_FOUND_DATETIME",
"IS_IGNORED",
],
)
else: # Just args.host
print("The -H option cannot be used alone")
elif args.qids:
sorted_list = q.get_qid_info(args.qids)
pprint(sorted_list.xpath('//RESPONSE/VULN_LIST/VULN//*[text()]'))
pprint(sorted_list.xpath("//RESPONSE/VULN_LIST/VULN//*[text()]"))
else:
pass
if args.interactive:
import IPython; IPython.embed() ##############################################################
import IPython
IPython.embed() #### import IPython; IPython.embed()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment