-
Notifications
You must be signed in to change notification settings - Fork 254
Expand file tree
/
Copy pathcredentials.py
More file actions
210 lines (176 loc) · 7.46 KB
/
credentials.py
File metadata and controls
210 lines (176 loc) · 7.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# -*- coding: utf-8 -*-
# Copyright 2017 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
"""This module handles credential management and parsing for the API. As we
have multiple Search products with different authentication schemes, we try to
provide some flexibility to make this process easier. We suggest putting your
credentials in a YAML file, but the main function in this module,
``load_credentials``, will parse environment variables as well.
"""
import os
import logging
import yaml
import requests
import base64
from .utils import merge_dicts
OAUTH_ENDPOINT = 'https://api.twitter.com/oauth2/token'
__all__ = ["load_credentials"]
logger = logging.getLogger(__name__)
def _load_yaml_credentials(filename=None, yaml_key=None):
"""Loads and parses credentials in a YAML file. Catches common exceptions
and returns an empty dict on error, which will be handled downstream.
Returns:
dict: parsed credentials or {}
"""
try:
with open(os.path.expanduser(filename)) as f:
search_creds = yaml.safe_load(f)[yaml_key]
except FileNotFoundError:
logger.error("cannot read file {}".format(filename))
search_creds = {}
except KeyError:
logger.error("{} is missing the provided key: {}"
.format(filename, yaml_key))
search_creds = {}
return search_creds
def _load_env_credentials():
vars_ = (
"SEARCHTWEETS_ENDPOINT",
"SEARCHTWEETS_ACCOUNT",
"SEARCHTWEETS_USERNAME",
"SEARCHTWEETS_PASSWORD",
"SEARCHTWEETS_BEARER_TOKEN",
"SEARCHTWEETS_ACCOUNT_TYPE",
"SEARCHTWEETS_CONSUMER_KEY",
"SEARCHTWEETS_CONSUMER_SECRET"
)
parsed = {}
for var in vars_:
key = var.replace('SEARCHTWEETS_', '').lower()
try:
parsed[key] = os.environ[var]
except KeyError:
pass
return parsed
def _parse_credentials(search_creds, account_type):
if account_type is None:
account_type = search_creds.get("account_type", None)
# attempt to infer account type
if account_type is None:
if search_creds.get("bearer_token") is not None:
account_type = "premium"
elif search_creds.get("password") is not None:
account_type = "enterprise"
else:
pass
if account_type not in {"premium", "enterprise"}:
msg = """Account type is not specified and cannot be inferred.
Please check your credential file, arguments, or environment variables
for issues. The account type must be 'premium' or 'enterprise'.
"""
logger.error(msg)
raise KeyError
try:
if account_type == "premium":
if "bearer_token" not in search_creds:
if "consumer_key" in search_creds \
and "consumer_secret" in search_creds:
search_creds["bearer_token"] = _generate_bearer_token(
search_creds["consumer_key"],
search_creds["consumer_secret"])
search_args = {
"bearer_token": search_creds["bearer_token"],
"endpoint": search_creds["endpoint"],
"extra_headers_dict": search_creds.get("extra_headers",None)}
if account_type == "enterprise":
search_args = {"username": search_creds["username"],
"password": search_creds["password"],
"endpoint": search_creds["endpoint"]}
except KeyError:
logger.error("Your credentials are not configured correctly and "
" you are missing a required field. Please see the "
" readme for proper configuration")
raise KeyError
return search_args
def load_credentials(filename=None, account_type=None,
yaml_key=None, env_overwrite=True):
"""
Handles credential management. Supports both YAML files and environment
variables. A YAML file is preferred for simplicity and configurability.
A YAML credential file should look something like this:
.. code:: yaml
<KEY>:
endpoint: <FULL_URL_OF_ENDPOINT>
username: <USERNAME>
password: <PW>
consumer_key: <KEY>
consumer_secret: <SECRET>
bearer_token: <TOKEN>
account_type: <enterprise OR premium>
extra_headers:
<MY_HEADER_KEY>: <MY_HEADER_VALUE>
with the appropriate fields filled out for your account. The top-level key
defaults to ``search_tweets_api`` but can be flexible.
If a YAML file is not found or is missing keys, this function will check
for this information in the environment variables that correspond to
.. code: yaml
SEARCHTWEETS_ENDPOINT
SEARCHTWEETS_USERNAME
SEARCHTWEETS_PASSWORD
SEARCHTWEETS_BEARER_TOKEN
SEARCHTWEETS_ACCOUNT_TYPE
...
Again, set the variables that correspond to your account information and
type. See the main documentation for details and more examples.
Args:
filename (str): pass a filename here if you do not want to use the
default ``~/.twitter_keys.yaml``
account_type (str): your account type, "premium" or "enterprise". We
will attempt to infer the account info if left empty.
yaml_key (str): the top-level key in the YAML file that has your
information. Defaults to ``search_tweets_api``.
env_overwrite: any found environment variables will overwrite values
found in a YAML file. Defaults to ``True``.
Returns:
dict: your access credentials.
Example:
>>> from searchtweets.api_utils import load_credentials
>>> search_args = load_credentials(account_type="premium",
env_overwrite=False)
>>> search_args.keys()
dict_keys(['bearer_token', 'endpoint'])
>>> import os
>>> os.environ["SEARCHTWEETS_ENDPOINT"] = "https://endpoint"
>>> os.environ["SEARCHTWEETS_USERNAME"] = "areallybadpassword"
>>> os.environ["SEARCHTWEETS_PASSWORD"] = "<PW>"
>>> load_credentials()
{'endpoint': 'https://endpoint',
'password': '<PW>',
'username': 'areallybadpassword'}
"""
yaml_key = yaml_key if yaml_key is not None else "search_tweets_api"
filename = "~/.twitter_keys.yaml" if filename is None else filename
yaml_vars = _load_yaml_credentials(filename=filename, yaml_key=yaml_key)
if not yaml_vars:
logger.warning("Error parsing YAML file; searching for "
"valid environment variables")
env_vars = _load_env_credentials()
merged_vars = (merge_dicts(yaml_vars, env_vars)
if env_overwrite
else merge_dicts(env_vars, yaml_vars))
parsed_vars = _parse_credentials(merged_vars, account_type=account_type)
return parsed_vars
def _generate_bearer_token(consumer_key, consumer_secret):
"""
Return the bearer token for a given pair of consumer key and secret values.
"""
data = [('grant_type', 'client_credentials')]
resp = requests.post(OAUTH_ENDPOINT,
data=data,
auth=(consumer_key, consumer_secret))
logger.warning("Grabbing bearer token from OAUTH")
if resp.status_code >= 400:
logger.error(resp.text)
resp.raise_for_status()
return resp.json()['access_token']