55import os
66import re
77import time
8+ from libbs .artifacts import _art_from_dict
9+ from libbs .api import DecompilerInterface
10+ from libbs .decompilers .binja .interface import BinjaInterface as BNInterface
811from concurrent .futures import ThreadPoolExecutor , as_completed
912from revengai .utils import rename_function as rename_function_util
13+ from libbs .artifacts import (
14+ Function ,
15+ FunctionArgument ,
16+ GlobalVariable ,
17+ Enum ,
18+ Struct ,
19+ Typedef ,
20+ )
21+ #from revengai.utils.datatypes import apply_types, apply_type, _load_many_artifacts_from_list
1022
1123class MatchFunctions :
1224 def __init__ (self , config ):
@@ -379,7 +391,7 @@ def _process_data_type_batch(self, chunk: List[Dict]) -> List[Dict]:
379391 data = response .get ("data" , {})
380392 items = data .get ("items" , [])
381393 pending_count = sum (1 for item in items if item .get ("status" ) == "pending" )
382- log_info (f"RevEng.AI | { pending_count } items still pending... trying again " )
394+ log_info (f"RevEng.AI | { pending_count } items still pending..." )
383395 if not pending_count :
384396 break
385397 time .sleep (3 )
@@ -390,9 +402,20 @@ def _process_data_type_batch(self, chunk: List[Dict]) -> List[Dict]:
390402 continue
391403 for result in chunk :
392404 if result ['nearest_neighbor_id' ] == item ['function_id' ]:
393- signature = self .make_signature (item ['data_types' ])
394- if signature != "N/A" :
395- signatures .append ({"nearest_neighbor_id" : result ['nearest_neighbor_id' ], "signature" : signature })
405+ signature = "N/A"
406+ item2 = item .get ("data_types" , {})
407+ func_types = item2 .get ("func_types" , None )
408+ func_deps = item2 .get ("func_deps" , [])
409+ log_info (f"RevEng.AI | Func types: { func_types } " )
410+ if func_types is not None :
411+ fnc : Function = _art_from_dict (func_types )
412+ if fnc .name is None :
413+ log_info (f"Function { item ['function_id' ]} has no name, skipping signature application." )
414+ continue
415+ log_info (f"Applying signature for { fnc .name } " )
416+ signature = self .function_to_str (fnc )
417+ if signature != "N/A" :
418+ signatures .append ({"nearest_neighbor_id" : result ['nearest_neighbor_id' ], "signature" : signature , "data_types" : item ['data_types' ], "signature_data" : {"deps" : func_deps , "function" : fnc }})
396419 break
397420
398421 #log_info(f"RevEng.AI | Total count: {total_count}")
@@ -404,6 +427,22 @@ def _process_data_type_batch(self, chunk: List[Dict]) -> List[Dict]:
404427 log_error (f"RevEng.AI | Error processing data type batch: { str (e )} " )
405428 return []
406429
430+ def function_arguments (self , fnc : Function ) -> list [str ]:
431+ args = []
432+ for k in fnc .header .args :
433+ arg : FunctionArgument = fnc .header .args [k ]
434+ args .append (
435+ f"{ arg .type } { arg .name } "
436+ )
437+ return args
438+
439+
440+ def function_to_str (self , fnc : Function ) -> str :
441+ # convert the signature to a string representation
442+ return f"{ fnc .type } { fnc .name } " \
443+ f"({ ', ' .join (self .function_arguments (fnc ))} )"
444+
445+
407446 def make_signature (self , data_types : List [Dict ]) -> str :
408447 try :
409448 #log_info(f"RevEng.AI | Making signature for {data_types}")
@@ -463,3 +502,122 @@ def fetch_data_types(self, bv: BinaryView, selected_results: List[Dict]) -> Tupl
463502 except Exception as e :
464503 log_error (f"RevEng.AI | Error fetching data types: { str (e )} " )
465504 return False , str (e )
505+
506+ def test (self , bv : BinaryView , selected_results : List [Dict ]) -> Tuple [bool , Dict [str , Any ]]:
507+ try :
508+ log_info ("RevEng.AI | Starting test" )
509+
510+ decompiler = DecompilerInterface .discover (force_decompiler = "binja" )
511+
512+ log_info (f"RevEng.AI | Decompiler: { decompiler } " )
513+ log_info (f"RevEng.AI | Type: { type (decompiler )} " )
514+
515+ for addr , func in decompiler .functions .items ():
516+ log_info (f"RevEng.AI | { addr } : { func } " )
517+
518+
519+ log_info (f"RevEng.AI | Decompiler: { decompiler } " )
520+ for result in selected_results :
521+ if result .get ('signature_data' , None ) is not None :
522+ log_info (f"RevEng.AI | Testing 0x{ result ['function_address' ]:x} " )
523+ self ._apply_data_types (result ['function_address' ], result ['signature_data' ], decompiler )
524+
525+
526+
527+ return True , "Test completed"
528+ except Exception as e :
529+ log_error (f"RevEng.AI | Error testing: { str (e )} " )
530+ return False , str (e )
531+
532+ def _apply_type (
533+ self ,
534+ deci : DecompilerInterface ,
535+ artifact ,
536+ soft_skip = False
537+ ) -> None | str :
538+ supported_types = [
539+ Function ,
540+ GlobalVariable ,
541+ Enum ,
542+ Struct ,
543+ Typedef
544+ ]
545+
546+ if not any (isinstance (artifact , t ) for t in supported_types ):
547+ return "Unsupported artifact type: " \
548+ f"{ artifact .__class__ .__name__ } "
549+
550+ try :
551+ log_info (f"RevEng.AI | Applying artifact: { artifact .name } " )
552+ log_info (f"RevEng.AI | Artifact type: { artifact .__class__ .__name__ } " )
553+ log_info (f"RevEng.AI | Artifact Name: { artifact .name } " )
554+ log_info (f"RevEng.AI | Decompiler: { deci } " )
555+ if isinstance (artifact , Function ):
556+ deci .functions [artifact .addr ] = artifact
557+ elif isinstance (artifact , GlobalVariable ):
558+ deci .global_vars [artifact .addr ] = artifact
559+ elif isinstance (artifact , Enum ):
560+ deci .enums [artifact .name ] = artifact
561+ elif isinstance (artifact , Struct ):
562+ deci .structs [artifact .name ] = artifact
563+ elif isinstance (artifact , Typedef ):
564+ deci .typedefs [artifact .name ] = artifact
565+
566+
567+
568+ except Exception as e :
569+ log_error (f"Error while applying artifact '{ artifact .name } '"
570+ f" of type { artifact .__class__ .__name__ } : { e } " )
571+ if not soft_skip :
572+ return f"Error while applying artifact '{ artifact .name } '" \
573+ f" of type { artifact .__class__ .__name__ } : { e } "
574+
575+ return None
576+
577+ def _apply_types (self , deci : DecompilerInterface , artifacts : List ) -> None | str :
578+ for artifact in artifacts :
579+ error = self ._apply_type (deci , artifact , True )
580+ if error is not None :
581+ return error
582+ return None
583+
584+ def _load_many_artifacts_from_list (self , artifacts : list [dict ]) -> list :
585+ _artifacts = []
586+ for artifact in artifacts :
587+ art = _art_from_dict (artifact )
588+ if art is not None :
589+ _artifacts .append (art )
590+ return _artifacts
591+
592+ def _apply_data_types (self , function_addr : int = 0 ,
593+ signature = None ,
594+ deci : DecompilerInterface = None ,) -> None :
595+ if not deci :
596+ log_error ("RevEng.AI | Unable to find a decompiler" )
597+ return
598+
599+ try :
600+ # get the function signature from the table
601+ function : Function = signature .get ("function" )
602+ deps = signature .get ("deps" )
603+
604+ function .addr = function_addr
605+
606+ # fisrt apply the dependencies
607+ res = self ._apply_types (deci , self ._load_many_artifacts_from_list (deps ))
608+ if res is not None :
609+ log_error (
610+ f"Failed to apply function dependencies: { res } " )
611+ return
612+
613+ # then apply the function signature
614+ res = self ._apply_type (deci , function )
615+ if res is not None :
616+ log_error (f"Failed to apply function signature: { res } " )
617+ return
618+
619+ # show success message
620+ log_info ("Successfully applied function signature and dependencies" )
621+
622+ except Exception as e :
623+ log_error (f"Error: { e } " )
0 commit comments