Skip to content

Commit 9ba93aa

Browse files
committed
session management, retry logic, response parsing, cursor pagination, batch operations performance improvements
1 parent bc7dac2 commit 9ba93aa

2 files changed

Lines changed: 105 additions & 57 deletions

File tree

jupiterone/client.py

Lines changed: 103 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
""" Python SDK for JupiterOne GraphQL API """
2-
3-
# pylint: disable=W0212,no-name-in-module
4-
# see https://github.com/PyCQA/pylint/issues/409
5-
62
import json
73
from warnings import warn
8-
from typing import Dict, List, Union
4+
from typing import Dict, List, Union, Optional
95
from datetime import datetime
106
import time
11-
127
import re
138
import requests
149
from requests.adapters import HTTPAdapter, Retry
1510
from retrying import retry
11+
import concurrent.futures
12+
import aiohttp
13+
import asyncio
1614

1715
from jupiterone.errors import (
1816
JupiterOneClientError,
@@ -60,7 +58,6 @@
6058
UPSERT_PARAMETER,
6159
)
6260

63-
6461
def retry_on_429(exc):
6562
"""Used to trigger retry on rate limit"""
6663
return isinstance(exc, JupiterOneApiRetryError)
@@ -74,13 +71,6 @@ class JupiterOneClient:
7471
DEFAULT_URL = "https://graphql.us.jupiterone.io"
7572
SYNC_API_URL = "https://api.us.jupiterone.io"
7673

77-
RETRY_OPTS = {
78-
"wait_exponential_multiplier": 1000,
79-
"wait_exponential_max": 10000,
80-
"stop_max_delay": 300000,
81-
"retry_on_exception": retry_on_429,
82-
}
83-
8474
def __init__(
8575
self,
8676
account: str = None,
@@ -97,6 +87,16 @@ def __init__(
9787
"JupiterOne-Account": self.account,
9888
"Content-Type": "application/json",
9989
}
90+
91+
# Initialize session with retry logic
92+
self.session = requests.Session()
93+
retries = Retry(
94+
total=5,
95+
backoff_factor=1,
96+
status_forcelist=[429, 502, 503, 504],
97+
allowed_methods=["POST", "GET"]
98+
)
99+
self.session.mount("https://", HTTPAdapter(max_retries=retries))
100100

101101
@property
102102
def account(self):
@@ -123,7 +123,6 @@ def token(self, value: str):
123123
self._token = value
124124

125125
# pylint: disable=R1710
126-
@retry(**RETRY_OPTS)
127126
def _execute_query(self, query: str, variables: Dict = None) -> Dict:
128127
"""Executes query against graphql endpoint"""
129128

@@ -134,28 +133,26 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict:
134133
# Always ask for variableResultSize
135134
data.update(flags={"variableResultSize": True})
136135

137-
# initiate requests session and implement retry logic of 5 request retries with 1 second between
138-
s = requests.Session()
139-
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
140-
s.mount("https://", HTTPAdapter(max_retries=retries))
141-
142-
response = s.post(self.graphql_url, headers=self.headers, json=data, timeout=60)
136+
response = self.session.post(
137+
self.graphql_url,
138+
headers=self.headers,
139+
json=data,
140+
timeout=60
141+
)
143142

144143
# It is still unclear if all responses will have a status
145144
# code of 200 or if 429 will eventually be used to
146145
# indicate rate limits being hit. J1 devs are aware.
147146
if response.status_code == 200:
148-
if response._content:
149-
content = json.loads(response._content)
150-
if "errors" in content:
151-
errors = content["errors"]
152-
if len(errors) == 1:
153-
if "429" in errors[0]["message"]:
154-
raise JupiterOneApiRetryError(
155-
"JupiterOne API rate limit exceeded"
156-
)
157-
raise JupiterOneApiError(content.get("errors"))
158-
return response.json()
147+
content = response.json()
148+
if "errors" in content:
149+
errors = content["errors"]
150+
if len(errors) == 1 and "429" in errors[0]["message"]:
151+
raise JupiterOneApiRetryError(
152+
"JupiterOne API rate limit exceeded"
153+
)
154+
raise JupiterOneApiError(content.get("errors"))
155+
return content
159156

160157
elif response.status_code == 401:
161158
raise JupiterOneApiError(
@@ -176,18 +173,24 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict:
176173
if isinstance(content, (bytes, bytearray)):
177174
content = content.decode("utf-8")
178175
if "application/json" in response.headers.get("Content-Type", "text/plain"):
179-
data = json.loads(content)
176+
data = response.json()
180177
content = data.get("error", data.get("errors", content))
181178
raise JupiterOneApiError("{}:{}".format(response.status_code, content))
182179

183180
def _cursor_query(
184-
self, query: str, cursor: str = None, include_deleted: bool = False
181+
self,
182+
query: str,
183+
cursor: str = None,
184+
include_deleted: bool = False,
185+
max_workers: Optional[int] = None
185186
) -> Dict:
186-
"""Performs a V1 graph query using cursor pagination
187+
"""Performs a V1 graph query using cursor pagination with optional parallel processing
188+
187189
args:
188190
query (str): Query text
189191
cursor (str): A pagination cursor for the initial query
190192
include_deleted (bool): Include recently deleted entities in query/search
193+
max_workers (int, optional): Maximum number of parallel workers for fetching pages
191194
"""
192195

193196
# If the query itself includes a LIMIT then we must parse that and check if we've reached
@@ -200,33 +203,77 @@ def _cursor_query(
200203
result_limit = False
201204

202205
results: List = []
203-
while True:
206+
207+
def fetch_page(cursor: Optional[str] = None) -> Dict:
204208
variables = {"query": query, "includeDeleted": include_deleted}
205-
206209
if cursor is not None:
207210
variables["cursor"] = cursor
211+
return self._execute_query(query=CURSOR_QUERY_V1, variables=variables)
208212

209-
response = self._execute_query(query=CURSOR_QUERY_V1, variables=variables)
210-
data = response["data"]["queryV1"]["data"]
213+
# First page to get initial cursor and data
214+
response = fetch_page(cursor)
215+
data = response["data"]["queryV1"]["data"]
211216

212-
# This means it's a "TREE" query and we have everything
213-
if "vertices" in data and "edges" in data:
214-
return data
217+
# This means it's a "TREE" query and we have everything
218+
if "vertices" in data and "edges" in data:
219+
return data
215220

216-
results.extend(data)
217-
218-
if result_limit and len(results) >= result_limit:
219-
# We can stop paginating if we've collected enough results based on the requested limit
220-
break
221-
elif (
222-
"cursor" in response["data"]["queryV1"]
223-
and response["data"]["queryV1"]["cursor"] is not None
224-
):
225-
# We got a cursor and haven't collected enough results
221+
results.extend(data)
222+
223+
# If no cursor or we've hit the limit, return early
224+
if not response["data"]["queryV1"].get("cursor") or (result_limit and len(results) >= result_limit):
225+
return {"data": results[:result_limit] if result_limit else results}
226+
227+
# If parallel processing is enabled and we have more pages to fetch
228+
if max_workers and max_workers > 1:
229+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
230+
future_to_cursor = {
231+
executor.submit(fetch_page, response["data"]["queryV1"]["cursor"]):
232+
response["data"]["queryV1"]["cursor"]
233+
}
234+
235+
while future_to_cursor:
236+
# Wait for the next future to complete
237+
done, _ = concurrent.futures.wait(
238+
future_to_cursor,
239+
return_when=concurrent.futures.FIRST_COMPLETED
240+
)
241+
242+
for future in done:
243+
cursor = future_to_cursor.pop(future)
244+
try:
245+
response = future.result()
246+
page_data = response["data"]["queryV1"]["data"]
247+
results.extend(page_data)
248+
249+
# Check if we need to fetch more pages
250+
if (result_limit and len(results) >= result_limit) or \
251+
not response["data"]["queryV1"].get("cursor"):
252+
# Cancel remaining futures
253+
for f in future_to_cursor:
254+
f.cancel()
255+
future_to_cursor.clear()
256+
break
257+
258+
# Schedule next page fetch
259+
next_cursor = response["data"]["queryV1"]["cursor"]
260+
future_to_cursor[executor.submit(fetch_page, next_cursor)] = next_cursor
261+
262+
except Exception as e:
263+
# Log error but continue with other pages
264+
print(f"Error fetching page with cursor {cursor}: {str(e)}")
265+
else:
266+
# Sequential processing
267+
while True:
226268
cursor = response["data"]["queryV1"]["cursor"]
227-
else:
228-
# No cursor returned so we're done
229-
break
269+
response = fetch_page(cursor)
270+
data = response["data"]["queryV1"]["data"]
271+
results.extend(data)
272+
273+
if result_limit and len(results) >= result_limit:
274+
break
275+
elif not response["data"]["queryV1"].get("cursor"):
276+
break
230277

231278
# If we detected an inline LIMIT make sure we only return that many results
232279
if result_limit:
@@ -343,7 +390,7 @@ def query_with_deferred_response(self, query, cursor=None):
343390
break
344391

345392
else:
346-
print(f"Request failed after {max_retries} attempts. Status: {response.status_code}")
393+
print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}")
347394

348395
return all_query_results
349396

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
retrying>=1.3.4,<2
2-
requests>=2,<3
2+
requests>=2.31.0,<3
3+
aiohttp>=3.9.1,<4

0 commit comments

Comments
 (0)