Skip to content

Commit fddead5

Browse files
arulajmaniMukul Murthy
authored andcommitted
Add DeltaPipelines Deploy/Delete commands to the CLI
Add the pipelines deploy and pipelines delete commands to deploy and delete Delta Pipelines.
1 parent 9450202 commit fddead5

9 files changed

Lines changed: 725 additions & 0 deletions

File tree

databricks_cli/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from databricks_cli.stack.cli import stack_group
3838
from databricks_cli.groups.cli import groups_group
3939
from databricks_cli.instance_pools.cli import instance_pools_group
40+
from databricks_cli.pipelines.cli import pipelines_group
4041

4142

4243
@click.group(context_settings=CONTEXT_SETTINGS)
@@ -59,6 +60,7 @@ def cli():
5960
cli.add_command(stack_group, name='stack')
6061
cli.add_command(groups_group, name='groups')
6162
cli.add_command(instance_pools_group, name="instance-pools")
63+
cli.add_command(pipelines_group, name='pipelines')
6264

6365
if __name__ == "__main__":
6466
cli()

databricks_cli/click_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ class SecretPrincipalClickType(ParamType):
9090
help = 'The name of the principal.'
9191

9292

93+
class PipelineSpecClickType(ParamType):
94+
name = 'SPEC'
95+
help = 'The path to the pipelines deployment spec file'
96+
97+
9398
class OneOfOption(Option):
9499
def __init__(self, *args, **kwargs):
95100
self.one_of = kwargs.pop('one_of')

databricks_cli/pipelines/__init__.py

Whitespace-only changes.

databricks_cli/pipelines/api.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# Databricks CLI
2+
# Copyright 2017 Databricks, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"), except
5+
# that the use of services to which certain application programming
6+
# interfaces (each, an "API") connect requires that the user first obtain
7+
# a license for the use of the APIs from Databricks, Inc. ("Databricks"),
8+
# by creating an account at www.databricks.com and agreeing to either (a)
9+
# the Community Edition Terms of Service, (b) the Databricks Terms of
10+
# Service, or (c) another written agreement between Licensee and Databricks
11+
# for the use of the APIs.
12+
#
13+
# You may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
24+
25+
from hashlib import sha1
26+
import os
27+
28+
from six.moves import urllib
29+
30+
from databricks_cli.sdk import DeltaPipelinesService
31+
from databricks_cli.dbfs.api import DbfsApi
32+
from databricks_cli.dbfs.dbfs_path import DbfsPath
33+
34+
# These imports are specific to the credentials part
35+
from databricks_cli.configure.config import get_profile_from_context
36+
from databricks_cli.configure.provider import get_config, ProfileConfigProvider
37+
from databricks_cli.utils import InvalidConfigurationError
38+
39+
BUFFER_SIZE = 1024 * 64
40+
base_pipelines_dir = 'dbfs:/pipelines/code'
41+
42+
43+
class PipelinesApi(object):
44+
def __init__(self, api_client):
45+
self.client = DeltaPipelinesService(api_client)
46+
self.dbfs_client = DbfsApi(api_client)
47+
48+
def deploy(self, spec, headers=None):
49+
lib_objects = LibraryObject.from_json(spec.get('libraries', []))
50+
local_lib_objects, external_lib_objects = \
51+
self._identify_local_libraries(lib_objects)
52+
53+
spec['libraries'] = LibraryObject.to_json(external_lib_objects +
54+
self._upload_local_libraries(local_lib_objects))
55+
spec['credentials'] = self._get_credentials_for_request()
56+
self.client.client.perform_query('PUT',
57+
'/pipelines/{}'.format(spec['id']),
58+
data=spec,
59+
headers=headers)
60+
61+
def delete(self, pipeline_id, headers=None):
62+
self.client.delete(pipeline_id, self._get_credentials_for_request(), headers)
63+
64+
@staticmethod
65+
def _identify_local_libraries(lib_objects):
66+
"""
67+
Partitions the given set of libraries into local and those already present in dbfs/s3 etc.
68+
Local libraries are (currently) jar files with a file scheme or no scheme at all.
69+
All other libraries should be present in a supported external source.
70+
:param lib_objects: List[LibraryObject]
71+
:return: List[List[LibraryObject], List[LibraryObject]] ([Local, External])
72+
"""
73+
local_lib_objects, external_lib_objects = [], []
74+
for lib_object in lib_objects:
75+
parsed_uri = urllib.parse.urlparse(lib_object.path)
76+
if lib_object.lib_type == 'jar' and parsed_uri.scheme == '':
77+
local_lib_objects.append(lib_object)
78+
elif lib_object.lib_type == 'jar' and parsed_uri.scheme.lower() == 'file':
79+
# exactly 1 or 3
80+
if parsed_uri.path.startswith('//') or parsed_uri.netloc != '':
81+
raise RuntimeError('invalid file uri scheme, '
82+
'did you mean to use file:/ or file:///')
83+
local_lib_objects.append(LibraryObject(lib_object.lib_type, parsed_uri.path))
84+
else:
85+
external_lib_objects.append(lib_object)
86+
return local_lib_objects, external_lib_objects
87+
88+
def _upload_local_libraries(self, local_lib_objects):
89+
remote_lib_objects = [LibraryObject(llo.lib_type, self._get_hashed_path(llo.path))
90+
for llo in local_lib_objects]
91+
92+
transformed_remote_lib_objects = [LibraryObject(rlo.lib_type, DbfsPath(rlo.path))
93+
for rlo in remote_lib_objects]
94+
upload_files = [llo_tuple for llo_tuple in
95+
zip(local_lib_objects, transformed_remote_lib_objects)
96+
if not self.dbfs_client.file_exists(llo_tuple[1].path)]
97+
98+
for llo, rlo in upload_files:
99+
self.dbfs_client.put_file(llo.path, rlo.path, False)
100+
101+
return remote_lib_objects
102+
103+
@staticmethod
104+
def _get_hashed_path(path):
105+
"""
106+
Finds the corresponding dbfs file path for the file located at the supplied path by
107+
calculating its hash using SHA1.
108+
:param path: Local File Path
109+
:return: Remote Path (pipeline_base_dir + file_hash (dot) file_extension)
110+
"""
111+
hash_buffer = sha1()
112+
with open(path, 'rb') as f:
113+
while True:
114+
data = f.read(BUFFER_SIZE)
115+
if not data:
116+
break
117+
hash_buffer.update(data)
118+
119+
file_hash = hash_buffer.hexdigest()
120+
# splitext includes the period in the extension
121+
path = '{}/{}{}'.format(base_pipelines_dir, file_hash, os.path.splitext(path)[1])
122+
return path
123+
124+
@staticmethod
125+
def _get_credentials_for_request():
126+
"""
127+
Only required while the deploy/delete APIs require credentials in the body as well
128+
as the header. Once the API requirement is relaxed, we can remove this function"
129+
"""
130+
profile = get_profile_from_context()
131+
if profile:
132+
config = ProfileConfigProvider.get_config(profile)
133+
else:
134+
config = get_config()
135+
if not config or not config.is_valid:
136+
raise InvalidConfigurationError.for_profile(profile)
137+
138+
if config.is_valid_with_token:
139+
return {'token': config.token}
140+
else:
141+
return {'user': config.username, 'password': config.password}
142+
143+
144+
class LibraryObject(object):
145+
def __init__(self, lib_type, lib_path):
146+
self.path = lib_path
147+
self.lib_type = lib_type
148+
149+
@classmethod
150+
def from_json(cls, libraries):
151+
"""
152+
Serialize Libraries into LibraryObjects
153+
:param libraries: List[Dictionary{String, String}]
154+
:return: List[LibraryObject]
155+
"""
156+
lib_objects = []
157+
for library in libraries:
158+
for lib_type, path in library.items():
159+
lib_objects.append(LibraryObject(lib_type, path))
160+
return lib_objects
161+
162+
@classmethod
163+
def to_json(cls, lib_objects):
164+
"""
165+
Deserialize LibraryObjects
166+
:param lib_objects: List[LibraryObject]
167+
:return: List[Dictionary{String, String}]
168+
"""
169+
libraries = []
170+
for lib_object in lib_objects:
171+
libraries.append({lib_object.lib_type: lib_object.path})
172+
return libraries
173+
174+
def __eq__(self, other):
175+
if not isinstance(other, LibraryObject):
176+
return NotImplemented
177+
return self.path == other.path and self.lib_type == other.lib_type

databricks_cli/pipelines/cli.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Databricks CLI
2+
# Copyright 2017 Databricks, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"), except
5+
# that the use of services to which certain application programming
6+
# interfaces (each, an "API") connect requires that the user first obtain
7+
# a license for the use of the APIs from Databricks, Inc. ("Databricks"),
8+
# by creating an account at www.databricks.com and agreeing to either (a)
9+
# the Community Edition Terms of Service, (b) the Databricks Terms of
10+
# Service, or (c) another written agreement between Licensee and Databricks
11+
# for the use of the APIs.
12+
#
13+
# You may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
24+
from json import loads as json_loads
25+
import os
26+
27+
import click
28+
29+
from databricks_cli.click_types import PipelineSpecClickType
30+
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS
31+
from databricks_cli.version import print_version_callback, version
32+
from databricks_cli.pipelines.api import PipelinesApi
33+
from databricks_cli.configure.config import provide_api_client, profile_option, debug_option
34+
35+
36+
@click.command(context_settings=CONTEXT_SETTINGS,
37+
short_help='Deploys a delta pipeline according to the pipeline specification')
38+
@click.argument('spec_arg', default=None, required=False)
39+
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
40+
@debug_option
41+
@profile_option
42+
@eat_exceptions
43+
@provide_api_client
44+
def deploy_cli(api_client, spec_arg, spec):
45+
"""
46+
Deploys a delta pipeline according to the pipeline specification.
47+
* The pipeline spec is a deployment specification that explains how to run a
48+
Delta Pipeline on Databricks.
49+
* The CLI simply forwards the spec to Databricks.
50+
* All the local libraries referenced in the spec are uploaded to DBFS.
51+
"""
52+
if bool(spec_arg) == bool(spec):
53+
raise RuntimeError('The spec should be provided either by an option or argument')
54+
src = spec_arg if bool(spec_arg) else spec
55+
spec_obj = _read_spec(src)
56+
PipelinesApi(api_client).deploy(spec_obj)
57+
58+
59+
@click.command(context_settings=CONTEXT_SETTINGS,
60+
short_help='Stops a delta pipeline and cleans '
61+
'up Databricks resources associated with it')
62+
@click.argument('spec_arg', default=None, required=False)
63+
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
64+
@click.option('--pipeline-id', default=None,
65+
help='id associated with the pipeline to be stopped')
66+
@debug_option
67+
@profile_option
68+
@eat_exceptions
69+
@provide_api_client
70+
def delete_cli(api_client, spec_arg, spec, pipeline_id):
71+
"""
72+
Stops a delta pipeline and cleans up Databricks resources associated with it
73+
"""
74+
# Only one out of spec/pipeline_id/spec_arg should be supplied
75+
if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1:
76+
raise RuntimeError('Either spec should be provided as an argument '
77+
'or option, or the pipeline-id should be provided')
78+
if bool(spec_arg) or bool(spec):
79+
src = spec_arg if bool(spec_arg) else spec
80+
pipeline_id = _read_spec(src)["id"]
81+
PipelinesApi(api_client).delete(pipeline_id)
82+
83+
84+
def _read_spec(src):
85+
"""
86+
Reads the spec at src as a JSON if no file extension is provided, or if in the extension format
87+
if the format is supported.
88+
"""
89+
extension = os.path.splitext(src)[1]
90+
if extension.lower() == '.json':
91+
with open(src, 'r') as f:
92+
json = f.read()
93+
return json_loads(json)
94+
else:
95+
raise RuntimeError('The provided file extension for the spec is not supported')
96+
97+
98+
@click.group(context_settings=CONTEXT_SETTINGS,
99+
short_help='Utility to interact with the Databricks Delta Pipelines.')
100+
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,
101+
expose_value=False, is_eager=True, help=version)
102+
@debug_option
103+
@profile_option
104+
def pipelines_group():
105+
"""
106+
Utility to interact with the Databricks pipelines.
107+
"""
108+
pass
109+
110+
111+
pipelines_group.add_command(deploy_cli, name='deploy')
112+
pipelines_group.add_command(delete_cli, name='delete')

databricks_cli/sdk/service.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,3 +794,46 @@ def get_instance_pool(self, instance_pool_id=None, headers=None):
794794
def list_instance_pools(self, headers=None):
795795
_data = {}
796796
return self.client.perform_query('GET', '/instance-pools/list', data=_data, headers=headers)
797+
798+
799+
class DeltaPipelinesService(object):
800+
def __init__(self, client):
801+
self.client = client
802+
803+
def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=None,
804+
clusters=None, libraries=None, transformations=None, credentials=None,
805+
headers=None):
806+
_data = {}
807+
if pipeline_id is not None:
808+
_data['pipeline_id'] = pipeline_id
809+
if id is not None:
810+
_data['id'] = id
811+
if name is not None:
812+
_data['name'] = name
813+
if storage is not None:
814+
_data['storage'] = storage
815+
if filters is not None:
816+
_data['filters'] = filters
817+
if not isinstance(filters, dict):
818+
raise TypeError('Expected databricks.Filters() or dict for field filters')
819+
if clusters is not None:
820+
_data['clusters'] = clusters
821+
if libraries is not None:
822+
_data['libraries'] = libraries
823+
if transformations is not None:
824+
_data['transformations'] = transformations
825+
if credentials is not None:
826+
_data['credentials'] = credentials
827+
if not isinstance(credentials, dict):
828+
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
829+
return self.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)
830+
831+
def delete(self, pipeline_id=None, credentials=None, headers=None):
832+
_data = {}
833+
if pipeline_id is not None:
834+
_data['pipeline_id'] = pipeline_id
835+
if credentials is not None:
836+
_data['credentials'] = credentials
837+
if not isinstance(credentials, dict):
838+
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
839+
return self.client.perform_query('DELETE', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)

tests/pipelines/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)