@@ -170,6 +170,128 @@ async def test_create_knowledge_base(
170170 data = response .json ()
171171 assert data ["name" ] == "New KB"
172172
173+ @patch ("langflow.api.v1.knowledge_bases.KBStorageHelper.get_root_path" )
174+ async def test_create_kb_path_traversal_single_level (
175+ self , mock_root , client : AsyncClient , logged_in_headers , tmp_path
176+ ):
177+ """Single-level traversal '../victim_user/evil_kb' in POST must be blocked with 400/403.
178+
179+ VULNERABILITY: the create endpoint builds kb_path = kb_root_path / kb_user / kb_name
180+ without resolve() or is_relative_to(), so '../victim_user/evil_kb' escapes the user dir.
181+ """
182+ mock_root .return_value = tmp_path
183+ (tmp_path / "activeuser" ).mkdir (parents = True )
184+ victim_dir = tmp_path / "victim_user" / "evil_kb"
185+
186+ response = await client .post (
187+ "api/v1/knowledge_bases" ,
188+ headers = logged_in_headers ,
189+ json = {
190+ "name" : "../victim_user/evil_kb" ,
191+ "embedding_provider" : "OpenAI" ,
192+ "embedding_model" : "text-embedding-3-small" ,
193+ },
194+ )
195+
196+ assert response .status_code in (400 , 403 ), (
197+ f"VULNERABILITY CONFIRMED: create endpoint accepted traversal payload with status { response .status_code } "
198+ )
199+ assert not victim_dir .exists (), (
200+ "VULNERABILITY CONFIRMED: path traversal created a directory outside the user's KB root"
201+ )
202+
203+ @patch ("langflow.api.v1.knowledge_bases.KBStorageHelper.get_root_path" )
204+ async def test_create_kb_path_traversal_absolute_path (
205+ self , mock_root , client : AsyncClient , logged_in_headers , tmp_path
206+ ):
207+ """Absolute path in kb_name must be blocked — e.g. '/tmp/evil'.
208+
209+ VULNERABILITY: kb_root_path / kb_user / '/tmp/evil' resolves to '/tmp/evil' in Python
210+ because Path drops all previous components when a segment starts with '/'.
211+ """
212+ mock_root .return_value = tmp_path
213+ (tmp_path / "activeuser" ).mkdir (parents = True )
214+ evil_dir = tmp_path / "evil_absolute"
215+
216+ response = await client .post (
217+ "api/v1/knowledge_bases" ,
218+ headers = logged_in_headers ,
219+ json = {
220+ "name" : str (evil_dir ),
221+ "embedding_provider" : "OpenAI" ,
222+ "embedding_model" : "text-embedding-3-small" ,
223+ },
224+ )
225+
226+ assert response .status_code in (400 , 403 ), (
227+ f"VULNERABILITY CONFIRMED: create endpoint accepted absolute path payload "
228+ f"with status { response .status_code } "
229+ )
230+ assert not evil_dir .exists (), (
231+ "VULNERABILITY CONFIRMED: absolute path in kb_name created a directory outside the KB root"
232+ )
233+
234+ @patch ("langflow.api.v1.knowledge_bases.KBStorageHelper.get_root_path" )
235+ async def test_create_kb_path_traversal_prefix_ambiguity (
236+ self , mock_root , client : AsyncClient , logged_in_headers , tmp_path
237+ ):
238+ """Prefix-ambiguity attack on create: user='activeuser', target dir='activeuser_evil'.
239+
240+ With startswith('/root/activeuser'), the path '/root/activeuser_evil/secret_kb'
241+ incorrectly passes because the string starts with '/root/activeuser'.
242+ is_relative_to() closes this gap and must block the request with 400/403.
243+ """
244+ mock_root .return_value = tmp_path
245+
246+ (tmp_path / "activeuser" ).mkdir (parents = True )
247+ victim_kb = tmp_path / "activeuser_evil" / "secret_kb"
248+ victim_kb .mkdir (parents = True )
249+
250+ response = await client .post (
251+ "api/v1/knowledge_bases" ,
252+ headers = logged_in_headers ,
253+ json = {
254+ "name" : "../activeuser_evil/secret_kb" ,
255+ "embedding_provider" : "OpenAI" ,
256+ "embedding_model" : "text-embedding-3-small" ,
257+ },
258+ )
259+
260+ assert response .status_code in (400 , 403 ), (
261+ "VULNERABILITY CONFIRMED: prefix-ambiguity bypass succeeded on create endpoint — "
262+ "startswith() may still be in use instead of is_relative_to()"
263+ )
264+ assert not (tmp_path / "activeuser_evil" / "secret_kb_new" ).exists (), (
265+ "VULNERABILITY CONFIRMED: prefix-ambiguity attack created a directory outside the user's KB root"
266+ )
267+
268+ @patch ("langflow.api.v1.knowledge_bases.logger.warning" )
269+ @patch ("langflow.api.v1.knowledge_bases.KBStorageHelper.get_root_path" )
270+ async def test_create_kb_path_traversal_logs_warning (
271+ self , mock_root , mock_warning , client : AsyncClient , logged_in_headers , tmp_path
272+ ):
273+ """A traversal attempt on create must emit a warning log with user= and kb_name= context."""
274+ mock_root .return_value = tmp_path
275+
276+ (tmp_path / "activeuser" ).mkdir (parents = True )
277+ (tmp_path / "victim_user" / "secret_kb" ).mkdir (parents = True )
278+
279+ await client .post (
280+ "api/v1/knowledge_bases" ,
281+ headers = logged_in_headers ,
282+ json = {
283+ "name" : "../victim_user/secret_kb" ,
284+ "embedding_provider" : "OpenAI" ,
285+ "embedding_model" : "text-embedding-3-small" ,
286+ },
287+ )
288+
289+ mock_warning .assert_called_once ()
290+ warning_args = mock_warning .call_args [0 ]
291+ all_args_str = str (warning_args )
292+ assert "user=" in all_args_str , "Warning log must contain 'user=' in the format string"
293+ assert "kb_name=" in all_args_str , "Warning log must contain 'kb_name=' in the format string"
294+
173295 async def test_create_kb_name_too_short (self , client : AsyncClient , logged_in_headers ):
174296 response = await client .post (
175297 "api/v1/knowledge_bases" ,
0 commit comments