Skip to content

Commit d3a7b14

Browse files
authored
Update threattracer.py
The new release of the code optimizes the previous version by incorporating asynchronous HTTP requests using the aiohttp library.
1 parent c702ee7 commit d3a7b14

File tree

1 file changed

+73
-66
lines changed

1 file changed

+73
-66
lines changed

threattracer.py

Lines changed: 73 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import requests
1+
import asyncio
22
import re
33
from termcolor import colored
4-
from datetime import datetime
54
import json
5+
from aiohttp import ClientSession, TCPConnector, ClientTimeout
66
from pyExploitDb import PyExploitDb
77

88
art = """
@@ -11,109 +11,112 @@
1111
| | | |__ _ __ ___ __ _| |_ | |_ __ __ _ ___ ___ _ __
1212
| | | '_ \| '__/ _ \/ _` | __|| | '__/ _` |/ __/ _ \ '__|
1313
| | | | | | | | __/ (_| | |_ | | | | (_| | (_| __/ |
14-
|_| |_| |_|_| \___|\__,_|\__||_|_| \__,_|\___\___|_| Version 2.0
14+
|_| |_| |_|_| \___|\__,_|\__||_|_| \__,_|\___\___|_| Version 2.1
1515
A Script to identify CVE using CPE by name & version
1616
Credit: @FR13ND0x7F @0xCaretaker @meppohak5
1717
"""
1818

1919
print(colored(art, "cyan"))
2020

21-
def find_cpes(component, version):
21+
async def find_cpes_async(component, version):
2222
base_url = "https://nvd.nist.gov/products/cpe/search/results"
2323
params = {
2424
"namingFormat": "2.3",
2525
"keyword": f"{component} {version}"
2626
}
2727

28-
response = requests.get(base_url, params=params)
29-
print(f"URL Used: {response.url}") # Print the URL used to find CPE
30-
content = response.text
28+
async with ClientSession(connector=TCPConnector(ssl=False), timeout=ClientTimeout(total=10)) as session:
29+
async with session.get(base_url, params=params) as response:
30+
print(f"URL Used: {response.url}") # Print the URL used to find CPE
31+
content = await response.text()
3132

3233
cpe_matches = re.findall(r'cpe:(.*?)<', content)
3334
return cpe_matches
3435

35-
def synk_db(cve_id):
36-
res = requests.get(f"https://security.snyk.io/vuln/?search={cve_id}")
37-
a_tag_pattern = r'data-snyk-test="vuln table title".*>([^"]+)<!----><!---->'
38-
a_tag_matches = re.findall(a_tag_pattern, res.text)
36+
async def synk_db(cve_id):
37+
async with ClientSession(connector=TCPConnector(ssl=False), timeout=ClientTimeout(total=10)) as session:
38+
async with session.get(f"https://security.snyk.io/vuln/?search={cve_id}") as res:
39+
text = await res.text()
40+
a_tag_pattern = r'data-snyk-test="vuln table title".*>([^"]+)<!----><!---->'
41+
a_tag_matches = re.findall(a_tag_pattern, text)
3942

40-
if a_tag_matches:
41-
snyk_short_name = a_tag_matches[0].lstrip().rstrip()
42-
return snyk_short_name
43+
if a_tag_matches:
44+
snyk_short_name = a_tag_matches[0].strip()
45+
return snyk_short_name
4346

44-
def fetch_cve_details(cpe_string):
47+
async def fetch_cve_details(cpe_string):
4548
base_url = "https://services.nvd.nist.gov/rest/json/cves/1.0"
4649
results = []
4750

4851
cve_query_string = ":".join(cpe_string.split(":")[1:5]) # Extract relevant CPE part (vendor, product, version, update)
4952
url = f"{base_url}?cpeMatchString=cpe:/{cve_query_string}"
5053

51-
response = requests.get(url)
52-
53-
try:
54-
data = response.json()
55-
except json.JSONDecodeError:
56-
print(colored(f"Error decoding JSON for CPE: {cpe_string}. Skipping.", "red"))
57-
return []
58-
59-
if "result" in data:
60-
cves = data["result"]["CVE_Items"]
61-
for cve_item in cves:
62-
cve_id = cve_item["cve"]["CVE_data_meta"]["ID"]
63-
snyk_short_name = synk_db(cve_id)
64-
65-
description = cve_item["cve"]["description"]["description_data"][0]["value"]
66-
link = f"https://nvd.nist.gov/vuln/detail/{cve_id}"
67-
68-
weaknesses = []
69-
if "problemtype" in cve_item["cve"]:
70-
for problem_type in cve_item["cve"]["problemtype"]["problemtype_data"]:
71-
for description in problem_type["description"]:
72-
weaknesses.append(description["value"])
73-
74-
if "description_data" in cve_item["cve"]["description"]:
75-
description_text = cve_item["cve"]["description"]["description_data"][0]["value"]
76-
else:
77-
description_text = "Description not available."
78-
79-
# Check for public exploit using pyExploitDb
80-
pEdb = PyExploitDb()
81-
pEdb.debug = False
82-
pEdb.openFile()
83-
exploit_status = pEdb.searchCve(cve_id)
84-
if exploit_status:
85-
exploit_status = "Public Exploit Found"
86-
else:
87-
exploit_status = "No Public Exploit Found"
88-
89-
cve_details = {
90-
"CVE ID": cve_id,
91-
"Short Name": snyk_short_name,
92-
"Description": description_text,
93-
"Weaknesses": ", ".join(weaknesses),
94-
"Link": link,
95-
"Exploit Status": exploit_status
96-
}
97-
98-
results.append(cve_details)
54+
async with ClientSession(connector=TCPConnector(ssl=False), timeout=ClientTimeout(total=10)) as session:
55+
async with session.get(url) as response:
56+
try:
57+
data = await response.json()
58+
except json.JSONDecodeError:
59+
print(colored(f"Error decoding JSON for CPE: {cpe_string}. Skipping.", "red"))
60+
return []
61+
62+
if "result" in data:
63+
cves = data["result"]["CVE_Items"]
64+
for cve_item in cves:
65+
cve_id = cve_item["cve"]["CVE_data_meta"]["ID"]
66+
snyk_short_name = await synk_db(cve_id)
67+
68+
description = cve_item["cve"]["description"]["description_data"][0]["value"]
69+
link = f"https://nvd.nist.gov/vuln/detail/{cve_id}"
70+
71+
weaknesses = []
72+
if "problemtype" in cve_item["cve"]:
73+
for problem_type in cve_item["cve"]["problemtype"]["problemtype_data"]:
74+
for description in problem_type["description"]:
75+
weaknesses.append(description["value"])
76+
77+
if "description_data" in cve_item["cve"]["description"]:
78+
description_text = cve_item["cve"]["description"]["description_data"][0]["value"]
79+
else:
80+
description_text = "Description not available."
81+
82+
# Check for public exploit using pyExploitDb
83+
pEdb = PyExploitDb()
84+
pEdb.debug = False
85+
pEdb.openFile()
86+
exploit_status = pEdb.searchCve(cve_id)
87+
if exploit_status:
88+
exploit_status = "Public Exploit Found"
89+
else:
90+
exploit_status = "No Public Exploit Found"
91+
92+
cve_details = {
93+
"CVE ID": cve_id,
94+
"Short Name": snyk_short_name,
95+
"Description": description_text,
96+
"Weaknesses": ", ".join(weaknesses),
97+
"Link": link,
98+
"Exploit Status": exploit_status
99+
}
100+
101+
results.append(cve_details)
99102

100103
return results
101104

102-
if __name__ == "__main__":
105+
async def main():
103106
print(colored("CPE Finder Script", "green", attrs=["bold"]))
104107
print("This script searches for the CPEs of a component and version.\n")
105108

106109
component = input(colored("Enter the component (e.g., jquery): ", "cyan"))
107110
version = input(colored("Enter the version (e.g., 1.0.0): ", "cyan"))
108111

109-
cpe_strings = find_cpes(component, version)
112+
cpe_strings = await find_cpes_async(component, version)
110113
if cpe_strings:
111114
print(colored("CPEs Found:", "green"))
112115
for cpe_string in cpe_strings:
113116
print(colored(f" {cpe_string}", "green"))
114117

115118
for cpe_string in cpe_strings:
116-
results = fetch_cve_details(cpe_string)
119+
results = await fetch_cve_details(cpe_string)
117120
if results:
118121
print(colored("\nCVE Details", "cyan", attrs=["underline"]))
119122
for result in results:
@@ -130,3 +133,7 @@ def fetch_cve_details(cpe_string):
130133
print(colored(f"Exploit Status: {result['Exploit Status']}\n", "green"))
131134
else:
132135
print(colored("CPEs not found for the provided component and version.", "red"))
136+
137+
if __name__ == "__main__":
138+
asyncio.run(main())
139+

0 commit comments

Comments
 (0)