-
Notifications
You must be signed in to change notification settings - Fork 63
Expand file tree
/
Copy pathclient.py
More file actions
191 lines (165 loc) · 6.98 KB
/
client.py
File metadata and controls
191 lines (165 loc) · 6.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import time
from urllib.parse import urlencode
import requests
from pipedrive import exceptions
from pipedrive.activities import Activities
from pipedrive.deals import Deals
from pipedrive.projects import Projects
from pipedrive.filters import Filters
from pipedrive.leads import Leads
from pipedrive.items import Items
from pipedrive.notes import Notes
from pipedrive.organizations import Organizations
from pipedrive.persons import Persons
from pipedrive.pipelines import Pipelines
from pipedrive.products import Products
from pipedrive.stages import Stages
from pipedrive.recents import Recents
from pipedrive.subscriptions import Subscriptions
from pipedrive.users import Users
from pipedrive.webhooks import Webhooks
class Client:
BASE_URL = "https://api.pipedrive.com/"
OAUTH_BASE_URL = "https://oauth.pipedrive.com/oauth/"
def __init__(self, client_id=None, client_secret=None, domain=None):
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.api_token = None
self.activities = Activities(self)
self.deals = Deals(self)
self.projects = Projects(self)
self.filters = Filters(self)
self.leads = Leads(self)
self.items = Items(self)
self.notes = Notes(self)
self.organizations = Organizations(self)
self.persons = Persons(self)
self.pipelines = Pipelines(self)
self.products = Products(self)
self.subscriptions = Subscriptions(self)
self.recents = Recents(self)
self.stages = Stages(self)
self.users = Users(self)
self.webhooks = Webhooks(self)
self.requests_session = requests.Session()
if domain:
if not domain.endswith("/"):
domain += "/"
self.BASE_URL = domain + "v1/"
else:
self.BASE_URL = self.BASE_URL + "v1/"
def authorization_url(self, redirect_uri, state=None):
params = {
"client_id": self.client_id,
"redirect_uri": redirect_uri,
}
if state is not None:
params["state"] = state
return self.OAUTH_BASE_URL + "authorize?" + urlencode(params)
def exchange_code(self, redirect_uri, code):
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
}
return self._post(
self.OAUTH_BASE_URL + "token",
data=data,
auth=(self.client_id, self.client_secret),
)
def refresh_token(self, refresh_token):
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
return self._post(
self.OAUTH_BASE_URL + "token",
data=data,
auth=(self.client_id, self.client_secret),
)
def set_access_token(self, access_token):
self.access_token = access_token
def set_api_token(self, api_token):
self.api_token = api_token
def _get(self, url, params=None, **kwargs):
return self._request("get", url, params=params, **kwargs)
def _post(self, url, **kwargs):
return self._request("post", url, **kwargs)
def _put(self, url, **kwargs):
return self._request("put", url, **kwargs)
def _patch(self, url, **kwargs):
return self._request("patch", url, **kwargs)
def _delete(self, url, **kwargs):
return self._request("delete", url, **kwargs)
def _request(self, method, url, headers=None, params=None, **kwargs):
_headers = {}
_params = {}
if self.access_token:
_headers["Authorization"] = "Bearer {}".format(self.access_token)
if self.api_token:
_params["api_token"] = self.api_token
if headers:
_headers.update(headers)
if params:
_params.update(params)
number_of_retries = kwargs.get('number_of_retries', 5)
intervaltime = kwargs.get('intervaltime', 2500)
# remove number of retries and intervaltime from kwargs, otherwise the requests call will fail.
if 'number_of_retries' in kwargs:
del kwargs['number_of_retries']
if 'intervaltime' in kwargs:
del kwargs['intervaltime']
if number_of_retries:
while number_of_retries > 0:
try:
response = self._parse(self.requests_session.request(method, url, headers=_headers, params=_params, **kwargs))
# No except, response is ok, return it.
return response
except (exceptions.BadRequestError, exceptions.UnauthorizedError, exceptions.NotFoundError,
exceptions.UnsupportedMediaTypeError, exceptions.UnprocessableEntityError,
exceptions.NotImplementedError) as e:
# Do not retry, just return the response.
raise e
except (exceptions.ForbiddenError, exceptions.InternalServerError, exceptions.ServiceUnavailableError,
exceptions.UnknownError, exceptions.TooManyRequestsError):
# Retry! There is hope.
number_of_retries -= 1
time.sleep(intervaltime / 1000.0)
else:
return self._parse(self.requests_session.request(method, url, headers=_headers, params=_params, **kwargs))
def _parse(self, response):
status_code = response.status_code
if "Content-Type" in response.headers and "application/json" in response.headers["Content-Type"]:
r = response.json()
else:
return response.text
if not response.ok:
error = None
if "error" in r:
error = r["error"]
if status_code == 400:
raise exceptions.BadRequestError(error, response)
elif status_code == 401:
raise exceptions.UnauthorizedError(error, response)
elif status_code == 403:
raise exceptions.ForbiddenError(error, response)
elif status_code == 404:
raise exceptions.NotFoundError(error, response)
elif status_code == 410:
raise exceptions.GoneError(error, response)
elif status_code == 415:
raise exceptions.UnsupportedMediaTypeError(error, response)
elif status_code == 422:
raise exceptions.UnprocessableEntityError(error, response)
elif status_code == 429:
raise exceptions.TooManyRequestsError(error, response)
elif status_code == 500:
raise exceptions.InternalServerError(error, response)
elif status_code == 501:
raise exceptions.NotImplementedError(error, response)
elif status_code == 503:
raise exceptions.ServiceUnavailableError(error, response)
else:
raise exceptions.UnknownError(error, response)
return r