Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ celerybeat.pid
# SageMath parsed files
*.sage.py

# Local idea tracking
NEW_IDEAS.md

# Environments
.env
.venv
Expand Down
103 changes: 103 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Commands

### Install dependencies
```bash
pip install -r requirements.txt
pip install -r requirements_ci.txt # dev/linting tools
```

### Run tests
```bash
# All tests
python -m unittest discover -s core_tests -p "*.py"

# Single test file
python -m unittest core_tests.runner_test
```

### Lint and format
```bash
pylint core/
isort --profile=black --check-only core/ core_tests/
isort --profile=black core/ core_tests/ # to fix imports
```

### Build
```bash
python -m build
```

### Run evals

Evals verify that the V2 agent scanner still catches expected vulnerabilities across 5 fixture files (`evals/fixtures/`). Run them after any change to `core/agent.py`, `core/code_scanner/agent_scanner.py`, or the system prompt.

```bash
# Standard model (gpt-4o-mini) — all fixtures should pass
set -a && source .env && set +a
python3 evals/run_evals.py --provider openai --model gpt-4o-mini

# Advanced model (gpt-4o) — stricter; also requires YAML deserialization,
# race condition, JWT algorithm confusion, and timing attack findings
python3 evals/run_evals.py --provider openai --model gpt-4o

# Single fixture only
python3 evals/run_evals.py --provider openai --model gpt-4o-mini --fixture auth_service.py
```

Findings are split into two tiers in `evals/expected_findings.json`:
- **standard** — required from any model (SQL injection, XSS, path traversal, pickle, hardcoded secrets, etc.)
- **advanced** — only required when running gpt-4o (YAML code execution, race conditions, JWT algorithm confusion, timing attacks)

Exit code 0 = all fixtures at or above the 80% threshold. Exit code 1 = one or more failed.
### Run the CLI locally
```bash
# V1 runner (sends all files to AI at once)
python3 -m core.runner --provider openai

# V2 runner (file-by-file via Pydantic-AI agent)
python3 -m core.runner_v2 --provider openai
```

## Architecture

CodeScanAI is a CLI tool that scans codebases for security vulnerabilities using AI models.

### Two parallel codepaths

There are two scanner implementations that share the same CLI argument surface (`core/utils/argument_parser.py`):

1. **V1 (`core/runner.py` → `core/code_scanner/code_scanner.py`)**: Aggregates all file content into a single code summary, sends it to the AI provider in one call, returns a markdown string. Uses the provider abstraction layer.

2. **V2 (`core/runner_v2.py` → `core/code_scanner/agent_scanner.py`)**: Iterates file-by-file, runs a Pydantic-AI `Agent` synchronously on each, and streams structured `FileScanResult` output to stdout. Also supports posting inline PR review comments via `GithubIntegration`. This is the more feature-rich path.

The active entrypoint is `core.runner_v2:main` (V2), set in `pyproject.toml` under `[project.scripts]`.

### Provider abstraction (V1 only)

`core/providers/base_ai_provider.py` defines the `BaseAIProvider` interface with a single `scan_code(code_summary)` method. Concrete implementations:
- `OpenAIProvider` — uses `openai` SDK
- `GoogleGeminiAIProvider` — uses `google-generativeai`
- `CustomAIProvider` — HTTP requests to a self-hosted server (Ollama, etc.)

`core/utils/provider_creator.py` maps CLI `--provider` values to provider classes.

### Pydantic-AI agent (V2 only)

`core/agent.py` defines structured output types (`Vulnerability`, `FileScanResult`) and factory functions. It also holds pre-configured system prompts for different scan modes: `SECURITY_AGENT_PROMPT`, `PERFORMANCE_AGENT_PROMPT`, `CLEAN_CODE_AGENT_PROMPT`. Custom providers route through the OpenAI-compatible interface via `OPENAI_BASE_URL`.

### GitHub integration

`core/utils/github_integration.py` (`GithubIntegration`) is used only in V2. It posts inline PR review comments using PyGithub. Falls back to a regular issue comment if the line isn't in the PR diff.

`core/utils/file_extractor.py` handles both local git-diff file discovery and PR file listing via the GitHub API, shared by both V1 and V2.

### Scan modes

Both runners support three modes driven by CLI args:
- **Full scan** (default): walks `--directory` and scans all files
- **Changes only** (`--changes_only`): scans files changed in local git repo
- **PR scan** (`--repo` + `--pr_number` + `--github_token`): fetches changed files from a GitHub PR
108 changes: 108 additions & 0 deletions core/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Defines the structured output types, agent factory, and pre-configured system prompts
used by the V2 Pydantic-AI scanner.
"""

from typing import Optional, Type

from pydantic import BaseModel, Field
from pydantic_ai import Agent


class Vulnerability(BaseModel):
"""Represents a single security vulnerability found in a file."""

line_number: Optional[int] = Field(
default=None,
description=(
"The exact line number where the issue is found inside the file. "
"Omit if the issue is architectural or spans multiple lines."
),
)
description: str = Field(
description="A detailed description of the issue and why it is a security risk."
)
remediation: str = Field(
description="Actionable suggestion or code snippet on how to fix this specific vulnerability."
)
severity: str = Field(description="Severity measure (Low, Medium, High, Critical)")
vulnerability_type: str = Field(
description="The category of issue (e.g. SQL Injection, Big-O inefficiency, etc.)"
)


class FileScanResult(BaseModel):
"""Structured result returned by the agent for a single scanned file."""

vulnerabilities: list[Vulnerability] = Field(
description="List of issues found in the file. Empty if zero issues are found."
)


def get_pydantic_ai_model(provider: str, model: Optional[str]) -> str:
"""Map a CLI provider name and optional model string to a Pydantic-AI model identifier."""
if provider == "openai":
return f"openai:{model or 'gpt-4o-mini'}"
if provider == "gemini":
return f"gemini:{model or 'gemini-1.5-flash'}"
if provider == "custom":
# Falls back to the OpenAI-compatible interface via OPENAI_BASE_URL
return f"openai:{model or 'custom-model'}"
return "openai:gpt-4o-mini"


def create_agent(
model_str: str, system_prompt: str, result_type: Type[BaseModel] = FileScanResult
) -> Agent:
"""
Creates and returns a Pydantic-AI Agent configured for a custom, laser-focused task.
By passing different `system_prompt` and `result_type` schemas, you can deploy
multiple types of agents.
"""
return Agent(
model_str,
result_type=result_type,
system_prompt=system_prompt,
)


# --- Define pre-configured Agent Prompts for laser-focused tasks ---

SECURITY_AGENT_PROMPT = (
"You are an expert in software security analysis, adept at identifying and explaining "
"potential vulnerabilities in code. "
"You will be given complete code snippets from various applications. "
"EVERY line of the source code is prefixed with its exact line number "
"(e.g. `14: def foo():`). "
"Your task is to analyze the provided code, pinpoint potential security risks, "
"and offer clear suggestions for enhancing the application's security posture. "
"Focus on the critical issues that could impact the overall security of the application. "
"You MUST be exhaustive. Carefully audit the entire script from top to bottom "
"and return EVERY vulnerability you find. Do not stop at the first issue. "
"If any are found, use the explicitly provided line numbers to pinpoint the defect "
"where possible. For architectural or multi-line issues, you may omit the line number. "
"Also, strictly provide an actionable `remediation` that makes suggestions on how to "
"rewrite or fix the code securely. "
"If no vulnerabilities are found, return an empty list. "
"When scanning a pull request or diff, some lines will be marked with `[CHANGED]` "
"after the line number (e.g. `14: [CHANGED] def foo():`). "
"These lines are newly added or modified in the change under review. "
"Prioritise your analysis on `[CHANGED]` lines, but use the full file context — "
"imports, surrounding functions, class definitions, and data flow — to assess "
"whether those changes introduce or worsen a vulnerability."
)

PERFORMANCE_AGENT_PROMPT = (
"You are a Senior Staff Software Engineer laser-focused on performance optimization. "
"Analyze the following code for memory leaks, O(N^2) bottlenecks, or CPU inefficiencies. "
"Pinpoint exact line numbers and return a list of performance issues. "
"If none are found, return an empty list."
)

CLEAN_CODE_AGENT_PROMPT = (
"You are an expert in code refactoring and Clean Code methodologies. "
"Analyze the code for anti-patterns, confusing variable names, massive functions, "
"or high cyclomatic complexity. "
"Pinpoint exact line numbers and return a list of maintainability issues. "
"If the code is perfectly clean, return an empty list."
)
162 changes: 162 additions & 0 deletions core/code_scanner/agent_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
V2 agent-based scanner. Scans source files one at a time using a Pydantic-AI Agent
and streams structured FileScanResult output to stdout.
"""

import logging
import os

from core.agent import (
SECURITY_AGENT_PROMPT,
FileScanResult,
create_agent,
get_pydantic_ai_model,
)
from core.utils.file_extractor import (
get_changed_files_in_pr,
get_changed_files_in_repo,
get_local_changed_line_numbers,
get_pr_changed_line_numbers,
)
from core.utils.github_integration import GithubIntegration

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


class AgentScanner:
"""
Scans source code via the Pydantic AI agent, file by file.
Optionally posts inline review comments to GitHub PRs.
"""

def __init__(self, args) -> None:
self.args = args
model_str = get_pydantic_ai_model(args.provider, args.model)

# Set OPENAI_BASE_URL for custom providers so Pydantic-AI targets the correct backend.
if args.provider == "custom" and args.host:
host_url = f"{args.host}:{args.port}" if args.port else args.host
if args.endpoint:
host_url += args.endpoint
os.environ["OPENAI_BASE_URL"] = host_url
if args.token:
os.environ["OPENAI_API_KEY"] = args.token

self.agent = create_agent(
model_str=model_str,
system_prompt=SECURITY_AGENT_PROMPT,
result_type=FileScanResult,
)
self.github_integration = (
GithubIntegration(args)
if args.repo and args.pr_number and args.github_token
else None
)

def scan(self):
"""
Scans the code by identifying files based on PR context or local directory
and iterates through them using the Pydantic AI agent.
"""
if self.args.changes_only or (self.args.repo and self.args.pr_number):
return self._scan_changes()
return self._scan_files()

def _scan_changes(self):
try:
if self.args.repo and self.args.pr_number:
changed_files = get_changed_files_in_pr(
self.args.repo, self.args.pr_number, self.args.github_token
)
changed_line_map = get_pr_changed_line_numbers(
self.args.repo, self.args.pr_number, self.args.github_token
)
else:
changed_files = get_changed_files_in_repo(self.args.directory)
changed_line_map = None
except ValueError as e:
logging.error(e)
return

if not changed_files:
logging.info("No changes detected.")
return

for filename in changed_files:
filepath = os.path.join(self.args.directory, filename)
if changed_line_map is not None:
changed_lines = changed_line_map.get(filename, set())
else:
changed_lines = get_local_changed_line_numbers(self.args.directory, filename)
self._scan_single_file(filepath, display_name=filename, changed_lines=changed_lines)

def _scan_files(self):
file_paths = []
for root, _, files in os.walk(self.args.directory):
for file in files:
file_paths.append(os.path.join(root, file))

for filepath in file_paths:
self._scan_single_file(
filepath, display_name=os.path.relpath(filepath, self.args.directory)
)

def _scan_single_file(self, file_path: str, display_name: str, changed_lines: set = None):
"""Scan a single file and print any vulnerabilities found."""
if not os.path.isfile(file_path):
logging.warning("Skipping %s: Not a valid file or not found locally.", file_path)
return

try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e: # pylint: disable=broad-exception-caught
logging.warning("Skipping %s: %s", file_path, e)
return

if not content.strip():
return

logging.info("Scanning file: %s ...", display_name)

def _format_line(idx, line):
lineno = idx + 1
if changed_lines and lineno in changed_lines:
return f"{lineno}: [CHANGED] {line}"
return f"{lineno}: {line}"

numbered_content = "\n".join([_format_line(idx, line) for idx, line in enumerate(content.splitlines())])

try:
result = self.agent.run_sync(f"File: {display_name}\n\n{numbered_content}")
scan_result = result.data

if scan_result.vulnerabilities:
print(f"\n--- Vulnerabilities found in {display_name} ---")
md_output = ""
for vuln in scan_result.vulnerabilities:
line_info = f"Line {vuln.line_number}: " if vuln.line_number else ""
md_output += f" - **{line_info}[{vuln.severity}] {vuln.vulnerability_type}**\n"
md_output += f" - **Issue**: {vuln.description}\n"
md_output += f" - **Fix**: {vuln.remediation}\n"
print(md_output)

if self.github_integration:
for vuln in scan_result.vulnerabilities:
comment_body = (
f"**[{vuln.severity.upper()} SEVERITY] {vuln.vulnerability_type}**"
f"\n\n{vuln.description}"
f"\n\n**Suggested Fix:**\n{vuln.remediation}"
)
self.github_integration.post_inline_comment(
path=display_name,
line=vuln.line_number,
body=comment_body,
)
else:
logging.info("No vulnerabilities found in %s.", display_name)

except Exception as e: # pylint: disable=broad-exception-caught
logging.error("Error scanning %s: %s", display_name, e)
Loading
Loading