-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathuserdata_mixin.py
More file actions
53 lines (42 loc) · 1.85 KB
/
userdata_mixin.py
File metadata and controls
53 lines (42 loc) · 1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from abc import ABCMeta, abstractmethod
from typing import Any, final
from event_schema.auth import UserLogin, UserLoginKey
from .base import AuthPluginMeta
class UserdataMixin(AuthPluginMeta, metaclass=ABCMeta):
"""Включает поддержку отправки данных о пользователе в сервис Userdata API
Подробнее о Userdata API: https://github.com/profcomff/userdata-api
"""
@staticmethod
@final
def generate_kafka_key(user_id: int) -> UserLoginKey:
"""
Мы генерируем ключи так как для сообщений с одинаковыми ключами
Kafka гарантирует последовательность чтений
Args:
user_id: Айди пользователя
Returns:
Ничего
"""
return UserLoginKey.model_validate({"user_id": user_id})
@classmethod
@abstractmethod
async def _convert_data_to_userdata_format(cls, data: Any) -> UserLogin:
raise NotImplementedError()
@classmethod
def userdata_process_empty_strings(cls, userdata: UserLogin) -> UserLogin:
"""Изменяет значения с пустыми строками в параметре категории юзердаты на None"""
for item in userdata.items:
if item.value == '':
item.value = None
return userdata
@classmethod
@abstractmethod
async def _delete_userdata(cls, user: User, *, db_session: DbSession) -> None:
"""Удаление данных пользователя
Args:
user (User): Объект пользователя
db_session (DbSession): Сессия базы данных
Returns:
Ничего?
"""
raise NotImplementedError()