1111from sqlmap_ai .parser import extract_sqlmap_info
1212from sqlmap_ai .ai_analyzer import ai_suggest_next_steps
1313class AdaptiveTestingEngine :
14- def __init__ (self , runner , interactive_mode = False , default_timeout = 120 ):
14+ def __init__ (self , runner , interactive_mode = False , default_timeout = 120 , test_parameter = None ):
1515 self .runner = runner
1616 self .interactive_mode = interactive_mode
1717 self .default_timeout = default_timeout
@@ -20,6 +20,15 @@ def __init__(self, runner, interactive_mode=False, default_timeout=120):
2020 self .detected_waf = False
2121 self .vulnerable_params = []
2222 self .tamper_scripts_used = []
23+ self .found_databases = [] # Track databases found across all steps
24+ self .test_parameter = test_parameter # Specific parameter(s) to test
25+
26+ def _add_param_option (self , options : List [str ]) -> List [str ]:
27+ """Add -p parameter option if specified"""
28+ if self .test_parameter :
29+ options .append (f"-p { self .test_parameter } " )
30+ return options
31+
2332 def run_adaptive_test (self , target_url : str ) -> Dict [str , Any ]:
2433 if not self ._validate_url (target_url ):
2534 return {
@@ -66,8 +75,9 @@ def run_adaptive_test(self, target_url: str) -> Dict[str, Any]:
6675 if initial_info ["databases" ]:
6776 print_success ("SQL injection vulnerability confirmed!" )
6877 self .vulnerable_params = initial_info ["vulnerable_parameters" ]
78+ self .found_databases = initial_info ["databases" ] # Store databases found in Step 1
6979 if any (db .lower () in ["mysql" , "mssql" , "oracle" , "postgresql" ] for db in initial_info ["techniques" ]):
70- self .detected_dbms = next ((db for db in initial_info ["techniques" ]
80+ self .detected_dbms = next ((db for db in initial_info ["techniques" ]
7181 if db .lower () in ["mysql" , "mssql" , "oracle" , "postgresql" ]), None )
7282 print_success (f"DBMS identified: { self .detected_dbms } " )
7383 step3_result = self ._run_step3_dbms_specific (target_url )
@@ -114,19 +124,23 @@ def _validate_url(self, url: str) -> bool:
114124 return False
115125 return True
116126 def _run_step1_assessment (self , target_url : str ) -> Optional [str ]:
117- print_info ("Running initial assessment with --batch --dbs --threads=5" )
127+ options = ["--batch" , "--dbs" , "--threads=5" ]
128+ options = self ._add_param_option (options )
129+ print_info (f"Running initial assessment with { ' ' .join (options )} " )
118130 result = self .runner .run_sqlmap (
119131 target_url = target_url ,
120- options = [ "--batch" , "--dbs" , "--threads=5" ] ,
132+ options = options ,
121133 timeout = self .default_timeout ,
122134 interactive_mode = self .interactive_mode
123135 )
124136 return result
125137 def _run_step2_identify_dbms (self , target_url : str ) -> Optional [str ]:
126- print_info ("Running DBMS fingerprinting with --threads=5" )
138+ options = ["--batch" , "--fingerprint" , "--threads=5" ]
139+ options = self ._add_param_option (options )
140+ print_info (f"Running DBMS fingerprinting with { ' ' .join (options )} " )
127141 result = self .runner .run_sqlmap (
128142 target_url = target_url ,
129- options = [ "--batch" , "--fingerprint" , "--threads=5" ] ,
143+ options = options ,
130144 timeout = self .default_timeout ,
131145 interactive_mode = self .interactive_mode
132146 )
@@ -171,13 +185,22 @@ def _run_step3_dbms_specific(self, target_url: str) -> Dict[str, Any]:
171185 result = limited_result
172186 dbms_info = extract_sqlmap_info (result )
173187 databases = dbms_info .get ("databases" , [])
188+ # Update found_databases if new ones are discovered
189+ if databases :
190+ for db in databases :
191+ if db not in self .found_databases :
192+ self .found_databases .append (db )
174193 self .scan_history .append ({
175194 "step" : "dbms_specific_scan" ,
176195 "command" : f"sqlmap -u { target_url } --batch --dbms={ self .detected_dbms .lower ()} --tables --threads=5" ,
177196 "result" : dbms_info
178197 })
179- if databases :
180- print_success (f"Found { len (databases )} databases" )
198+ # Check if databases exist from any previous step
199+ if self .found_databases :
200+ if databases :
201+ print_success (f"Found { len (databases )} databases" )
202+ else :
203+ print_info (f"Using { len (self .found_databases )} databases from previous steps" )
181204 tables = dbms_info .get ("tables" , [])
182205 if tables :
183206 print_success (f"Found { len (tables )} tables" )
@@ -283,11 +306,12 @@ def _run_step3_dbms_specific(self, target_url: str) -> Dict[str, Any]:
283306 "scan_history" : self .scan_history ,
284307 "message" : "Found databases but unable to enumerate tables or extract data. "
285308 "The database might be empty or protected against enumeration." ,
286- "databases_found" : databases
309+ "databases_found" : self . found_databases
287310 }
288311 return enhanced_result
289312 else :
290- print_warning ("No databases enumerated. Moving to enhanced testing." )
313+ # This should rarely happen now since we track databases globally
314+ print_warning ("No databases found yet. Moving to enhanced testing." )
291315 return self ._run_step4_enhanced_testing (target_url )
292316 def _run_step4_enhanced_testing (self , target_url : str ) -> Dict [str , Any ]:
293317 print_info ("🔴 Step 4: Enhanced Testing" )
@@ -770,10 +794,11 @@ def _prepare_final_results(self, results: Dict[str, Any], methods_tested: List[s
770794 "message" : "All testing methods failed. Target may not be vulnerable." ,
771795 "scan_history" : self .scan_history
772796 }
773- def run_adaptive_test_sequence (runner , target_url , interactive_mode = False , timeout = 120 ):
797+ def run_adaptive_test_sequence (runner , target_url , interactive_mode = False , timeout = 120 , test_parameter = None ):
774798 engine = AdaptiveTestingEngine (
775799 runner = runner ,
776800 interactive_mode = interactive_mode ,
777- default_timeout = timeout
801+ default_timeout = timeout ,
802+ test_parameter = test_parameter
778803 )
779804 return engine .run_adaptive_test (target_url )
0 commit comments