-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathconn.py
More file actions
369 lines (293 loc) · 11.6 KB
/
conn.py
File metadata and controls
369 lines (293 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# btrdb.conn
# Connection related objects for the BTrDB library
#
# Author: PingThings
# Created: Fri Dec 21 14:57:30 2018 -0500
#
# For license information, see LICENSE.txt
# ID: conn.py [] allen@pingthings.io $
"""
Connection related objects for the BTrDB library
"""
##########################################################################
## Imports
##########################################################################
import os
import re
import json
import certifi
import uuid as uuidlib
import grpc
from grpc._cython.cygrpc import CompressionAlgorithm
from btrdb.stream import Stream, StreamSet
from btrdb.utils.general import unpack_stream_descriptor
from btrdb.utils.conversion import to_uuid
from btrdb.exceptions import StreamNotFoundError, InvalidOperation
##########################################################################
## Module Variables
##########################################################################
MIN_TIME = -(16 << 56)
MAX_TIME = 48 << 56
MAX_POINTWIDTH = 63
##########################################################################
## Classes
##########################################################################
class Connection(object):
def __init__(self, addrportstr, apikey=None):
"""
Connects to a BTrDB server
Parameters
----------
addrportstr: str
The address of the cluster to connect to, e.g 123.123.123:4411
apikey: str
The option API key to authenticate requests
"""
addrport = addrportstr.split(":", 2)
chan_ops = [('grpc.default_compression_algorithm', CompressionAlgorithm.gzip)]
if len(addrport) != 2:
raise ValueError("expecting address:port")
if addrport[1] == "4411":
# grpc bundles its own CA certs which will work for all normal SSL
# certificates but will fail for custom CA certs. Allow the user
# to specify a CA bundle via env var to overcome this
env_bundle = os.getenv("BTRDB_CA_BUNDLE", "")
# certifi certs are provided as part of this package install
# https://github.com/certifi/python-certifi
lib_certs = certifi.where()
ca_bundle = env_bundle
if ca_bundle == "":
ca_bundle = lib_certs
try:
with open(ca_bundle, "rb") as f:
contents = f.read()
except Exception:
if env_bundle != "":
# The user has given us something but we can't use it, we need to make noise
raise Exception("BTRDB_CA_BUNDLE(%s) env is defined but could not read file" % ca_bundle)
else:
contents = None
if apikey is None:
self.channel = grpc.secure_channel(
addrportstr,
grpc.ssl_channel_credentials(contents),
options=chan_ops
)
else:
self.channel = grpc.secure_channel(
addrportstr,
grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(contents),
grpc.access_token_call_credentials(apikey)
),
options=chan_ops
)
else:
if apikey is not None:
raise ValueError("cannot use an API key with an insecure (port 4410) BTrDB API. Try port 4411")
self.channel = grpc.insecure_channel(addrportstr, chan_ops)
class BTrDB(object):
"""
The primary server connection object for communicating with a BTrDB server.
"""
def __init__(self, endpoint):
self.ep = endpoint
def query(self, stmt, params=[]):
"""
Performs a SQL query on the database metadata and returns a list of
dictionaries from the resulting cursor.
Parameters
----------
stmt: str
a SQL statement to be executed on the BTrDB metadata. Available
tables are noted below. To sanitize inputs use a `$1` style parameter such as
`select * from streams where name = $1 or name = $2`.
params: list or tuple
a list of parameter values to be sanitized and interpolated into the
SQL statement. Using parameters forces value/type checking and is
considered a best practice at the very least.
Returns
-------
list
a list of dictionary object representing the cursor results.
Notes
-------
Parameters will be inserted into the SQL statement as noted by the
paramter number such as `$1`, `$2`, or `$3`. The `streams` table is
available for `SELECT` statements only.
See https://btrdb.readthedocs.io/en/latest/ for more info.
"""
return [
json.loads(row.decode("utf-8"))
for page in self.ep.sql_query(stmt, params)
for row in page
]
def streams(self, *identifiers, versions=None, is_collection_prefix=False):
"""
Returns a StreamSet object with BTrDB streams from the supplied
identifiers. If any streams cannot be found matching the identifier
than StreamNotFoundError will be returned.
Parameters
----------
identifiers: str or UUID
a single item or iterable of items which can be used to query for
streams. identiers are expected to be UUID as string, UUID as UUID,
or collection/name string.
versions: list[int]
a single or iterable of version numbers to match the identifiers
"""
if versions is not None and not isinstance(versions, list):
raise TypeError("versions argument must be of type list")
if versions and len(versions) != len(identifiers):
raise ValueError("number of versions does not match identifiers")
streams = []
for ident in identifiers:
if isinstance(ident, uuidlib.UUID):
streams.append(self.stream_from_uuid(ident))
continue
if isinstance(ident, str):
# attempt UUID lookup
pattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
if re.match(pattern, ident):
streams.append(self.stream_from_uuid(ident))
continue
# attempt collection/name lookup
if "/" in ident:
parts = ident.split("/")
found = self.streams_in_collection(
"/".join(parts[:-1]),
is_collection_prefix=is_collection_prefix,
tags={"name": parts[-1]}
)
if len(found) == 1:
streams.append(found[0])
continue
raise StreamNotFoundError(f"Could not identify stream `{ident}`")
raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.")
obj = StreamSet(streams)
if versions:
version_dict = {streams[idx].uuid: versions[idx] for idx in range(len(versions))}
obj.pin_versions(version_dict)
return obj
def stream_from_uuid(self, uuid):
"""
Creates a stream handle to the BTrDB stream with the UUID `uuid`. This
method does not check whether a stream with the specified UUID exists.
It is always good form to check whether the stream existed using
`stream.exists()`.
Parameters
----------
uuid: UUID
The uuid of the requested stream.
Returns
-------
Stream
instance of Stream class or None
"""
return Stream(self, to_uuid(uuid))
def create(self, uuid, collection, tags=None, annotations=None):
"""
Tells BTrDB to create a new stream with UUID `uuid` in `collection` with specified `tags` and `annotations`.
Parameters
----------
uuid: UUID
The uuid of the requested stream.
Returns
-------
Stream
instance of Stream class
"""
if tags is None:
tags = {}
if annotations is None:
annotations = {}
self.ep.create(uuid, collection, tags, annotations)
return Stream(self, uuid,
known_to_exist=True,
collection=collection,
tags=tags.copy(),
annotations=annotations.copy(),
property_version=0
)
def info(self):
"""
Returns information about the connected BTrDB srerver.
Returns
-------
dict
server connection and status information
"""
info = self.ep.info()
return {
"majorVersion": info.majorVersion,
"build": info.build,
"proxy": { "proxyEndpoints": [ep for ep in info.proxy.proxyEndpoints] },
}
def list_collections(self, starts_with=""):
"""
Returns a list of collection paths using the `starts_with` argument for
filtering.
Returns
-------
collection paths: list[str]
"""
return [c for some in self.ep.listCollections(starts_with) for c in some]
def streams_in_collection(self, *collection, is_collection_prefix=True, tags=None, annotations=None):
"""
Search for streams matching given parameters
This function allows for searching
Parameters
----------
collection: str
collections to use when searching for streams, case sensitive.
is_collection_prefix: bool
Whether the collection is a prefix.
tags: Dict[str, str]
The tags to identify the stream.
annotations: Dict[str, str]
The annotations to identify the stream.
Returns
------
list
A list of stream objects found with the provided search arguments.
"""
result = []
if tags is None:
tags = {}
if annotations is None:
annotations = {}
if not collection:
collection = [None]
for item in collection:
streams = self.ep.lookupStreams(item, is_collection_prefix, tags, annotations)
for desclist in streams:
for desc in desclist:
tagsanns = unpack_stream_descriptor(desc)
result.append(Stream(
self, uuidlib.UUID(bytes = desc.uuid),
known_to_exist=True, collection=desc.collection,
tags=tagsanns[0], annotations=tagsanns[1],
property_version=desc.propertyVersion
))
return result
def collection_metadata(self, prefix):
"""
Gives statistics about metadata for collections that match a
prefix.
Parameters
----------
prefix: str
A prefix of the collection names to look at
Returns
-------
tuple
A tuple of dictionaries containing metadata on the streams in the
provided collection.
"""
ep = self.ep
tags, annotations = ep.getMetadataUsage(prefix)
pyTags = {tag.key: tag.count for tag in tags}
pyAnn = {ann.key: ann.count for ann in annotations}
return pyTags, pyAnn
def __reduce__(self):
raise InvalidOperation("BTrDB object cannot be reduced.")