Skip to content

Commit 904b2b0

Browse files
authored
Merge pull request #8 from labteral/develop
v2
2 parents c37b37f + bbb1c15 commit 904b2b0

4 files changed

Lines changed: 36 additions & 32 deletions

File tree

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ RUN \
2828
pip install --upgrade pip \
2929
&& pip install \
3030
"cython==0.29.20" \
31-
"easyrocks==0.0.16a0" \
31+
"easyrocks>=2.214.0" \
3232
"msgpack==1.0.2" \
3333
"python-snappy==0.5.4" \
3434
"pyyaml==5.3.1" \

stopover/broker.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ def put_message(self, params: dict) -> dict:
9898

9999
partition_numbers = self._get_stream_partition_numbers(stream)
100100
if partition_number is None:
101-
partition_number = utils.get_partition_number(
102-
partition_numbers, key)
101+
partition_number = \
102+
utils.get_partition_number(partition_numbers, key)
103103
elif partition_number not in partition_numbers:
104104
raise ValueError('partition does not exist')
105105

@@ -276,7 +276,6 @@ def _get_stream_partition_numbers(self, stream: str):
276276
raise FileNotFoundError(
277277
f'missing partitions among {partition_numbers}')
278278

279-
# Initialize new partitions
280279
Partition(stream=stream,
281280
number=partition_number,
282281
data_dir=self.config['global']['data_dir'],
@@ -313,8 +312,8 @@ def _rebalance(self):
313312
self.last_seen_by_group[receiver_group][receiver]
314313
) / 1000
315314

316-
if receiver_unseen_time < self.config['global'][
317-
'receiver_timeout']:
315+
if receiver_unseen_time \
316+
< self.config['global']['receiver_timeout']:
318317
stream_receiver_group_receivers.append(receiver)
319318

320319
else:
@@ -353,9 +352,9 @@ def _rebalance(self):
353352
stream_partition_numbers[index])
354353

355354
for stream, receiver_group, receiver in receivers_to_remove:
356-
logging.info(
357-
f'receiver "{receiver}" kicked from the receiver_group \
358-
"{receiver_group}" for the stream "{stream}"')
355+
logging.info(f'receiver "{receiver}" kicked from the ' \
356+
f'receiver_group "{receiver_group}" ' \
357+
f'for the stream "{stream}"')
359358
del self.partitions_by_group[stream][receiver_group][receiver]
360359
if receiver in self.last_seen_by_group[receiver_group]:
361360
del self.last_seen_by_group[receiver_group][receiver]

stopover/partition.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
import utils
44
from os import makedirs
55
from easyrocks import RocksDB, WriteBatch, CompressionType
6+
from easyrocks.utils import int_to_padded_bytes
67
from threading import Lock
78
import logging
89
from typing import Dict
910

11+
UINT_BYTES = 8
12+
MAX_UINT = 2**(UINT_BYTES * 8) - 1
13+
1014

1115
class PartitionItem:
1216
def __init__(self,
@@ -39,6 +43,10 @@ def _load_from_dict(self, value: Dict):
3943

4044

4145
class Partition:
46+
MESSAGE = b'\x00'
47+
INDEX = b'\x01'
48+
OFFSET = b'\x02'
49+
4250
def __init__(self,
4351
stream: str,
4452
number: int,
@@ -114,7 +122,7 @@ def set_offset(self, receiver: str, offset: int):
114122
offset_key = self._get_offset_key(receiver)
115123
self._store.put(offset_key, offset)
116124

117-
def prune(self, ttl):
125+
def prune(self, ttl: int):
118126
ttl *= 1000 # milliseconds
119127

120128
current_timestamp = utils.get_timestamp_ms()
@@ -149,43 +157,40 @@ def _get_by_index(self, index: int) -> bytes:
149157
partition_item = PartitionItem(item_dict=value)
150158
return partition_item
151159

152-
def _get_index(self):
153-
index_key = self._get_index_key()
160+
def _get_index(self) -> int:
161+
index_key = Partition.INDEX
154162
index = self._store.get(index_key)
155163
if index is None:
156164
index = -1
157165
return index
158166

159-
def _increase_index(self, write_batch):
160-
index_key = self._get_index_key()
161-
index = self._get_index()
162-
index += 1
163-
self._store.put(index_key, index, write_batch=write_batch)
164-
165-
def _get_offset(self, receiver: str):
167+
def _get_offset(self, receiver: str) -> int:
166168
offset_key = self._get_offset_key(receiver)
167169
offset = self._store.get(offset_key)
168170
if offset is None:
169171
offset = -1
170172
return offset
171173

174+
def _increase_index(self, write_batch: WriteBatch):
175+
next_index = self._get_index() + 1
176+
if next_index > MAX_UINT:
177+
raise ValueError(next_index)
178+
index_key = Partition.INDEX
179+
self._store.put(index_key, next_index, write_batch=write_batch)
180+
172181
def _increase_offset(self, receiver: str):
182+
next_offset = self._get_offset(receiver) + 1
183+
if next_offset > MAX_UINT:
184+
raise ValueError(next_offset)
173185
offset_key = self._get_offset_key(receiver)
174-
offset = self._get_offset(receiver)
175-
offset += 1
176-
self._store.put(offset_key, offset)
177-
178-
@staticmethod
179-
def _get_index_key():
180-
index_key = utils.get_padded_string('', prefix='_index:')
181-
return index_key
186+
self._store.put(offset_key, next_offset)
182187

183188
@staticmethod
184-
def _get_offset_key(receiver):
185-
offset_key = utils.get_padded_string(receiver, prefix='_offset:')
189+
def _get_offset_key(receiver: str) -> bytes:
190+
offset_key = Partition.OFFSET + bytes(receiver, 'utf-8')
186191
return offset_key
187192

188193
@staticmethod
189-
def _get_message_key(index):
190-
message_key = utils.get_padded_string(str(index), prefix='message:')
194+
def _get_message_key(index: int) -> bytes:
195+
message_key = Partition.MESSAGE + int_to_padded_bytes(index, UINT_BYTES)
191196
return message_key

stopover/stopover.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4-
version = '1.214.0'
4+
version = '2.214.0'
55

66
banner = f"""
77
███████████ ███████████

0 commit comments

Comments
 (0)