Skip to content

Commit bd8c006

Browse files
author
Lingling Peng
committed
add two async job; tested the basic of import
1 parent 4f0721e commit bd8c006

3 files changed

Lines changed: 241 additions & 2 deletions

File tree

synapseclient/core/constants/concrete_types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,7 @@
128128
LIST_GRID_SESSIONS_RESPONSE = (
129129
"org.sagebionetworks.repo.model.grid.ListGridSessionsResponse"
130130
)
131+
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
132+
UPLOAD_TO_TABLE_PREVIEW_REQUEST = (
133+
"org.sagebionetworks.repo.model.table.UploadToTablePreviewRequest"
134+
)

synapseclient/models/curation.py

Lines changed: 233 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
from dataclasses import dataclass, field, replace
9-
from typing import Any, AsyncGenerator, Dict, Generator, Optional, Protocol, Union
9+
from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Protocol, Union
1010

1111
from opentelemetry import trace
1212

@@ -28,15 +28,22 @@
2828
from synapseclient.core.constants.concrete_types import (
2929
CREATE_GRID_REQUEST,
3030
FILE_BASED_METADATA_TASK_PROPERTIES,
31+
GRID_CSV_IMPORT_REQUEST,
3132
GRID_RECORD_SET_EXPORT_REQUEST,
3233
LIST_GRID_SESSIONS_REQUEST,
3334
LIST_GRID_SESSIONS_RESPONSE,
3435
RECORD_BASED_METADATA_TASK_PROPERTIES,
36+
UPLOAD_TO_TABLE_PREVIEW_REQUEST,
3537
)
3638
from synapseclient.core.utils import delete_none_keys, merge_dataclass_entities
3739
from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator
3840
from synapseclient.models.recordset import ValidationSummary
39-
from synapseclient.models.table_components import Query
41+
from synapseclient.models.table_components import (
42+
Column,
43+
ColumnType,
44+
CsvTableDescriptor,
45+
Query,
46+
)
4047

4148

4249
@dataclass
@@ -989,6 +996,158 @@ def to_synapse_request(self) -> Dict[str, Any]:
989996
return request_dict
990997

991998

999+
@dataclass
1000+
class GridCsvImportRequest(AsynchronousCommunicator):
1001+
"""
1002+
A request to import a CSV file into a grid. Currently supports only grid
1003+
created from a record set.
1004+
1005+
This result is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/GridCsvImportRequest.html>
1006+
"""
1007+
1008+
concrete_type: str = GRID_CSV_IMPORT_REQUEST
1009+
"""The concrete type for this request."""
1010+
1011+
session_id: Optional[str] = None
1012+
"""The grid session ID."""
1013+
1014+
file_handle_id: Optional[str] = None
1015+
"""The id of the file handle that contains the CSV data."""
1016+
1017+
csv_descriptor: CsvTableDescriptor = field(default_factory=CsvTableDescriptor)
1018+
"""The description of a csv for upload or download."""
1019+
1020+
schema: Optional[List[Column]] = None
1021+
"""The list of ColumnModel that describe the CSV file. Currently this is is required."""
1022+
1023+
# Response fields (populated by fill_from_dict)
1024+
total_count: Optional[int] = field(default=None, compare=False)
1025+
"""The total number of rows in the CSV."""
1026+
1027+
created_count: Optional[int] = field(default=None, compare=False)
1028+
"""The number of rows that were created."""
1029+
1030+
updated_count: Optional[int] = field(default=None, compare=False)
1031+
"""The number of rows that were updated."""
1032+
1033+
def fill_from_dict(
1034+
self, synapse_response: Union[Dict[str, Any], Any]
1035+
) -> "GridCsvImportRequest":
1036+
"""
1037+
Converts a response from the REST API into this dataclass.
1038+
1039+
Arguments:
1040+
synapse_response: The response from the REST API.
1041+
1042+
Returns:
1043+
The GridCsvImportRequest object.
1044+
"""
1045+
self.session_id = synapse_response.get("sessionId", self.session_id)
1046+
self.total_count = synapse_response.get("totalCount", None)
1047+
self.created_count = synapse_response.get("createdCount", None)
1048+
self.updated_count = synapse_response.get("updatedCount", None)
1049+
return self
1050+
1051+
def to_synapse_request(self) -> Dict[str, Any]:
1052+
"""
1053+
Converts this dataclass to a dictionary suitable for a Synapse REST API request.
1054+
1055+
Returns:
1056+
A dictionary representation of this object for API requests.
1057+
"""
1058+
request_dict: Dict[str, Any] = {"concreteType": self.concrete_type}
1059+
if self.session_id is not None:
1060+
request_dict["sessionId"] = self.session_id
1061+
if self.file_handle_id is not None:
1062+
request_dict["fileHandleId"] = self.file_handle_id
1063+
if self.csv_descriptor is not None:
1064+
request_dict["csvDescriptor"] = self.csv_descriptor.to_synapse_request()
1065+
if self.schema is not None:
1066+
request_dict["schema"] = [col.to_synapse_request() for col in self.schema]
1067+
return request_dict
1068+
1069+
1070+
@dataclass
1071+
class UploadToTablePreviewRequest(AsynchronousCommunicator):
1072+
"""
1073+
Request for a preview of an upload to a Table.
1074+
1075+
This result is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/UploadToTablePreviewRequest.html>
1076+
"""
1077+
1078+
concrete_type: str = UPLOAD_TO_TABLE_PREVIEW_REQUEST
1079+
"""The concrete type for this request."""
1080+
1081+
upload_file_handle_id: Optional[str] = None
1082+
"""The ID of the file handle for a type of UPLOAD"""
1083+
1084+
line_to_skip: Optional[int] = None
1085+
"""The number of lines to skip from the start of the file. The default value of 0 will be used if this is not provided by the caller."""
1086+
1087+
csv_table_descriptor: CsvTableDescriptor = field(default_factory=CsvTableDescriptor)
1088+
"""The description of a csv for upload or download."""
1089+
1090+
do_full_file_scan: Optional[bool] = None
1091+
"""When set to true the full file will be scanned for a schema suggestions. A full scan is more accurate but can take more time. When set to false only a sub-set of the first rows will be scanned, which can be faster but is less accurate. The default value is false."""
1092+
1093+
# Response fields (populated by fill_from_dict)
1094+
suggested_columns: Optional[List[Column]] = field(default=None, compare=False)
1095+
"""The suggested columns for the table based on the file scan."""
1096+
1097+
sample_rows: Optional[List[List[Optional[str]]]] = field(
1098+
default=None, compare=False
1099+
)
1100+
"""A sample of the rows in the file."""
1101+
1102+
rows_scanned: Optional[int] = field(default=None, compare=False)
1103+
"""The number of rows scanned from the file."""
1104+
1105+
def fill_from_dict(
1106+
self, synapse_response: Union[Dict[str, Any], Any]
1107+
) -> "UploadToTablePreviewRequest":
1108+
"""
1109+
Converts a response from the REST API into this dataclass.
1110+
1111+
Arguments:
1112+
synapse_response: The response from the REST API.
1113+
1114+
Returns:
1115+
The UploadToTablePreviewRequest object.
1116+
"""
1117+
suggested_columns_data = synapse_response.get("suggestedColumns", None)
1118+
if suggested_columns_data:
1119+
self.suggested_columns = [
1120+
Column().fill_from_dict(col) for col in suggested_columns_data
1121+
]
1122+
1123+
sample_rows_data = synapse_response.get("sampleRows", None)
1124+
if sample_rows_data:
1125+
self.sample_rows = [row.get("values", []) for row in sample_rows_data]
1126+
1127+
self.rows_scanned = synapse_response.get("rowsScanned", None)
1128+
return self
1129+
1130+
def to_synapse_request(self) -> Dict[str, Any]:
1131+
"""
1132+
Converts this dataclass to a dictionary suitable for a Synapse REST API request.
1133+
1134+
Returns:
1135+
A dictionary representation of this object for API requests.
1136+
"""
1137+
request_dict: Dict[str, Any] = {"concreteType": self.concrete_type}
1138+
if self.upload_file_handle_id is not None:
1139+
request_dict["uploadFileHandleId"] = self.upload_file_handle_id
1140+
if self.line_to_skip is not None:
1141+
request_dict["linesToSkip"] = self.line_to_skip
1142+
if self.csv_table_descriptor is not None:
1143+
request_dict["csvTableDescriptor"] = (
1144+
self.csv_table_descriptor.to_synapse_request()
1145+
)
1146+
if self.do_full_file_scan is not None:
1147+
request_dict["doFullFileScan"] = self.do_full_file_scan
1148+
return request_dict
1149+
1150+
9921151
@dataclass
9931152
class GridRecordSetExportRequest(AsynchronousCommunicator):
9941153
"""
@@ -1838,3 +1997,75 @@ async def main():
18381997
await delete_grid_session(
18391998
session_id=self.session_id, synapse_client=synapse_client
18401999
)
2000+
2001+
async def import_csv_async(
2002+
self,
2003+
file_handle_id: str,
2004+
csv_table_descriptor: Optional[CsvTableDescriptor] = None,
2005+
*,
2006+
timeout: int = 120,
2007+
synapse_client: Optional[Synapse] = None,
2008+
) -> "Grid":
2009+
"""
2010+
Import a CSV file into this grid session. Previews the file to determine
2011+
the column schema, then imports the data. Currently supports only grids
2012+
created from a record set.
2013+
2014+
Arguments:
2015+
file_handle_id: The id of the file handle that contains the CSV data.
2016+
csv_table_descriptor: The description of the CSV format (delimiter,
2017+
quote character, etc.). If not provided, the default CSV format
2018+
will be used.
2019+
timeout: The number of seconds to wait for each async job to complete
2020+
or progress before raising a SynapseTimeoutError. Defaults to 120.
2021+
synapse_client: If not passed in and caching was not disabled by
2022+
`Synapse.allow_client_caching(False)` this will use the last created
2023+
instance from the Synapse class constructor.
2024+
2025+
Returns:
2026+
The Grid object.
2027+
2028+
Example: Import a CSV file into a grid session asynchronously
2029+
&nbsp;
2030+
2031+
```python
2032+
import asyncio
2033+
from synapseclient import Synapse
2034+
from synapseclient.models import Grid
2035+
2036+
syn = Synapse()
2037+
syn.login()
2038+
2039+
async def main():
2040+
grid = Grid(session_id="abc-123-def")
2041+
grid = await grid.import_csv_async(file_handle_id="123456")
2042+
print(f"Import complete for session: {grid.session_id}")
2043+
2044+
asyncio.run(main())
2045+
```
2046+
"""
2047+
upload_to_table_preview = UploadToTablePreviewRequest(
2048+
csv_table_descriptor=csv_table_descriptor,
2049+
upload_file_handle_id=file_handle_id,
2050+
)
2051+
preview_response = await upload_to_table_preview.send_job_and_wait_async(
2052+
timeout=timeout
2053+
)
2054+
all_columns = [
2055+
Column(name="ROW_ID", column_type=ColumnType.STRING),
2056+
Column(name="ROW_VERSION", column_type=ColumnType.STRING),
2057+
] + preview_response.suggested_columns
2058+
import_request = GridCsvImportRequest(
2059+
session_id=self.session_id,
2060+
file_handle_id=file_handle_id,
2061+
schema=all_columns,
2062+
)
2063+
if csv_table_descriptor:
2064+
import_request.csv_descriptor = csv_table_descriptor
2065+
import_response = await import_request.send_job_and_wait_async(
2066+
synapse_client=synapse_client, timeout=timeout
2067+
)
2068+
print(
2069+
f"CSV import to grid session {self.session_id} completed successfully, total count: {import_response.total_count}, total created: {import_response.created_count}, total updated: {import_response.updated_count}"
2070+
)
2071+
return self

synapseclient/models/mixins/asynchronous_job.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
CREATE_GRID_REQUEST,
1616
CREATE_SCHEMA_REQUEST,
1717
GET_VALIDATION_SCHEMA_REQUEST,
18+
GRID_CSV_IMPORT_REQUEST,
1819
GRID_RECORD_SET_EXPORT_REQUEST,
1920
QUERY_BUNDLE_REQUEST,
2021
QUERY_TABLE_CSV_REQUEST,
2122
TABLE_UPDATE_TRANSACTION_REQUEST,
23+
UPLOAD_TO_TABLE_PREVIEW_REQUEST,
2224
)
2325
from synapseclient.core.exceptions import (
2426
SynapseError,
@@ -35,6 +37,8 @@
3537
CREATE_SCHEMA_REQUEST: "/schema/type/create/async",
3638
QUERY_TABLE_CSV_REQUEST: "/entity/{entityId}/table/download/csv/async",
3739
QUERY_BUNDLE_REQUEST: "/entity/{entityId}/table/query/async",
40+
GRID_CSV_IMPORT_REQUEST: "/grid/import/csv/async/",
41+
UPLOAD_TO_TABLE_PREVIEW_REQUEST: "/table/upload/csv/preview/async",
3842
}
3943

4044

0 commit comments

Comments
 (0)