11import psycopg
2+ import time
23
34from nypl_py_utils .functions .log_helper import create_log
45
56
67class PostgreSQLClient :
78 """Client for managing individual connections to a PostgreSQL database"""
89
9- def __init__ (self , host , port , db_name , user , password ):
10+ def __init__ (self , host , port , database , user , password ):
1011 self .logger = create_log ('postgresql_client' )
1112 self .conn = None
1213 self .conn_info = ('postgresql://{user}:{password}@{host}:{port}/'
13- '{db_name}' ).format (user = user , password = password ,
14- host = host , port = port ,
15- db_name = db_name )
14+ '{database}' ).format (user = user , password = password ,
15+ host = host , port = port ,
16+ database = database )
17+ self .database = database
1618
17- self .db_name = db_name
18-
19- def connect (self , ** kwargs ):
19+ def connect (self , retry_count = 0 , backoff_factor = 5 , ** kwargs ):
2020 """
2121 Connects to a PostgreSQL database using the given credentials.
2222
23- Keyword args can be passed into the connection to set certain options.
24- All possible arguments can be found here:
25- https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect.
26-
27- Common arguments include:
28- autocommit: bool
29- Whether to automatically commit each query rather than running
30- them as part of a transaction. By default False.
31- row_factory: RowFactory
32- A psycopg RowFactory that determines how the data will be
33- returned. Defaults to tuple_row, which returns the rows as a
34- list of tuples.
23+ Parameters
24+ ----------
25+ retry_count: int, optional
26+ The number of times to retry connecting before throwing an error.
27+ By default no retry occurs.
28+ backoff_factor: int, optional
29+ The backoff factor when retrying. The amount of time to wait before
30+ retrying is backoff_factor ** number_of_retries_made.
31+ kwargs:
32+ All possible arguments (such as the row_factory) can be found here:
33+ https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect
3534 """
36- self .logger .info ('Connecting to {} database' .format (self .db_name ))
37- try :
38- self .conn = psycopg .connect (self .conn_info , ** kwargs )
39- except psycopg .Error as e :
40- self .logger .error (
41- 'Error connecting to {name} database: {error}' .format (
42- name = self .db_name , error = e ))
43- raise PostgreSQLClientError (
44- 'Error connecting to {name} database: {error}' .format (
45- name = self .db_name , error = e )) from None
35+ self .logger .info ('Connecting to {} database' .format (self .database ))
36+ attempt_count = 0
37+ while attempt_count <= retry_count :
38+ try :
39+ try :
40+ self .conn = psycopg .connect (self .conn_info , ** kwargs )
41+ break
42+ except (psycopg .OperationalError ,
43+ psycopg .errors .ConnectionTimeout ):
44+ if attempt_count < retry_count :
45+ self .logger .info ('Failed to connect -- retrying' )
46+ time .sleep (backoff_factor ** attempt_count )
47+ attempt_count += 1
48+ else :
49+ raise
50+ except Exception as e :
51+ self .logger .error (
52+ 'Error connecting to {name} database: {error}' .format (
53+ name = self .database , error = e ))
54+ raise PostgreSQLClientError (
55+ 'Error connecting to {name} database: {error}' .format (
56+ name = self .database , error = e )) from None
4657
4758 def execute_query (self , query , query_params = None , ** kwargs ):
4859 """
@@ -53,7 +64,11 @@ def execute_query(self, query, query_params=None, **kwargs):
5364 query: str
5465 The query to execute
5566 query_params: sequence, optional
56- The values to be used in a parameterized query
67+ The values to be used in a parameterized query. The values can be
68+ for a single insert query -- e.g. execute_query(
69+ "INSERT INTO x VALUES (%s, %s)", (1, "a"))
70+ or for multiple -- e.g execute_transaction(
71+ "INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")])
5772 kwargs:
5873 All possible arguments can be found here:
5974 https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.Cursor.execute
@@ -65,30 +80,38 @@ def execute_query(self, query, query_params=None, **kwargs):
6580 based on the connection's row_factory if there's something to
6681 return (even if the result set is empty).
6782 """
68- self .logger .info ('Querying {} database' .format (self .db_name ))
83+ self .logger .info ('Querying {} database' .format (self .database ))
6984 self .logger .debug ('Executing query {}' .format (query ))
7085 try :
7186 cursor = self .conn .cursor ()
72- cursor .execute (query , query_params , ** kwargs )
87+ if query_params is not None and all (
88+ isinstance (param , tuple ) or isinstance (param , list )
89+ for param in query_params
90+ ):
91+ cursor .executemany (query , query_params , ** kwargs )
92+ else :
93+ cursor .execute (query , query_params , ** kwargs )
7394 self .conn .commit ()
7495 return None if cursor .description is None else cursor .fetchall ()
7596 except Exception as e :
7697 self .conn .rollback ()
98+ cursor .close ()
99+ self .close_connection ()
77100 self .logger .error (
78101 ('Error executing {name} database query \' {query}\' : '
79102 '{error}' ).format (
80- name = self .db_name , query = query , error = e ))
103+ name = self .database , query = query , error = e ))
81104 raise PostgreSQLClientError (
82105 ('Error executing {name} database query \' {query}\' : '
83106 '{error}' ).format (
84- name = self .db_name , query = query , error = e )) from None
107+ name = self .database , query = query , error = e )) from None
85108 finally :
86109 cursor .close ()
87110
88111 def close_connection (self ):
89112 """Closes the database connection"""
90113 self .logger .debug ('Closing {} database connection' .format (
91- self .db_name ))
114+ self .database ))
92115 self .conn .close ()
93116
94117
0 commit comments