为基于拉取的 Pub/Sub 订阅创建 Cloud Run 工作器池

1. 简介

概览

工作器池是一种 Cloud Run 资源,专门为非请求工作负载(例如拉取队列)而设计。请注意,工作器池不具备以下功能:

  • 没有端点/网址
  • 对已部署的容器在端口上侦听请求没有要求
  • 不自动扩缩

在此 Codelab 中,您将创建一个工作器池,该工作器池将从 Pub/Sub 拉取订阅中持续检索消息。您可以参阅文档此代码示例,详细了解 Pub/Sub 拉取订阅。

学习内容

  • 如何创建应用并将其部署到 Cloud Run 工作器池
  • 如何从基于拉取的 Pub/Sub 订阅中检索消息

2. 设置和要求

首先,为此 Codelab 设置环境变量:

export PROJECT_ID=<your_project_id>
export REGION=<your_region>

export WORKER_POOL_NAME=codelab-workers-pubsub
export SERVICE_ACCOUNT=worker-pools-sa
export SERVICE_ACCOUNT_EMAIL=${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
export TOPIC=pull-pubsub-topic

接下来,配置 gcloud 以使用您的项目 ID

gcloud config set project $PROJECT_ID

在开始使用此 Codelab 之前,请运行以下命令来启用以下 API:

gcloud services enable run.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com \
    pubsub.googleapis.com

运行以下命令来创建服务账号:

gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
  --display-name="Service account for worker pool codelab"

最后,为 Cloud Run 服务账号授予对 PubSub 的访问权限:

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member serviceAccount:$SERVICE_ACCOUNT_EMAIL \
    --role='roles/pubsub.admin'

3. 创建 Cloud Run 工作器池

首先,为工作器池代码创建一个目录:

mkdir codelab-worker-pools
cd codelab-worker-pools

接下来,创建一个名为 package.json 的文件

{
    "name": "codelab-worker-pools",
    "version": "1.0.0",
    "description": "A codelab example of a Cloud Run worker pool retrieving messages from a Pull-based PubSub subscription",
    "main": "index.js",
    "scripts": {
        "start": "node index.js"
    },
    "engines": {
        "node": ">=22.0.0"
    },
    "keywords": [],
    "author": "",
    "license": "ISC",
    "dependencies": {
        "@google-cloud/pubsub": "^5.1.0"
    }
}

现在,创建一个名为 index.js 的文件:

'use strict';

const subscriptionNameOrId = 'pull-pubsub-topic';

const { PubSub } = require('@google-cloud/pubsub');

// Creates a Pub/Sub client; cache this for further use.
const pubSubClient = new PubSub();

// References an existing subscription.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// This function is called when a shutdown signal is received.
const handleShutdown = async (signal) => {
  console.log(`\n${signal} signal caught. Shutting down gracefully...`);

  try {
    // 1. Stop listening for new messages. The `close()` method returns a Promise.
    console.log('Closing Pub/Sub subscription...');
    await subscription.close();
    console.log('Pub/Sub subscription closed.');

    // 2. Add any other cleanup logic here, like closing database connections.

  } catch (err) {
    console.error('Error during graceful shutdown:', err);
  } finally {
    console.log('Worker Pool exited.');
    process.exit(0);
  }
};

// Listen for termination signals.
// SIGINT handles Ctrl+C locally.
// SIGTERM handles signals from services like Cloud Run.
process.on('SIGINT', () => handleShutdown('SIGINT'));
process.on('SIGTERM', () => handleShutdown('SIGTERM'));

// ------------------ Pub/Sub Message Handling ------------------

// Create an event handler to process incoming messages.
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${JSON.stringify(message.attributes)}`);

  // Ack the message so it is not sent again.
  message.ack();
};

// Register the message handler and listen for messages.
subscription.on('message', messageHandler);

console.log(
  `Worker started. Listening for messages on "${subscriptionNameOrId}".`
);
console.log('If running locally, press Ctrl+C to quit.');

// The application will now listen for messages indefinitely until a shutdown
// signal is received.

4. 部署工作器池

运行以下命令,创建 Cloud Run 工作器池:

gcloud beta run worker-pools deploy $WORKER_POOL_NAME --region=$REGION --source .

此命令会从源代码构建映像并部署作业。这需要几分钟才能完成。

5. 向 PubSub 发布消息

创建 Pub/Sub 主题

gcloud pubsub topics create $TOPIC

创建 PubSub 拉取订阅

gcloud pubsub subscriptions create codelab-subscription --topic=$TOPIC

运行以下命令,将消息发布到您的 PubSub 主题。

gcloud pubsub topics publish $TOPIC --message "Hello Worker Pools"

检查工作器池的日志

gcloud logging read 'resource.type="cloud_run_worker_pool" AND resource.labels.worker_pool_name="'$WORKER_POOL_NAME'" AND resource.labels.location="'$REGION'"' --limit 10

您应该会在日志中看到 Hello Worker Pools

6. 删除工作器池

由于工作器池会持续运行,因此您应删除该工作器池。

gcloud beta run worker-pools delete $WORKER_POOL_NAME --region $REGION

7. 恭喜!

恭喜您完成此 Codelab!

建议您查看 Cloud Run 文档。

所学内容

  • 如何创建应用并将其部署到 Cloud Run 工作器池
  • 如何从基于拉取的 Pub/Sub 订阅中检索消息

8. 清理

如需删除 Cloud Run 工作器池,请前往 Cloud Run Cloud 控制台 (https://console.cloud.google.com/run),然后删除 codelab-workers-pubsub 工作器池。

如需删除整个项目,请前往管理资源,选择您在第 2 步中创建的项目,然后选择“删除”。如果您删除项目,则需要在 Cloud SDK 中更改项目。您可以运行 gcloud projects list 查看所有可用项目的列表。