@@ -2124,50 +2124,93 @@ async def find_taint_sinks(
21242124 @mcp .tool ()
21252125 async def find_taint_flows (
21262126 session_id : str ,
2127- source_patterns : Optional [list ] = None ,
2128- sink_patterns : Optional [list ] = None ,
2129- max_path_length : int = 10 ,
2130- timeout : int = 30 ,
2131- limit : int = 100 ,
2127+ source_node_id : Optional [str ] = None ,
2128+ sink_node_id : Optional [str ] = None ,
2129+ source_location : Optional [str ] = None ,
2130+ sink_location : Optional [str ] = None ,
2131+ max_path_length : int = 20 ,
2132+ timeout : int = 60 ,
21322133 ) -> Dict [str , Any ]:
21332134 """
2134- Find dataflow paths from sources to sinks using Joern dataflow primitives.
2135+ Find dataflow paths from a specific source to a specific sink using Joern dataflow primitives.
2136+
2137+ Analyze data flow from a taint source (external input point or function call) to a taint sink
2138+ (security-sensitive operation) to identify potential vulnerabilities. Provides detailed path
2139+ information showing how data flows through the call graph and data dependencies.
2140+
2141+ This is a focused taint analysis task that works with specific source and sink identifiers,
2142+ making it practical for vulnerability investigation and security code review.
21352143
2136- Analyze data flow from taint sources (external input points) to taint sinks
2137- (security-sensitive operations) to identify potential vulnerabilities. This
2138- is a computationally expensive operation that may take significant time .
2144+ **Important**: Since function names like "malloc", "read", or "system" can appear many times
2145+ in a codebase, you MUST specify which specific call instance to analyze. Use node IDs (obtained
2146+ from find_taint_sources/find_taint_sinks) or specify exact locations .
21392147
21402148 Args:
21412149 session_id: The session ID from create_cpg_session
2142- source_patterns: Optional list of regex patterns for source function names
2143- If not provided, uses default patterns from config for the session's language
2144- sink_patterns: Optional list of regex patterns for sink function names
2145- If not provided, uses default patterns from config for the session's language
2146- max_path_length: Maximum length of dataflow paths to consider (default: 10)
2147- Flows with more than this many elements will be filtered out
2148- timeout: Maximum execution time in seconds (default: 30)
2149- limit: Maximum number of flow results to return (default: 100)
2150+ source_node_id: Node ID of the source call/method (recommended method - get from find_taint_sources)
2151+ Example: "12345" - the exact CPG node ID for a specific getenv() call
2152+ sink_node_id: Node ID of the sink call/method (recommended method - get from find_taint_sinks)
2153+ Example: "67890" - the exact CPG node ID for a specific system() call
2154+ source_location: Alternative to node_id: specify as "filename:line_number" or "filename:line_number:method_name"
2155+ Example: "main.c:42" or "main.c:42:main"
2156+ sink_location: Alternative to node_id: specify as "filename:line_number" or "filename:line_number:method_name"
2157+ Example: "main.c:100" or "main.c:100:execute_command"
2158+ max_path_length: Maximum length of dataflow paths to consider in elements (default: 20)
2159+ Paths with more elements will be filtered out to avoid extremely long chains
2160+ timeout: Maximum execution time in seconds (default: 60)
21502161
21512162 Returns:
21522163 {
21532164 "success": true,
2165+ "source": {
2166+ "node_id": "12345",
2167+ "code": "getenv(\" PATH\" )",
2168+ "filename": "main.c",
2169+ "lineNumber": 42,
2170+ "method": "main"
2171+ },
2172+ "sink": {
2173+ "node_id": "67890",
2174+ "code": "system(cmd)",
2175+ "filename": "main.c",
2176+ "lineNumber": 100,
2177+ "method": "execute_command"
2178+ },
21542179 "flows": [
21552180 {
2156- "source_code": "getenv(\" PATH\" )",
2157- "source_file": "main.c",
2158- "source_line": 42,
2159- "sink_code": "system(cmd)",
2160- "sink_file": "main.c",
2161- "sink_line": 100,
2162- "path_length": 3
2181+ "path_id": 0,
2182+ "path_length": 5,
2183+ "nodes": [
2184+ {
2185+ "step": 0,
2186+ "code": "getenv(\" PATH\" )",
2187+ "filename": "main.c",
2188+ "lineNumber": 42,
2189+ "nodeType": "CALL"
2190+ },
2191+ {
2192+ "step": 1,
2193+ "code": "path_var",
2194+ "filename": "main.c",
2195+ "lineNumber": 45,
2196+ "nodeType": "IDENTIFIER"
2197+ },
2198+ ...
2199+ ]
21632200 }
21642201 ],
2165- "total ": 1
2202+ "total_flows ": 1
21662203 }
21672204 """
21682205 try :
21692206 validate_session_id (session_id )
21702207
2208+ # Validate that we have proper source and sink specifications
2209+ if not source_node_id and not source_location :
2210+ raise ValidationError ("Either source_node_id or source_location must be provided" )
2211+ if not sink_node_id and not sink_location :
2212+ raise ValidationError ("Either sink_node_id or sink_location must be provided" )
2213+
21712214 session_manager = services ["session_manager" ]
21722215 query_executor = services ["query_executor" ]
21732216
@@ -2180,64 +2223,158 @@ async def find_taint_flows(
21802223
21812224 await session_manager .touch_session (session_id )
21822225
2183- lang = session .language or "c"
2184- cfg = services ["config" ]
2185- taint_src_cfg = getattr (cfg .cpg , "taint_sources" , {}) if hasattr (cfg .cpg , "taint_sources" ) else {}
2186- taint_sink_cfg = getattr (cfg .cpg , "taint_sinks" , {}) if hasattr (cfg .cpg , "taint_sinks" ) else {}
2226+ # Resolve source to actual node
2227+ source_info = None
2228+ if source_node_id :
2229+ # Direct node ID lookup
2230+ query = f'cpg.id({ source_node_id } ).map(n => (n.id, n.code, n.file.name.headOption.getOrElse("unknown"), n.lineNumber.getOrElse(-1), Try(n.method.fullName).getOrElse("unknown"))).headOption'
2231+ else :
2232+ # Parse location: "filename:line_number" or "filename:line_number:method_name"
2233+ parts = source_location .split (":" )
2234+ if len (parts ) < 2 :
2235+ raise ValidationError ("source_location must be in format 'filename:line' or 'filename:line:method'" )
2236+
2237+ filename = parts [0 ]
2238+ line_num = parts [1 ]
2239+ method_name = parts [2 ] if len (parts ) > 2 else None
2240+
2241+ if method_name :
2242+ query = f'cpg.file.name("{ filename } ").call.lineNumber({ line_num } ).where(_.method.fullName.contains("{ method_name } ")).map(c => (c.id, c.code, c.file.name.headOption.getOrElse("unknown"), c.lineNumber.getOrElse(-1), c.method.fullName)).headOption'
2243+ else :
2244+ query = f'cpg.file.name("{ filename } ").call.lineNumber({ line_num } ).map(c => (c.id, c.code, c.file.name.headOption.getOrElse("unknown"), c.lineNumber.getOrElse(-1), c.method.fullName)).headOption'
2245+
2246+ result_src = await query_executor .execute_query (
2247+ session_id = session_id ,
2248+ cpg_path = "/workspace/cpg.bin" ,
2249+ query = query ,
2250+ timeout = 10 ,
2251+ limit = 1 ,
2252+ )
21872253
2188- srcs = source_patterns or taint_src_cfg .get (lang , [])
2189- snks = sink_patterns or taint_sink_cfg .get (lang , [])
2254+ if result_src .success and result_src .data and len (result_src .data ) > 0 :
2255+ item = result_src .data [0 ]
2256+ if isinstance (item , dict ) and item .get ("_1" ):
2257+ source_info = {
2258+ "node_id" : item .get ("_1" ),
2259+ "code" : item .get ("_2" ),
2260+ "filename" : item .get ("_3" ),
2261+ "lineNumber" : item .get ("_4" ),
2262+ "method" : item .get ("_5" ),
2263+ }
21902264
2191- if not srcs or not snks :
2192- # If either is empty, return empty result quickly
2193- return {"success" : True , "flows" : [], "total" : 0 }
2265+ # Resolve sink to actual node
2266+ sink_info = None
2267+ if sink_node_id :
2268+ # Direct node ID lookup
2269+ query = f'cpg.id({ sink_node_id } ).map(n => (n.id, n.code, n.file.name.headOption.getOrElse("unknown"), n.lineNumber.getOrElse(-1), Try(n.method.fullName).getOrElse("unknown"))).headOption'
2270+ else :
2271+ # Parse location: "filename:line_number" or "filename:line_number:method_name"
2272+ parts = sink_location .split (":" )
2273+ if len (parts ) < 2 :
2274+ raise ValidationError ("sink_location must be in format 'filename:line' or 'filename:line:method'" )
2275+
2276+ filename = parts [0 ]
2277+ line_num = parts [1 ]
2278+ method_name = parts [2 ] if len (parts ) > 2 else None
2279+
2280+ if method_name :
2281+ query = f'cpg.file.name("{ filename } ").call.lineNumber({ line_num } ).where(_.method.fullName.contains("{ method_name } ")).map(c => (c.id, c.code, c.file.name.headOption.getOrElse("unknown"), c.lineNumber.getOrElse(-1), c.method.fullName)).headOption'
2282+ else :
2283+ query = f'cpg.file.name("{ filename } ").call.lineNumber({ line_num } ).map(c => (c.id, c.code, c.file.name.headOption.getOrElse("unknown"), c.lineNumber.getOrElse(-1), c.method.fullName)).headOption'
2284+
2285+ result_snk = await query_executor .execute_query (
2286+ session_id = session_id ,
2287+ cpg_path = "/workspace/cpg.bin" ,
2288+ query = query ,
2289+ timeout = 10 ,
2290+ limit = 1 ,
2291+ )
21942292
2195- # Remove trailing parens from patterns for proper regex matching
2196- cleaned_srcs = [p .rstrip ("(" ) for p in srcs ]
2197- cleaned_snks = [p .rstrip ("(" ) for p in snks ]
2198- src_joined = "|" .join ([re .escape (p ) for p in cleaned_srcs ])
2199- snk_joined = "|" .join ([re .escape (p ) for p in cleaned_snks ])
2200-
2201- # Build a query that finds dataflow paths from sources to sinks
2202- # Use Joern's sink.reachableByFlows(source) to find actual dataflow paths
2203- # Note: The query must .l (list) the sources and sinks before passing to reachableByFlows
2204- # Filter by max_path_length to avoid excessively long paths
2293+ if result_snk .success and result_snk .data and len (result_snk .data ) > 0 :
2294+ item = result_snk .data [0 ]
2295+ if isinstance (item , dict ) and item .get ("_1" ):
2296+ sink_info = {
2297+ "node_id" : item .get ("_1" ),
2298+ "code" : item .get ("_2" ),
2299+ "filename" : item .get ("_3" ),
2300+ "lineNumber" : item .get ("_4" ),
2301+ "method" : item .get ("_5" ),
2302+ }
2303+
2304+ # If either source or sink not found, return early
2305+ if not source_info or not sink_info :
2306+ return {
2307+ "success" : True ,
2308+ "source" : source_info ,
2309+ "sink" : sink_info ,
2310+ "flows" : [],
2311+ "total_flows" : 0 ,
2312+ "message" : f"Could not resolve source or sink from provided identifiers"
2313+ }
2314+
2315+ # Build dataflow query using reachableByFlows
2316+ # This finds all dataflow paths from source to sink
2317+ source_id = source_info ["node_id" ]
2318+ sink_id = sink_info ["node_id" ]
2319+
22052320 query = (
2206- f'val sources = cpg.call.name(" { src_joined } " ).l\n '
2207- f'val sinks = cpg.call.name(" { snk_joined } " ).l\n '
2208- f'sinks.reachableByFlows(sources).filter(flow => flow.elements.size <= { max_path_length } ).map(flow => {{\n '
2209- f' val elems = flow. elements\n '
2210- f' (elems.head.code, elems.head.file.name.headOption.getOrElse("unknown"), elems.head.lineNumber.getOrElse(-1), '
2211- f'elems.last.code, elems.last. file.name.headOption.getOrElse("unknown"), elems.last. lineNumber.getOrElse(-1), '
2212- f'elems.size) \n '
2213- f'}}).take( { limit } )'
2321+ f'val source = cpg.id( { source_id } ).l\n '
2322+ f'val sink = cpg.id( { sink_id } ).l\n '
2323+ f'if (source.nonEmpty && sink.nonEmpty) {{\n '
2324+ f' val flows = sink.reachableByFlows(source).filter(f => f. elements.size <= { max_path_length } ).toList \n '
2325+ f' flows.zipWithIndex.map {{ case (flow, flowIdx) => \n '
2326+ f' (flowIdx, flow.elements.size, flow.elements.map(e => (e.code, e. file.name.headOption.getOrElse("unknown"), e. lineNumber.getOrElse(-1), e.label)).l) \n '
2327+ f' }} \n '
2328+ f'}} else List[(Int, Int, List[(String, String, Int, String)])]( )'
22142329 )
22152330
22162331 result = await query_executor .execute_query (
22172332 session_id = session_id ,
22182333 cpg_path = "/workspace/cpg.bin" ,
22192334 query = query ,
22202335 timeout = timeout ,
2221- limit = limit ,
2336+ limit = 1000 , # Allow many results to capture all paths
22222337 )
22232338
22242339 if not result .success :
2225- return {"success" : False , "error" : {"code" : "QUERY_ERROR" , "message" : result .error }}
2340+ return {
2341+ "success" : False ,
2342+ "error" : {"code" : "QUERY_ERROR" , "message" : result .error },
2343+ }
22262344
2345+ # Parse flows from result
22272346 flows = []
22282347 for item in result .data :
22292348 if isinstance (item , dict ):
2349+ flow_idx = item .get ("_1" )
2350+ path_length = item .get ("_2" )
2351+ nodes_data = item .get ("_3" , [])
2352+
2353+ # Build node list for this path
2354+ nodes = []
2355+ for step , node_data in enumerate (nodes_data ):
2356+ if isinstance (node_data , dict ):
2357+ nodes .append ({
2358+ "step" : step ,
2359+ "code" : node_data .get ("_1" , "" ),
2360+ "filename" : node_data .get ("_2" , "" ),
2361+ "lineNumber" : node_data .get ("_3" , - 1 ),
2362+ "nodeType" : node_data .get ("_4" , "" ),
2363+ })
2364+
22302365 flows .append ({
2231- "source_code" : item .get ("_1" ),
2232- "source_file" : item .get ("_2" ),
2233- "source_line" : item .get ("_3" ),
2234- "sink_code" : item .get ("_4" ),
2235- "sink_file" : item .get ("_5" ),
2236- "sink_line" : item .get ("_6" ),
2237- "path_length" : item .get ("_7" ),
2366+ "path_id" : flow_idx ,
2367+ "path_length" : path_length ,
2368+ "nodes" : nodes ,
22382369 })
22392370
2240- return {"success" : True , "flows" : flows , "total" : len (flows )}
2371+ return {
2372+ "success" : True ,
2373+ "source" : source_info ,
2374+ "sink" : sink_info ,
2375+ "flows" : flows ,
2376+ "total_flows" : len (flows ),
2377+ }
22412378
22422379 except (SessionNotFoundError , SessionNotReadyError , ValidationError ) as e :
22432380 logger .error (f"Error finding taint flows: { e } " )
0 commit comments