@@ -47,21 +47,6 @@ def process_csv(
4747) -> None :
4848 upload : Upload = Upload .objects .get (id = task_id )
4949
50- # Download data from S3/MinIO
51- with upload .blob as blob_file :
52- blob_file : BinaryIO = blob_file
53- csv_rows = list (
54- csv .DictReader (
55- StringIO (blob_file .read ().decode ('utf-8' )),
56- delimiter = delimiter ,
57- quotechar = quotechar ,
58- )
59- )
60-
61- # Cast entries in each row to appropriate type, if necessary
62- for i , row in enumerate (csv_rows ):
63- csv_rows [i ] = process_row (row , columns )
64-
6550 # Create new table
6651 table : Table = Table .objects .create (
6752 name = table_name ,
@@ -77,8 +62,30 @@ def process_csv(
7762 ]
7863 )
7964
80- # Insert rows
81- table .put_rows (csv_rows )
65+ # Download data from S3/MinIO
66+ with upload .blob as blob_file :
67+ blob_file : BinaryIO = blob_file
68+ csv_rows = list (
69+ csv .DictReader (
70+ StringIO (blob_file .read ().decode ('utf-8' )),
71+ delimiter = delimiter ,
72+ quotechar = quotechar ,
73+ )
74+ )
75+
76+ # Cast entries in each row to appropriate type, if necessary
77+ processed_rows = []
78+ for row in csv_reader :
79+ processed_rows .append (process_row (row , columns ))
80+
81+ # Insert rows
82+ if len (processed_rows ) == 100000 :
83+ table .put_rows (processed_rows )
84+ processed_rows = []
85+
86+ # Put remaining rows
87+ table .put_rows (processed_rows )
88+
8289
8390
8491def maybe_insert_join_statement (query : str , bind_vars : Dict , table_dict : Dict ) -> Tuple [str , Dict ]:
0 commit comments