Skip to content

Commit d90d19d

Browse files
authored
[DCP] Adds Terraform deployment of ingestion workflow. (#27)
* Adds support for Workflow deployment, Terraform Trigger and the GCS bucket with permissions for reading and writing for dataflow. * Also deploy the GCS bucket * Addresses gemini comments, mostly deletion protection + permissions fixes * Round 2 review cleanup * Pass in the ingestion service account in data flow trigger. * Fix iam issue for dataflow worker.
1 parent 6368fb3 commit d90d19d

7 files changed

Lines changed: 166 additions & 1 deletion

File tree

infra/dcp/main.tf

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ provider "google" {
1818
billing_project = var.billing_project_id != null ? var.billing_project_id : var.project_id
1919
}
2020

21+
provider "google-beta" {
22+
project = var.project_id
23+
region = var.region
24+
user_project_override = var.user_project_override
25+
billing_project = var.billing_project_id != null ? var.billing_project_id : var.project_id
26+
}
27+
2128
# Enable required APIs for both stacks
2229
resource "google_project_service" "apis" {
2330
for_each = toset(concat([
@@ -30,7 +37,10 @@ resource "google_project_service" "apis" {
3037
"vpcaccess.googleapis.com",
3138
"artifactregistry.googleapis.com",
3239
"compute.googleapis.com"
33-
], var.enable_dcp ? ["spanner.googleapis.com"] : []))
40+
], var.enable_dcp ? ["spanner.googleapis.com"] : [], var.dcp_deploy_data_ingestion_workflow ? [
41+
"workflows.googleapis.com",
42+
"workflowexecutions.googleapis.com"
43+
] : []))
3444

3545
service = each.key
3646
disable_on_destroy = false
@@ -71,6 +81,9 @@ module "dcp" {
7181
make_service_public = var.make_services_public
7282
deletion_protection = var.deletion_protection
7383

84+
deploy_data_ingestion_workflow = var.dcp_deploy_data_ingestion_workflow
85+
86+
7487
depends_on = [google_project_service.apis]
7588
}
7689

infra/dcp/modules/dcp/dataflow.tf

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
resource "google_workflows_workflow" "ingestion_orchestrator" {
2+
count = var.deploy_data_ingestion_workflow ? 1 : 0
3+
name = "${var.namespace}-ingestion-orchestrator"
4+
region = var.region
5+
description = "Triggers the Dataflow Flex Template Graph Ingestion Pipeline with runtime parameters"
6+
service_account = google_service_account.dcp_ingestion_runner[0].id
7+
deletion_protection = var.deletion_protection
8+
9+
source_contents = <<-EOF
10+
main:
11+
params: [input]
12+
steps:
13+
- init:
14+
assign:
15+
- project_id: '${var.project_id}'
16+
- launch_params:
17+
projectId: '$${project_id}'
18+
spannerInstanceId: '$${input.spannerInstanceId}'
19+
spannerDatabaseId: '$${input.spannerDatabaseId}'
20+
importList: '$${input.importList}'
21+
tempLocation: '$${input.tempLocation}'
22+
- run_flex_template:
23+
call: googleapis.dataflow.v1b3.projects.locations.flexTemplates.launch
24+
args:
25+
projectId: '$${project_id}'
26+
location: '$${input.region}'
27+
body:
28+
launchParameter:
29+
jobName: '$${"ingestion-job-" + string(int(sys.now()))}'
30+
containerSpecGcsPath: 'gs://datcom-templates/templates/flex/ingestion.json'
31+
parameters: '$${launch_params}'
32+
environment:
33+
serviceAccountEmail: '${google_service_account.dcp_ingestion_runner[0].email}'
34+
result: launch_result
35+
- return_result:
36+
return: '$${launch_result}'
37+
EOF
38+
}
39+
40+
# Automatically provision a GCS bucket for the customer's custom graph ingestion files, if enabled
41+
resource "google_storage_bucket" "data_ingestion_bucket" {
42+
count = var.deploy_data_ingestion_workflow && var.create_ingestion_bucket ? 1 : 0
43+
name = "${var.namespace}-ingestion-bucket-${var.project_id}"
44+
location = var.region
45+
uniform_bucket_level_access = true
46+
force_destroy = !var.deletion_protection
47+
}

infra/dcp/modules/dcp/iam.tf

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,61 @@ resource "google_cloud_run_service_iam_binding" "public_invoker" {
2121
"allUsers"
2222
]
2323
}
24+
25+
# Dedicated Service Account for running the Dataflow Ingestion pipeline
26+
resource "google_service_account" "dcp_ingestion_runner" {
27+
count = var.deploy_data_ingestion_workflow ? 1 : 0
28+
account_id = "${local.name_prefix}dcp-ingestion-sa"
29+
display_name = "Data Commons Platform Ingestion Runner"
30+
}
31+
32+
# Grant Spanner Database User access to the Ingestion runner
33+
resource "google_project_iam_member" "ingestion_spanner_user" {
34+
count = var.deploy_data_ingestion_workflow ? 1 : 0
35+
project = var.project_id
36+
role = "roles/spanner.databaseUser"
37+
member = "serviceAccount:${google_service_account.dcp_ingestion_runner[0].email}"
38+
}
39+
40+
# Grant Dataflow orchestration and Storage permissions exclusively to the new Ingestion runner
41+
resource "google_project_iam_member" "dataflow_admin" {
42+
count = var.deploy_data_ingestion_workflow ? 1 : 0
43+
project = var.project_id
44+
role = "roles/dataflow.admin"
45+
member = "serviceAccount:${google_service_account.dcp_ingestion_runner[0].email}"
46+
}
47+
48+
resource "google_project_iam_member" "dataflow_worker" {
49+
count = var.deploy_data_ingestion_workflow ? 1 : 0
50+
project = var.project_id
51+
role = "roles/dataflow.worker"
52+
member = "serviceAccount:${google_service_account.dcp_ingestion_runner[0].email}"
53+
}
54+
55+
56+
resource "google_service_account_iam_member" "service_account_user" {
57+
count = var.deploy_data_ingestion_workflow ? 1 : 0
58+
service_account_id = google_service_account.dcp_ingestion_runner[0].name
59+
role = "roles/iam.serviceAccountUser"
60+
member = "serviceAccount:${google_service_account.dcp_ingestion_runner[0].email}"
61+
}
62+
63+
# Fetch project number to reference the Workflows background Service Agent
64+
data "google_project" "project" {
65+
project_id = var.project_id
66+
}
67+
68+
resource "google_service_account_iam_member" "workflows_token_creator" {
69+
count = var.deploy_data_ingestion_workflow ? 1 : 0
70+
service_account_id = google_service_account.dcp_ingestion_runner[0].name
71+
role = "roles/iam.serviceAccountTokenCreator"
72+
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-workflows.iam.gserviceaccount.com"
73+
}
74+
75+
# Bind Object Admin access to either the newly created bucket or an explicitly reused external one
76+
resource "google_storage_bucket_iam_member" "dynamic_ingestion_bucket_access" {
77+
count = var.deploy_data_ingestion_workflow ? 1 : 0
78+
bucket = var.create_ingestion_bucket ? google_storage_bucket.data_ingestion_bucket[0].name : var.external_ingestion_bucket_name
79+
role = "roles/storage.objectAdmin"
80+
member = "serviceAccount:${google_service_account.dcp_ingestion_runner[0].email}"
81+
}

infra/dcp/modules/dcp/outputs.tf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,12 @@ output "spanner_instance_id" {
1313
output "spanner_database_id" {
1414
value = var.create_spanner_db ? (var.spanner_database_id != "" ? "${local.name_prefix}${var.spanner_database_id}" : "${local.name_prefix}dcp-db") : var.spanner_database_id
1515
}
16+
17+
output "ingestion_orchestrator_id" {
18+
description = "Fully qualified ID of the Cloud Workflows ingestion orchestrator"
19+
value = var.deploy_data_ingestion_workflow ? google_workflows_workflow.ingestion_orchestrator[0].id : null
20+
}
21+
output "data_ingestion_bucket_url" {
22+
description = "GCS path to the dynamically provisioned bucket for customer custom MCF datasets"
23+
value = var.deploy_data_ingestion_workflow && var.create_ingestion_bucket ? google_storage_bucket.data_ingestion_bucket[0].url : null
24+
}

infra/dcp/modules/dcp/variables.tf

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,23 @@ variable "make_service_public" {
9292
description = "Whether to allow unauthenticated invocations to the service"
9393
type = bool
9494
}
95+
96+
# --- Ingestion Pipeline Config ---
97+
variable "deploy_data_ingestion_workflow" {
98+
description = "Deploy the complete end-to-end Data Commons Ingestion workflow stack"
99+
type = bool
100+
}
101+
102+
103+
104+
variable "create_ingestion_bucket" {
105+
description = "Controls whether Terraform automatically provisions a dedicated staging GCS bucket for uploading graph dataset (.mcf) files"
106+
type = bool
107+
default = true
108+
}
109+
110+
variable "external_ingestion_bucket_name" {
111+
description = "Specifies an existing, external GCS bucket name to use for datasets if automatic provisioning is disabled"
112+
type = string
113+
default = ""
114+
}

infra/dcp/outputs.tf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,12 @@ output "cdc_service_url" {
1515
output "cdc_mysql_instance_connection_name" {
1616
value = var.enable_cdc ? module.cdc[0].mysql_instance_connection_name : null
1717
}
18+
19+
output "dcp_ingestion_orchestrator_id" {
20+
description = "ID of the ingestion Cloud Workflows orchestrator"
21+
value = var.enable_dcp && var.dcp_deploy_data_ingestion_workflow ? module.dcp[0].ingestion_orchestrator_id : null
22+
}
23+
output "dcp_data_ingestion_bucket_url" {
24+
description = "GCS URL pointing directly to the dynamically provisioned bucket for your input graph MCF files"
25+
value = var.enable_dcp && var.dcp_deploy_data_ingestion_workflow ? module.dcp[0].data_ingestion_bucket_url : null
26+
}

infra/dcp/variables.tf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,12 @@ variable "cdc_redis_replica_count" {
355355
type = number
356356
default = 1
357357
}
358+
359+
# --- Ingestion Pipeline Config ---
360+
variable "dcp_deploy_data_ingestion_workflow" {
361+
description = "Deploy the complete end-to-end Data Commons Ingestion workflow stack"
362+
type = bool
363+
default = false
364+
}
365+
366+

0 commit comments

Comments
 (0)