1+ import re
2+ import typing as t
3+
4+ from scanoss .scanossapi import ScanossApi
5+
6+ from . import PostProcessor
7+ from .. import ScanResults
8+
9+
10+
11+
12+ class ScanossPostProcessor (PostProcessor ):
13+ def __init__ (self , api_key : t .Optional [str ] = None ):
14+ self ._api_key = api_key
15+
16+ def apply (self , results : ScanResults ) -> ScanResults :
17+ wfps = []
18+ analysis = {}
19+
20+ for analysis_result in results .values ():
21+ for res in analysis_result :
22+ if res .category == 'scanoss' and 'scan' not in res .data : # if there is no scan data yet
23+ # Because of caching we cannot rely on the file path, we need to extract the file hash from the WFP
24+ # And use it to map the results later
25+ wfp = res .data .get ('wfp' )
26+ # WFP contains file hash in format file=<md5hash>, that is always 32 hex characters
27+ matches = re .findall (r'file=([a-f0-9]{32})' , wfp )
28+ for file_hash in matches :
29+ wfps .append (wfp )
30+ analysis [file_hash ] = res
31+
32+ scanoss_api = ScanossApi (api_key = self ._api_key ) if self ._api_key else ScanossApi ()
33+
34+ if wfps :
35+ wfps_results = {}
36+ wfps_chunk_size = 999
37+
38+ for i in range (0 , len (wfps ), wfps_chunk_size ):
39+ try :
40+ wfps_results .update (scanoss_api .scan ('\n ' .join (wfps [i :i + wfps_chunk_size ]))) # type: ignore
41+ except :
42+ continue
43+
44+ if wfps_results :
45+ for wfp_results in wfps_results .values ():
46+ for wfp_res in wfp_results :
47+ if (res_file_hash := wfp_res .get ('file_hash' )) and (file_analysis := analysis .get (res_file_hash )):
48+ file_analysis .data .setdefault ('scan' , []).append (wfp_res )
49+
50+ return results
0 commit comments