Skip to content

Commit f050f5a

Browse files
authored
Merge pull request #88 from customerio/option_to_control_connection_pooling
Adds an additional named parameter to the CustomerIO and APIClient in…
2 parents 907a4a9 + 610e673 commit f050f5a

5 files changed

Lines changed: 89 additions & 20 deletions

File tree

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,17 @@ response = client.send_email(request)
226226
print(response)
227227
```
228228

229+
## Notes
230+
- The Customer.io Python SDK depends on the [`Requests`](https://pypi.org/project/requests/) library which includes [`urllib3`](https://pypi.org/project/urllib3/) as a transitive dependency. The [`Requests`](https://pypi.org/project/requests/) library leverages connection pooling defined in [`urllib3`](https://pypi.org/project/urllib3/). [`urllib3`](https://pypi.org/project/urllib3/) only attempts to retry invocations of `HTTP` methods which are understood to be idempotent (See: [`Retry.DEFAULT_ALLOWED_METHODS`](https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/retry.py#L184)). Since the `POST` method is not considered to be idempotent, any invocations which require `POST` are not retried.
231+
232+
- It is possible to have the Customer.io Python SDK effectively *disable* connection pooling by passing a named initialization parameter `use_connection_pooling` to either the `APIClient` class or `CustomerIO` class. Setting this parameter to `False` (default: `True`) causes the [`Session`](https://github.com/psf/requests/blob/main/requests/sessions.py#L355) to be initialized and discarded after each request. If you are experiencing integration issues where the cause is reported as `Connection Reset by Peer`, this may correct the problem. It will, however, impose a slight performance penalty as the TCP connection set-up and tear-down will now occur for each request.
233+
234+
### Usage Example Disabling Connection Pooling
235+
```python
236+
from customerio import CustomerIO, Regions
237+
cio = CustomerIO(site_id, api_key, region=Regions.US, use_connection_pooling=False)
238+
```
239+
229240
## Running tests
230241

231242
Changes to the library can be tested by running `make test` from the parent directory.

customerio/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
VERSION = (1, 5, 0, 'final', 0)
1+
VERSION = (1, 6, 0, 'final', 0)
22

33
def get_version():
44
version = '%s.%s' % (VERSION[0], VERSION[1])

customerio/api.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,27 @@
77
from .regions import Regions, Region
88

99
class APIClient(ClientBase):
10-
def __init__(self, key, url=None, region=Regions.US, retries=3, timeout=10, backoff_factor=0.02):
10+
def __init__(self, key, url=None, region=Regions.US, retries=3, timeout=10, backoff_factor=0.02, use_connection_pooling=True):
1111
if not isinstance(region, Region):
1212
raise CustomerIOException('invalid region provided')
1313

14+
self.key = key
1415
self.url = url or 'https://{host}'.format(host=region.api_host)
1516
ClientBase.__init__(self, retries=retries,
16-
timeout=timeout, backoff_factor=backoff_factor)
17-
18-
self.http.headers['Authorization'] = "Bearer {key}".format(key=key)
17+
timeout=timeout, backoff_factor=backoff_factor, use_connection_pooling=use_connection_pooling)
1918

2019
def send_email(self, request):
2120
if isinstance(request, SendEmailRequest):
2221
request = request._to_dict()
2322
resp = self.send_request('POST', self.url + "/v1/send/email", request)
2423
return json.loads(resp)
2524

25+
# builds the session.
26+
def _build_session(self):
27+
session = super()._build_session()
28+
session.headers['Authorization'] = "Bearer {key}".format(key=self.key)
2629

30+
return session
2731

2832
class SendEmailRequest(object):
2933
'''An object with all the options avaiable for triggering a transactional message'''

customerio/client_base.py

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
from __future__ import division
55
from datetime import datetime, timezone
6+
import logging
67
import math
78

89
from requests import Session
@@ -15,24 +16,32 @@ class CustomerIOException(Exception):
1516
pass
1617

1718
class ClientBase(object):
18-
def __init__(self, retries=3, timeout=10, backoff_factor=0.02):
19+
def __init__(self, retries=3, timeout=10, backoff_factor=0.02, use_connection_pooling=True):
1920
self.timeout = timeout
2021
self.retries = retries
22+
self.backoff_factor = backoff_factor
23+
self.use_connection_pooling = use_connection_pooling
24+
self._current_session = None
2125

22-
self.http = Session()
23-
self.http.headers['User-Agent'] = "Customer.io Python Client/{version}".format(version=ClientVersion)
26+
@property
27+
def http(self):
28+
if self._current_session is None:
29+
self._current_session = self._get_session()
2430

25-
# Retry request a number of times before raising an exception
26-
# also define backoff_factor to delay each retry
27-
self.http.mount('https://', HTTPAdapter(max_retries=Retry(
28-
total=retries, backoff_factor=backoff_factor)))
31+
return self._current_session
2932

3033
def send_request(self, method, url, data):
3134
'''Dispatches the request and returns a response'''
3235

3336
try:
3437
response = self.http.request(
3538
method, url=url, json=self._sanitize(data), timeout=self.timeout)
39+
40+
result_status = response.status_code
41+
if result_status != 200:
42+
raise CustomerIOException('%s: %s %s %s' % (result_status, url, data, response.text))
43+
return response.text
44+
3645
except Exception as e:
3746
# Raise exception alerting user that the system might be
3847
# experiencing an outage and refer them to system status page.
@@ -42,10 +51,8 @@ def send_request(self, method, url, data):
4251
'''.format(klass=type(e), message=e, count=self.retries)
4352
raise CustomerIOException(message)
4453

45-
result_status = response.status_code
46-
if result_status != 200:
47-
raise CustomerIOException('%s: %s %s %s' % (result_status, url, data, response.text))
48-
return response.text
54+
finally:
55+
self._close()
4956

5057
def _sanitize(self, data):
5158
for k, v in data.items():
@@ -69,3 +76,38 @@ def _stringify_list(self, customer_ids):
6976
raise CustomerIOException(
7077
'customer_ids cannot be {type}'.format(type=type(v)))
7178
return customer_string_ids
79+
80+
# gets a session based on whether we want pooling or not. If no pooling is desired, we create a new session each time.
81+
def _get_session(self):
82+
if (self.use_connection_pooling):
83+
if (self._current_session is None):
84+
self._current_session = self._build_session()
85+
86+
# if we're using pooling, return the existing session.
87+
logging.debug("Using existing session...")
88+
return self._current_session
89+
else:
90+
# if we're not using pooling, build a new session.
91+
logging.debug("Creating new session...")
92+
self._current_session = self._build_session()
93+
return self._current_session
94+
95+
# builds the session.
96+
def _build_session(self):
97+
session = Session()
98+
session.headers['User-Agent'] = "Customer.io Python Client/{version}".format(version=ClientVersion)
99+
100+
# Retry request a number of times before raising an exception
101+
# also define backoff_factor to delay each retry
102+
session.mount(
103+
'https://',
104+
HTTPAdapter(max_retries=Retry(total=self.retries, backoff_factor=self.backoff_factor)))
105+
106+
return session
107+
108+
# closes the session if we're not using connection pooling.
109+
def _close(self):
110+
# if we're not using pooling; clean up the resources.
111+
if (not self.use_connection_pooling):
112+
self._current_session.close()
113+
self._current_session = None

customerio/track.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,27 @@
1010
from customerio.constants import CIOID, EMAIL, ID
1111

1212
class CustomerIO(ClientBase):
13-
def __init__(self, site_id=None, api_key=None, host=None, region=Regions.US, port=None, url_prefix=None, json_encoder=None, retries=3, timeout=10, backoff_factor=0.02):
13+
def __init__(self, site_id=None, api_key=None, host=None, region=Regions.US, port=None, url_prefix=None, json_encoder=None, retries=3, timeout=10, backoff_factor=0.02, use_connection_pooling=True):
1414
if not isinstance(region, Region):
1515
raise CustomerIOException('invalid region provided')
1616

1717
self.host = host or region.track_host
1818
self.port = port or 443
1919
self.url_prefix = url_prefix or '/api/v1'
20+
self.api_key = api_key
21+
self.site_id = site_id
2022

2123
if json_encoder is not None:
2224
warnings.warn(
2325
"With the switch to using requests library the `json_encoder` param is no longer used.", DeprecationWarning)
2426

2527
self.setup_base_url()
26-
ClientBase.__init__(self, retries=retries,
27-
timeout=timeout, backoff_factor=backoff_factor)
28-
self.http.auth = (site_id, api_key)
28+
ClientBase.__init__(
29+
self,
30+
retries=retries,
31+
timeout=timeout,
32+
backoff_factor=backoff_factor,
33+
use_connection_pooling=use_connection_pooling)
2934

3035
def _url_encode(self, id):
3136
return quote(str(id), safe='')
@@ -209,3 +214,10 @@ def merge_customers(self, primary_id_type, primary_id, secondary_id_type, second
209214
}
210215
}
211216
self.send_request('POST', url, post_data)
217+
218+
# builds the session.
219+
def _build_session(self):
220+
session = super()._build_session()
221+
session.auth = (self.site_id, self.api_key)
222+
223+
return session

0 commit comments

Comments
 (0)