|
1 | 1 | import csv |
2 | 2 | from io import StringIO |
3 | | -from typing import Any, BinaryIO, Dict |
| 3 | +from typing import Any, BinaryIO, Dict, Tuple |
4 | 4 |
|
5 | 5 | from celery import shared_task |
6 | 6 | from celery.utils.log import get_task_logger |
7 | | -from django.shortcuts import get_object_or_404 |
8 | 7 |
|
9 | 8 | from multinet.api.models import Network, Table, TableTypeAnnotation, Upload, Workspace |
10 | 9 | from multinet.api.utils.arango import ArangoQuery |
@@ -71,127 +70,173 @@ def process_csv( |
71 | 70 | table.put_rows(csv_rows) |
72 | 71 |
|
73 | 72 |
|
74 | | -def create_csv_network(workspace: Workspace, serializer): |
75 | | - """Create a network from a link of tables (in request thread).""" |
76 | | - from multinet.api.views.serializers import CSVNetworkCreateSerializer |
77 | | - |
78 | | - serializer: CSVNetworkCreateSerializer |
79 | | - serializer.is_valid(raise_exception=True) |
| 73 | +def maybe_insert_join_statement(query: str, bind_vars: Dict, table_dict: Dict) -> Tuple[str, Dict]: |
| 74 | + """ |
| 75 | + Return mutated query and bind_vars to account for joins. |
80 | 76 |
|
81 | | - # Perform joins before edge creation, since new tables are created |
82 | | - mapped_tables = {} |
83 | | - for table, mapping in serializer.validated_data.get('joins', {}).items(): |
84 | | - foreign_table = mapping['foreign_column']['table'] |
85 | | - foreign_column = mapping['foreign_column']['column'] |
86 | | - joined_table, created = Table.objects.get_or_create( |
87 | | - name=f'{foreign_table}-joined-{table}', workspace=workspace |
| 77 | + This function expects a variable defined in AQL as `new_doc`, and supply a `final_doc` |
| 78 | + variable, which will either contain the joined data, or be identical to `new_doc`. |
| 79 | + """ |
| 80 | + join_table = None |
| 81 | + join_table_excluded = [] |
| 82 | + join_col_local = None |
| 83 | + join_col_foreign = None |
| 84 | + if table_dict.get('joined') is not None: |
| 85 | + join_table = table_dict['joined']['table']['name'] |
| 86 | + join_table_excluded = table_dict['joined']['table']['excluded'] |
| 87 | + join_col_local = table_dict['joined']['link']['local'] |
| 88 | + join_col_foreign = table_dict['joined']['link']['foreign'] |
| 89 | + |
| 90 | + # Conditionally insert join vars and query text |
| 91 | + if join_table is None: |
| 92 | + query += """ |
| 93 | + LET final_doc = new_doc |
| 94 | + """ |
| 95 | + else: |
| 96 | + bind_vars.update( |
| 97 | + { |
| 98 | + '@JOINING_TABLE': join_table, |
| 99 | + 'JOINING_TABLE_EXCLUDED': join_table_excluded, |
| 100 | + 'JOIN_COL_LOCAL': join_col_local, |
| 101 | + 'JOIN_COL_FOREIGN': join_col_foreign, |
| 102 | + } |
88 | 103 | ) |
89 | | - |
90 | | - # Clear table rows if already exists |
91 | | - joined_table: Table |
92 | | - mapped_tables[foreign_table] = joined_table.name |
93 | | - if created: |
94 | | - joined_table.get_arango_collection(readonly=False).truncate() |
95 | | - |
96 | | - # Begin AQL query for joining |
97 | | - bind_vars = { |
98 | | - '@TABLE': table, |
99 | | - 'TABLE_COL': mapping['column'], |
100 | | - '@FOREIGN_TABLE': foreign_table, |
101 | | - 'FOREIGN_TABLE_COL': foreign_column, |
102 | | - '@JOINED_TABLE': joined_table.name, |
103 | | - } |
104 | | - query_str = """ |
105 | | - FOR foreign_doc in @@FOREIGN_TABLE |
106 | | - // Find matching doc |
107 | | - LET table_doc = FIRST( |
108 | | - FOR doc in @@TABLE |
109 | | - FILTER doc.@TABLE_COL == foreign_doc.@FOREIGN_TABLE_COL |
| 104 | + query += """ |
| 105 | + // Perform join |
| 106 | + LET foreign_doc = ( |
| 107 | + FIRST( |
| 108 | + FOR doc in @@JOINING_TABLE |
| 109 | + FILTER new_doc.@JOIN_COL_LOCAL == doc.@JOIN_COL_FOREIGN |
110 | 110 | return doc |
111 | 111 | ) || {} |
112 | | -
|
113 | | - LET new_doc = MERGE( |
114 | | - UNSET(foreign_doc, ['_id', '_key', 'rev']), |
115 | | - UNSET(table_doc, ['_id', '_key', 'rev']) |
116 | | - ) |
117 | | - INSERT new_doc IN @@JOINED_TABLE |
| 112 | + ) |
| 113 | + LET foreign_excluded = APPEND(['_id', '_key', 'rev'], @JOINING_TABLE_EXCLUDED) |
| 114 | + LET new_foreign_doc = UNSET(foreign_doc, foreign_excluded) |
| 115 | + LET final_doc = MERGE(new_doc, new_foreign_doc) |
118 | 116 | """ |
119 | | - query = ArangoQuery( |
120 | | - workspace.get_arango_db(readonly=False), |
121 | | - query_str=query_str, |
122 | | - bind_vars=bind_vars, |
123 | | - ) |
124 | | - query.execute() |
125 | 117 |
|
126 | | - source_edge_table: Table = get_object_or_404( |
127 | | - Table, |
128 | | - workspace=workspace, |
129 | | - name=serializer.validated_data['edge_table']['name'], |
130 | | - ) |
| 118 | + return query, bind_vars |
131 | 119 |
|
132 | | - # Create new edge table |
133 | | - network_name = serializer.validated_data['name'] |
134 | | - new_edge_table: Table = Table.objects.create( |
135 | | - name=f'{network_name}_edges', workspace=workspace, edge=True |
136 | | - ) |
137 | 120 |
|
138 | | - # Use mapped source table if joined on, otherwise original |
139 | | - edge_table_data = serializer.validated_data['edge_table'] |
140 | | - foreign_source_table = mapped_tables.get( |
141 | | - edge_table_data['source']['foreign_column']['table'], |
142 | | - edge_table_data['source']['foreign_column']['table'], |
143 | | - ) |
| 121 | +# CSV Network functions |
| 122 | +def create_table(workspace: Workspace, network_name: str, table_dict: Dict) -> str: |
| 123 | + """Create table from definition, including joins.""" |
| 124 | + # table_dict has the shape of the FullTable serializer |
| 125 | + original_table_name = table_dict['name'] |
| 126 | + new_table_name = f'{network_name}--{table_dict["name"]}' |
| 127 | + excluded_columns = table_dict['excluded'] |
| 128 | + |
| 129 | + # Create table, deleting any data if it already exists |
| 130 | + table, created = Table.objects.get_or_create(workspace=workspace, name=new_table_name) |
| 131 | + if not created: |
| 132 | + table.get_arango_collection(readonly=False).truncate() |
| 133 | + |
| 134 | + # AQL query for copying doc, excluding certain columns, and joining |
| 135 | + bind_vars = { |
| 136 | + '@ORIGINAL_TABLE': original_table_name, |
| 137 | + '@TABLE': new_table_name, |
| 138 | + 'EXCLUDED_COLS': excluded_columns, |
| 139 | + } |
| 140 | + query_str = """ |
| 141 | + FOR og_doc in @@ORIGINAL_TABLE |
| 142 | + // Copy doc, excluding specified columns |
| 143 | + LET excluded = APPEND(['_id', '_key', 'rev'], @EXCLUDED_COLS) |
| 144 | + LET new_doc = UNSET(og_doc, excluded) |
| 145 | + """ |
144 | 146 |
|
145 | | - # Use mapped target table if joined on, otherwise original |
146 | | - foreign_target_table = mapped_tables.get( |
147 | | - edge_table_data['target']['foreign_column']['table'], |
148 | | - edge_table_data['target']['foreign_column']['table'], |
| 147 | + # Add join statements if needed |
| 148 | + query_str, bind_vars = maybe_insert_join_statement(query_str, bind_vars, table_dict) |
| 149 | + |
| 150 | + # Add final insert |
| 151 | + query_str += """ |
| 152 | + INSERT final_doc IN @@TABLE |
| 153 | + """ |
| 154 | + |
| 155 | + # Execute query |
| 156 | + ArangoQuery( |
| 157 | + workspace.get_arango_db(readonly=False), |
| 158 | + query_str=query_str, |
| 159 | + bind_vars=bind_vars, |
| 160 | + ).execute() |
| 161 | + |
| 162 | + return new_table_name |
| 163 | + |
| 164 | + |
| 165 | +def create_csv_network(workspace: Workspace, serializer): |
| 166 | + """Create a network from a link of tables (in request thread).""" |
| 167 | + from multinet.api.views.serializers import CSVNetworkCreateSerializer |
| 168 | + |
| 169 | + serializer: CSVNetworkCreateSerializer |
| 170 | + serializer.is_valid(raise_exception=True) |
| 171 | + |
| 172 | + # Create source/target tables |
| 173 | + data = serializer.validated_data |
| 174 | + network_name = data['name'] |
| 175 | + source_table = create_table(workspace, network_name, data['source_table']) |
| 176 | + target_table = create_table(workspace, network_name, data['target_table']) |
| 177 | + |
| 178 | + # Create table, deleting any data if it already exists |
| 179 | + edge_table_name = data['edge']['table']['name'] |
| 180 | + new_edge_table_name = f'{network_name}--{edge_table_name}' |
| 181 | + table, created = Table.objects.get_or_create( |
| 182 | + workspace=workspace, name=new_edge_table_name, edge=True |
149 | 183 | ) |
| 184 | + if not created: |
| 185 | + table.get_arango_collection(readonly=False).truncate() |
150 | 186 |
|
151 | | - # Copy rows from original edge table to new edge table |
| 187 | + # Setup bind vars for query |
152 | 188 | bind_vars = { |
153 | | - '@ORIGINAL': source_edge_table.get_arango_collection().name, |
154 | | - '@NEW_COLL': new_edge_table.get_arango_collection().name, |
| 189 | + '@ORIGINAL': edge_table_name, |
| 190 | + '@NEW_TABLE': new_edge_table_name, |
| 191 | + 'EXCLUDED_COLS': data['edge']['table']['excluded'], |
155 | 192 | # Source |
156 | | - 'ORIGINAL_SOURCE_COLUMN': edge_table_data['source']['column'], |
157 | | - '@FOREIGN_SOURCE_TABLE': foreign_source_table, |
158 | | - 'FOREIGN_SOURCE_COLUMN': edge_table_data['source']['foreign_column']['column'], |
| 193 | + '@SOURCE_TABLE': source_table, |
| 194 | + 'SOURCE_LINK_LOCAL': data['edge']['source']['local'], |
| 195 | + 'SOURCE_LINK_FOREIGN': data['edge']['source']['foreign'], |
159 | 196 | # Target |
160 | | - 'ORIGINAL_TARGET_COLUMN': edge_table_data['target']['column'], |
161 | | - '@FOREIGN_TARGET_TABLE': foreign_target_table, |
162 | | - 'FOREIGN_TARGET_COLUMN': edge_table_data['target']['foreign_column']['column'], |
| 197 | + '@TARGET_TABLE': target_table, |
| 198 | + 'TARGET_LINK_LOCAL': data['edge']['target']['local'], |
| 199 | + 'TARGET_LINK_FOREIGN': data['edge']['target']['foreign'], |
163 | 200 | } |
| 201 | + |
| 202 | + # Make query to copy edge table docs to new edge table, inserting from/to links |
164 | 203 | query_str = """ |
165 | 204 | FOR edge_doc in @@ORIGINAL |
166 | 205 | // Find matching source doc |
167 | 206 | LET source_doc = FIRST( |
168 | | - FOR dd in @@FOREIGN_SOURCE_TABLE |
169 | | - FILTER edge_doc.@ORIGINAL_SOURCE_COLUMN == dd.@FOREIGN_SOURCE_COLUMN |
| 207 | + FOR dd in @@SOURCE_TABLE |
| 208 | + FILTER edge_doc.@SOURCE_LINK_LOCAL == dd.@SOURCE_LINK_FOREIGN |
170 | 209 | return dd |
171 | 210 | ) |
172 | 211 | // Find matching target doc |
173 | 212 | LET target_doc = FIRST( |
174 | | - FOR dd in @@FOREIGN_TARGET_TABLE |
175 | | - FILTER edge_doc.@ORIGINAL_TARGET_COLUMN == dd.@FOREIGN_TARGET_COLUMN |
| 213 | + FOR dd in @@TARGET_TABLE |
| 214 | + FILTER edge_doc.@TARGET_LINK_LOCAL == dd.@TARGET_LINK_FOREIGN |
176 | 215 | return dd |
177 | 216 | ) |
178 | | -
|
179 | 217 | // Add _from/_to to new doc, remove internal fields, insert into new coll |
180 | | - LET new_doc = MERGE(edge_doc, {'_from': source_doc._id, '_to': target_doc._id}) |
181 | | - LET fixed = UNSET(new_doc, ['_id', '_key', 'rev']) |
182 | | - INSERT fixed INTO @@NEW_COLL |
| 218 | + LET excluded = APPEND(['_id', '_key', 'rev'], @EXCLUDED_COLS) |
| 219 | + LET new_edge_doc = MERGE(edge_doc, {'_from': source_doc._id, '_to': target_doc._id}) |
| 220 | + LET new_doc = UNSET(new_edge_doc, excluded) |
| 221 | + """ |
| 222 | + |
| 223 | + # Add join statements if needed |
| 224 | + query_str, bind_vars = maybe_insert_join_statement(query_str, bind_vars, data['edge']['table']) |
| 225 | + query_str += """ |
| 226 | + INSERT final_doc INTO @@NEW_TABLE |
183 | 227 | """ |
184 | | - query = ArangoQuery( |
| 228 | + |
| 229 | + # Execute query |
| 230 | + ArangoQuery( |
185 | 231 | workspace.get_arango_db(readonly=False), |
186 | 232 | query_str=query_str, |
187 | 233 | bind_vars=bind_vars, |
188 | | - ) |
189 | | - query.execute() |
| 234 | + ).execute() |
190 | 235 |
|
191 | 236 | # Create network |
192 | 237 | return Network.create_with_edge_definition( |
193 | 238 | name=network_name, |
194 | 239 | workspace=workspace, |
195 | | - edge_table=new_edge_table.name, |
196 | | - node_tables=[foreign_source_table, foreign_target_table], |
| 240 | + edge_table=new_edge_table_name, |
| 241 | + node_tables=[source_table, target_table], |
197 | 242 | ) |
0 commit comments