-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtelegram.py
More file actions
186 lines (160 loc) · 7.97 KB
/
telegram.py
File metadata and controls
186 lines (160 loc) · 7.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import hashlib
import hmac
import logging
from typing import Any
from urllib.parse import unquote
import jwt
from event_schema.auth import UserLogin
from fastapi import Depends
from fastapi.background import BackgroundTasks
from fastapi_sqlalchemy import db
from pydantic import BaseModel
from auth_backend.auth_method import AuthPluginMeta, OauthMeta, Session
from auth_backend.exceptions import AlreadyExists, OauthAuthFailed
from auth_backend.kafka.kafka import get_kafka_producer
from auth_backend.models.db import User, UserSession
from auth_backend.schemas.types.scopes import Scope
from auth_backend.settings import Settings
from auth_backend.utils.security import UnionAuth
from auth_backend.utils.string import concantenate_strings
logger = logging.getLogger(__name__)
class TelegramSettings(Settings):
TELEGRAM_REDIRECT_URL: str = "https://app.test.profcomff.com/auth"
TELEGRAM_BOT_TOKEN: str | None = None # Сделал так для тестов, однако токен нужен для работы авторизации!
class TelegramAuth(OauthMeta):
"""Вход в приложение 'Твой ФФ' через Telegram Login Widget."""
prefix = '/telegram'
tags = ['Telegram']
settings = TelegramSettings()
class TGAuthResponseSchema(BaseModel):
id: str
first_name: str
last_name: str | None = None
username: str | None = None
photo_url: str | None = None
auth_date: str
hash: str
scopes: list[Scope] | None = None # Телеграм не передает это поле. Осталось "исторически".
session_name: str | None = None # Телеграм не передает это поле. Осталось "исторически".
@classmethod
async def _register(
cls,
user_inp: TGAuthResponseSchema,
background_tasks: BackgroundTasks,
user_session: UserSession = Depends(UnionAuth(auto_error=True, scopes=[], allow_none=True)),
) -> Session:
"""Добавление метода аутентификации через (виджет) Телеграма."""
old_user = None
new_user = {}
# Проверяем получение корректных данных
userinfo = await cls._check(user_inp)
tg_user_id = userinfo['id']
user = await cls._get_user('user_id', tg_user_id, db_session=db.session)
if user is not None:
raise AlreadyExists(User, user.id)
if user_session is None:
user = await cls._create_user(db_session=db.session)
else:
user = user_session.user
old_user = {'user_id': user.id}
new_user["user_id"] = user.id
tg_auth = cls.create_auth_method_param('user_id', tg_user_id, user.id, db_session=db.session)
new_user[cls.get_name()] = {"user_id": tg_auth.value}
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
TelegramAuth.generate_kafka_key(user.id),
userdata,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
user,
user_inp.scopes,
db_session=db.session,
session_name=user_inp.session_name,
)
@classmethod
async def _login(cls, user_inp: TGAuthResponseSchema, background_tasks: BackgroundTasks) -> Session:
"""Вход в пользователя с помощью аккаунта ТГ.
Производит вход, если находит пользователя по id (из Телеграма). Если аккаунт не
найден, возвращает ошибка.
"""
userinfo = await cls._check(user_inp)
telegram_user_id = user_inp.id
logger.debug(userinfo)
user = await cls._get_user('user_id', telegram_user_id, db_session=db.session)
if not user:
id_token = jwt.encode(userinfo, cls.settings.ENCRYPTION_KEY, algorithm="HS256")
raise OauthAuthFailed(
'No users found for Telegram account', 'Не найдено пользователей с таким ТГ аккаунтом', id_token
)
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
TelegramAuth.generate_kafka_key(user.id),
userdata,
)
return await cls._create_session(
user,
user_inp.scopes,
db_session=db.session,
session_name=user_inp.session_name,
)
@classmethod
async def _redirect_url(cls):
"""URL на который происходит редирект после завершения входа на стороне провайдера.
В данном случае не предполагается к использованию, т.к. данный URL вшит в виджет.
"""
return OauthMeta.UrlSchema(url=cls.settings.TELEGRAM_REDIRECT_URL)
@classmethod
async def _auth_url(cls):
"""URL на который происходит редирект из приложения, чтобы авторизоваться на стороне провайдера.
В данном случае не предполагается, т.к. URL вшит в виджет. Отдается атрибут src виджета.
"""
return OauthMeta.UrlSchema(url='https://telegram.org/js/telegram-widget.js?22')
@classmethod
async def _check(cls, user_inp):
"""Проверка данных пользователя.
https://core.telegram.org/widgets/login#checking-authorization
"""
data_check = {
'id': user_inp.id,
'first_name': user_inp.first_name,
'last_name': user_inp.last_name,
'username': user_inp.username,
'photo_url': user_inp.photo_url,
'auth_date': user_inp.auth_date,
}
check_hash = user_inp.hash
data_check_string = ''
for k, v in sorted(data_check.items()):
if v is None:
continue
data_check_string += f'{unquote(k)}={unquote(v)}\n'
data_check_string = data_check_string.rstrip('\n')
secret_key = hashlib.sha256(str.encode(cls.settings.TELEGRAM_BOT_TOKEN)).digest()
signing = hmac.new(secret_key, msg=str.encode(data_check_string), digestmod=hashlib.sha256).hexdigest()
if signing == check_hash:
return data_check
else:
raise OauthAuthFailed('Invalid user data from Telegram', 'Неправильные учетные данные')
@classmethod
async def _convert_data_to_userdata_format(cls, data: dict[str, Any]) -> UserLogin:
"""Конвертация данных в формат для userdata-api."""
first_name, last_name = '', ''
if 'first_name' in data.keys() and data['first_name'] is not None:
first_name = data['first_name']
if 'last_name' in data.keys() and data['last_name'] is not None:
last_name = data['last_name']
full_name = concantenate_strings([first_name, last_name])
if not full_name:
full_name = None
items = [
{"category": "Личная информация", "param": "Полное имя", "value": full_name},
{"category": "Контакты", "param": "Имя пользователя Telegram", "value": data.get("username")},
{"category": "Личная информация", "param": "Фото", "value": data.get("photo_url")},
]
result = {"items": items, "source": cls.get_name()}
return cls.userdata_process_empty_strings(UserLogin.model_validate(result))