Skip to content

Commit bf2951f

Browse files
authored
[SC-49206] Add run and stop commands to the cli (#323)
Changes: Adds run/stop commands which underneath just call the REST API. Tested: Ran tox tests.
1 parent d602b54 commit bf2951f

5 files changed

Lines changed: 88 additions & 9 deletions

File tree

databricks_cli/pipelines/api.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ def get(self, pipeline_id, headers=None):
6363
def reset(self, pipeline_id, headers=None):
6464
self.client.reset(pipeline_id, headers)
6565

66+
def run(self, pipeline_id, headers=None):
67+
self.client.run(pipeline_id, headers)
68+
69+
def stop(self, pipeline_id, headers=None):
70+
self.client.stop(pipeline_id, headers)
71+
6672
def _upload_libraries_and_update_spec(self, spec):
6773
spec = copy.deepcopy(spec)
6874
lib_objects = LibraryObject.from_json(spec.get('libraries', []))

databricks_cli/pipelines/cli.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,49 @@ def reset_cli(api_client, pipeline_id):
187187
click.echo("Reset triggered for pipeline {}".format(pipeline_id))
188188

189189

190+
@click.command(context_settings=CONTEXT_SETTINGS,
191+
short_help='Starts the execution of a delta pipeline run')
192+
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
193+
help=PipelineIdClickType.help)
194+
@debug_option
195+
@profile_option
196+
@pipelines_exception_eater
197+
@provide_api_client
198+
def run_cli(api_client, pipeline_id):
199+
"""
200+
Starts the execution of a delta pipelines run by starting the cluster and processing data.
201+
202+
Usage:
203+
204+
databricks pipelines run --pipeline-id 1234
205+
"""
206+
_validate_pipeline_id(pipeline_id)
207+
PipelinesApi(api_client).run(pipeline_id)
208+
click.echo("Run triggered for pipeline {}".format(pipeline_id))
209+
210+
211+
@click.command(context_settings=CONTEXT_SETTINGS,
212+
short_help='Stops the execution of a delta pipeline run')
213+
@click.option('--pipeline-id', default=None, type=PipelineIdClickType(),
214+
help=PipelineIdClickType.help)
215+
@debug_option
216+
@profile_option
217+
@pipelines_exception_eater
218+
@provide_api_client
219+
def stop_cli(api_client, pipeline_id):
220+
"""
221+
Stops the execution of a delta pipelines run by terminating the cluster. Processing of data can
222+
be resumed by calling `run`.
223+
224+
Usage:
225+
226+
databricks pipelines stop --pipeline-id 1234
227+
"""
228+
_validate_pipeline_id(pipeline_id)
229+
PipelinesApi(api_client).stop(pipeline_id)
230+
click.echo("Stopped pipeline {}".format(pipeline_id))
231+
232+
190233
def _read_spec(src):
191234
"""
192235
Reads the spec at src as a JSON if no file extension is provided, or if in the extension format
@@ -276,3 +319,5 @@ def pipelines_group(): # pragma: no cover
276319
pipelines_group.add_command(delete_cli, name='delete')
277320
pipelines_group.add_command(get_cli, name='get')
278321
pipelines_group.add_command(reset_cli, name='reset')
322+
pipelines_group.add_command(run_cli, name='run')
323+
pipelines_group.add_command(stop_cli, name='stop')

databricks_cli/sdk/service.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,3 +938,13 @@ def reset(self, pipeline_id=None, headers=None):
938938
_data = {}
939939

940940
return self.client.perform_query('POST', '/pipelines/{pipeline_id}/reset'.format(pipeline_id=pipeline_id), data=_data, headers=headers)
941+
942+
def run(self, pipeline_id=None, headers=None):
943+
_data = {}
944+
945+
return self.client.perform_query('POST', '/pipelines/{pipeline_id}/run'.format(pipeline_id=pipeline_id), data=_data, headers=headers)
946+
947+
def stop(self, pipeline_id=None, headers=None):
948+
_data = {}
949+
950+
return self.client.perform_query('POST', '/pipelines/{pipeline_id}/stop'.format(pipeline_id=pipeline_id), data=_data, headers=headers)

tests/pipelines/test_api.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,22 @@ def test_reset(pipelines_api):
221221
data={}, headers=None)
222222

223223

224+
def test_run(pipelines_api):
225+
pipelines_api.run(PIPELINE_ID)
226+
client_mock = pipelines_api.client.client.perform_query
227+
assert client_mock.call_count == 1
228+
client_mock.assert_called_with('POST', '/pipelines/{}/run'.format(PIPELINE_ID),
229+
data={}, headers=None)
230+
231+
232+
def test_stop(pipelines_api):
233+
pipelines_api.stop(PIPELINE_ID)
234+
client_mock = pipelines_api.client.client.perform_query
235+
assert client_mock.call_count == 1
236+
client_mock.assert_called_with('POST', '/pipelines/{}/stop'.format(PIPELINE_ID),
237+
data={}, headers=None)
238+
239+
224240
def test_partition_local_remote(pipelines_api):
225241
libraries = [
226242
# local files

tests/pipelines/test_cli.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,20 @@ def test_deploy_update_delete_cli_correct_spec_extensions(pipelines_api_mock, tm
186186

187187

188188
@provide_conf
189-
def test_reset_cli_id(pipelines_api_mock):
190-
runner = CliRunner()
191-
runner.invoke(cli.reset_cli, ['--pipeline-id', PIPELINE_ID])
192-
assert pipelines_api_mock.reset.call_args[0][0] == PIPELINE_ID
189+
def test_cli_id(pipelines_api_mock):
190+
for command in [cli.reset_cli, cli.stop_cli, cli.run_cli]:
191+
runner = CliRunner()
192+
runner.invoke(command, ['--pipeline-id', PIPELINE_ID])
193+
assert pipelines_api_mock.reset.call_args[0][0] == PIPELINE_ID
193194

194195

195196
@provide_conf
196-
def test_reset_cli_no_id(pipelines_api_mock):
197-
runner = CliRunner()
198-
result = runner.invoke(cli.reset_cli, [])
199-
assert result.exit_code == 1
200-
assert pipelines_api_mock.reset.call_count == 0
197+
def test_cli_no_id(pipelines_api_mock):
198+
for command in [cli.reset_cli, cli.stop_cli, cli.run_cli]:
199+
runner = CliRunner()
200+
result = runner.invoke(command, [])
201+
assert result.exit_code == 1
202+
assert pipelines_api_mock.reset.call_count == 0
201203

202204

203205
@provide_conf

0 commit comments

Comments
 (0)