-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpublisher_client.py
More file actions
63 lines (50 loc) · 2.72 KB
/
publisher_client.py
File metadata and controls
63 lines (50 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""Usage
>>> from uid2_client import Uid2PublisherClient
"""
import datetime as dt
from datetime import timezone
from .encryption import _decrypt_gcm
from .request_response_util import *
from .token_generate_response import TokenGenerateResponse
from .token_refresh_response import TokenRefreshResponse
from .input_util import base64_to_byte_array
class Uid2PublisherClient:
"""Client for interacting with UID2 publisher services.
You will need to have the base URL of the endpoint and a client key pair (auth/secret)
to consume web services.
Methods:
generate_token: generate an advertising token from an email, phone #, or hash
refresh_token: refresh an advertising token
Examples:
Connect to the UID2 service and obtain the latest encryption keys:
>>> from uid2_client import *
>>> client = Uid2PublisherClient('https://prod.uidapi.com', 'my-authorization-key', 'my-secret-key')
>>> response = client.generate_token(TokenGenerateInput.from_email("test@email.com"))
>>> new_token = client.refresh_token(response.get_identity())
"""
def __init__(self, base_url, auth_key, secret_key):
"""Create a new Uid2PublisherClient client.
Args:
base_url (str): base URL for all requests to UID2 services (e.g. 'https://prod.uidapi.com')
auth_key (str): authorization key for consuming the UID2 services
secret_key (str): secret key for consuming the UID2 services
Note:
Your authorization key will determine which UID2 services you are allowed to use.
"""
self._base_url = base_url
self._auth_key = auth_key
self._secret_key = base64.b64decode(secret_key)
def generate_token(self, token_generate_input):
req, nonce = make_v2_request(self._secret_key, dt.datetime.now(tz=timezone.utc),
token_generate_input.get_as_json_string().encode())
resp = post(self._base_url, '/v2/token/generate', headers=auth_headers(self._auth_key), data=req)
resp.raise_for_status()
resp_body = parse_v2_response(self._secret_key, resp.text, nonce)
return TokenGenerateResponse(resp_body)
def refresh_token(self, current_identity):
resp = post(self._base_url, '/v2/token/refresh', headers=auth_headers(self._auth_key),
data=current_identity.get_refresh_token().encode())
resp.raise_for_status()
resp_bytes = base64_to_byte_array(resp.text)
decrypted = _decrypt_gcm(resp_bytes, base64_to_byte_array(current_identity.get_refresh_response_key()))
return TokenRefreshResponse(decrypted.decode(), dt.datetime.now(tz=timezone.utc))