Skip to content

Commit 3cb6ae8

Browse files
authored
Update submit_spark_job_to_driver_node_group_cluster.py
1 parent 7d1e599 commit 3cb6ae8

1 file changed

Lines changed: 43 additions & 43 deletions

File tree

dataproc/snippets/submit_spark_job_to_driver_node_group_cluster.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -43,55 +43,55 @@ def submit_job(project_id: str, region: str, cluster_name: str) -> None:
4343
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
4444
) as job_client:
4545

46-
driver_scheduling_config = dataproc.DriverSchedulingConfig(
47-
memory_mb=2048, # Example memory in MB
48-
vcores=2, # Example number of vcores
49-
)
46+
driver_scheduling_config = dataproc.DriverSchedulingConfig(
47+
memory_mb=2048, # Example memory in MB
48+
vcores=2, # Example number of vcores
49+
)
5050

51-
# Create the job config. 'main_jar_file_uri' can also be a
52-
# Google Cloud Storage URL.
53-
job = {
54-
"placement": {"cluster_name": cluster_name},
55-
"spark_job": {
56-
"main_class": "org.apache.spark.examples.SparkPi",
57-
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
58-
"args": ["1000"],
59-
},
60-
"driver_scheduling_config": driver_scheduling_config
61-
}
51+
# Create the job config. 'main_jar_file_uri' can also be a
52+
# Google Cloud Storage URL.
53+
job = {
54+
"placement": {"cluster_name": cluster_name},
55+
"spark_job": {
56+
"main_class": "org.apache.spark.examples.SparkPi",
57+
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
58+
"args": ["1000"],
59+
},
60+
"driver_scheduling_config": driver_scheduling_config
61+
}
6262

63-
operation = job_client.submit_job_as_operation(
64-
request={"project_id": project_id, "region": region, "job": job}
65-
)
63+
operation = job_client.submit_job_as_operation(
64+
request={"project_id": project_id, "region": region, "job": job}
65+
)
6666

67-
try:
68-
response = operation.result()
69-
except Exception as e:
70-
print(f"Error submitting job or waiting for completion: {e}")
71-
raise
67+
try:
68+
response = operation.result()
69+
except Exception as e:
70+
print(f"Error submitting job or waiting for completion: {e}")
71+
raise
7272

73-
# Dataproc job output gets saved to the Cloud Storage bucket
74-
# allocated to the job. Use a regex to obtain the bucket and blob info.
75-
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
76-
if not matches:
77-
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
78-
raise ValueError
73+
# Dataproc job output gets saved to the Cloud Storage bucket
74+
# allocated to the job. Use a regex to obtain the bucket and blob info.
75+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
76+
if not matches:
77+
print(f"Error: Could not parse driver output URI: {response.driver_output_resource_uri}")
78+
raise ValueError
7979

80-
try:
81-
with storage.Client() as storage_client:
82-
bucket_name = matches.group(1)
83-
blob_name = f"{matches.group(2)}.000000000"
84-
output = (
85-
storage_client.get_bucket(bucket_name)
86-
.blob(blob_name)
87-
.download_as_bytes()
88-
.decode("utf-8")
89-
)
90-
except Exception as e:
91-
print(f"Error downloading job output: {e}")
92-
raise
80+
try:
81+
with storage.Client() as storage_client:
82+
bucket_name = matches.group(1)
83+
blob_name = f"{matches.group(2)}.000000000"
84+
output = (
85+
storage_client.get_bucket(bucket_name)
86+
.blob(blob_name)
87+
.download_as_bytes()
88+
.decode("utf-8")
89+
)
90+
except Exception as e:
91+
print(f"Error downloading job output: {e}")
92+
raise
9393

94-
print(f"Job finished successfully: {output}")
94+
print(f"Job finished successfully: {output}")
9595

9696

9797
# [END dataproc_submit_spark_job_to_driver_node_group_cluster]

0 commit comments

Comments
 (0)