Skip to content

Commit 3c80465

Browse files
Merge branch 'qa' into production
2 parents c3114dd + d191a78 commit 3c80465

6 files changed

Lines changed: 35 additions & 3 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
.DS_Store
22
dist/
33
__pycache__/
4+
.vscode/
45
*env/
56
*.py[cod]
67
*$py.class

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## v1.0.4 - 6/28/23
4+
- Enforce Kinesis stream 1000 records/second write limit
5+
36
## v1.0.3 - 5/19/23
47
- Add research_catalog_identifier_helper function
58

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ kinesis_client = KinesisClient(...)
3535
# Do not use any version below 1.0.0
3636
# All available optional dependencies can be found in pyproject.toml.
3737
# See the "Managing dependencies" section below for more details.
38-
nypl-py-utils[kinesis-client,config-helper]==1.0.1
38+
nypl-py-utils[kinesis-client,config-helper]==1.0.4
3939
```
4040

4141
## Developing locally

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "nypl_py_utils"
7-
version = "1.0.3"
7+
version = "1.0.4"
88
authors = [
99
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
1010
]

src/nypl_py_utils/classes/kinesis_client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,22 @@ def close(self):
3939
def send_records(self, records):
4040
"""
4141
Sends list of records (usually represented as Avro-encoded byte
42-
strings) to Kinesis in batches of size self.batch_size.
42+
strings) to Kinesis in batches of size self.batch_size. Kinesis can
43+
only handle 1000 records per second, so this method waits a second
44+
between each 1000 records.
4345
"""
46+
records_sent_since_pause = 0
4447
for i in range(0, len(records), self.batch_size):
4548
encoded_batch = records[i:i + self.batch_size]
4649
kinesis_records = [{'Data': record, 'PartitionKey':
4750
str(int(time.time() * 1000000000))}
4851
for record in encoded_batch]
52+
53+
if records_sent_since_pause + len(encoded_batch) > 1000:
54+
records_sent_since_pause = 0
55+
time.sleep(1)
4956
self._send_kinesis_format_records(kinesis_records, 1)
57+
records_sent_since_pause += len(encoded_batch)
5058

5159
def _send_kinesis_format_records(self, kinesis_records, call_count):
5260
"""

tests/test_kinesis_client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def test_send_records(self, test_instance, mocker):
2626
MOCK_RECORDS = [b'a', b'b', b'c', b'd', b'e']
2727
mocked_send_method = mocker.patch(
2828
'nypl_py_utils.classes.kinesis_client.KinesisClient._send_kinesis_format_records') # noqa: E501
29+
mock_sleep = mocker.patch('time.sleep', return_value=None)
2930

3031
test_instance.send_records(MOCK_RECORDS)
3132
mocked_send_method.assert_has_calls([
@@ -34,6 +35,25 @@ def test_send_records(self, test_instance, mocker):
3435
mocker.call([_TEST_KINESIS_RECORDS[2],
3536
_TEST_KINESIS_RECORDS[3]], 1),
3637
mocker.call([_TEST_KINESIS_RECORDS[4]], 1)])
38+
mock_sleep.assert_not_called()
39+
40+
def test_send_records_with_pause(self, mocker):
41+
mocker.patch('boto3.client')
42+
test_instance = KinesisClient('test_stream_arn', 500)
43+
44+
MOCK_RECORDS = [b'a'] * 2200
45+
mocked_send_method = mocker.patch(
46+
'nypl_py_utils.classes.kinesis_client.KinesisClient._send_kinesis_format_records') # noqa: E501
47+
mock_sleep = mocker.patch('time.sleep', return_value=None)
48+
49+
test_instance.send_records(MOCK_RECORDS)
50+
mocked_send_method.assert_has_calls([
51+
mocker.call([_TEST_KINESIS_RECORDS[0]]*500, 1),
52+
mocker.call([_TEST_KINESIS_RECORDS[0]]*500, 1),
53+
mocker.call([_TEST_KINESIS_RECORDS[0]]*500, 1),
54+
mocker.call([_TEST_KINESIS_RECORDS[0]]*500, 1),
55+
mocker.call([_TEST_KINESIS_RECORDS[0]]*200, 1)])
56+
assert mock_sleep.call_count == 2
3757

3858
def test_send_kinesis_format_records(self, test_instance):
3959
test_instance.kinesis_client.put_records.return_value = {

0 commit comments

Comments
 (0)