Skip to content

Commit 558b06c

Browse files
committed
🔒 Fix unchecked file path
1 parent 485fc04 commit 558b06c

3 files changed

Lines changed: 17 additions & 1 deletion

File tree

pystreamapi/loaders/__csv_loader.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import contextlib
2+
import os
23
from collections import namedtuple
34
from csv import reader
45

@@ -13,17 +14,28 @@ def csv(file_path: str, delimiter=',', encoding="utf-8") -> list:
1314
:param file_path: The path to the CSV file.
1415
:param delimiter: The delimiter used in the CSV file.
1516
"""
17+
file_path = __validate_path(file_path)
1618
with open(file_path, 'r', newline='', encoding=encoding) as csvfile:
1719
csvreader = reader(csvfile, delimiter=delimiter)
1820

1921
# Create a namedtuple type, casting the header values to int or float if possible
20-
Row = namedtuple('Row', list(next(csvreader)))
22+
Row = namedtuple('Row', list(next(csvreader, [])))
2123

2224
# Process the data, casting values to int or float if possible
2325
data = [Row(*[__try_cast(value) for value in row]) for row in csvreader]
2426

2527
return data
2628

29+
def __validate_path(file_path: str):
30+
"""Validate a path string to prevent path traversal attacks"""
31+
if not os.path.isabs(file_path):
32+
raise ValueError("The file_path must be an absolute path.")
33+
34+
if not os.path.exists(file_path):
35+
raise FileNotFoundError("The specified file does not exist.")
36+
37+
return file_path
38+
2739
def __try_cast(value):
2840
"""Try to cast value to primary data types from python (int, float, bool)"""
2941
for cast in (int, float):

tests/assets/empty.csv

Whitespace-only changes.

tests/test_loaders.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ def test_csv_loader_with_custom_delimiter(self):
2323
self.assertEqual(len(data), 1)
2424
self.assertEqual(data[0].attr1, 1)
2525
self.assertIsInstance(data[0].attr1, int)
26+
27+
def test_csv_loader_with_empty_file(self):
28+
data = csv(f'{self.path}/empty.csv')
29+
self.assertEqual(len(data), 0)

0 commit comments

Comments
 (0)