Skip to content

Commit 7cfb3c0

Browse files
authored
Update submit_spark_job_to_driver_node_group_cluster.py
1 parent 8b1a438 commit 7cfb3c0

1 file changed

Lines changed: 98 additions & 13 deletions

File tree

dataproc/snippets/submit_spark_job_to_driver_node_group_cluster.py

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,107 @@ def submit_job(project_id: str, region: str, cluster_name: str) -> None:
7474
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
7575
raise ValueError
7676

77-
try:
78-
with storage.Client() as storage_client:
79-
bucket_name = matches.group(1)
80-
blob_name = f"{matches.group(2)}.000000000"
81-
output = (
82-
storage_client.get_bucket(bucket_name)
83-
.blob(blob_name)
84-
.download_as_bytes()
85-
.decode("utf-8")
86-
)
87-
except Exception as e:
88-
print(f"Error downloading job output: {e}")
89-
raise
77+
#!/usr/bin/env python
9078

79+
# Copyright 2025 Google LLC
80+
#
81+
# Licensed under the Apache License, Version 2.0 (the "License");
82+
# you may not use this file except in compliance with the License.
83+
# You may obtain a copy of the License at
84+
#
85+
# http://www.apache.org/licenses/LICENSE-2.0
86+
#
87+
# Unless required by applicable law or agreed to in writing, software
88+
# distributed under the License is distributed on an "AS IS" BASIS,
89+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
90+
# See the License for the specific language governing permissions and
91+
# limitations under the License.
92+
93+
# This sample walks a user through submitting a Spark job to a
94+
# Dataproc driver node group cluster using the Dataproc
95+
# client library.
96+
97+
# Usage:
98+
# python submit_spark_job_to_driver_node_group_cluster.py \
99+
# --project_id <PROJECT_ID> --region <REGION> \
100+
# --cluster_name <CLUSTER_NAME>
101+
102+
# [START dataproc_submit_spark_job_to_driver_node_group_cluster]
103+
104+
import re
105+
106+
from google.cloud import dataproc_v1 as dataproc
107+
from google.cloud import storage
108+
109+
def submit_job(project_id: str, region: str, cluster_name: str) -> None:
110+
"""Submits a Spark job to the specified Dataproc cluster with a driver node group and prints the output.
111+
112+
Args:
113+
project_id: The Google Cloud project ID.
114+
region: The Dataproc region where the cluster is located.
115+
cluster_name: The name of the Dataproc cluster.
116+
"""
117+
# Create the job client.
118+
with dataproc.JobControllerClient(
119+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
120+
) as job_client:
121+
122+
driver_scheduling_config = dataproc.DriverSchedulingConfig(
123+
memory_mb=2048, # Example memory in MB
124+
vcores=2, # Example number of vcores
125+
)
126+
127+
# Create the job config. 'main_jar_file_uri' can also be a
128+
# Google Cloud Storage URL.
129+
job = {
130+
"placement": {"cluster_name": cluster_name},
131+
"spark_job": {
132+
"main_class": "org.apache.spark.examples.SparkPi",
133+
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
134+
"args": ["1000"],
135+
},
136+
"driver_scheduling_config": driver_scheduling_config
137+
}
138+
139+
operation = job_client.submit_job_as_operation(
140+
request={"project_id": project_id, "region": region, "job": job}
141+
)
142+
143+
response = operation.result()
144+
145+
# Dataproc job output gets saved to the Cloud Storage bucket
146+
# allocated to the job. Use a regex to obtain the bucket and blob info.
147+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
148+
if not matches:
149+
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
150+
raise ValueError
151+
152+
output = (
153+
storage.Client()
154+
.get_bucket(matches.group(1))
155+
.blob(f"{matches.group(2)}.000000000")
156+
.download_as_bytes()
157+
.decode("utf-8")
158+
)
159+
91160
print(f"Job finished successfully: {output}")
92161

162+
# [END dataproc_submit_spark_job_to_driver_node_group_cluster]
163+
164+
165+
if __name__ == "__main__":
166+
import argparse
167+
168+
parser = argparse.ArgumentParser(
169+
description="Submits a Spark job to a Dataproc driver node group cluster."
170+
)
171+
parser.add_argument("--project_id", help="The Google Cloud project ID.", required=True)
172+
parser.add_argument("--region", help="The Dataproc region where the cluster is located.", required=True)
173+
parser.add_argument("--cluster_name", help="The name of the Dataproc cluster.", required=True)
174+
175+
args = parser.parse_args()
176+
submit_job(args.project_id, args.region, args.cluster_name)
177+
93178

94179
# [END dataproc_submit_spark_job_to_driver_node_group_cluster]
95180

0 commit comments

Comments
 (0)