1212import time
1313import json
1414import logging
15+ import base64
1516
1617
1718class STATUS :
@@ -22,6 +23,7 @@ class STATUS:
2223
2324
2425def handle_error (method ):
26+
2527 def _try_except (self , * args , ** kwargs ):
2628 try :
2729 return method (self , * args , ** kwargs )
@@ -39,9 +41,10 @@ def _try_except(self, *args, **kwargs):
3941
4042
4143class Broker :
44+
4245 def __init__ (self , config ):
4346 self .config = config
44- logging . info ( f'config: { json . dumps ( config , indent = 2 ) } ' )
47+ utils . log_dict ( self . config , prefix = '⚙️ ' )
4548
4649 self .partitions_by_stream_lock = Lock ()
4750 self .partitions_by_stream = {}
@@ -58,12 +61,44 @@ def __init__(self, config):
5861 Thread (target = self ._rebalance_loop , daemon = True ).start ()
5962 Thread (target = self ._prune_loop , daemon = True ).start ()
6063
64+ def check_authenticated (self , headers ):
65+ if 'authorization' in headers :
66+ return True
67+ return False
68+
69+ def check_authorized (self , headers ):
70+ token = headers ['authorization' ].split ('Basic ' )[1 ]
71+ client_id , client_secret = base64 .b64decode (token ) \
72+ .decode ('ascii' ).split (':' )
73+ return (
74+ client_id in self .config ['auth' ]
75+ and client_secret == self .config ['auth' ][client_id ]
76+ )
77+
6178 @staticmethod
6279 def on_get (request , response ):
6380 response .content_type = 'text/html; charset=utf-8'
6481 response .body = f'Labteral Stopover { __version__ } '
6582
6683 def on_post (self , request , response ):
84+ headers = {
85+ key .lower (): value
86+ for (key , value ) in request .headers .items ()
87+ }
88+
89+ if 'auth' in self .config :
90+ is_authenticated = self .check_authenticated (headers )
91+ if not is_authenticated :
92+ response .status = falcon .status_codes .HTTP_401
93+ return
94+
95+ is_authorized = self .check_authorized (
96+ headers
97+ ) if is_authenticated else False
98+ if not is_authorized :
99+ response .status = falcon .status_codes .HTTP_403
100+ return
101+
67102 bin_data = request .stream .read ()
68103
69104 plain_response = False
@@ -290,7 +325,8 @@ def _get_partition(self, stream: str, partition_number: int):
290325 self .partitions [stream ][partition_number ] = Partition (
291326 stream = stream ,
292327 number = partition_number ,
293- data_dir = self .config ['global' ]['data_dir' ])
328+ data_dir = self .config ['global' ]['data_dir' ]
329+ )
294330 return self .partitions [stream ][partition_number ]
295331
296332 def _get_receiver_partition_numbers (
@@ -306,12 +342,13 @@ def _get_receiver_partition_numbers(
306342 if receiver_group not in self .partitions_by_group [stream ]:
307343 self .partitions_by_group [stream ][receiver_group ] = {}
308344
309- if receiver not in self .partitions_by_group [stream ][
310- receiver_group ]:
345+ if receiver not in self .partitions_by_group [stream ][receiver_group
346+ ]:
311347 self .partitions_by_group [stream ][receiver_group ][receiver ] = []
312348
313349 return list (
314- self .partitions_by_group [stream ][receiver_group ][receiver ])
350+ self .partitions_by_group [stream ][receiver_group ][receiver ]
351+ )
315352
316353 def _get_stream_path (self , stream : str ) -> str :
317354 return f"{ self .config ['global' ]['data_dir' ]} /streams/{ stream } /"
@@ -325,8 +362,8 @@ def _get_stream_partition_numbers(self, stream: str):
325362 self .partitions_by_stream [stream ] = partition_numbers
326363
327364 try :
328- partitions_target = self .config ['streams' ][stream ][
329- 'partitions' ]
365+ partitions_target = self .config ['streams' ][stream ]['partitions'
366+ ]
330367 except KeyError :
331368 partitions_target = self .config ['global' ]['partitions' ]
332369
@@ -344,12 +381,15 @@ def _get_stream_partition_numbers(self, stream: str):
344381 partitions_target ):
345382 if partition_number in partition_numbers :
346383 raise FileNotFoundError (
347- f'missing partitions among { partition_numbers } ' )
348-
349- Partition (stream = stream ,
350- number = partition_number ,
351- data_dir = self .config ['global' ]['data_dir' ],
352- create_if_missing = True )
384+ f'missing partitions among { partition_numbers } '
385+ )
386+
387+ Partition (
388+ stream = stream ,
389+ number = partition_number ,
390+ data_dir = self .config ['global' ]['data_dir' ],
391+ create_if_missing = True
392+ )
353393 partition_numbers .append (partition_number )
354394
355395 return self .partitions_by_stream [stream ]
@@ -359,16 +399,15 @@ def _rebalance_loop(self):
359399 self ._rebalance ()
360400 remaining_seconds = self .config ['global' ]['rebalance_interval' ]
361401 logging .debug (
362- f"next rebalance will hapen in { remaining_seconds } seconds" )
402+ f"next rebalance will hapen in { remaining_seconds } seconds"
403+ )
363404 time .sleep (self .config ['global' ]['rebalance_interval' ])
364405
365406 def _rebalance (self ):
366407 with self .partitions_by_group_lock :
367408 logging .debug ('rebalancing...' )
368409 if self .partitions_by_group :
369- logging .info (
370- 'assignments: '
371- f'{ json .dumps (self .partitions_by_group , indent = 4 )} ' )
410+ utils .log_dict (self .partitions_by_group )
372411
373412 receivers_to_remove = []
374413 for stream in self .partitions_by_group :
@@ -389,7 +428,8 @@ def _rebalance(self):
389428
390429 else :
391430 receivers_to_remove .append (
392- (stream , receiver_group , receiver ))
431+ (stream , receiver_group , receiver )
432+ )
393433
394434 stream_partition_numbers = \
395435 self ._get_stream_partition_numbers (stream )
@@ -410,22 +450,22 @@ def _rebalance(self):
410450 step ):
411451 receiver_index = index // step
412452 self .partitions_by_group [stream ][receiver_group ][
413- stream_receiver_group_receivers [
414- receiver_index ]] = stream_partition_numbers [
415- index :index + step ]
453+ stream_receiver_group_receivers [receiver_index ]
454+ ] = stream_partition_numbers [index :index + step ]
416455
417456 for index in range (number_of_partitions - remainder ,
418457 number_of_partitions ):
419458 receiver_index = index - number_of_partitions + 1
420459 self .partitions_by_group [stream ][receiver_group ][
421- stream_receiver_group_receivers [
422- receiver_index ]].append (
423- stream_partition_numbers [index ])
460+ stream_receiver_group_receivers [receiver_index ]
461+ ].append (stream_partition_numbers [index ])
424462
425463 for stream , receiver_group , receiver in receivers_to_remove :
426- logging .info (f'receiver "{ receiver } " kicked from the '
427- f'receiver_group "{ receiver_group } " '
428- f'for the stream "{ stream } "' )
464+ logging .info (
465+ f'receiver "{ receiver } " kicked from the '
466+ f'receiver_group "{ receiver_group } " '
467+ f'for the stream "{ stream } "'
468+ )
429469 del self .partitions_by_group [stream ][receiver_group ][receiver ]
430470 if receiver in self .last_seen_by_group [receiver_group ]:
431471 del self .last_seen_by_group [receiver_group ][receiver ]
@@ -473,8 +513,10 @@ def _prune_loop(self):
473513
474514 for partition_number in partition_numbers :
475515 logging .info (
476- f'pruning stream { stream } ({ partition_number } )' )
516+ f'pruning stream { stream } ({ partition_number } )'
517+ )
477518
478519 partition = self ._get_partition (
479- stream , partition_number )
520+ stream , partition_number
521+ )
480522 partition .prune (int (ttl ))
0 commit comments