Skip to content

Commit 6c429d4

Browse files
authored
Add secure tunneling command to cli (#402)
Add new command for secure tunneling to Databricks clusters on select environments.
1 parent 4e74b10 commit 6c429d4

25 files changed

Lines changed: 1176 additions & 24 deletions

databricks_cli/clusters/cli.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2121
# See the License for the specific language governing permissions and
2222
# limitations under the License.
23-
23+
import sys
2424
import time
2525
from datetime import datetime
2626
from json import loads as json_loads
@@ -331,6 +331,41 @@ def cluster_events_cli(api_client, cluster_id, start_time, end_time, order, even
331331
click.echo(tabulate(_cluster_events_to_table(events_json), tablefmt='plain'))
332332

333333

334+
@click.command(context_settings=CONTEXT_SETTINGS)
335+
@click.option('--cluster-id', cls=OneOfOption, one_of=CLUSTER_OPTIONS,
336+
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help)
337+
@click.option('--cluster-name', cls=OneOfOption, one_of=CLUSTER_OPTIONS,
338+
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help)
339+
@click.option('--local-port', type=click.INT,
340+
help="The local port to use for the local tunneling server")
341+
@click.option('--debug', '-d', is_flag=True, help="Run the tunnel in debug mode")
342+
@profile_option
343+
@eat_exceptions
344+
@provide_api_client
345+
def tunnel_cli(api_client, cluster_id, cluster_name, local_port, debug):
346+
"""
347+
[Alpha] Start a secure TCP tunnel to a cluster over Databricks' identity proxy.
348+
"""
349+
if sys.version_info < (3, 6):
350+
raise RuntimeError("The tunneling command is not supported on Python version < 3.6")
351+
if not api_client.token:
352+
raise RuntimeError("The tunneling cli only supports personal token authentication.")
353+
354+
if cluster_id:
355+
pass
356+
elif cluster_name:
357+
cluster = ClusterApi(api_client).get_cluster_by_name(cluster_name)
358+
cluster_id = cluster["cluster_id"]
359+
else:
360+
raise RuntimeError('cluster_name and cluster_id must not be empty!')
361+
362+
# TODO(tunneling-cli): move this up once we support python3 only
363+
from databricks_cli.tunnel.api import TunnelApi
364+
365+
click.echo("Starting a secure tunnel to cluster with ID: {}...".format(cluster_id))
366+
TunnelApi(api_client, cluster_id, debug=debug).start_tunneling(local_port=local_port)
367+
368+
334369
@click.group(context_settings=CONTEXT_SETTINGS,
335370
short_help='Utility to interact with Databricks clusters.')
336371
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,
@@ -358,3 +393,4 @@ def clusters_group(): # pragma: no cover
358393
clusters_group.add_command(spark_versions_cli, name='spark-versions')
359394
clusters_group.add_command(permanent_delete_cli, name='permanent-delete')
360395
clusters_group.add_command(cluster_events_cli, name='events')
396+
clusters_group.add_command(tunnel_cli, name='tunnel')

databricks_cli/commands/__init__.py

Whitespace-only changes.

databricks_cli/commands/api.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Databricks CLI
2+
# Copyright 2021 Databricks, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"), except
5+
# that the use of services to which certain application programming
6+
# interfaces (each, an "API") connect requires that the user first obtain
7+
# a license for the use of the APIs from Databricks, Inc. ("Databricks"),
8+
# by creating an account at www.databricks.com and agreeing to either (a)
9+
# the Community Edition Terms of Service, (b) the Databricks Terms of
10+
# Service, or (c) another written agreement between Licensee and Databricks
11+
# for the use of the APIs.
12+
#
13+
# You may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
from tenacity import stop_after_attempt, retry, wait_random_exponential, retry_if_result
24+
25+
from databricks_cli.sdk.v1_service import CommandService, ExecutionContextService
26+
27+
28+
def command_is_not_terminated(resp):
29+
return resp["status"] not in {"Finished", "Cancelled", "Error"}
30+
31+
32+
class CommandApi(CommandService):
33+
@retry(retry=retry_if_result(command_is_not_terminated),
34+
wait=wait_random_exponential(multiplier=1, max=30))
35+
def wait_command_until_terminated(self, cluster_id, context_id, command_id):
36+
return self.get_command_status(cluster_id, context_id, command_id)
37+
38+
def execute_command_until_terminated(self, language, cluster_id, context_id, command):
39+
resp = self.execute_command(language=language, cluster_id=cluster_id,
40+
context_id=context_id, command=command)
41+
command_id = resp["id"]
42+
return self.wait_command_until_terminated(cluster_id=cluster_id,
43+
context_id=context_id, command_id=command_id)
44+
45+
46+
class ExecutionContextApi(ExecutionContextService):
47+
# Sometimes cluster is already in the status="RUNNING", however it couldn't provide the
48+
# execution context to make the execute command stable yet.
49+
@retry(stop=stop_after_attempt(10), wait=wait_random_exponential(multiplier=5, max=30))
50+
def create_context(self, language, cluster_id):
51+
return super().create_context(language, cluster_id)

databricks_cli/sdk/api_client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,17 @@ def __init__(self, user=None, password=None, host=None, token=None,
104104
self.verify = verify
105105
self.api_version = api_version
106106
self.jobs_api_version = jobs_api_version
107+
self.host = host
108+
self.token = token
107109

108110
def close(self):
109111
"""Close the client"""
110112
pass
111113

112114
# helper functions starting here
113115

114-
def perform_query(self, method, path, data = {}, headers = None, files=None, version=None):
116+
def perform_query(self, method, path, data={}, headers=None, files=None, version=None,
117+
return_raw_response=False):
115118
"""set up connection and perform query"""
116119
if headers is None:
117120
headers = self.default_headers
@@ -144,14 +147,16 @@ def perform_query(self, method, path, data = {}, headers = None, files=None, ver
144147
except ValueError:
145148
pass
146149
raise requests.exceptions.HTTPError(message, response=e.response)
150+
if return_raw_response:
151+
return resp
147152
return resp.json()
148153

149154

150155
def get_url(self, path, version=None):
151156
if version:
152157
return self.url + version + path
153158
elif self.jobs_api_version and path and path.startswith('/jobs'):
154-
return self.url + self.jobs_api_version + path
159+
return self.url + self.jobs_api_version + path
155160
return self.url + self.api_version + path
156161

157162

databricks_cli/sdk/v1_service.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Databricks CLI
2+
# Copyright 2021 Databricks, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"), except
5+
# that the use of services to which certain application programming
6+
# interfaces (each, an "API") connect requires that the user first obtain
7+
# a license for the use of the APIs from Databricks, Inc. ("Databricks"),
8+
# by creating an account at www.databricks.com and agreeing to either (a)
9+
# the Community Edition Terms of Service, (b) the Databricks Terms of
10+
# Service, or (c) another written agreement between Licensee and Databricks
11+
# for the use of the APIs.
12+
#
13+
# You may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
import copy
24+
25+
from databricks_cli.sdk.version import OLD_API_VERSION
26+
27+
28+
class APIV1Client(object):
29+
def __init__(self, client):
30+
self.v1_client = copy.deepcopy(client)
31+
self.v1_client.api_version = OLD_API_VERSION
32+
33+
34+
class CommandService(APIV1Client):
35+
def get_command_status(self, cluster_id, context_id, command_id):
36+
return self.v1_client.perform_query(
37+
method="GET", path="/commands/status", data={
38+
"clusterId": cluster_id,
39+
"contextId": context_id,
40+
"commandId": command_id,
41+
}
42+
)
43+
44+
def cancel_command(self, cluster_id, context_id, command_id):
45+
return self.v1_client.perform_query(
46+
method="POST", path="/commands/cancel", data={
47+
"clusterId": cluster_id,
48+
"contextId": context_id,
49+
"commandId": command_id,
50+
}
51+
)
52+
53+
def execute_command(self, language, cluster_id, context_id, command):
54+
return self.v1_client.perform_query(
55+
method="POST", path="/commands/execute", data={
56+
"language": language,
57+
"clusterId": cluster_id,
58+
"contextId": context_id,
59+
"command": command
60+
}
61+
)
62+
63+
64+
class ExecutionContextService(APIV1Client):
65+
def get_context_status(self, cluster_id, context_id):
66+
return self.v1_client.perform_query(
67+
method="GET", path="/contexts/status", data={
68+
"clusterId": cluster_id,
69+
"contextId": context_id,
70+
}
71+
)
72+
73+
def create_context(self, language, cluster_id):
74+
return self.v1_client.perform_query(
75+
method="POST", path="/contexts/create", data={
76+
"language": language,
77+
"clusterId": cluster_id,
78+
}
79+
)
80+
81+
def destroy_context(self, cluster_id, context_id):
82+
return self.v1_client.perform_query(
83+
method="POST", path="/contexts/destroy", data={
84+
"contextId": context_id,
85+
"clusterId": cluster_id,
86+
}
87+
)

databricks_cli/sdk/version.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
# See the License for the specific language governing permissions and
2222
# limitations under the License.
2323

24-
API_VERSION='2.0'
24+
API_VERSION = '2.0'
25+
OLD_API_VERSION = '1.2'
2526

2627
# Available API versions
27-
API_VERSIONS=['2.0', '2.1']
28+
API_VERSIONS = ['2.0', '2.1']

databricks_cli/tunnel/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)