|
| 1 | +import snowflake.connector as sc |
| 2 | + |
| 3 | +from nypl_py_utils.functions.log_helper import create_log |
| 4 | + |
| 5 | + |
| 6 | +class SnowflakeClient: |
| 7 | + """Client for managing connections to Snowflake""" |
| 8 | + |
| 9 | + def __init__(self, account, user, private_key=None, password=None): |
| 10 | + self.logger = create_log('snowflake_client') |
| 11 | + if (password is None) == (private_key is None): |
| 12 | + raise SnowflakeClientError( |
| 13 | + 'Either password or private key must be set (but not both)', |
| 14 | + self.logger |
| 15 | + ) from None |
| 16 | + |
| 17 | + self.conn = None |
| 18 | + self.account = account |
| 19 | + self.user = user |
| 20 | + self.private_key = private_key |
| 21 | + self.password = password |
| 22 | + |
| 23 | + def connect(self, mfa_code=None, **kwargs): |
| 24 | + """ |
| 25 | + Connects to Snowflake using the given credentials. If you're connecting |
| 26 | + locally, you should be using the password and mfa_code. If the |
| 27 | + connection is for production code, a private_key should be set up. |
| 28 | +
|
| 29 | + Parameters |
| 30 | + ---------- |
| 31 | + mfa_code: str, optional |
| 32 | + The six-digit MFA code. Only necessary for connecting as a human |
| 33 | + user. |
| 34 | + kwargs: |
| 35 | + All possible arguments (such as which warehouse to use or how |
| 36 | + long to wait before timing out) can be found here: |
| 37 | + https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#connect |
| 38 | + """ |
| 39 | + self.logger.info('Connecting to Snowflake') |
| 40 | + if self.private_key is not None: |
| 41 | + try: |
| 42 | + self.conn = sc.connect( |
| 43 | + account=self.account, |
| 44 | + user=self.user, |
| 45 | + private_key=self.private_key, |
| 46 | + **kwargs) |
| 47 | + except Exception as e: |
| 48 | + raise SnowflakeClientError( |
| 49 | + f'Error connecting to Snowflake: {e}', self.logger |
| 50 | + ) from None |
| 51 | + else: |
| 52 | + if mfa_code is None: |
| 53 | + raise SnowflakeClientError( |
| 54 | + 'When using a password, an MFA code must also be provided', |
| 55 | + self.logger |
| 56 | + ) from None |
| 57 | + |
| 58 | + pw = self.password + mfa_code |
| 59 | + try: |
| 60 | + self.conn = sc.connect( |
| 61 | + account=self.account, |
| 62 | + user=self.user, |
| 63 | + password=pw, |
| 64 | + passcode_in_password=True, |
| 65 | + **kwargs) |
| 66 | + except Exception as e: |
| 67 | + raise SnowflakeClientError( |
| 68 | + f'Error connecting to Snowflake: {e}', self.logger |
| 69 | + ) from None |
| 70 | + |
| 71 | + def execute_query(self, query, **kwargs): |
| 72 | + """ |
| 73 | + Executes an arbitrary query against the given connection. |
| 74 | +
|
| 75 | + Note that: |
| 76 | + 1) All results will be fetched by default, so this method is not |
| 77 | + suitable if you do not want to load all rows into memory |
| 78 | + 2) AUTOCOMMIT is on by default, so this method is not suitable if |
| 79 | + you want to execute multiple queries in a single transaction |
| 80 | + 3) This method can be used for both read and write queries, but |
| 81 | + it's not optimized for writing -- there is no parameter binding |
| 82 | + or executemany support, and the return value for write queries |
| 83 | + can be unpredictable. |
| 84 | +
|
| 85 | + Parameters |
| 86 | + ---------- |
| 87 | + query: str |
| 88 | + The SQL query to execute |
| 89 | + kwargs: |
| 90 | + All possible arguments (such as timeouts) can be found here: |
| 91 | + https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#execute |
| 92 | +
|
| 93 | + Returns |
| 94 | + ------- |
| 95 | + sequence |
| 96 | + A list of tuples |
| 97 | + """ |
| 98 | + self.logger.info('Querying Snowflake') |
| 99 | + cursor = self.conn.cursor() |
| 100 | + try: |
| 101 | + try: |
| 102 | + cursor.execute(query, **kwargs) |
| 103 | + return cursor.fetchall() |
| 104 | + except Exception: |
| 105 | + raise |
| 106 | + finally: |
| 107 | + cursor.close() |
| 108 | + except Exception as e: |
| 109 | + # If there was an error, also close the connection |
| 110 | + self.close_connection() |
| 111 | + |
| 112 | + short_q = str(query) |
| 113 | + if len(short_q) > 2500: |
| 114 | + short_q = short_q[:2497] + '...' |
| 115 | + raise SnowflakeClientError( |
| 116 | + f'Error executing Snowflake query {short_q}: {e}', self.logger |
| 117 | + ) from None |
| 118 | + |
| 119 | + def close_connection(self): |
| 120 | + """Closes the connection""" |
| 121 | + self.logger.info('Closing Snowflake connection') |
| 122 | + self.conn.close() |
| 123 | + |
| 124 | + |
| 125 | +class SnowflakeClientError(Exception): |
| 126 | + def __init__(self, message='', logger=None): |
| 127 | + self.message = message |
| 128 | + if logger is not None: |
| 129 | + logger.error(message) |
| 130 | + |
| 131 | + def __str__(self): |
| 132 | + return self.message |
0 commit comments