-
Notifications
You must be signed in to change notification settings - Fork 348
Expand file tree
/
Copy path__init__.py
More file actions
90 lines (87 loc) · 2.34 KB
/
__init__.py
File metadata and controls
90 lines (87 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from .aggs import AggsClient
from .futures import FuturesClient
from .financials import FinancialsClient
from .benzinga import BenzingaClient
from .economy import EconomyClient
from .etf_global import EtfGlobalClient
from .tmx import TmxClient
from .trades import TradesClient
from .quotes import QuotesClient
from .snapshot import SnapshotClient
from .indicators import IndicatorsClient
from .summaries import SummariesClient
from .reference import (
MarketsClient,
TickersClient,
SplitsClient,
DividendsClient,
ConditionsClient,
ExchangesClient,
ContractsClient,
)
from .vX import VXClient
from typing import Optional, Any, Dict
import os
BASE = "https://api.massive.com"
ENV_KEY = "MASSIVE_API_KEY"
class RESTClient(
AggsClient,
FuturesClient,
FinancialsClient,
BenzingaClient,
EconomyClient,
EtfGlobalClient,
TmxClient,
TradesClient,
QuotesClient,
SnapshotClient,
MarketsClient,
TickersClient,
SplitsClient,
DividendsClient,
ConditionsClient,
ExchangesClient,
ContractsClient,
IndicatorsClient,
SummariesClient,
):
def __init__(
self,
api_key: Optional[str] = os.getenv(ENV_KEY),
connect_timeout: float = 10.0,
read_timeout: float = 10.0,
num_pools: int = 10,
retries: int = 3,
base: str = BASE,
pagination: bool = True,
verbose: bool = False,
trace: bool = False,
custom_json: Optional[Any] = None,
connection_pool_kw: Optional[Dict[str, Any]] = None,
):
super().__init__(
api_key=api_key,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
num_pools=num_pools,
retries=retries,
base=base,
pagination=pagination,
verbose=verbose,
trace=trace,
custom_json=custom_json,
connection_pool_kw=connection_pool_kw,
)
self.vx = VXClient(
api_key=api_key,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
num_pools=num_pools,
retries=retries,
base=base,
pagination=pagination,
verbose=verbose,
trace=trace,
custom_json=custom_json,
connection_pool_kw=connection_pool_kw,
)