11"""Flask todo application."""
22
3+ import hmac
4+ import logging
35import os
46import sqlite3
7+ import time
8+ from collections import defaultdict
9+ from datetime import date
10+ from functools import wraps
511from pathlib import Path
12+ from typing import Any , Callable
613
7- from flask import Flask , redirect , render_template , request , url_for
14+ from flask import Flask , Response , redirect , render_template , request , url_for
15+
16+ logger = logging .getLogger (__name__ )
817
918app = Flask (__name__ , template_folder = str (Path (__file__ ).parent .parent / "templates" ))
1019DATABASE = os .environ .get ("DATABASE_PATH" , "todos.db" )
20+ AUTH_USERNAME = os .environ .get ("AUTH_USERNAME" , "admin" )
21+ AUTH_PASSWORD = os .environ .get ("AUTH_PASSWORD" )
22+
23+ if not AUTH_PASSWORD :
24+ raise RuntimeError ("AUTH_PASSWORD environment variable must be set to a non-empty value." )
25+
26+ _RATE_LIMIT_WINDOW = 60 # seconds
27+ _RATE_LIMIT_MAX = 10 # max failed attempts per window
28+ _failed_attempts : dict [str , list [float ]] = defaultdict (list )
29+
30+
31+ def _is_rate_limited (ip : str ) -> bool :
32+ """Return True if the IP has exceeded the failed login rate limit."""
33+ now = time .monotonic ()
34+ attempts = [t for t in _failed_attempts [ip ] if now - t < _RATE_LIMIT_WINDOW ]
35+ _failed_attempts [ip ] = attempts
36+ return len (attempts ) >= _RATE_LIMIT_MAX
37+
38+
39+ def _rate_limit_response () -> Response | None :
40+ """Return a 429 Response if the current request IP is rate-limited, else None."""
41+ ip = request .remote_addr or "unknown"
42+ if _is_rate_limited (ip ):
43+ logger .warning ("Rate limit exceeded for IP %s" , ip )
44+ return Response ("Too many failed attempts." , 429 )
45+ return None
46+
47+
48+ def _credentials_valid () -> bool :
49+ """Return True if the current request carries valid Basic Auth credentials."""
50+ auth = request .authorization
51+ return (
52+ auth is not None
53+ and hmac .compare_digest (auth .username , AUTH_USERNAME )
54+ and hmac .compare_digest (auth .password , AUTH_PASSWORD )
55+ )
56+
57+
58+ def require_auth (f : Callable ) -> Callable :
59+ """Require HTTP Basic Auth for a route.
60+
61+ Note: Deploy behind HTTPS to prevent credentials from being transmitted in plaintext.
62+ """
63+ @wraps (f )
64+ def decorated (* args : Any , ** kwargs : Any ) -> Any :
65+ if (err := _rate_limit_response ()):
66+ return err
67+ ip = request .remote_addr or "unknown"
68+ if not _credentials_valid ():
69+ _failed_attempts [ip ].append (time .monotonic ())
70+ logger .warning ("Failed auth attempt from IP %s" , ip )
71+ return Response (
72+ "Authentication required." ,
73+ 401 ,
74+ {"WWW-Authenticate" : 'Basic realm="Todo App"' },
75+ )
76+ return f (* args , ** kwargs )
77+ return decorated
1178
1279
1380def get_db () -> sqlite3 .Connection :
@@ -25,48 +92,127 @@ def init_db() -> None:
2592 id INTEGER PRIMARY KEY AUTOINCREMENT,
2693 title TEXT NOT NULL,
2794 completed BOOLEAN NOT NULL DEFAULT 0,
28- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
95+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
96+ due_date TEXT,
97+ created_by TEXT NOT NULL DEFAULT ''
2998 )
3099 """ )
100+ for column , definition in [("due_date" , "TEXT" ), ("created_by" , "TEXT NOT NULL DEFAULT ''" )]:
101+ try :
102+ conn .execute (f"ALTER TABLE todos ADD COLUMN { column } { definition } " ) # noqa: S608
103+ except sqlite3 .OperationalError :
104+ pass # Column already exists
105+ # Assign legacy todos (created before auth was added) to the current user
106+ conn .execute (
107+ "UPDATE todos SET created_by = ? WHERE created_by = ''" ,
108+ (AUTH_USERNAME ,),
109+ )
31110 conn .commit ()
32111 conn .close ()
33112
34113
35114@app .route ("/" )
115+ @require_auth
36116def index () -> str :
37117 """Display all todos."""
118+ sort = request .args .get ("sort" , "created_at" )
119+ order_by = "due_date ASC, created_at DESC" if sort == "due_date" else "created_at DESC"
38120 conn = get_db ()
39- todos = conn .execute ("SELECT * FROM todos ORDER BY created_at DESC" ).fetchall ()
121+ username = request .authorization .username if request .authorization else AUTH_USERNAME
122+ todos = conn .execute ( # noqa: S608
123+ f"SELECT * FROM todos WHERE created_by = ? ORDER BY { order_by } " , (username ,)
124+ ).fetchall ()
40125 conn .close ()
41- return render_template ("index.html" , todos = todos )
126+ return render_template ("index.html" , todos = todos , today = date .today ().isoformat (), sort = sort , username = username )
127+
128+
129+ def _current_user () -> str :
130+ """Return the authenticated username from the current request.
131+
132+ Raises RuntimeError if called outside an authenticated request context.
133+ """
134+ if not request .authorization :
135+ raise RuntimeError ("_current_user called without an authenticated request" )
136+ return request .authorization .username
137+
138+
139+ def _check_todo_access (conn : sqlite3 .Connection , todo_id : int ) -> Response | None :
140+ """Return an error Response if the authenticated user cannot access the todo.
141+
142+ Must only be called from routes protected by @require_auth, which already
143+ enforces rate limiting and credential validation before this is reached.
144+
145+ Returns 404 if the todo does not exist, 403 if it exists but is not owned
146+ by the authenticated user.
147+ """
148+ username = _current_user ()
149+ row = conn .execute ("SELECT created_by FROM todos WHERE id = ?" , (todo_id ,)).fetchone ()
150+ if row is None :
151+ return Response ("Not found" , 404 )
152+ if row ["created_by" ] != username :
153+ return Response ("Forbidden" , 403 )
154+ return None
42155
43156
44157@app .route ("/add" , methods = ["POST" ])
158+ @require_auth
45159def add () -> str :
46160 """Add a new todo."""
47161 title = request .form .get ("title" , "" ).strip ()
162+ due_date = request .form .get ("due_date" , "" ).strip () or None
48163 if title :
49164 conn = get_db ()
50- conn .execute ("INSERT INTO todos (title) VALUES (?)" , (title ,))
165+ conn .execute (
166+ "INSERT INTO todos (title, due_date, created_by) VALUES (?, ?, ?)" ,
167+ (title , due_date , _current_user ()),
168+ )
51169 conn .commit ()
52170 conn .close ()
53171 return redirect (url_for ("index" ))
54172
55173
56174@app .route ("/toggle/<int:todo_id>" )
175+ @require_auth
57176def toggle (todo_id : int ) -> str :
58177 """Toggle a todo's completed status."""
59178 conn = get_db ()
179+ if (err := _check_todo_access (conn , todo_id )):
180+ conn .close ()
181+ return err
60182 conn .execute ("UPDATE todos SET completed = NOT completed WHERE id = ?" , (todo_id ,))
61183 conn .commit ()
62184 conn .close ()
63185 return redirect (url_for ("index" ))
64186
65187
188+ @app .route ("/edit/<int:todo_id>" , methods = ["POST" ])
189+ @require_auth
190+ def edit (todo_id : int ) -> str :
191+ """Edit a todo's title and due date."""
192+ title = request .form .get ("title" , "" ).strip ()
193+ due_date = request .form .get ("due_date" , "" ).strip () or None
194+ if title :
195+ conn = get_db ()
196+ if (err := _check_todo_access (conn , todo_id )):
197+ conn .close ()
198+ return err
199+ conn .execute (
200+ "UPDATE todos SET title = ?, due_date = ? WHERE id = ?" ,
201+ (title , due_date , todo_id ),
202+ )
203+ conn .commit ()
204+ conn .close ()
205+ return redirect (url_for ("index" ))
206+
207+
66208@app .route ("/delete/<int:todo_id>" )
209+ @require_auth
67210def delete (todo_id : int ) -> str :
68211 """Delete a todo."""
69212 conn = get_db ()
213+ if (err := _check_todo_access (conn , todo_id )):
214+ conn .close ()
215+ return err
70216 conn .execute ("DELETE FROM todos WHERE id = ?" , (todo_id ,))
71217 conn .commit ()
72218 conn .close ()
0 commit comments