Skip to content

Commit c1099d0

Browse files
author
Mohan Kishore
committed
integration tested
1 parent 9cd3ed3 commit c1099d0

4 files changed

Lines changed: 203 additions & 93 deletions

File tree

python_dynamodb_lock/python_dynamodb_lock.py

Lines changed: 131 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from botocore.exceptions import ClientError
2828
from concurrent.futures import ThreadPoolExecutor
2929
import datetime
30+
from decimal import Decimal
3031
import logging
3132
import socket
3233
import time
@@ -53,12 +54,22 @@ class DynamoDBLockClient:
5354
_DEFAULT_EXPIRY_PERIOD = datetime.timedelta(hours=1)
5455
_DEFAULT_HEARTBEAT_TPS = 5
5556
_DEFAULT_APP_CALLBACK_THREADPOOL_SIZE = 5
57+
# for optional create-table method
58+
_DEFAULT_READ_CAPACITY = 5
59+
_DEFAULT_WRITE_CAPACITY = 5
5660

61+
# to help make the sort-key optional
5762
_DEFAULT_SORT_KEY_VALUE = '-'
5863

64+
# DynamoDB "hard-coded" column names
65+
_COL_OWNER_NAME = 'owner_name'
66+
_COL_LEASE_DURATION = 'lease_duration'
67+
_COL_RECORD_VERSION_NUMBER = 'record_version_number'
68+
_COL_EXPIRY_TIME = 'expiry_time'
69+
5970

6071
def __init__(self,
61-
dynamodb,
72+
dynamodb_resource,
6273
table_name=_DEFAULT_TABLE_NAME,
6374
partition_key_name=_DEFAULT_PARTITION_KEY_NAME,
6475
sort_key_name=_DEFAULT_SORT_KEY_NAME,
@@ -71,7 +82,7 @@ def __init__(self,
7182
app_callback_executor=None
7283
):
7384
"""
74-
:param boto3.ServiceResource dynamodb: mandatory argument
85+
:param boto3.ServiceResource dynamodb_resource: mandatory argument
7586
:param str table_name: defaults to 'DynamoDBLockTable'
7687
:param str partition_key_name: defaults to 'lock_key'
7788
:param str sort_key_name: defaults to 'sort_key'
@@ -96,7 +107,7 @@ def __init__(self,
96107
maximum of 5 threads.
97108
"""
98109
self.uuid = uuid.uuid4().hex
99-
self.dynamodb = dynamodb
110+
self.dynamodb_resource = dynamodb_resource
100111
self.table_name = table_name
101112
self.partition_key_name = partition_key_name
102113
self.sort_key_name = sort_key_name
@@ -113,7 +124,7 @@ def __init__(self,
113124
# additional properties
114125
self._locks = {}
115126
self._shutting_down = False
116-
self.dynamodb_table = dynamodb.Table(table_name)
127+
self._dynamodb_table = dynamodb_resource.Table(table_name)
117128
# and, initialization
118129
self._start_background_thread()
119130
logger.info('Created: %s', str(self))
@@ -200,10 +211,10 @@ def send_heartbeat(self, lock):
200211

201212
old_record_version_number = lock.record_version_number
202213
new_record_version_number = str(uuid.uuid4())
203-
new_expiry_time = time.time() + self.expiry_period.total_seconds()
214+
new_expiry_time = int(time.time() + self.expiry_period.total_seconds())
204215

205216
# first, try to update the database
206-
self.dynamodb_table.update_item(
217+
self._dynamodb_table.update_item(
207218
Key={
208219
self.partition_key_name: lock.partition_key,
209220
self.sort_key_name: lock.sort_key
@@ -213,19 +224,20 @@ def send_heartbeat(self, lock):
213224
ExpressionAttributeNames={
214225
'#pk': self.partition_key_name,
215226
'#sk': self.sort_key_name,
216-
'#rvn': 'record_version_number',
217-
'#et': 'expiry_time',
227+
'#rvn': self._COL_RECORD_VERSION_NUMBER,
228+
'#et': self._COL_EXPIRY_TIME,
218229
},
219230
ExpressionAttributeValues={
220-
':old_rvn': {'S': old_record_version_number},
221-
':new_rvn': {'S': new_record_version_number},
222-
':new_et': {'N': new_expiry_time}
231+
':old_rvn': old_record_version_number,
232+
':new_rvn': new_record_version_number,
233+
':new_et': new_expiry_time,
223234
}
224235
)
225236

226237
# if successful, update the in-memory lock representations
227238
lock.record_version_number = new_record_version_number
228239
lock.expiry_time = new_expiry_time
240+
lock.last_updated_time = time.time()
229241
logger.debug('Successfully sent the heartbeat: %s', lock.unique_identifier)
230242
except ClientError as e:
231243
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
@@ -243,9 +255,8 @@ def send_heartbeat(self, lock):
243255
# Note: the lock might have been released locally by the conditional-exception above
244256
if lock.unique_identifier not in self._locks: return
245257
# if the lock is in danger, invoke the app-callback
246-
last_update_time = lock.expiry_time - self.expiry_period.total_seconds()
247-
is_lock_safe = time.time() < (last_update_time + self.safe_period.total_seconds())
248-
if not is_lock_safe:
258+
safe_period_end_time = lock.last_updated_time + self.safe_period.total_seconds()
259+
if not time.time() < safe_period_end_time:
249260
logger.warning('LockInDanger while sending heartbeat: %s', lock.unique_identifier)
250261
# callback - the app should abort its processing, and release the lock
251262
self._call_app_callback(lock, DynamoDBLockError.LOCK_IN_DANGER)
@@ -311,9 +322,9 @@ def acquire_lock(self,
311322
partition_key=partition_key,
312323
sort_key=sort_key,
313324
owner_name=self.owner_name,
314-
lease_duration_in_seconds=self.lease_duration.total_seconds(),
325+
lease_duration=self.lease_duration.total_seconds(),
315326
record_version_number=str( uuid.uuid4() ),
316-
expiry_time=time.time() + self.expiry_period.total_seconds(),
327+
expiry_time=int(time.time() + self.expiry_period.total_seconds()),
317328
additional_attributes=additional_attributes,
318329
app_callback=app_callback,
319330
lock_client=self,
@@ -330,7 +341,8 @@ def acquire_lock(self,
330341

331342
try:
332343
# need to bump up the expiry time - to account for the sleep between tries
333-
new_lock.expiry_time = time.time() + self.expiry_period.total_seconds()
344+
new_lock.last_updated_time = time.time()
345+
new_lock.expiry_time = int(time.time() + self.expiry_period.total_seconds())
334346

335347
logger.debug('Checking the database for existing owner: %s', new_lock.unique_identifier)
336348
existing_lock = self._get_lock_from_dynamodb(partition_key, sort_key)
@@ -359,7 +371,7 @@ def acquire_lock(self,
359371
# if the record_version_number has not changed for more than lease_duration period,
360372
# it basically means that the owner thread/process has died.
361373
last_version_elapsed_time = time.time() - last_version_fetch_time
362-
if last_version_elapsed_time > existing_lock.lease_duration.total_seconds():
374+
if last_version_elapsed_time > existing_lock.lease_duration:
363375
logger.warning('Existing lock\'s lease has expired: %s', str(existing_lock))
364376
self._overwrite_existing_lock_in_dynamodb(new_lock, last_record_version_number)
365377
logger.debug('Added to the DDB. Adding to in-memory map: %s', new_lock.unique_identifier)
@@ -380,7 +392,7 @@ def acquire_lock(self,
380392
next_loop_start_time = start_time + retry_count * retry_period.total_seconds()
381393
if next_loop_start_time > retry_timeout_time:
382394
raise DynamoDBLockError(DynamoDBLockError.ACQUIRE_TIMEOUT, 'acquire_lock() timed out: ' + new_lock.unique_identifier)
383-
elif curr_loop_end_time < next_loop_start_time:
395+
elif next_loop_start_time > curr_loop_end_time:
384396
logger.info('Sleeping before a retry: %s', new_lock.unique_identifier)
385397
time.sleep(next_loop_start_time - curr_loop_end_time)
386398

@@ -417,7 +429,7 @@ def release_lock(self, lock, best_effort=True):
417429
del self._locks[lock.unique_identifier]
418430

419431
# then, remove it from the database
420-
self.dynamodb_table.delete_item(
432+
self._dynamodb_table.delete_item(
421433
Key={
422434
self.partition_key_name: lock.partition_key,
423435
self.sort_key_name: lock.sort_key
@@ -426,10 +438,10 @@ def release_lock(self, lock, best_effort=True):
426438
ExpressionAttributeNames={
427439
'#pk': self.partition_key_name,
428440
'#sk': self.sort_key_name,
429-
'#rvn': 'record_version_number',
441+
'#rvn': self._COL_RECORD_VERSION_NUMBER,
430442
},
431443
ExpressionAttributeValues={
432-
':rvn': {'S': lock.record_version_number},
444+
':rvn': lock.record_version_number,
433445
}
434446
)
435447

@@ -456,7 +468,7 @@ def _get_lock_from_dynamodb(self, partition_key, sort_key):
456468
:rtype: BaseDynamoDBLock
457469
"""
458470
logger.debug('Getting the lock from dynamodb for: %s, %s', partition_key, sort_key)
459-
result = self.dynamodb_table.get_item(
471+
result = self._dynamodb_table.get_item(
460472
Key={
461473
self.partition_key_name: partition_key,
462474
self.sort_key_name: sort_key
@@ -476,7 +488,7 @@ def _add_new_lock_to_dynamodb(self, lock):
476488
:param DynamoDBLock lock: The lock instance that needs to be added to the database.
477489
"""
478490
logger.debug('Adding a new lock: %s', str(lock))
479-
self.dynamodb_table.put_item(
491+
self._dynamodb_table.put_item(
480492
Item=self._get_item_from_lock(lock),
481493
ConditionExpression='NOT(attribute_exists(#pk) AND attribute_exists(#sk))',
482494
ExpressionAttributeNames={
@@ -494,16 +506,16 @@ def _overwrite_existing_lock_in_dynamodb(self, lock, record_version_number):
494506
:param str record_version_number: The version-number for the old lock instance in the database.
495507
"""
496508
logger.debug('Overwriting existing-rvn: %s with new lock: %s', record_version_number, str(lock))
497-
self.dynamodb_table.put_item(
509+
self._dynamodb_table.put_item(
498510
Item=self._get_item_from_lock(lock),
499511
ConditionExpression='attribute_exists(#pk) AND attribute_exists(#sk) AND #rvn = :old_rvn',
500512
ExpressionAttributeNames={
501513
'#pk': self.partition_key_name,
502514
'#sk': self.sort_key_name,
503-
'#rvn': 'record_version_number',
515+
'#rvn': self._COL_RECORD_VERSION_NUMBER,
504516
},
505517
ExpressionAttributeValues={
506-
':old_rvn': {'S': record_version_number},
518+
':old_rvn': record_version_number,
507519
}
508520
)
509521

@@ -519,10 +531,10 @@ def _get_lock_from_item(self, item):
519531
lock = BaseDynamoDBLock(
520532
partition_key=item.pop(self.partition_key_name),
521533
sort_key=item.pop(self.sort_key_name),
522-
owner_name=item.pop('owner_name'),
523-
lease_duration_in_seconds=item.pop('lease_duration_in_seconds'),
524-
record_version_number=item.pop('record_version_number'),
525-
expiry_time=item.pop('expiry_time'),
534+
owner_name=item.pop(self._COL_OWNER_NAME),
535+
lease_duration=float(item.pop(self._COL_LEASE_DURATION)),
536+
record_version_number=item.pop(self._COL_RECORD_VERSION_NUMBER),
537+
expiry_time=int(item.pop(self._COL_EXPIRY_TIME)),
526538
additional_attributes=item
527539
)
528540
return lock
@@ -540,10 +552,10 @@ def _get_item_from_lock(self, lock):
540552
item.update({
541553
self.partition_key_name: lock.partition_key,
542554
self.sort_key_name: lock.sort_key,
543-
'owner_name': lock.owner_name,
544-
'lease_duration_in_seconds': lock.lease_duration_in_seconds,
545-
'record_version_number': lock.record_version_number,
546-
'expiry_time': lock.expiry_time
555+
self._COL_OWNER_NAME: lock.owner_name,
556+
self._COL_LEASE_DURATION: Decimal.from_float(lock.lease_duration),
557+
self._COL_RECORD_VERSION_NUMBER: lock.record_version_number,
558+
self._COL_EXPIRY_TIME: lock.expiry_time
547559
})
548560
return item
549561

@@ -591,6 +603,80 @@ def __str__(self):
591603
return '%s::%s' % (self.__class__.__name__, self.__dict__)
592604

593605

606+
@classmethod
607+
def create_dynamodb_table(cls,
608+
dynamodb_client,
609+
table_name=_DEFAULT_TABLE_NAME,
610+
partition_key_name=_DEFAULT_PARTITION_KEY_NAME,
611+
sort_key_name=_DEFAULT_SORT_KEY_NAME,
612+
read_capacity=_DEFAULT_READ_CAPACITY,
613+
write_capacity=_DEFAULT_WRITE_CAPACITY):
614+
615+
"""
616+
Helper method to create the DynamoDB table
617+
618+
:param boto3.DynamoDB.Client dynamodb_client: mandatory argument
619+
:param str table_name: defaults to 'DynamoDBLockTable'
620+
:param str partition_key_name: defaults to 'lock_key'
621+
:param str sort_key_name: defaults to 'sort_key'
622+
:param int read_capacity: the max TPS for strongly-consistent reads; defaults to 5
623+
:param int write_capacity: the max TPS for write operations; defaults to 5
624+
:return:
625+
"""
626+
logger.info("Creating the lock table: %s", table_name)
627+
dynamodb_client.create_table(
628+
TableName=table_name,
629+
KeySchema=[
630+
{
631+
'AttributeName': partition_key_name,
632+
'KeyType': 'HASH'
633+
},
634+
{
635+
'AttributeName': sort_key_name,
636+
'KeyType': 'RANGE'
637+
},
638+
],
639+
AttributeDefinitions=[
640+
{
641+
'AttributeName': partition_key_name,
642+
'AttributeType': 'S'
643+
},
644+
{
645+
'AttributeName': sort_key_name,
646+
'AttributeType': 'S'
647+
},
648+
],
649+
ProvisionedThroughput={
650+
'ReadCapacityUnits': read_capacity,
651+
'WriteCapacityUnits': write_capacity
652+
},
653+
)
654+
cls._wait_for_table_to_be_active(dynamodb_client, table_name)
655+
656+
logger.info("Updating the table with time_to_live configuration")
657+
dynamodb_client.update_time_to_live(
658+
TableName=table_name,
659+
TimeToLiveSpecification={
660+
'Enabled': True,
661+
'AttributeName': cls._COL_EXPIRY_TIME
662+
}
663+
)
664+
cls._wait_for_table_to_be_active(dynamodb_client, table_name)
665+
666+
667+
@classmethod
668+
def _wait_for_table_to_be_active(cls, dynamodb_client, table_name):
669+
logger.info("Waiting till the table becomes ACTIVE")
670+
while True:
671+
response = dynamodb_client.describe_table(TableName=table_name)
672+
status = response.get('Table', {}).get('TableStatus', 'UNKNOWN')
673+
logger.info("Table status: %s", status)
674+
if status == 'ACTIVE':
675+
break
676+
else:
677+
time.sleep(2)
678+
679+
594680

595681
class BaseDynamoDBLock:
596682
"""
@@ -603,7 +689,7 @@ def __init__(self,
603689
partition_key,
604690
sort_key,
605691
owner_name,
606-
lease_duration_in_seconds,
692+
lease_duration,
607693
record_version_number,
608694
expiry_time,
609695
additional_attributes
@@ -612,21 +698,20 @@ def __init__(self,
612698
:param str partition_key: The primary lock identifier
613699
:param str sort_key: If present, forms a "composite identifier" along with the partition_key
614700
:param str owner_name: The owner name - typically from the lock_client
615-
:param float lease_duration_in_seconds: The lease duration - typically from the lock_client
616-
:param str record_version_number: Changes with every heartbeat - the "liveness" indicator
617-
:param float expiry_time: Epoch timestamp in seconds after which DynamoDB will auto-delete the record
701+
:param float lease_duration: The lease duration in seconds - typically from the lock_client
702+
:param str record_version_number: A "liveness" indicating GUID - changes with every heartbeat
703+
:param int expiry_time: Epoch timestamp in seconds after which DynamoDB will auto-delete the record
618704
:param dict additional_attributes: Arbitrary application metadata to be stored with the lock
619705
"""
620706
self.partition_key = partition_key
621707
self.sort_key = sort_key
622708
self.owner_name = owner_name
623-
self.lease_duration_in_seconds = lease_duration_in_seconds
709+
self.lease_duration = lease_duration
624710
self.record_version_number = record_version_number
625711
self.expiry_time = expiry_time
626712
self.additional_attributes = additional_attributes or {}
627713
# additional properties
628714
self.unique_identifier = quote(partition_key) + '|' + quote(sort_key)
629-
self.lease_duration = datetime.timedelta(seconds=lease_duration_in_seconds)
630715

631716

632717
def __str__(self):
@@ -646,7 +731,7 @@ def __init__(self,
646731
partition_key,
647732
sort_key,
648733
owner_name,
649-
lease_duration_in_seconds,
734+
lease_duration,
650735
record_version_number,
651736
expiry_time,
652737
additional_attributes,
@@ -657,9 +742,9 @@ def __init__(self,
657742
:param str partition_key: The primary lock identifier
658743
:param str sort_key: If present, forms a "composite identifier" along with the partition_key
659744
:param str owner_name: The owner name - typically from the lock_client
660-
:param float lease_duration_in_seconds: The lease duration - typically from the lock_client
745+
:param float lease_duration: The lease duration - typically from the lock_client
661746
:param str record_version_number: Changes with every heartbeat - the "liveness" indicator
662-
:param float expiry_time: Epoch timestamp in seconds after which DynamoDB will auto-delete the record
747+
:param int expiry_time: Epoch timestamp in seconds after which DynamoDB will auto-delete the record
663748
:param dict additional_attributes: Arbitrary application metadata to be stored with the lock
664749
665750
:param Callable app_callback: Callback function that can be used to notify the app of lock entering
@@ -670,14 +755,15 @@ def __init__(self,
670755
partition_key,
671756
sort_key,
672757
owner_name,
673-
lease_duration_in_seconds,
758+
lease_duration,
674759
record_version_number,
675760
expiry_time,
676761
additional_attributes
677762
)
678763
self.app_callback = app_callback
679764
self.lock_client = lock_client
680765
# additional properties
766+
self.last_updated_time = time.time()
681767
self.thread_lock = threading.RLock()
682768

683769

requirements_dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ coverage==4.5.1
88
Sphinx==1.8.0
99
twine==1.11.0
1010
sphinx_rtd_theme==0.4.1
11+
pytest==3.8.1

0 commit comments

Comments
 (0)