11import psycopg
2+ import time
23
34from nypl_py_utils .functions .log_helper import create_log
45
56
67class PostgreSQLClient :
78 """Client for managing individual connections to a PostgreSQL database"""
89
9- def __init__ (self , host , port , db_name , user , password ):
10+ def __init__ (self , host , port , database , user , password ):
1011 self .logger = create_log ('postgresql_client' )
1112 self .conn = None
1213 self .conn_info = ('postgresql://{user}:{password}@{host}:{port}/'
13- '{db_name}' ).format (user = user , password = password ,
14- host = host , port = port ,
15- db_name = db_name )
14+ '{database}' ).format (user = user , password = password ,
15+ host = host , port = port ,
16+ database = database )
17+ self .database = database
1618
17- self .db_name = db_name
18-
19- def connect (self , ** kwargs ):
19+ def connect (self , retry_count = 0 , backoff_factor = 5 , ** kwargs ):
2020 """
2121 Connects to a PostgreSQL database using the given credentials.
2222
23- Keyword args can be passed into the connection to set certain options.
24- All possible arguments can be found here:
25- https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect.
26-
27- Common arguments include:
28- autocommit: bool
29- Whether to automatically commit each query rather than running
30- them as part of a transaction. By default False.
31- row_factory: RowFactory
32- A psycopg RowFactory that determines how the data will be
33- returned. Defaults to tuple_row, which returns the rows as a
34- list of tuples.
23+ Parameters
24+ ----------
25+ retry_count: int, optional
26+ The number of times to retry connecting before throwing an error.
27+ By default no retry occurs.
28+ backoff_factor: int, optional
29+ The backoff factor when retrying. The amount of time to wait before
30+ retrying is backoff_factor ** number_of_retries_made.
31+ kwargs:
32+ All possible arguments (such as the row_factory) can be found here:
33+ https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect
3534 """
36- self .logger .info ('Connecting to {} database' .format (self .db_name ))
37- try :
38- self .conn = psycopg .connect (self .conn_info , ** kwargs )
39- except psycopg .Error as e :
40- self .logger .error (
41- 'Error connecting to {name} database: {error}' .format (
42- name = self .db_name , error = e ))
43- raise PostgreSQLClientError (
44- 'Error connecting to {name} database: {error}' .format (
45- name = self .db_name , error = e )) from None
35+ self .logger .info ('Connecting to {} database' .format (self .database ))
36+ attempt_count = 0
37+ while attempt_count <= retry_count :
38+ try :
39+ try :
40+ self .conn = psycopg .connect (self .conn_info , ** kwargs )
41+ except (psycopg .OperationalError ,
42+ psycopg .errors .ConnectionTimeout ):
43+ if attempt_count < retry_count :
44+ self .logger .info ('Failed to connect -- retrying' )
45+ time .sleep (backoff_factor ** attempt_count )
46+ attempt_count += 1
47+ else :
48+ raise
49+ else :
50+ break
51+ except Exception as e :
52+ self .logger .error (
53+ 'Error connecting to {name} database: {error}' .format (
54+ name = self .database , error = e ))
55+ raise PostgreSQLClientError (
56+ 'Error connecting to {name} database: {error}' .format (
57+ name = self .database , error = e )) from None
4658
4759 def execute_query (self , query , query_params = None , ** kwargs ):
4860 """
@@ -53,7 +65,11 @@ def execute_query(self, query, query_params=None, **kwargs):
5365 query: str
5466 The query to execute
5567 query_params: sequence, optional
56- The values to be used in a parameterized query
68+ The values to be used in a parameterized query. The values can be
69+ for a single insert query -- e.g. execute_query(
70+ "INSERT INTO x VALUES (%s, %s)", (1, "a"))
71+ or for multiple -- e.g execute_transaction(
72+ "INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")])
5773 kwargs:
5874 All possible arguments can be found here:
5975 https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.Cursor.execute
@@ -65,30 +81,38 @@ def execute_query(self, query, query_params=None, **kwargs):
6581 based on the connection's row_factory if there's something to
6682 return (even if the result set is empty).
6783 """
68- self .logger .info ('Querying {} database' .format (self .db_name ))
84+ self .logger .info ('Querying {} database' .format (self .database ))
6985 self .logger .debug ('Executing query {}' .format (query ))
7086 try :
7187 cursor = self .conn .cursor ()
72- cursor .execute (query , query_params , ** kwargs )
88+ if query_params is not None and all (
89+ isinstance (param , tuple ) or isinstance (param , list )
90+ for param in query_params
91+ ):
92+ cursor .executemany (query , query_params , ** kwargs )
93+ else :
94+ cursor .execute (query , query_params , ** kwargs )
7395 self .conn .commit ()
7496 return None if cursor .description is None else cursor .fetchall ()
7597 except Exception as e :
7698 self .conn .rollback ()
99+ cursor .close ()
100+ self .close_connection ()
77101 self .logger .error (
78102 ('Error executing {name} database query \' {query}\' : '
79103 '{error}' ).format (
80- name = self .db_name , query = query , error = e ))
104+ name = self .database , query = query , error = e ))
81105 raise PostgreSQLClientError (
82106 ('Error executing {name} database query \' {query}\' : '
83107 '{error}' ).format (
84- name = self .db_name , query = query , error = e )) from None
108+ name = self .database , query = query , error = e )) from None
85109 finally :
86110 cursor .close ()
87111
88112 def close_connection (self ):
89113 """Closes the database connection"""
90114 self .logger .debug ('Closing {} database connection' .format (
91- self .db_name ))
115+ self .database ))
92116 self .conn .close ()
93117
94118
0 commit comments