使用 CREMA 根据 Pub/Sub 队列容量自动扩缩 Cloud Run 工作器池

1. 简介

概览

本教程介绍如何部署 Cloud Run 工作器池(消费者)来处理 Pub/Sub 消息,以及如何使用 Cloud Run 外部指标自动扩缩 (CREMA) 根据队列深度自动扩缩消费者实例。

学习内容

在此 Codelab 中,您将执行以下操作:

  • 创建 Pub/Sub 主题和订阅,并将消息推送到该主题。
  • 部署一个 Cloud Run 工作器池(消费者),用于接收来自 Pub/Sub 的消息。
  • 将 GitHub 上的 CREMA 项目部署为 Cloud Run 服务,以根据 Pub/Sub 订阅中的消息数量自动扩缩工作器池。
  • 通过在本地运行 Python 脚本来生成负载,以测试自动扩缩配置。

2. 配置环境变量

由于本 Codelab 中会用到许多环境变量,因此我们建议运行

set -u

如果您尝试使用尚未设置的环境变量,该功能会向您发出警告。如需撤消此设置,请运行 set +u

首先,将以下变量更改为您的项目 ID。

export PROJECT_ID=<YOUR_PROJECT_ID>

然后将其设置为此 Codelab 的项目。

gcloud config set project $PROJECT_ID

接下来,设置此 Codelab 使用的环境变量。

export REGION=us-central1
export TOPIC_ID=crema-pubsub-topic
export SUBSCRIPTION_ID=crema-pubsub-sub
export CREMA_SA_NAME=crema-service-account
export CONSUMER_SA_NAME=consumer-service-account
export CONSUMER_WORKER_POOL_NAME=worker-pool-consumer
export CREMA_SERVICE_NAME=my-crema-service

为此 Codelab 创建一个目录

mkdir crema-pubsub-codelab
cd crema-pubsub-codelab

启用 API

gcloud services enable \
        artifactregistry.googleapis.com \
        cloudbuild.googleapis.com \
        run.googleapis.com \
        parametermanager.googleapis.com

最后,请确保您的 gcloud 使用的是最新版本

gcloud components update

3. Pub/Sub 设置

创建主题和工作器池将处理的拉取订阅。Bash

创建主题。

gcloud pubsub topics create $TOPIC_ID

创建订阅。

gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID

4. IAM 和服务账号

建议为每个 Cloud Run 资源创建一个服务账号。在此 Codelab 中,您将创建以下内容:

  • 消费者 SA:处理 Pub/Sub 消息的工作器池的身份。
  • CREMA SA:CREMA 自动扩缩器服务的身份。

Create Service Accounts

创建工作器池使用方 SA:

gcloud iam service-accounts create $CONSUMER_SA_NAME \
  --display-name="PubSub Consumer Service Account"

创建工作器池 CREMA 服务 SA:

gcloud iam service-accounts create $CREMA_SA_NAME \
  --display-name="CREMA Autoscaler Service Account"

向消费者 SA 授予权限

向工作器池使用方 SA 授予从订阅中拉取消息的权限。

gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
  --member="serviceAccount:$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/pubsub.subscriber"

向 CREMA SA 授予权限

CREMA 需要获得读取参数、扩缩工作器池和监控 Pub/Sub 指标的权限。

  1. 访问 Parameter Manager(配置读取者):
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/parametermanager.parameterViewer"
  1. 扩缩工作器池(Cloud Run Developer):
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/run.developer"
  1. 监控 Pub/Sub:

授予监控查看者角色。

gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer"

向 CREMA 服务 SA 的订阅添加政策以查看该订阅

gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/pubsub.viewer"

CREMA SA 还需具有“服务账号用户”角色,该角色是更改实例数量所必需的:

gcloud iam service-accounts add-iam-policy-binding \
    $CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
    --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

5. 验证服务账号权限

在继续学习此 Codelab 之前,请验证 CREMA 服务 SA 是否具有正确的项目级角色。

gcloud projects get-iam-policy $PROJECT_ID \
  --flatten="bindings[].members" \
  --format="table(bindings.role)" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"

应产生以下结果:

roles/monitoring.viewer
roles/parametermanager.parameterViewer
roles/run.developer

验证 Pub/Sub 订阅是否具有允许 CREMA 服务 SA 查看的政策。

gcloud pubsub subscriptions get-iam-policy $SUBSCRIPTION_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --format="table(bindings.role)"

应导致

roles/pubsub.viewer

并验证 CREMA SA 是否具有 Service Account User 角色

gcloud iam service-accounts get-iam-policy \
  $CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"

应生成以下结果

bindings:
  members: serviceAccount:crema-service-account@<PROJECT_ID>.iam.gserviceaccount.com
  role: roles/iam.serviceAccountUser

并且 Worker Pool Consumer SA 具有 Pub/Sub 订阅者角色

gcloud pubsub subscriptions get-iam-policy $SUBSCRIPTION_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --format="table(bindings.role)"

应导致

ROLE
roles/pubsub.subscriber

6. 构建和部署消费者工作器池

为您的消费者代码创建一个目录,然后进入该目录。

mkdir consumer
cd consumer
  1. 创建 consumer.py 文件
import os
import time
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# Configuration
PROJECT_ID = os.environ.get('PROJECT_ID')
SUBSCRIPTION_ID = os.environ.get('SUBSCRIPTION_ID')

subscription_path = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}"

print(f"Worker Pool instance starting. Watching {subscription_path}...")

subscriber = pubsub_v1.SubscriberClient()

def callback(message):
    try:
        data = message.data.decode("utf-8")
        print(f"Processing job: {data}")
        time.sleep(5)  # Simulate work
        print(f"Done {data}")
        message.ack()
    except Exception as e:
        print(f"Error processing message: {e}")
        message.nack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except Exception as e:
        print(f"Streaming pull failed: {e}")
  1. 创建 Dockerfile
FROM python:3.12-slim
RUN pip install google-cloud-pubsub
COPY consumer.py .
CMD ["python", "-u", "consumer.py"]
  1. 部署使用方工作器池

此 Codelab 建议先部署包含 0 个实例的工作器池,这样您就可以在 CREMA 检测到订阅中的 Pub/Sub 消息时,观察它如何扩缩工作器池。

gcloud beta run worker-pools deploy $CONSUMER_WORKER_POOL_NAME \
  --source . \
  --region $REGION \
  --service-account="$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --instances=0 \
  --set-env-vars PROJECT_ID=$PROJECT_ID,SUBSCRIPTION_ID=$SUBSCRIPTION_ID

7. 配置 CREMA

  1. 导航回项目的根目录。
cd ..
  1. 创建配置文件 创建一个名为 crema-config.yaml 的文件
apiVersion: crema/v1
kind: CremaConfig
spec:
  pollingInterval: 30
  triggerAuthentications:
    - metadata:
        name: adc-trigger-auth
      spec:
        podIdentity:
          provider: gcp
  scaledObjects:
    - spec:
        scaleTargetRef:
          name: projects/PROJECT_ID_PLACEHOLDER/locations/REGION_PLACEHOLDER/workerpools/CONSUMER_WORKER_POOL_NAME_PLACEHOLDER
        triggers:
          - type: gcp-pubsub
            metadata:
              subscriptionName: "SUBSCRIPTION_ID_PLACEHOLDER"
              # Target number of undelivered messages per worker instance
              value: "10"
              mode: "SubscriptionSize"
            authenticationRef:
              name: adc-trigger-auth
  1. 替换变量
sed -i "s/PROJECT_ID_PLACEHOLDER/$PROJECT_ID/g" crema-config.yaml
sed -i "s/REGION_PLACEHOLDER/$REGION/g" crema-config.yaml
sed -i "s/CONSUMER_WORKER_POOL_NAME_PLACEHOLDER/$CONSUMER_WORKER_POOL_NAME/g" crema-config.yaml
sed -i "s/SUBSCRIPTION_ID_PLACEHOLDER/$SUBSCRIPTION_ID/g" crema-config.yaml
  1. 验证您的 crema-config.yaml 是否正确
if grep -q "_PLACEHOLDER" crema-config.yaml; then
  echo "❌ ERROR: Validations failed. '_PLACEHOLDER' was found in crema-config.yaml."
  echo "Please check your environment variables and run the 'sed' commands again."
else
  echo "✅ Config check passed: No placeholders found."
fi
  1. 上传到 Parameter Manager

为参数管理器设置其他环境变量

export PARAMETER_ID=crema-config
export PARAMETER_REGION=global
export PARAMETER_VERSION=1

创建 Parameter 资源

gcloud parametermanager parameters create $PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --parameter-format=YAML

创建参数版本 1

gcloud parametermanager parameters versions create $PARAMETER_VERSION \
  --parameter=crema-config \
  --project=$PROJECT_ID \
  --location=$PARAMETER_REGION \
  --payload-data-from-file=crema-config.yaml

验证参数是否已成功添加

gcloud parametermanager parameters versions list \
  --parameter=$PARAMETER_ID \
  --location=$PARAMETER_REGION

您应该会看到类似如下内容:

projects/<YOUR_PROJECT_ID>/locations/global/parameters/crema-config/versions/1

8. 部署 CREMA 服务

在本部分中,您将部署 CREMA 自动扩缩器服务。您将使用公开提供的图片。

  1. 设置 CREMA 所需的环境变量
CREMA_CONFIG_PARAM_VERSION=projects/$PROJECT_ID/locations/$PARAMETER_REGION/parameters/$PARAMETER_ID/versions/$PARAMETER_VERSION
  1. 验证版本名称路径
echo $CREMA_CONFIG_PARAM_VERSION

它应如下所示

projects/<YOUR_PROJECT>/locations/global/parameters/crema-config/versions/1
  1. 为 CREMA 映像设置了环境变量
IMAGE=us-central1-docker.pkg.dev/cloud-run-oss-images/crema-v1/autoscaler:1.0
  1. 并部署 CREMA 服务

请注意,必须指定基础映像。

gcloud beta run deploy $CREMA_SERVICE_NAME \
  --image=$IMAGE \
  --region=${REGION} \
  --service-account="${CREMA_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  --no-allow-unauthenticated \
  --no-cpu-throttling \
  --labels=created-by=crema \
  --base-image=us-central1-docker.pkg.dev/serverless-runtimes/google-24/runtimes/java25 \
  --set-env-vars="CREMA_CONFIG=${CREMA_CONFIG_PARAM_VERSION},OUTPUT_SCALER_METRICS=True,ENABLE_CLOUD_LOGGING=True"

9. 负载测试

  1. 创建用于将消息发布到 Pub/Sub 主题的脚本
touch load-pubsub.sh
  1. 将以下代码添加到 load-pubsub.sh 文件中
#!/bin/bash
TOPIC_ID=${TOPIC_ID} 
PROJECT_ID=${PROJECT_ID}
NUM_MESSAGES=100

echo "Publishing $NUM_MESSAGES messages to topic $TOPIC_ID..."

for i in $(seq 1 $NUM_MESSAGES); do
  gcloud pubsub topics publish $TOPIC_ID --message="job-$i" --project=$PROJECT_ID &
  if (( $i % 10 == 0 )); then
    wait
    echo "Published $i messages..."
  fi
done
wait
echo "Done. All messages published."
  1. 运行负载测试
chmod +x load-pubsub.sh
./load-pubsub.sh
  1. 监控扩缩等待 3-4 分钟。查看 CREMA 日志,了解它是否根据新的 authenticationRef 配置推荐了实例。
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=$CREMA_SERVICE_NAME AND textPayload:SCALER" \
  --limit=20 \
  --format="value(textPayload)" \
  --freshness=5m
  1. 监控处理查看消费者的日志,看看它是否正在启动。
gcloud beta run worker-pools logs tail $CONSUMER_WORKER_POOL_NAME --region=$REGION

您应该会看到如下所示的日志

Done job-100

10. 问题排查

首先,您需要确定问题是出在 CREMA 服务配置上,还是出在 PubSub 使用者配置上。

将 PubSub 消费者自动扩缩器设置为 1,而不是 0。如果它立即开始处理 Pub/Sub 消息,则表示 CREMA 存在问题。如果它不处理 pubsub 消息,则 pubsub 消费者存在问题。

11. 恭喜!

恭喜您完成此 Codelab!

建议您查看 Cloud Run 文档。

所学内容

  • 如何创建 Pub/Sub 主题和订阅,以及如何将消息推送到该主题。
  • 如何部署从 Pub/Sub 接收消息的 Cloud Run 工作器池(消费者)。
  • 如何将 GitHub 上的 CREMA 项目部署为 Cloud Run 服务,以根据 Pub/Sub 订阅中的消息数量自动扩缩工作器池。
  • 如何通过在本地运行 Python 脚本来生成负载,从而测试自动扩缩配置。

12. 清理

为避免系统因本教程中使用的资源向您的 Google Cloud 账号收取费用,您可以删除在此 Codelab 中创建的资源,也可以删除整个项目。

删除本 Codelab 中使用的资源

  1. 删除 Cloud Run CREMA 服务
gcloud run services delete $CREMA_SERVICE_NAME --region=$REGION --quiet
  1. 删除 Cloud Run 工作器池使用方
gcloud beta run worker-pools delete $CONSUMER_WORKER_POOL_NAME --region=$REGION --quiet
  1. 删除 Pub/Sub 订阅和主题
gcloud pubsub subscriptions delete $SUBSCRIPTION_ID --quiet
gcloud pubsub topics delete $TOPIC_ID --quiet
  1. 删除 Parameter Manager 配置

删除参数中的版本

gcloud parametermanager parameters versions delete $PARAMETER_VERSION \
  --parameter=$PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --quiet

现在删除空参数

gcloud parametermanager parameters delete $PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --quiet
  1. 删除服务账号
gcloud iam service-accounts delete "$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --quiet
gcloud iam service-accounts delete "$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --quiet

或者删除整个项目

如需删除整个项目,请前往管理资源,选择您在第 2 步中创建的项目,然后选择“删除”。如果您删除项目,则需要在 Cloud SDK 中更改项目。您可以运行 gcloud projects list 查看所有可用项目的列表。