-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsnowflake_connector.py
More file actions
46 lines (39 loc) · 1.29 KB
/
Copy pathsnowflake_connector.py
File metadata and controls
46 lines (39 loc) · 1.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from sqlalchemy import create_engine
import snowflake.connector
import pandas as pd
import os
# from airflow.hooks.base_hook import BaseHook
from sqlalchemy.exc import OperationalError
import json
class SnoflakeAPI:
def __init__(self):
# credentials setup for local use
ACCOUNT=os.environ.get('DB_ACCOUNT')
USER=os.environ.get('USER_DB')
warehouse=os.environ.get('DB_WAREHOUSE')
DATABASE=os.environ.get('DB')
authenticator='externalbrowser',
self.con = snowflake.connector.connect(
user=USER,
account=ACCOUNT,
authenticator='externalbrowser',
warehouse=warehouse,
database=DATABASE,
)
def __execute_snowflake_query__(self,query, with_cursor=False):
cursor = self.con.cursor()
try:
cursor.execute(query)
res = cursor.fetchall()
if with_cursor:
return (res, cursor)
else:
return res
finally:
cursor.close()
def pandas_df_from_snowflake_query(self,query):
result, cursor = self.__execute_snowflake_query__(query, with_cursor=True)
headers = list(map(lambda t: t[0], cursor.description))
df = pd.DataFrame(result)
df.columns = headers
return df