-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathget_data.py
More file actions
126 lines (102 loc) · 3.59 KB
/
get_data.py
File metadata and controls
126 lines (102 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Contains functions used to aquire the data from external sources"""
from typing import Any, Literal
import zipfile
import shutil
import os
import io
import pathlib
import requests
import pathlib
from ..utils import data_connections
DB_PATH = pathlib.Path(".db")
DATA_PATH = pathlib.Path(__file__).parent.parent.parent / "data"
INPUT_PATH = DATA_PATH / "input"
ARTIFICIAL_HES_BASE_URL = f"https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final"
def download_zip_from_url(
zip_file_url: str, overwrite: bool = False, output_path: pathlib.Path = None
) -> str:
"""Downloads a zipfile from the specified URL
Parameters
----------
zip_file_url : str
The url string of where the zipfile is held
overwrite : bool
if True, then running this again will overwrite existing files of the same name, otherwise
it will not.
output_path : pathlib.Path
Where you want the zip to be saved to - if left as "None" then it will be saved to
"data/{filename}"
Returns
----------
output_path : pathlib.Path
"""
filename = pathlib.Path(zip_file_url).stem
if output_path is None:
output_path = INPUT_PATH / filename
if os.path.exists(output_path) and overwrite is True:
shutil.rmtree(output_path, ignore_errors=False, onerror=None)
elif os.path.exists(output_path) and overwrite is not True:
raise Exception(f"The zipfile already exists at: {output_path}")
response = requests.get(zip_file_url, stream=True, timeout=3600)
downloaded_zip = zipfile.ZipFile(io.BytesIO(response.content))
downloaded_zip.extractall(output_path)
return output_path
def download_artificial_hes_zip(
dataset_name: Literal["ae", "apc", "op"],
version: str = "202302_v1",
size: Literal["sample", "full"] = "sample",
) -> pathlib.Path:
"""
Download and unpack artificial hes zip file.
Parameters
----------
dataset_name : Literal["ae", "apc", "op"]
Name of dataset to download.
version : str, optional
Version to download, by default "202302_v1"
size : str, optional
Size to download, by default "sample"
Returns
-------
pathlib.Path
Path to the downloaded file.
"""
zip_name = f"artificial_hes_{dataset_name}_{version}_{size}.zip"
zip_url = f"{ARTIFICIAL_HES_BASE_URL}/{zip_name}"
zip_path = download_zip_from_url(zip_url, overwrite=True)
return zip_path
def get_user_inputs() -> dict[str, Any]:
"""
Get user inputs to configure the main function.
Returns
-------
dict[str, Any]
User input variables.
"""
replace = input("Replace tables if they already exist? (y/n, default=n): ")
replace = replace == "y"
if not replace:
exists_ok = input(
"Continue without error if tables already exist? (y/n, default=y): "
)
exists_ok = exists_ok == "" or exists_ok == "y"
else:
exists_ok = True
return {
"replace": replace,
"exists_ok": exists_ok,
}
if __name__ == "__main__":
user_inputs = get_user_inputs()
conn = data_connections.get_duckdb_connection()
zip_path = download_artificial_hes_zip("ae")
for csv_path in INPUT_PATH.glob("**/*.csv"):
table_name = data_connections.create_table_from_csv(
conn,
csv_path,
replace=user_inputs["replace"],
exists_ok=user_inputs["exists_ok"],
)
df = data_connections.read_table_to_df(conn, table_name)
print(f"Printing results from table '{table_name}'")
print(df)