Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,52 @@
<html>
<head>
<title>Todo App</title>
<style>
.overdue { color: red; }
</style>
</head>
<body>
<h1>Todo App</h1>
<p><small>Signed in as: <strong>{{ username }}</strong> &mdash; you can only view and modify your own todos.</small></p>

<form action="/add" method="POST">
<input type="text" name="title" placeholder="Add a todo">
<input type="text" name="title" placeholder="Add a todo" required>
<input type="date" name="due_date">
<button type="submit">Add</button>
</form>

<hr>

{% if todos %}
<p>
Sort by:
<a href="/?sort=created_at">Created date</a> |
<a href="/?sort=due_date">Due date</a>
</p>
<table border="1" cellpadding="5">
<tr>
<th>ID</th>
<th>Task</th>
<th>Due Date</th>
<th>Status</th>
<th>Actions</th>
</tr>
{% for todo in todos %}
<tr>
{% set overdue = todo.due_date and todo.due_date < today and not todo.completed %}
<tr{% if overdue %} class="overdue"{% endif %}>
<td>{{ todo.id }}</td>
<td>{{ todo.title }}</td>
<td>{{ todo.due_date if todo.due_date else "—" }}</td>
<td>{{ "Done" if todo.completed else "Open" }}</td>
<td>
<!-- Server enforces ownership: only todos belonging to the authenticated user are shown and actions are verified server-side -->
<a href="/toggle/{{ todo.id }}">[toggle]</a>
<a href="/delete/{{ todo.id }}">[delete]</a>
Comment on lines 44 to 45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The /toggle/{{ todo.id }} and /delete/{{ todo.id }} actions use GET requests (plain anchor links). This is a CSRF vulnerability — any page can trigger state-changing operations by embedding links or <img src="..."> tags that the authenticated user's browser will follow. These should use POST forms (with a CSRF token if the framework supports it), not GET links.

Why did I show this?

Category: security
Comment Quality: high

Influenced by requirements:

Tools used:

  1. list_changed_files, {'pattern': {'type': 'string', 'value': '**/*.py'}}
  2. list_changed_files, {'pattern': '**/*.py'}

<form action="/edit/{{ todo.id }}" method="POST" style="display:inline">
<input type="text" name="title" value="{{ todo.title }}" required>
<input type="date" name="due_date" value="{{ todo.due_date if todo.due_date else '' }}">
<button type="submit">[save]</button>
</form>
</td>
</tr>
{% endfor %}
Expand Down
156 changes: 151 additions & 5 deletions src/todo_app/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,80 @@
"""Flask todo application."""

import hmac
import logging
import os
import sqlite3
import time
from collections import defaultdict
from datetime import date
from functools import wraps
from pathlib import Path
from typing import Any, Callable

from flask import Flask, redirect, render_template, request, url_for
from flask import Flask, Response, redirect, render_template, request, url_for

logger = logging.getLogger(__name__)

app = Flask(__name__, template_folder=str(Path(__file__).parent.parent / "templates"))
DATABASE = os.environ.get("DATABASE_PATH", "todos.db")
AUTH_USERNAME = os.environ.get("AUTH_USERNAME", "admin")
AUTH_PASSWORD = os.environ.get("AUTH_PASSWORD")

if not AUTH_PASSWORD:
raise RuntimeError("AUTH_PASSWORD environment variable must be set to a non-empty value.")

_RATE_LIMIT_WINDOW = 60 # seconds
_RATE_LIMIT_MAX = 10 # max failed attempts per window
_failed_attempts: dict[str, list[float]] = defaultdict(list)


def _is_rate_limited(ip: str) -> bool:
"""Return True if the IP has exceeded the failed login rate limit."""
now = time.monotonic()
attempts = [t for t in _failed_attempts[ip] if now - t < _RATE_LIMIT_WINDOW]
_failed_attempts[ip] = attempts
return len(attempts) >= _RATE_LIMIT_MAX


def _rate_limit_response() -> Response | None:
"""Return a 429 Response if the current request IP is rate-limited, else None."""
ip = request.remote_addr or "unknown"
if _is_rate_limited(ip):
logger.warning("Rate limit exceeded for IP %s", ip)
return Response("Too many failed attempts.", 429)
return None


def _credentials_valid() -> bool:
"""Return True if the current request carries valid Basic Auth credentials."""
auth = request.authorization
return (
auth is not None
and hmac.compare_digest(auth.username, AUTH_USERNAME)
and hmac.compare_digest(auth.password, AUTH_PASSWORD)
)


def require_auth(f: Callable) -> Callable:
"""Require HTTP Basic Auth for a route.

Note: Deploy behind HTTPS to prevent credentials from being transmitted in plaintext.
"""
@wraps(f)
def decorated(*args: Any, **kwargs: Any) -> Any:
if (err := _rate_limit_response()):
return err
ip = request.remote_addr or "unknown"
if not _credentials_valid():
_failed_attempts[ip].append(time.monotonic())
logger.warning("Failed auth attempt from IP %s", ip)
return Response(
"Authentication required.",
401,
{"WWW-Authenticate": 'Basic realm="Todo App"'},
)
return f(*args, **kwargs)
return decorated


def get_db() -> sqlite3.Connection:
Expand All @@ -25,48 +92,127 @@ def init_db() -> None:
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
due_date TEXT,
created_by TEXT NOT NULL DEFAULT ''
)
""")
for column, definition in [("due_date", "TEXT"), ("created_by", "TEXT NOT NULL DEFAULT ''")]:
try:
conn.execute(f"ALTER TABLE todos ADD COLUMN {column} {definition}") # noqa: S608
except sqlite3.OperationalError:
pass # Column already exists
# Assign legacy todos (created before auth was added) to the current user
conn.execute(
"UPDATE todos SET created_by = ? WHERE created_by = ''",
(AUTH_USERNAME,),
)
conn.commit()
conn.close()


@app.route("/")
@require_auth
def index() -> str:
"""Display all todos."""
sort = request.args.get("sort", "created_at")
order_by = "due_date ASC, created_at DESC" if sort == "due_date" else "created_at DESC"
Comment on lines +118 to +119
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sort parameter is taken from user input and used directly in an f-string SQL query without validation. Although only two values are used, if the logic ever changes or is copy-pasted, this is a SQL injection vector. The allowed values should be explicitly validated:

Suggested change
sort = request.args.get("sort", "created_at")
order_by = "due_date ASC, created_at DESC" if sort == "due_date" else "created_at DESC"
sort = request.args.get("sort", "created_at")
if sort not in ("due_date", "created_at"):
sort = "created_at"
order_by = "due_date ASC, created_at DESC" if sort == "due_date" else "created_at DESC"
Why did I show this?

Category: security
Comment Quality: high

Based on general best practices

Tools used:

  1. get_file_lines, {'file_path': 'templates/index.html', 'start_line': '1', 'end_line': '200'}
  2. list_changed_files, {'pattern': '**/*.html'}
  3. get_file_lines, {'file_path': 'src/templates/index.html', 'start_line': '1', 'end_line': '200'}

conn = get_db()
todos = conn.execute("SELECT * FROM todos ORDER BY created_at DESC").fetchall()
username = request.authorization.username if request.authorization else AUTH_USERNAME
todos = conn.execute( # noqa: S608
f"SELECT * FROM todos WHERE created_by = ? ORDER BY {order_by}", (username,)
).fetchall()
conn.close()
return render_template("index.html", todos=todos)
return render_template("index.html", todos=todos, today=date.today().isoformat(), sort=sort, username=username)


def _current_user() -> str:
"""Return the authenticated username from the current request.

Raises RuntimeError if called outside an authenticated request context.
"""
if not request.authorization:
raise RuntimeError("_current_user called without an authenticated request")
return request.authorization.username


def _check_todo_access(conn: sqlite3.Connection, todo_id: int) -> Response | None:
"""Return an error Response if the authenticated user cannot access the todo.

Must only be called from routes protected by @require_auth, which already
enforces rate limiting and credential validation before this is reached.

Returns 404 if the todo does not exist, 403 if it exists but is not owned
by the authenticated user.
"""
username = _current_user()
row = conn.execute("SELECT created_by FROM todos WHERE id = ?", (todo_id,)).fetchone()
if row is None:
return Response("Not found", 404)
if row["created_by"] != username:
return Response("Forbidden", 403)
return None


@app.route("/add", methods=["POST"])
@require_auth
def add() -> str:
"""Add a new todo."""
title = request.form.get("title", "").strip()
due_date = request.form.get("due_date", "").strip() or None
if title:
conn = get_db()
conn.execute("INSERT INTO todos (title) VALUES (?)", (title,))
conn.execute(
"INSERT INTO todos (title, due_date, created_by) VALUES (?, ?, ?)",
(title, due_date, _current_user()),
)
conn.commit()
conn.close()
return redirect(url_for("index"))


@app.route("/toggle/<int:todo_id>")
@require_auth
def toggle(todo_id: int) -> str:
"""Toggle a todo's completed status."""
conn = get_db()
if (err := _check_todo_access(conn, todo_id)):
conn.close()
return err
conn.execute("UPDATE todos SET completed = NOT completed WHERE id = ?", (todo_id,))
conn.commit()
conn.close()
return redirect(url_for("index"))


@app.route("/edit/<int:todo_id>", methods=["POST"])
@require_auth
def edit(todo_id: int) -> str:
"""Edit a todo's title and due date."""
title = request.form.get("title", "").strip()
due_date = request.form.get("due_date", "").strip() or None
if title:
conn = get_db()
if (err := _check_todo_access(conn, todo_id)):
conn.close()
return err
conn.execute(
"UPDATE todos SET title = ?, due_date = ? WHERE id = ?",
(title, due_date, todo_id),
)
conn.commit()
conn.close()
return redirect(url_for("index"))


@app.route("/delete/<int:todo_id>")
@require_auth
def delete(todo_id: int) -> str:
"""Delete a todo."""
conn = get_db()
if (err := _check_todo_access(conn, todo_id)):
conn.close()
return err
conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
conn.commit()
conn.close()
Expand Down
Loading