1414# See the License for the specific language governing permissions and
1515# limitations under the License.
1616
17- # This sample walks a user through submitting a Spark job to a
17+ # This sample walks a user through submitting a Spark job to a
1818# Dataproc driver node group cluster using the Dataproc
1919# client library.
2020
2626# [START dataproc_submit_spark_job_to_driver_node_group_cluster]
2727
2828import re
29-
29+
3030from google .cloud import dataproc_v1 as dataproc
3131from google .cloud import storage
32-
32+
3333def submit_job (project_id : str , region : str , cluster_name : str ) -> None :
3434 """Submits a Spark job to the specified Dataproc cluster with a driver node group and prints the output.
3535
@@ -42,12 +42,12 @@ def submit_job(project_id: str, region: str, cluster_name: str) -> None:
4242 with dataproc .JobControllerClient (
4343 client_options = {"api_endpoint" : f"{ region } -dataproc.googleapis.com:443" }
4444 ) as job_client :
45-
45+
4646 driver_scheduling_config = dataproc .DriverSchedulingConfig (
4747 memory_mb = 2048 , # Example memory in MB
4848 vcores = 2 , # Example number of vcores
4949 )
50-
50+
5151 # Create the job config. 'main_jar_file_uri' can also be a
5252 # Google Cloud Storage URL.
5353 job = {
@@ -59,17 +59,18 @@ def submit_job(project_id: str, region: str, cluster_name: str) -> None:
5959 },
6060 "driver_scheduling_config" : driver_scheduling_config
6161 }
62-
62+
63+
6364 operation = job_client .submit_job_as_operation (
6465 request = {"project_id" : project_id , "region" : region , "job" : job }
6566 )
66-
67+
6768 try :
6869 response = operation .result ()
6970 except Exception as e :
7071 print (f"Error submitting job or waiting for completion: { e } " )
7172 raise
72-
73+
7374 # Dataproc job output gets saved to the Cloud Storage bucket
7475 # allocated to the job. Use a regex to obtain the bucket and blob info.
7576 matches = re .match ("gs://(.*?)/(.*)" , response .driver_output_resource_uri )
@@ -90,7 +91,7 @@ def submit_job(project_id: str, region: str, cluster_name: str) -> None:
9091 except Exception as e :
9192 print (f"Error downloading job output: { e } " )
9293 raise
93-
94+
9495 print (f"Job finished successfully: { output } " )
9596
9697
0 commit comments