Skip to content

Commit 2169730

Browse files
committed
remove list_taint_paths for redundancy
1 parent 8b0a82d commit 2169730

2 files changed

Lines changed: 0 additions & 264 deletions

File tree

src/tools/mcp_tools.py

Lines changed: 0 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -2505,234 +2505,6 @@ async def check_method_reachability(
25052505
"error": {"code": "INTERNAL_ERROR", "message": str(e)},
25062506
}
25072507

2508-
@mcp.tool()
2509-
async def list_taint_paths(
2510-
session_id: str,
2511-
source_pattern: Optional[str] = None,
2512-
sink_pattern: Optional[str] = None,
2513-
source_node_id: Optional[str] = None,
2514-
sink_node_id: Optional[str] = None,
2515-
max_paths: int = 10,
2516-
max_path_length: int = 15,
2517-
timeout: int = 60,
2518-
) -> Dict[str, Any]:
2519-
"""
2520-
List detailed taint flow paths from sources to sinks.
2521-
2522-
For given source and sink patterns (or specific node IDs), returns full
2523-
dataflow paths as ordered node sequences showing how data flows from
2524-
sources to sinks. Each path includes code, file, line number, and node
2525-
type for every step in the flow.
2526-
2527-
This is useful for:
2528-
- Detailed triage of taint vulnerabilities
2529-
- Understanding complete propagation chains
2530-
- Visualizing call/assignment/propagation sequences
2531-
- Security code review and validation
2532-
2533-
Args:
2534-
session_id: The session ID from create_cpg_session
2535-
source_pattern: Regex pattern for source function names (e.g., "getenv|input|request")
2536-
Either source_pattern or source_node_id must be provided
2537-
sink_pattern: Regex pattern for sink function names (e.g., "system|exec|eval")
2538-
Either sink_pattern or sink_node_id must be provided
2539-
source_node_id: Specific node ID to use as source (alternative to pattern)
2540-
sink_node_id: Specific node ID to use as sink (alternative to pattern)
2541-
max_paths: Maximum number of paths to return (default: 10)
2542-
max_path_length: Maximum length of each path in nodes (default: 15)
2543-
timeout: Maximum execution time in seconds (default: 60)
2544-
2545-
Returns:
2546-
{
2547-
"success": true,
2548-
"paths": [
2549-
{
2550-
"path_id": "path-1",
2551-
"source": {
2552-
"code": "getenv(\"PATH\")",
2553-
"filename": "main.c",
2554-
"lineNumber": 42,
2555-
"method": "main"
2556-
},
2557-
"sink": {
2558-
"code": "system(cmd)",
2559-
"filename": "main.c",
2560-
"lineNumber": 100,
2561-
"method": "execute_command"
2562-
},
2563-
"path_length": 5,
2564-
"nodes": [
2565-
{
2566-
"step": 0,
2567-
"code": "getenv(\"PATH\")",
2568-
"filename": "main.c",
2569-
"lineNumber": 42,
2570-
"node_type": "CALL"
2571-
},
2572-
{
2573-
"step": 1,
2574-
"code": "env_path",
2575-
"filename": "main.c",
2576-
"lineNumber": 42,
2577-
"node_type": "IDENTIFIER"
2578-
},
2579-
...
2580-
]
2581-
}
2582-
],
2583-
"total": 2
2584-
}
2585-
"""
2586-
try:
2587-
validate_session_id(session_id)
2588-
2589-
# Validate inputs
2590-
if not source_pattern and not source_node_id:
2591-
raise ValidationError("Either source_pattern or source_node_id must be provided")
2592-
if not sink_pattern and not sink_node_id:
2593-
raise ValidationError("Either sink_pattern or sink_node_id must be provided")
2594-
2595-
session_manager = services["session_manager"]
2596-
query_executor = services["query_executor"]
2597-
2598-
session = await session_manager.get_session(session_id)
2599-
if not session:
2600-
raise SessionNotFoundError(f"Session {session_id} not found")
2601-
2602-
if session.status != SessionStatus.READY.value:
2603-
raise SessionNotReadyError(f"Session is in '{session.status}' status")
2604-
2605-
await session_manager.touch_session(session_id)
2606-
2607-
# Build query based on whether we have patterns or node IDs
2608-
if source_node_id and sink_node_id:
2609-
# Use specific node IDs
2610-
query = (
2611-
f'val sources = cpg.id("{source_node_id}").l\n'
2612-
f'val sinks = cpg.id("{sink_node_id}").l\n'
2613-
f'val flows = if (sources.nonEmpty && sinks.nonEmpty) {{\n'
2614-
f' sinks.reachableByFlows(sources)\n'
2615-
f' .filter(flow => flow.elements.size <= {max_path_length})\n'
2616-
f' .take({max_paths})\n'
2617-
f' .map(flow => {{\n'
2618-
f' val elems = flow.elements\n'
2619-
f' (elems.head.code, elems.head.file.name.headOption.getOrElse("unknown"), '
2620-
f'elems.head.lineNumber.getOrElse(-1), '
2621-
f'elems.last.code, elems.last.file.name.headOption.getOrElse("unknown"), '
2622-
f'elems.last.lineNumber.getOrElse(-1), '
2623-
f'elems.size, '
2624-
f'elems.map(e => (e.code, e.file.name.headOption.getOrElse("unknown"), '
2625-
f'e.lineNumber.getOrElse(-1), e.label)))\n'
2626-
f' }})\n'
2627-
f' .l\n'
2628-
f'}} else List()\n'
2629-
f'flows.toJsonPretty'
2630-
)
2631-
else:
2632-
# Use patterns
2633-
src_pattern = source_pattern or ".*"
2634-
snk_pattern = sink_pattern or ".*"
2635-
2636-
# Clean and escape patterns
2637-
cleaned_src = src_pattern.rstrip("(")
2638-
cleaned_snk = snk_pattern.rstrip("(")
2639-
escaped_src = re.escape(cleaned_src) if cleaned_src != ".*" else cleaned_src
2640-
escaped_snk = re.escape(cleaned_snk) if cleaned_snk != ".*" else cleaned_snk
2641-
2642-
query = (
2643-
f'val sources = cpg.call.name("{escaped_src}").l\n'
2644-
f'val sinks = cpg.call.name("{escaped_snk}").l\n'
2645-
f'val flows = if (sources.nonEmpty && sinks.nonEmpty) {{\n'
2646-
f' sinks.reachableByFlows(sources)\n'
2647-
f' .filter(flow => flow.elements.size <= {max_path_length})\n'
2648-
f' .take({max_paths})\n'
2649-
f' .map(flow => {{\n'
2650-
f' val elems = flow.elements\n'
2651-
f' (elems.head.code, elems.head.file.name.headOption.getOrElse("unknown"), '
2652-
f'elems.head.lineNumber.getOrElse(-1), '
2653-
f'elems.last.code, elems.last.file.name.headOption.getOrElse("unknown"), '
2654-
f'elems.last.lineNumber.getOrElse(-1), '
2655-
f'elems.size, '
2656-
f'elems.map(e => (e.code, e.file.name.headOption.getOrElse("unknown"), '
2657-
f'e.lineNumber.getOrElse(-1), e.label)))\n'
2658-
f' }})\n'
2659-
f' .l\n'
2660-
f'}} else List()\n'
2661-
f'flows.toJsonPretty'
2662-
)
2663-
2664-
result = await query_executor.execute_query(
2665-
session_id=session_id,
2666-
cpg_path="/workspace/cpg.bin",
2667-
query=query,
2668-
timeout=timeout,
2669-
limit=max_paths * 20, # Allow for node expansion
2670-
)
2671-
2672-
if not result.success:
2673-
return {
2674-
"success": False,
2675-
"error": {"code": "QUERY_ERROR", "message": result.error},
2676-
}
2677-
2678-
paths = []
2679-
for idx, item in enumerate(result.data):
2680-
if isinstance(item, dict):
2681-
# Extract path information (without method names)
2682-
source_info = {
2683-
"code": item.get("_1", ""),
2684-
"filename": item.get("_2", ""),
2685-
"lineNumber": item.get("_3", -1),
2686-
}
2687-
2688-
sink_info = {
2689-
"code": item.get("_4", ""),
2690-
"filename": item.get("_5", ""),
2691-
"lineNumber": item.get("_6", -1),
2692-
}
2693-
2694-
path_length = item.get("_7", 0)
2695-
2696-
# Extract node sequence
2697-
nodes = []
2698-
node_list = item.get("_8", [])
2699-
for step, node_data in enumerate(node_list):
2700-
if isinstance(node_data, dict):
2701-
nodes.append({
2702-
"step": step,
2703-
"code": node_data.get("_1", ""),
2704-
"filename": node_data.get("_2", ""),
2705-
"lineNumber": node_data.get("_3", -1),
2706-
"node_type": node_data.get("_4", "UNKNOWN"),
2707-
})
2708-
2709-
paths.append({
2710-
"path_id": f"path-{idx + 1}",
2711-
"source": source_info,
2712-
"sink": sink_info,
2713-
"path_length": path_length,
2714-
"nodes": nodes,
2715-
})
2716-
2717-
return {
2718-
"success": True,
2719-
"paths": paths,
2720-
"total": len(paths),
2721-
}
2722-
2723-
except (SessionNotFoundError, SessionNotReadyError, ValidationError) as e:
2724-
logger.error(f"Error listing taint paths: {e}")
2725-
return {
2726-
"success": False,
2727-
"error": {"code": type(e).__name__.upper(), "message": str(e)},
2728-
}
2729-
except Exception as e:
2730-
logger.error(f"Unexpected error listing taint paths: {e}", exc_info=True)
2731-
return {
2732-
"success": False,
2733-
"error": {"code": "INTERNAL_ERROR", "message": str(e)},
2734-
}
2735-
27362508
@mcp.tool()
27372509
async def get_program_slice(
27382510
session_id: str,

tests/test_mcp_tools.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -758,42 +758,6 @@ async def test_check_method_reachability_success(self, fake_services, ready_sess
758758
assert "helper" in result["message"]
759759

760760
@pytest.mark.asyncio
761-
async def test_list_taint_paths_success(self, fake_services, ready_session):
762-
"""Test successful taint path listing"""
763-
mcp = FakeMCP()
764-
register_tools(mcp, fake_services)
765-
766-
fake_services["session_manager"].get_session.return_value = ready_session
767-
query_result = QueryResult(
768-
success=True,
769-
data=[{
770-
"_1": 'getenv("PATH")',
771-
"_2": "main.c",
772-
"_3": 10,
773-
"_4": 'system(cmd)',
774-
"_5": "main.c",
775-
"_6": 100,
776-
"_7": 5,
777-
"_8": [
778-
{"_1": 'getenv("PATH")', "_2": "main.c", "_3": 10, "_4": "CALL"},
779-
{"_1": "env_path", "_2": "main.c", "_3": 10, "_4": "IDENTIFIER"}
780-
]
781-
}],
782-
row_count=1
783-
)
784-
fake_services["query_executor"].execute_query.return_value = query_result
785-
786-
func = mcp.registered["list_taint_paths"]
787-
result = await func(
788-
session_id=ready_session.id,
789-
source_pattern="getenv",
790-
sink_pattern="system"
791-
)
792-
793-
assert result["success"] is True
794-
assert len(result["paths"]) == 1
795-
assert result["paths"][0]["path_length"] == 5
796-
797761
@pytest.mark.asyncio
798762
async def test_get_program_slice_success(self, fake_services, ready_session, temp_workspace):
799763
"""Test successful program slice retrieval"""

0 commit comments

Comments
 (0)