1
- import asyncio
1
+ import requests
2
2
import re
3
3
from termcolor import colored
4
4
import json
5
- from aiohttp import ClientSession , TCPConnector , ClientTimeout
6
5
from pyExploitDb import PyExploitDb
7
6
8
7
art = """
18
17
19
18
print (colored (art , "cyan" ))
20
19
21
- async def find_cpes_async (component , version ):
20
+ def find_cpes (component , version ):
22
21
base_url = "https://nvd.nist.gov/products/cpe/search/results"
23
22
params = {
24
23
"namingFormat" : "2.3" ,
25
24
"keyword" : f"{ component } { version } "
26
25
}
27
26
28
- async with ClientSession (connector = TCPConnector (ssl = False ), timeout = ClientTimeout (total = 10 )) as session :
29
- async with session .get (base_url , params = params ) as response :
30
- print (f"URL Used: { response .url } " ) # Print the URL used to find CPE
31
- content = await response .text ()
27
+ response = requests .get (base_url , params = params )
28
+ print (f"URL Used: { response .url } " ) # Print the URL used to find CPE
29
+ content = response .text
32
30
33
31
cpe_matches = re .findall (r'cpe:(.*?)<' , content )
34
32
return cpe_matches
35
33
36
- async def synk_db (cve_id ):
37
- async with ClientSession (connector = TCPConnector (ssl = False ), timeout = ClientTimeout (total = 10 )) as session :
38
- async with session .get (f"https://security.snyk.io/vuln/?search={ cve_id } " ) as res :
39
- text = await res .text ()
40
- a_tag_pattern = r'data-snyk-test="vuln table title".*>([^"]+)<!----><!---->'
41
- a_tag_matches = re .findall (a_tag_pattern , text )
34
+ def synk_db (cve_id ):
35
+ res = requests .get (f"https://security.snyk.io/vuln/?search={ cve_id } " )
36
+ a_tag_pattern = r'data-snyk-test="vuln table title".*>([^"]+)<!----><!---->'
37
+ a_tag_matches = re .findall (a_tag_pattern , res .text )
42
38
43
- if a_tag_matches :
44
- snyk_short_name = a_tag_matches [0 ].strip ()
45
- return snyk_short_name
39
+ if a_tag_matches :
40
+ snyk_short_name = a_tag_matches [0 ].strip ()
41
+ return snyk_short_name
46
42
47
- async def fetch_cve_details (cpe_string ):
43
+ def fetch_cve_details (cpe_string ):
48
44
base_url = "https://services.nvd.nist.gov/rest/json/cves/1.0"
49
45
results = []
50
46
51
47
cve_query_string = ":" .join (cpe_string .split (":" )[1 :5 ]) # Extract relevant CPE part (vendor, product, version, update)
52
48
url = f"{ base_url } ?cpeMatchString=cpe:/{ cve_query_string } "
53
49
54
- async with ClientSession ( connector = TCPConnector ( ssl = False ), timeout = ClientTimeout ( total = 10 )) as session :
55
- async with session . get ( url ) as response :
56
- try :
57
- data = await response .json ()
58
- except json .JSONDecodeError :
59
- print (colored (f"Error decoding JSON for CPE: { cpe_string } . Skipping." , "red" ))
60
- return []
61
-
62
- if "result" in data :
63
- cves = data ["result" ]["CVE_Items" ]
64
- for cve_item in cves :
65
- cve_id = cve_item ["cve" ]["CVE_data_meta" ]["ID" ]
66
- snyk_short_name = await synk_db (cve_id )
67
-
68
- description = cve_item ["cve" ]["description" ]["description_data" ][0 ]["value" ]
69
- link = f"https://nvd.nist.gov/vuln/detail/{ cve_id } "
70
-
71
- weaknesses = []
72
- if "problemtype" in cve_item ["cve" ]:
73
- for problem_type in cve_item ["cve" ]["problemtype" ]["problemtype_data" ]:
74
- for description in problem_type ["description" ]:
75
- weaknesses .append (description ["value" ])
76
-
77
- if "description_data" in cve_item ["cve" ]["description" ]:
78
- description_text = cve_item ["cve" ]["description" ]["description_data" ][0 ]["value" ]
79
- else :
80
- description_text = "Description not available."
81
-
82
- # Check for public exploit using pyExploitDb
83
- pEdb = PyExploitDb ()
84
- pEdb .debug = False
85
- pEdb .openFile ()
86
- exploit_status = pEdb .searchCve (cve_id )
87
- if exploit_status :
88
- exploit_status = "Public Exploit Found"
89
- else :
90
- exploit_status = "No Public Exploit Found"
91
-
92
- cve_details = {
93
- "CVE ID" : cve_id ,
94
- "Short Name" : snyk_short_name ,
95
- "Description" : description_text ,
96
- "Weaknesses" : ", " .join (weaknesses ),
97
- "Link" : link ,
98
- "Exploit Status" : exploit_status
99
- }
100
-
101
- results .append (cve_details )
50
+ response = requests . get ( url )
51
+
52
+ try :
53
+ data = response .json ()
54
+ except json .JSONDecodeError :
55
+ print (colored (f"Error decoding JSON for CPE: { cpe_string } . Skipping." , "red" ))
56
+ return []
57
+
58
+ if "result" in data :
59
+ cves = data ["result" ]["CVE_Items" ]
60
+ for cve_item in cves :
61
+ cve_id = cve_item ["cve" ]["CVE_data_meta" ]["ID" ]
62
+ snyk_short_name = synk_db (cve_id )
63
+
64
+ description = cve_item ["cve" ]["description" ]["description_data" ][0 ]["value" ]
65
+ link = f"https://nvd.nist.gov/vuln/detail/{ cve_id } "
66
+
67
+ weaknesses = []
68
+ if "problemtype" in cve_item ["cve" ]:
69
+ for problem_type in cve_item ["cve" ]["problemtype" ]["problemtype_data" ]:
70
+ for description in problem_type ["description" ]:
71
+ weaknesses .append (description ["value" ])
72
+
73
+ if "description_data" in cve_item ["cve" ]["description" ]:
74
+ description_text = cve_item ["cve" ]["description" ]["description_data" ][0 ]["value" ]
75
+ else :
76
+ description_text = "Description not available."
77
+
78
+ # Check for public exploit using pyExploitDb
79
+ pEdb = PyExploitDb ()
80
+ pEdb .debug = False
81
+ pEdb .openFile ()
82
+ exploit_status = pEdb .searchCve (cve_id )
83
+ if exploit_status :
84
+ exploit_status = "Public Exploit Found"
85
+ else :
86
+ exploit_status = "No Public Exploit Found"
87
+
88
+ cve_details = {
89
+ "CVE ID" : cve_id ,
90
+ "Short Name" : snyk_short_name ,
91
+ "Description" : description_text ,
92
+ "Weaknesses" : ", " .join (weaknesses ),
93
+ "Link" : link ,
94
+ "Exploit Status" : exploit_status
95
+ }
96
+
97
+ results .append (cve_details )
102
98
103
99
return results
104
100
105
- async def main () :
101
+ if __name__ == "__main__" :
106
102
print (colored ("CPE Finder Script" , "green" , attrs = ["bold" ]))
107
103
print ("This script searches for the CPEs of a component and version.\n " )
108
104
109
105
component = input (colored ("Enter the component (e.g., jquery): " , "cyan" ))
110
106
version = input (colored ("Enter the version (e.g., 1.0.0): " , "cyan" ))
111
107
112
- cpe_strings = await find_cpes_async (component , version )
108
+ cpe_strings = find_cpes (component , version )
113
109
if cpe_strings :
114
110
print (colored ("CPEs Found:" , "green" ))
115
111
for cpe_string in cpe_strings :
116
112
print (colored (f" { cpe_string } " , "green" ))
117
113
118
114
for cpe_string in cpe_strings :
119
- results = await fetch_cve_details (cpe_string )
115
+ results = fetch_cve_details (cpe_string )
120
116
if results :
121
117
print (colored ("\n CVE Details" , "cyan" , attrs = ["underline" ]))
122
118
for result in results :
@@ -134,6 +130,3 @@ async def main():
134
130
else :
135
131
print (colored ("CPEs not found for the provided component and version." , "red" ))
136
132
137
- if __name__ == "__main__" :
138
- asyncio .run (main ())
139
-
0 commit comments