CREMA を使用して Pub/Sub キューの量に基づいて Cloud Run ワーカープールを自動スケーリングする

1. はじめに

概要

このチュートリアルでは、Cloud Run ワーカープール(コンシューマー)をデプロイして Pub/Sub メッセージを処理し、Cloud Run 外部指標自動スケーリング(CREMA)を使用してキューの深さに基づいてコンシューマー インスタンスを自動的にスケーリングする方法について説明します。

学習内容

この Codelab では、次のことを行います。

  • Pub/Sub トピックとサブスクリプションを作成し、そのトピックにメッセージを push します。
  • Pub/Sub からメッセージを使用する Cloud Run ワーカープール(コンシューマー)をデプロイする。
  • GitHub の CREMA プロジェクトを Cloud Run サービスとしてデプロイし、Pub/Sub サブスクリプションのメッセージ数に基づいてワーカープールを自動的にスケーリングします。
  • Python スクリプトをローカルで実行して負荷を生成し、自動スケーリングの構成をテストします。

2. 環境変数を構成する

この Codelab では多くの環境変数が使用されるため、

set -u

まだ設定されていない環境変数を使用しようとすると、警告が表示されます。この設定を元に戻すには、set +u を実行します。

まず、次の変数をプロジェクト ID に変更します。

export PROJECT_ID=<YOUR_PROJECT_ID>

この Codelab のプロジェクトとして設定します。

gcloud config set project $PROJECT_ID

次に、この Codelab で使用する環境変数を設定します。

export REGION=us-central1
export TOPIC_ID=crema-pubsub-topic
export SUBSCRIPTION_ID=crema-pubsub-sub
export CREMA_SA_NAME=crema-service-account
export CONSUMER_SA_NAME=consumer-service-account
export CONSUMER_WORKER_POOL_NAME=worker-pool-consumer
export CREMA_SERVICE_NAME=my-crema-service

この Codelab のディレクトリを作成する

mkdir crema-pubsub-codelab
cd crema-pubsub-codelab

API を有効にする

gcloud services enable \
        artifactregistry.googleapis.com \
        cloudbuild.googleapis.com \
        run.googleapis.com \
        parametermanager.googleapis.com

最後に、gcloud が最新バージョンを使用していることを確認します。

gcloud components update

3. Pub/Sub の設定

ワーカープールが処理するトピックと pull サブスクリプションを作成します。Bash

トピックを作成します。

gcloud pubsub topics create $TOPIC_ID

サブスクリプションを作成します。

gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID

4. IAM とサービス アカウント

Cloud Run リソースごとにサービス アカウントを作成することをおすすめします。この Codelab では、次のものを作成します。

  • コンシューマー SA: Pub/Sub メッセージを処理するワーカー プールの ID。
  • CREMA SA: CREMA オートスケーラー サービスの ID。

サービス アカウントの作成

ワーカープールのコンシューマー SA を作成します。

gcloud iam service-accounts create $CONSUMER_SA_NAME \
  --display-name="PubSub Consumer Service Account"

ワーカープール CREMA サービス SA を作成します。

gcloud iam service-accounts create $CREMA_SA_NAME \
  --display-name="CREMA Autoscaler Service Account"

コンシューマー SA に権限を付与する

ワーカープールのコンシューマー SA に、サブスクリプションからメッセージを pull する権限を付与します。

gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
  --member="serviceAccount:$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/pubsub.subscriber"

CREMA SA への権限の付与

CREMA には、パラメータの読み取り、ワーカープールのスケーリング、Pub/Sub 指標のモニタリングを行う権限が必要です。

  1. Parameter Manager(構成リーダー)にアクセスする:
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/parametermanager.parameterViewer"
  1. ワーカープールをスケーリングする(Cloud Run デベロッパー):
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/run.developer"
  1. Pub/Sub のモニタリング:

モニタリング閲覧者のロールを付与します。

gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/monitoring.viewer"

CREMA サービス SA のサブスクリプションにポリシーを追加して表示する

gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION_ID \
  --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/pubsub.viewer"

CREMA SA には、インスタンス数の変更に必要なサービス アカウント ユーザーも必要です。

gcloud iam service-accounts add-iam-policy-binding \
    $CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
    --member="serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/iam.serviceAccountUser"

5. サービス アカウントの権限を確認する

この Codelab を進める前に、CREMA サービス SA に正しいプロジェクト レベルのロールがあることを確認してください。

gcloud projects get-iam-policy $PROJECT_ID \
  --flatten="bindings[].members" \
  --format="table(bindings.role)" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"

次の結果が得られます。

roles/monitoring.viewer
roles/parametermanager.parameterViewer
roles/run.developer

Pub/Sub サブスクリプションに、CREMA サービス SA が表示できるポリシーがあることを確認します。

gcloud pubsub subscriptions get-iam-policy $SUBSCRIPTION_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --format="table(bindings.role)"

結果は次のようになります

roles/pubsub.viewer

CREMA SA にサービス アカウント ユーザーのロールがあることを確認します。

gcloud iam service-accounts get-iam-policy \
  $CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"

次の結果が得られます。

bindings:
  members: serviceAccount:crema-service-account@<PROJECT_ID>.iam.gserviceaccount.com
  role: roles/iam.serviceAccountUser

ワーカー プール ユーザー SA に Pub/Sub サブスクライバー ロールがある

gcloud pubsub subscriptions get-iam-policy $SUBSCRIPTION_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --format="table(bindings.role)"

結果は次のようになります

ROLE
roles/pubsub.subscriber

6. コンシューマー ワーカープールをビルドしてデプロイする

コンシューマー コードのディレクトリを作成し、そのディレクトリに移動します。

mkdir consumer
cd consumer
  1. consumer.py ファイルを作成する
import os
import time
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# Configuration
PROJECT_ID = os.environ.get('PROJECT_ID')
SUBSCRIPTION_ID = os.environ.get('SUBSCRIPTION_ID')

subscription_path = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}"

print(f"Worker Pool instance starting. Watching {subscription_path}...")

subscriber = pubsub_v1.SubscriberClient()

def callback(message):
    try:
        data = message.data.decode("utf-8")
        print(f"Processing job: {data}")
        time.sleep(5)  # Simulate work
        print(f"Done {data}")
        message.ack()
    except Exception as e:
        print(f"Error processing message: {e}")
        message.nack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except Exception as e:
        print(f"Streaming pull failed: {e}")
  1. Dockerfile を作成する
FROM python:3.12-slim
RUN pip install google-cloud-pubsub
COPY consumer.py .
CMD ["python", "-u", "consumer.py"]
  1. コンシューマー ワーカープールをデプロイする

この Codelab では、最初に 0 個のインスタンスでワーカープールをデプロイすることをおすすめします。これにより、サブスクリプションで Pub/Sub メッセージが検出されたときに、CREMA がワーカープールをスケーリングする様子を確認できます。

gcloud beta run worker-pools deploy $CONSUMER_WORKER_POOL_NAME \
  --source . \
  --region $REGION \
  --service-account="$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
  --instances=0 \
  --set-env-vars PROJECT_ID=$PROJECT_ID,SUBSCRIPTION_ID=$SUBSCRIPTION_ID

7. CREMA を構成する

  1. プロジェクトのルート ディレクトリに戻ります。
cd ..
  1. 構成ファイルを作成する crema-config.yaml という名前のファイルを作成します。
apiVersion: crema/v1
kind: CremaConfig
spec:
  pollingInterval: 30
  triggerAuthentications:
    - metadata:
        name: adc-trigger-auth
      spec:
        podIdentity:
          provider: gcp
  scaledObjects:
    - spec:
        scaleTargetRef:
          name: projects/PROJECT_ID_PLACEHOLDER/locations/REGION_PLACEHOLDER/workerpools/CONSUMER_WORKER_POOL_NAME_PLACEHOLDER
        triggers:
          - type: gcp-pubsub
            metadata:
              subscriptionName: "SUBSCRIPTION_ID_PLACEHOLDER"
              # Target number of undelivered messages per worker instance
              value: "10"
              mode: "SubscriptionSize"
            authenticationRef:
              name: adc-trigger-auth
  1. 変数を置換する
sed -i "s/PROJECT_ID_PLACEHOLDER/$PROJECT_ID/g" crema-config.yaml
sed -i "s/REGION_PLACEHOLDER/$REGION/g" crema-config.yaml
sed -i "s/CONSUMER_WORKER_POOL_NAME_PLACEHOLDER/$CONSUMER_WORKER_POOL_NAME/g" crema-config.yaml
sed -i "s/SUBSCRIPTION_ID_PLACEHOLDER/$SUBSCRIPTION_ID/g" crema-config.yaml
  1. crema-config.yaml が正しいことを確認する
if grep -q "_PLACEHOLDER" crema-config.yaml; then
  echo "❌ ERROR: Validations failed. '_PLACEHOLDER' was found in crema-config.yaml."
  echo "Please check your environment variables and run the 'sed' commands again."
else
  echo "✅ Config check passed: No placeholders found."
fi
  1. Parameter Manager にアップロードする

Parameter Manager の追加の環境変数を設定する

export PARAMETER_ID=crema-config
export PARAMETER_REGION=global
export PARAMETER_VERSION=1

Parameter リソースを作成する

gcloud parametermanager parameters create $PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --parameter-format=YAML

パラメータ バージョン 1 を作成する

gcloud parametermanager parameters versions create $PARAMETER_VERSION \
  --parameter=crema-config \
  --project=$PROJECT_ID \
  --location=$PARAMETER_REGION \
  --payload-data-from-file=crema-config.yaml

パラメータが正常に追加されたことを確認する

gcloud parametermanager parameters versions list \
  --parameter=$PARAMETER_ID \
  --location=$PARAMETER_REGION

次のような出力が表示されます。

projects/<YOUR_PROJECT_ID>/locations/global/parameters/crema-config/versions/1

8. CREMA サービスをデプロイする

このセクションでは、CREMA オートスケーラー サービスをデプロイします。一般公開されている画像を使用します。

  1. CREMA に必要な環境変数を設定する
CREMA_CONFIG_PARAM_VERSION=projects/$PROJECT_ID/locations/$PARAMETER_REGION/parameters/$PARAMETER_ID/versions/$PARAMETER_VERSION
  1. バージョン名のパスを確認する
echo $CREMA_CONFIG_PARAM_VERSION

次のようになります。

projects/<YOUR_PROJECT>/locations/global/parameters/crema-config/versions/1
  1. CREMA イメージの環境変数を設定する
IMAGE=us-central1-docker.pkg.dev/cloud-run-oss-images/crema-v1/autoscaler:1.0
  1. CREMA サービスをデプロイする

ベースイメージは必須です。

gcloud beta run deploy $CREMA_SERVICE_NAME \
  --image=$IMAGE \
  --region=${REGION} \
  --service-account="${CREMA_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  --no-allow-unauthenticated \
  --no-cpu-throttling \
  --labels=created-by=crema \
  --base-image=us-central1-docker.pkg.dev/serverless-runtimes/google-24/runtimes/java25 \
  --set-env-vars="CREMA_CONFIG=${CREMA_CONFIG_PARAM_VERSION},OUTPUT_SCALER_METRICS=True,ENABLE_CLOUD_LOGGING=True"

9. 負荷テスト

  1. Pub/Sub トピックにメッセージをパブリッシュするスクリプトを作成する
touch load-pubsub.sh
  1. load-pubsub.sh ファイルに次のコードを追加します。
#!/bin/bash
TOPIC_ID=${TOPIC_ID} 
PROJECT_ID=${PROJECT_ID}
NUM_MESSAGES=100

echo "Publishing $NUM_MESSAGES messages to topic $TOPIC_ID..."

for i in $(seq 1 $NUM_MESSAGES); do
  gcloud pubsub topics publish $TOPIC_ID --message="job-$i" --project=$PROJECT_ID &
  if (( $i % 10 == 0 )); then
    wait
    echo "Published $i messages..."
  fi
done
wait
echo "Done. All messages published."
  1. 負荷テストを実行する
chmod +x load-pubsub.sh
./load-pubsub.sh
  1. スケーリングをモニタリングします。3 ~ 4 分待ちます。CREMA ログを表示して、新しい authenticationRef 構成に基づいてインスタンスが推奨されていることを確認します。
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=$CREMA_SERVICE_NAME AND textPayload:SCALER" \
  --limit=20 \
  --format="value(textPayload)" \
  --freshness=5m
  1. [Monitor Processing] で、コンシューマー ログを表示して、スピンアップしていることを確認します。
gcloud beta run worker-pools logs tail $CONSUMER_WORKER_POOL_NAME --region=$REGION

次のようなログが表示されます。

Done job-100

10. トラブルシューティング

まず、問題が CREMA サービス構成にあるのか、PubSub コンシューマー構成にあるのかを特定します。

PubSub コンシューマーのオートスケーラーを 0 ではなく 1 に設定します。すぐに Pub/Sub メッセージの処理を開始する場合は、CREMA に問題があります。pubsub メッセージが処理されない場合は、pubsub コンシューマーに問題があります。

11. 完了

以上で、この Codelab は完了です。

Cloud Run のドキュメントを確認することをおすすめします。

学習した内容

  • Pub/Sub トピックとサブスクリプションを作成し、そのトピックにメッセージを push する方法。
  • Pub/Sub からメッセージを使用する Cloud Run ワーカープール(コンシューマー)をデプロイする方法。
  • GitHub の CREMA プロジェクトを Cloud Run サービスとしてデプロイし、Pub/Sub サブスクリプションのメッセージ数に基づいてワーカープールを自動的にスケーリングする方法。
  • Python スクリプトをローカルで実行して負荷を生成し、自動スケーリングの構成をテストする方法。

12. クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、この Codelab で作成したリソースを削除するか、プロジェクト全体を削除します。

この Codelab で使用したリソースを削除する

  1. Cloud Run CREMA サービスを削除する
gcloud run services delete $CREMA_SERVICE_NAME --region=$REGION --quiet
  1. Cloud Run ワーカープールのコンシューマーを削除する
gcloud beta run worker-pools delete $CONSUMER_WORKER_POOL_NAME --region=$REGION --quiet
  1. Pub/Sub サブスクリプションとトピックを削除します。
gcloud pubsub subscriptions delete $SUBSCRIPTION_ID --quiet
gcloud pubsub topics delete $TOPIC_ID --quiet
  1. Parameter Manager の構成を削除する

パラメータ内のバージョンを削除する

gcloud parametermanager parameters versions delete $PARAMETER_VERSION \
  --parameter=$PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --quiet

空のパラメータを削除します。

gcloud parametermanager parameters delete $PARAMETER_ID \
  --location=$PARAMETER_REGION \
  --quiet
  1. サービス アカウントを削除する
gcloud iam service-accounts delete "$CREMA_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --quiet
gcloud iam service-accounts delete "$CONSUMER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --quiet

またはプロジェクト全体を削除する

プロジェクト全体を削除するには、[リソースの管理] に移動し、ステップ 2 で作成したプロジェクトを選択して、[削除] を選択します。プロジェクトを削除した場合は、Cloud SDK でプロジェクトを変更する必要があります。gcloud projects list を実行すると、使用可能なすべてのプロジェクトのリストを表示できます。