Nullstone capability that runs a Cloud Run Job or invokes a Cloud Run Service when a message is published to a Pub/Sub topic.
┌──► Cloud Workflow ──► Cloud Run Job (Job target)
Pub/Sub Topic ──► Eventarc
Pub/Sub Topic ──► Push subscription ──────────► Cloud Run Service (Service target)
The two targets take deliberately different paths:
- Job — A Cloud Run Job can't be invoked over plain HTTP, so the message is routed through Eventarc to a small Cloud Workflow, which calls the Cloud Run Jobs API and starts an execution with the message + attributes as env-var overrides.
- Service — A Cloud Run Service is just an authenticated HTTP endpoint, so Pub/Sub delivers to it directly via a native push subscription. No Eventarc, no workflow — which removes a hop of latency and the per-event Workflows cost.
The capability auto-detects which path to use by looking at the connected app:
app_metadata.job_nameset ⇒ Job targetapp_metadata.service_nameset ⇒ Service target
This module provisions the appropriate trigger (workflow + Eventarc trigger, or push subscription), a dedicated service account, and all the IAM bindings required to make the chain work.
This is a Nullstone capability. Attach it to either a Cloud Run Job application or a Cloud Run Service application in your stack.
The capability requires a connection named topic that satisfies the datastore/gcp/pubsub contract. In your Nullstone stack, wire the capability to a Pub/Sub topic datastore. The topic id and name are pulled automatically — you don't pass them as variables.
The shape of the input depends on whether your app is a Job or a Service.
The job is started with two environment variables on each execution:
| Variable | Description |
|---|---|
PUBSUB_MESSAGE |
The message payload, Base64-encoded |
PUBSUB_ATTRIBUTES |
The message attributes as a JSON object ({} if none) |
Example (Python):
import base64, json, os
payload = base64.b64decode(os.environ["PUBSUB_MESSAGE"]).decode("utf-8")
attributes = json.loads(os.environ["PUBSUB_ATTRIBUTES"])
print(f"Received: {payload} with attributes {attributes}")Note: these env vars are injected as per-execution overrides. They do not show up in the Job's static configuration in the GCP console — they are added at run time by the workflow.
The service receives a standard Pub/Sub push request: a POST / with Content-Type: application/json whose body is the Pub/Sub push envelope. The actual message data lives in message.data (Base64-encoded) — decode it yourself.
- Method + path: always
POST /. If your service expects events at a different path, route them inside your code. - Authentication: the request carries an OIDC ID token from the trigger's service account, with the service URL as the audience. If your service requires authentication (the default for new Cloud Run services), the trigger SA is granted
roles/run.invokerautomatically. - Body (Pub/Sub push envelope):
{
"message": {
"data": "aGVsbG8gd29ybGQ=",
"attributes": { "key": "value" },
"messageId": "1234567890",
"publishTime": "2026-06-02T12:00:00Z"
},
"subscription": "projects/<project>/subscriptions/<subscription>"
}Note: because the Service path uses a Pub/Sub push subscription (not Eventarc), the request does not carry CloudEvents
ce-*headers. Read everything you need from the JSON body above.
Example (Python / Flask):
import base64
from flask import Flask, request
app = Flask(__name__)
@app.post("/")
def handle_event():
envelope = request.get_json()
message = envelope["message"]
payload = base64.b64decode(message["data"]).decode("utf-8")
print(f"Received: {payload} with attributes {message.get('attributes', {})}")
return ("", 204)Example (Node.js / Express):
app.post("/", (req, res) => {
const { message } = req.body;
const payload = Buffer.from(message.data, "base64").toString("utf-8");
console.log(`Received: ${payload}`, message.attributes ?? {});
res.status(204).end();
});Return a 2xx to ack the message. Any other response (or a timeout past the subscription's ack deadline) causes Pub/Sub to redeliver per its retry policy.
| Variable | Type | Default | Description |
|---|---|---|---|
unacked_message_age_threshold |
number |
600 |
Threshold (in seconds) for the oldest-unacked-message-age alert on the delivery subscription (the Eventarc-managed subscription for Jobs, or the push subscription for Services). Only used when a notification datastore is connected; otherwise no alert policy is created. |
The Service path adds essentially nothing over Pub/Sub's own push delivery. The Job path goes through a workflow, which adds roughly ~$35 per million events (at GCP's published Workflows pricing) and ~200–600 ms of dispatch latency — the workflow is required because a Job cannot be invoked over HTTP.
- The
PUBSUB_MESSAGE/PUBSUB_ATTRIBUTESenv-var contract (Job target) is fixed. If your app needs different names, rename them inside your code. - Job workflow execution failures do not currently dead-letter — Eventarc will retry per its built-in policy, and persistent failures are visible in the Workflow execution history. The Service push subscription retries per its own retry policy but is not configured with a dead-letter topic.