Skip to content

Commit 42a050e

Browse files
committed
fix(auth): implement fixed-window rate limiting
- Update TokenAuth to track window start time - Reset rate limit counter after 1 hour window - Fixes permanent token blocking - Adds unit test for rate limit logic Signed-off-by: Scott R. Shinn <scott@atomicorp.com>
1 parent 7e3bebd commit 42a050e

2 files changed

Lines changed: 123 additions & 6 deletions

File tree

server/auth.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self, config_file: str = '/etc/chelon/chelon.conf'):
2323
"""Initialize token auth"""
2424
self.config_file = Path(config_file)
2525
self.tokens_file = Path('/var/lib/chelon/tokens.json')
26-
self.rate_limits = {} # token_id -> request count
26+
self.rate_limits = {} # token_id -> {'count': int, 'window_start': float}
2727

2828
# Load tokens
2929
self.tokens = self._load_tokens()
@@ -122,14 +122,29 @@ def validate_token(self, token: str) -> Dict:
122122
if secret_hash != token_info['secret_hash']:
123123
raise ValueError("Invalid token secret")
124124

125-
# Check rate limit (simple implementation)
126-
# TODO: Implement proper time-based rate limiting
127-
request_count = self.rate_limits.get(token_id, 0)
128-
if request_count >= token_info['rate_limit']:
125+
# Check rate limit (Fixed Window)
126+
now = datetime.now(UTC)
127+
window_size = 3600 # 1 hour in seconds
128+
129+
limit_data = self.rate_limits.get(token_id, {
130+
'count': 0,
131+
'window_start': now.timestamp()
132+
})
133+
134+
# Check if window has expired
135+
if now.timestamp() - limit_data['window_start'] > window_size:
136+
# Reset window
137+
limit_data = {
138+
'count': 0,
139+
'window_start': now.timestamp()
140+
}
141+
142+
if limit_data['count'] >= token_info['rate_limit']:
129143
raise ValueError("Rate limit exceeded")
130144

131145
# Increment request count
132-
self.rate_limits[token_id] = request_count + 1
146+
limit_data['count'] += 1
147+
self.rate_limits[token_id] = limit_data
133148

134149
# Update last used timestamp
135150
token_info['last_used'] = datetime.now(UTC).isoformat()

tests/test_rate_limit.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
2+
import sys
3+
import os
4+
import shutil
5+
import tempfile
6+
import unittest
7+
from unittest.mock import patch, MagicMock
8+
from datetime import datetime, timedelta, timezone
9+
10+
# Add server to path
11+
sys.path.insert(0, '/home/sshinn/src/chelon/server')
12+
13+
from auth import TokenAuth
14+
15+
class TestRateLimit(unittest.TestCase):
16+
def setUp(self):
17+
self.temp_dir = tempfile.mkdtemp()
18+
self.config_file = os.path.join(self.temp_dir, 'chelon.conf')
19+
self.tokens_file = os.path.join(self.temp_dir, 'tokens.json')
20+
21+
# Mock paths in TokenAuth
22+
self.auth_patcher = patch('auth.Path')
23+
self.mock_path = self.auth_patcher.start()
24+
25+
# We need to make sure tokens_file path resolves to our temp file
26+
# This is scanning for calls to Path('/var/lib/chelon/tokens.json')
27+
# Easier way: subclass or just patch the attributes after init if possible,
28+
# but init loads tokens.
29+
30+
with open(self.config_file, 'w') as f:
31+
f.write("TEST=1\n")
32+
33+
def tearDown(self):
34+
self.auth_patcher.stop()
35+
shutil.rmtree(self.temp_dir)
36+
37+
def test_rate_limit_window(self):
38+
# We'll just patch the tokens file attribute and load_tokens method to avoid filesystem issues
39+
# effectively testing the logic in validate_token
40+
41+
with patch('auth.TokenAuth._load_tokens', return_value={}) as mock_load, \
42+
patch('auth.TokenAuth._save_tokens'):
43+
44+
auth = TokenAuth(self.config_file)
45+
46+
# Manually inject a token
47+
token_id = "test-token"
48+
secret = "secret"
49+
import hashlib
50+
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
51+
52+
auth.tokens = {
53+
token_id: {
54+
'secret_hash': secret_hash,
55+
'permissions': ['sign:rpm'],
56+
'rate_limit': 2,
57+
'created': datetime.now(timezone.utc).isoformat(),
58+
'last_used': None
59+
}
60+
}
61+
62+
token_str = f"{token_id}:{secret}"
63+
64+
# Start time: T=0
65+
start_time = datetime(2026, 1, 1, 10, 0, 0, tzinfo=timezone.utc)
66+
67+
with patch('auth.datetime') as mock_datetime:
68+
mock_datetime.now.return_value = start_time
69+
70+
# Request 1 (Count: 1) - Should succeed
71+
print("Request 1...")
72+
auth.validate_token(token_str)
73+
74+
# Request 2 (Count: 2) - Should succeed
75+
print("Request 2...")
76+
auth.validate_token(token_str)
77+
78+
# Request 3 (Count: 3 > Limit 2) - Should fail
79+
print("Request 3 (Should fail)...")
80+
with self.assertRaises(ValueError) as cm:
81+
auth.validate_token(token_str)
82+
self.assertEqual(str(cm.exception), "Rate limit exceeded")
83+
84+
# Advance time by 30 mins (Still in window)
85+
mock_datetime.now.return_value = start_time + timedelta(minutes=30)
86+
87+
# Request 4 (Should still fail)
88+
print("Request 4 (30m later, Should fail)...")
89+
with self.assertRaises(ValueError) as cm:
90+
auth.validate_token(token_str)
91+
92+
# Advance time by 61 mins (New window)
93+
mock_datetime.now.return_value = start_time + timedelta(minutes=61)
94+
95+
# Request 5 (Reset count: 1) - Should succeed
96+
print("Request 5 (1h+ later, Should succeed)...")
97+
auth.validate_token(token_str)
98+
99+
print("Rate limit test passed!")
100+
101+
if __name__ == '__main__':
102+
unittest.main()

0 commit comments

Comments
 (0)