Skip to content

Commit 01932ec

Browse files
mukulmurthyMukul Murthy
authored andcommitted
Add pipelines 'get' and 'reset' commands (#273)
* Add the commands `databricks pipelines get` and `databricks pipelines reset` to get and reset Delta Pipelines * Stop double-passing credentials to the Delta Pipelines service * Improve CLI documentation. Tested with new and additional unit tests.
1 parent f6acc22 commit 01932ec

6 files changed

Lines changed: 235 additions & 77 deletions

File tree

databricks_cli/click_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ class PipelineSpecClickType(ParamType):
9595
help = 'The path to the pipelines deployment spec file'
9696

9797

98+
class PipelineIdClickType(ParamType):
99+
name = 'PIPELINE_ID'
100+
help = 'Delta Pipeline ID'
101+
102+
98103
class OneOfOption(Option):
99104
def __init__(self, *args, **kwargs):
100105
self.one_of = kwargs.pop('one_of')

databricks_cli/pipelines/api.py

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@
3131
from databricks_cli.dbfs.api import DbfsApi
3232
from databricks_cli.dbfs.dbfs_path import DbfsPath
3333

34-
# These imports are specific to the credentials part
35-
from databricks_cli.configure.config import get_profile_from_context
36-
from databricks_cli.configure.provider import get_config, ProfileConfigProvider
37-
from databricks_cli.utils import InvalidConfigurationError
38-
3934
BUFFER_SIZE = 1024 * 64
4035
base_pipelines_dir = 'dbfs:/pipelines/code'
4136
supported_lib_types = {'jar', 'whl'}
@@ -53,14 +48,19 @@ def deploy(self, spec, headers=None):
5348

5449
spec['libraries'] = LibraryObject.to_json(external_lib_objects +
5550
self._upload_local_libraries(local_lib_objects))
56-
spec['credentials'] = self._get_credentials_for_request()
5751
self.client.client.perform_query('PUT',
5852
'/pipelines/{}'.format(spec['id']),
5953
data=spec,
6054
headers=headers)
6155

6256
def delete(self, pipeline_id, headers=None):
63-
self.client.delete(pipeline_id, self._get_credentials_for_request(), headers)
57+
self.client.delete(pipeline_id, headers)
58+
59+
def get(self, pipeline_id, headers=None):
60+
self.client.get(pipeline_id, headers)
61+
62+
def reset(self, pipeline_id, headers=None):
63+
self.client.reset(pipeline_id, headers)
6464

6565
@staticmethod
6666
def _identify_local_libraries(lib_objects):
@@ -130,25 +130,6 @@ def _get_hashed_path(path):
130130
path = '{}/{}.{}'.format(base_pipelines_dir, file_hash, extension)
131131
return path
132132

133-
@staticmethod
134-
def _get_credentials_for_request():
135-
"""
136-
Only required while the deploy/delete APIs require credentials in the body as well
137-
as the header. Once the API requirement is relaxed, we can remove this function"
138-
"""
139-
profile = get_profile_from_context()
140-
if profile:
141-
config = ProfileConfigProvider.get_config(profile)
142-
else:
143-
config = get_config()
144-
if not config or not config.is_valid:
145-
raise InvalidConfigurationError.for_profile(profile)
146-
147-
if config.is_valid_with_token:
148-
return {'token': config.token}
149-
else:
150-
return {'user': config.username, 'password': config.password}
151-
152133

153134
class LibraryObject(object):
154135
def __init__(self, lib_type, lib_path):

databricks_cli/pipelines/cli.py

Lines changed: 110 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import click
2828

29-
from databricks_cli.click_types import PipelineSpecClickType
29+
from databricks_cli.click_types import PipelineSpecClickType, PipelineIdClickType
3030
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS
3131
from databricks_cli.version import print_version_callback, version
3232
from databricks_cli.pipelines.api import PipelinesApi
@@ -36,18 +36,24 @@
3636
@click.command(context_settings=CONTEXT_SETTINGS,
3737
short_help='Deploys a delta pipeline according to the pipeline specification')
3838
@click.argument('spec_arg', default=None, required=False)
39-
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
39+
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
4040
@debug_option
4141
@profile_option
4242
@eat_exceptions
4343
@provide_api_client
4444
def deploy_cli(api_client, spec_arg, spec):
4545
"""
46-
Deploys a delta pipeline according to the pipeline specification.
47-
* The pipeline spec is a deployment specification that explains how to run a
48-
Delta Pipeline on Databricks.
49-
* The CLI simply forwards the spec to Databricks.
50-
* All the local libraries referenced in the spec are uploaded to DBFS.
46+
Deploys a delta pipeline according to the pipeline specification. The pipeline spec is a
47+
specification that explains how to run a Delta Pipeline on Databricks. All local libraries
48+
referenced in the spec are uploaded to DBFS.
49+
50+
Usage:
51+
52+
databricks pipelines deploy example.json
53+
54+
OR
55+
56+
databricks pipelines deploy --spec example.json
5157
"""
5258
if bool(spec_arg) == bool(spec):
5359
raise RuntimeError('The spec should be provided either by an option or argument')
@@ -57,30 +63,97 @@ def deploy_cli(api_client, spec_arg, spec):
5763

5864

5965
@click.command(context_settings=CONTEXT_SETTINGS,
60-
short_help='Stops a delta pipeline and cleans '
61-
'up Databricks resources associated with it')
66+
short_help='Stops a delta pipeline and deletes its associated Databricks resources')
6267
@click.argument('spec_arg', default=None, required=False)
63-
@click.option('--spec', default=None, help=PipelineSpecClickType.help)
64-
@click.option('--pipeline-id', default=None,
65-
help='id associated with the pipeline to be stopped')
68+
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
69+
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
70+
help=PipelineIdClickType.help)
6671
@debug_option
6772
@profile_option
6873
@eat_exceptions
6974
@provide_api_client
7075
def delete_cli(api_client, spec_arg, spec, pipeline_id):
7176
"""
72-
Stops a delta pipeline and cleans up Databricks resources associated with it
77+
Stops a delta pipeline and deletes its associated Databricks resources. The pipeline can be
78+
resumed by deploying it again.
79+
80+
Usage:
81+
82+
databricks pipelines delete example.json
83+
84+
OR
85+
86+
databricks pipelines delete --spec example.json
87+
88+
OR
89+
90+
databricks pipelines delete --pipeline-id 1234
7391
"""
74-
# Only one out of spec/pipeline_id/spec_arg should be supplied
75-
if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1:
76-
raise RuntimeError('Either spec should be provided as an argument '
77-
'or option, or the pipeline-id should be provided')
78-
if bool(spec_arg) or bool(spec):
79-
src = spec_arg if bool(spec_arg) else spec
80-
pipeline_id = _read_spec(src)["id"]
92+
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
8193
PipelinesApi(api_client).delete(pipeline_id)
8294

8395

96+
@click.command(context_settings=CONTEXT_SETTINGS,
97+
short_help='Gets a delta pipeline\'s current spec and status')
98+
@click.argument('spec_arg', default=None, required=False)
99+
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
100+
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
101+
help=PipelineIdClickType.help)
102+
@debug_option
103+
@profile_option
104+
@eat_exceptions
105+
@provide_api_client
106+
def get_cli(api_client, spec_arg, spec, pipeline_id):
107+
"""
108+
Gets a delta pipeline's current spec and status.
109+
110+
Usage:
111+
112+
databricks pipelines get example.json
113+
114+
OR
115+
116+
databricks pipelines get --spec example.json
117+
118+
OR
119+
120+
databricks pipelines get --pipeline-id 1234
121+
"""
122+
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
123+
PipelinesApi(api_client).get(pipeline_id)
124+
125+
126+
@click.command(context_settings=CONTEXT_SETTINGS,
127+
short_help='Resets a delta pipeline so data can be reprocessed from scratch')
128+
@click.argument('spec_arg', default=None, required=False)
129+
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
130+
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
131+
help=PipelineIdClickType.help)
132+
@debug_option
133+
@profile_option
134+
@eat_exceptions
135+
@provide_api_client
136+
def reset_cli(api_client, spec_arg, spec, pipeline_id):
137+
"""
138+
Resets a delta pipeline by truncating tables and creating new checkpoint folders so data is
139+
reprocessed from scratch.
140+
141+
Usage:
142+
143+
databricks pipelines reset example.json
144+
145+
OR
146+
147+
databricks pipelines reset --spec example.json
148+
149+
OR
150+
151+
databricks pipelines reset --pipeline-id 1234
152+
"""
153+
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
154+
PipelinesApi(api_client).reset(pipeline_id)
155+
156+
84157
def _read_spec(src):
85158
"""
86159
Reads the spec at src as a JSON if no file extension is provided, or if in the extension format
@@ -95,6 +168,21 @@ def _read_spec(src):
95168
raise RuntimeError('The provided file extension for the spec is not supported')
96169

97170

171+
def _get_pipeline_id(spec_arg, spec, pipeline_id):
172+
"""
173+
Ensures that the user has either specified a spec (either through argument or option) or a
174+
pipeline ID directly, and returns the pipeline id to use.
175+
"""
176+
# Only one out of spec/pipeline_id/spec_arg should be supplied
177+
if bool(spec_arg) + bool(spec) + bool(pipeline_id) != 1:
178+
raise RuntimeError('Either spec should be provided as an argument '
179+
'or option, or the pipeline-id should be provided')
180+
if bool(spec_arg) or bool(spec):
181+
src = spec_arg if bool(spec_arg) else spec
182+
pipeline_id = _read_spec(src)["id"]
183+
return pipeline_id
184+
185+
98186
@click.group(context_settings=CONTEXT_SETTINGS,
99187
short_help='Utility to interact with the Databricks Delta Pipelines.')
100188
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,
@@ -110,3 +198,5 @@ def pipelines_group():
110198

111199
pipelines_group.add_command(deploy_cli, name='deploy')
112200
pipelines_group.add_command(delete_cli, name='delete')
201+
pipelines_group.add_command(get_cli, name='get')
202+
pipelines_group.add_command(reset_cli, name='reset')

databricks_cli/sdk/service.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -800,9 +800,8 @@ class DeltaPipelinesService(object):
800800
def __init__(self, client):
801801
self.client = client
802802

803-
def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=None,
804-
clusters=None, libraries=None, transformations=None, credentials=None,
805-
headers=None):
803+
def deploy(self, pipeline_id=None, id=None, name=None, storage=None, configuration=None,
804+
clusters=None, libraries=None, transformations=None, filters=None, headers=None):
806805
_data = {}
807806
if pipeline_id is not None:
808807
_data['pipeline_id'] = pipeline_id
@@ -812,28 +811,35 @@ def deploy(self, pipeline_id=None, id=None, name=None, storage=None, filters=Non
812811
_data['name'] = name
813812
if storage is not None:
814813
_data['storage'] = storage
815-
if filters is not None:
816-
_data['filters'] = filters
817-
if not isinstance(filters, dict):
818-
raise TypeError('Expected databricks.Filters() or dict for field filters')
814+
if configuration is not None:
815+
_data['configuration'] = configuration
819816
if clusters is not None:
820817
_data['clusters'] = clusters
821818
if libraries is not None:
822819
_data['libraries'] = libraries
823820
if transformations is not None:
824821
_data['transformations'] = transformations
825-
if credentials is not None:
826-
_data['credentials'] = credentials
827-
if not isinstance(credentials, dict):
828-
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
829-
return self.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)
822+
if filters is not None:
823+
_data['filters'] = filters
824+
if not isinstance(filters, dict):
825+
raise TypeError('Expected databricks.Filters() or dict for field filters')
826+
return self.client.perform_query('PUT', '/pipelines/{pipeline_id}', data=_data, headers=headers)
830827

831-
def delete(self, pipeline_id=None, credentials=None, headers=None):
828+
def delete(self, pipeline_id=None, headers=None):
832829
_data = {}
833830
if pipeline_id is not None:
834831
_data['pipeline_id'] = pipeline_id
835-
if credentials is not None:
836-
_data['credentials'] = credentials
837-
if not isinstance(credentials, dict):
838-
raise TypeError('Expected databricks.Credentials() or dict for field credentials')
839-
return self.client.perform_query('DELETE', '/pipelines/{}'.format(pipeline_id), data=_data, headers=headers)
832+
return self.client.perform_query('DELETE', '/pipelines/{pipeline_id}', data=_data, headers=headers)
833+
834+
def get(self, pipeline_id=None, headers=None):
835+
_data = {}
836+
if pipeline_id is not None:
837+
_data['pipeline_id'] = pipeline_id
838+
return self.client.perform_query('GET', '/pipelines/{pipeline_id}', data=_data, headers=headers)
839+
840+
def reset(self, pipeline_id=None, headers=None):
841+
_data = {}
842+
if pipeline_id is not None:
843+
_data['pipeline_id'] = pipeline_id
844+
return self.client.perform_query('POST', '/pipelines/{pipeline_id}/reset', data=_data, headers=headers)
845+

tests/pipelines/test_api.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
'id': PIPELINE_ID,
4040
'name': 'test_pipeline'
4141
}
42-
CREDENTIALS = 'dummy_credentials'
4342
HEADERS = 'dummy_headers'
4443

4544

@@ -64,9 +63,8 @@ def file_exists_stub(_, dbfs_path):
6463

6564
@mock.patch('databricks_cli.dbfs.api.DbfsApi.file_exists', file_exists_stub)
6665
@mock.patch('databricks_cli.dbfs.dbfs_path.DbfsPath.validate')
67-
@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request')
6866
@mock.patch('databricks_cli.dbfs.api.DbfsApi.put_file')
69-
def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelines_api, tmpdir):
67+
def test_deploy(put_file_mock, dbfs_path_validate, pipelines_api, tmpdir):
7068
"""
7169
Scenarios Tested:
7270
1. All three types of local file paths (absolute, relative, file: scheme)
@@ -77,7 +75,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
7775
A test local file which has '456' written to it is not present in Dbfs and therefore must be.
7876
uploaded to dbfs.
7977
"""
80-
get_credentials_mock.return_value = CREDENTIALS
8178
deploy_mock = pipelines_api.client.client.perform_query
8279
# set-up the test
8380
jar1 = tmpdir.join('jar1.jar').strpath
@@ -115,7 +112,6 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
115112
{'whl':
116113
'dbfs:/pipelines/code/51eac6b471a284d3341d8c0c63d0f1a286262a18/wheel-name-conv.whl'}
117114
]
118-
expected_spec['credentials'] = CREDENTIALS
119115

120116
pipelines_api.deploy(spec)
121117
assert dbfs_path_validate.call_count == 5
@@ -134,21 +130,31 @@ def test_deploy(put_file_mock, get_credentials_mock, dbfs_path_validate, pipelin
134130
data=expected_spec, headers=HEADERS)
135131

136132

137-
@mock.patch('databricks_cli.pipelines.api.PipelinesApi._get_credentials_for_request')
138-
def test_delete(get_credentials_mock, pipelines_api):
139-
get_credentials_mock.return_value = CREDENTIALS
133+
def test_delete(pipelines_api):
140134
pipelines_api.delete(PIPELINE_ID)
141135
delete_mock = pipelines_api.client.delete
142-
assert get_credentials_mock.call_count == 1
143136
assert delete_mock.call_count == 1
144137
assert delete_mock.call_args[0][0] == PIPELINE_ID
145-
assert delete_mock.call_args[0][1] == CREDENTIALS
146-
assert delete_mock.call_args[0][2] is None
138+
assert delete_mock.call_args[0][1] is None
147139

148140
pipelines_api.delete(PIPELINE_ID, HEADERS)
149141
assert delete_mock.call_args[0][0] == PIPELINE_ID
150-
assert delete_mock.call_args[0][1] == CREDENTIALS
151-
assert delete_mock.call_args[0][2] == HEADERS
142+
assert delete_mock.call_args[0][1] == HEADERS
143+
144+
145+
def test_get(pipelines_api):
146+
pipelines_api.get(PIPELINE_ID)
147+
get_mock = pipelines_api.client.get
148+
assert get_mock.call_count == 1
149+
assert get_mock.call_args[0][0] == PIPELINE_ID
150+
151+
152+
def test_reset(pipelines_api):
153+
pipelines_api.reset(PIPELINE_ID)
154+
reset_mock = pipelines_api.client.reset
155+
assert reset_mock.call_count == 1
156+
assert reset_mock.call_args[0][0] == PIPELINE_ID
157+
assert reset_mock.call_args[0][1] is None
152158

153159

154160
def test_partition_local_remote(pipelines_api):

0 commit comments

Comments
 (0)