Spring Integration と Google Cloud Pub/Sub を使用したメッセージング

1. 概要

Spring Integration は、MessageChannels を介して Messages を交換するためのメッセージ メカニズムを提供します。チャネル アダプタを使用して外部システムと通信します。

この演習では、Spring Cloud GCP で提供される Spring 統合チャネル アダプタを使用して通信する 2 つのアプリを作成します。これらのアダプタにより、Spring 統合はメッセージ交換バックエンドとして Google Cloud Pub/Sub を使用します。

Cloud Shell と Cloud SDK の gcloud コマンドを使用する方法について学習します。

このチュートリアルでは、Spring Boot スタートガイドのサンプルコードを使用します。

学習内容

  • Spring Integration と Spring Cloud GCP を使用して Google Cloud Pub/Sub でアプリ間でメッセージを交換する方法

必要なもの

  • Google Cloud Platform プロジェクト
  • ChromeFirefox などのブラウザ
  • Linux の標準的なテキスト エディタ(vim、emacs、nano など)を使い慣れていること

このチュートリアルをどのように使用されますか?

通読するのみ 通読し、演習を行う

HTML/CSS ウェブアプリの作成経験についてお答えください。

初心者 中級者 上級者

Google Cloud Platform サービスのご利用経験についてどのように評価されますか?

初心者 中級者 上級者

2. 設定と要件

セルフペース型の環境設定

  1. Google Cloud Console にログインして、プロジェクトを新規作成するか、既存のプロジェクトを再利用します。Gmail アカウントも Google Workspace アカウントもまだお持ちでない場合は、アカウントを作成してください。

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • プロジェクト名は、このプロジェクトの参加者に表示される名称です。Google API では使用されない文字列です。いつでも更新できます。
  • プロジェクト ID は、すべての Google Cloud プロジェクトにおいて一意でなければならず、不変です(設定後は変更できません)。Cloud コンソールでは一意の文字列が自動生成されます。通常は、この内容を意識する必要はありません。ほとんどの Codelab では、プロジェクト ID(通常は PROJECT_ID と識別されます)を参照する必要があります。生成された ID が好みではない場合は、ランダムに別の ID を生成できます。または、ご自身で試して、利用可能かどうかを確認することもできます。このステップ以降は変更できず、プロジェクトを通して同じ ID になります。
  • なお、3 つ目の値として、一部の API が使用するプロジェクト番号があります。これら 3 つの値について詳しくは、こちらのドキュメントをご覧ください。
  1. 次に、Cloud のリソースや API を使用するために、Cloud コンソールで課金を有効にする必要があります。この Codelab の操作をすべて行って、費用が生じたとしても、少額です。このチュートリアルの終了後に請求が発生しないようにリソースをシャットダウンするには、作成したリソースを削除するか、プロジェクトを削除します。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。

Google Cloud Shell

Google Cloud はノートパソコンからリモートで操作できますが、この Codelab では、Google Cloud Shell(Cloud 上で動作するコマンドライン環境)を使用します。

Cloud Shell をアクティブにする

  1. Cloud Console で、[Cloud Shell をアクティブにする] 853e55310c205094.png をクリックします。

55efc1aaa7a4d3ad.png

Cloud Shell を初めて起動する場合は、その内容を説明する中間画面が表示されます。中間画面が表示された場合は、[続行] をクリックします。

9c92662c6a846a5c.png

すぐにプロビジョニングが実行され、Cloud Shell に接続されます。

9f0e51b578fecce5.png

この仮想マシンには、必要な開発ツールがすべて用意されています。仮想マシンは Google Cloud で稼働し、永続的なホーム ディレクトリが 5 GB 用意されているため、ネットワークのパフォーマンスと認証が大幅に向上しています。このコードラボで行う作業のほとんどはブラウザから実行できます。

Cloud Shell に接続すると、認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

  1. Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list

コマンド出力

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

3. Pub/Sub リソースをプロビジョニングする

Google Cloud Pub/Sub の [トピック] ページに移動します。

[トピックを作成] をクリックします。

4c938409dc7169a6.png

トピックの名前として「exampleTopic」と入力し、[作成] をクリックします。

e2daeec91537f672.png

トピックを作成したら、[トピック] ページにとどまります。作成したトピックを探し、行の末尾にあるその他アイコンをクリックして、[新しいサブスクリプション] をクリックします。

975efa26e5054936.png

サブスクリプション名のテキスト ボックスに「exampleSubscription」と入力し、[作成] をクリックします。

f7a91d9e1cb48009.png

4. Spring Boot アプリケーションを初期化する

Cloud Shell の起動後に以下のコマンドラインを使用すると、Spring Initializr を使用して 2 つの新しい Spring Boot アプリケーションを生成できます。

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. メッセージを送信するアプリケーションを作成する

次に、メッセージ送信アプリを作成します。送信アプリのディレクトリに移動します。

$ cd spring-integration-sender

アプリがチャンネルにメッセージを書き込めるようにします。メッセージがチャネルに入ると、送信チャネル アダプタによって取得され、汎用 Spring メッセージから Google Cloud Pub/Sub メッセージに変換されて、Google Cloud Pub/Sub トピックにパブリッシュされます。

アプリがチャネルに書き込むには、Spring 統合メッセージング ゲートウェイを使用します。vimemacsnano のテキスト エディタを使用して、DemoApplication クラス内に PubsubOutboundGateway インターフェースを宣言します。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

これで、チャンネルにメッセージを送信するメカニズムができました。しかし、チャンネルに送信されたメッセージはその後どうなるのでしょうか?

チャネル内の新しいメッセージを消費して Google Cloud Pub/Sub トピックにパブリッシュするには、送信チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

@ServiceActivator アノテーションにより、この MessageHandlerinputChannel の新しいメッセージに適用されます。この例では、送信チャネル アダプタ PubSubMessageHandler を呼び出して、Google Cloud Pub/Sub の exampleTopic トピックにメッセージをパブリッシュしています。

チャネル アダプタを配置したので、PubsubOutboundGateway オブジェクトを自動配線して、チャネルにメッセージを書き込むために使用できるようになりました。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

@PostMapping アノテーションにより、HTTP POST リクエストをリッスンするエンドポイントが作成されました。ただし、DemoApplication クラスに @RestController アノテーションを追加して REST コントローラとしてマークすることも必要です。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

JAVA_HOME が正しいバージョンに設定されていることを確認します。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

送信側アプリを実行します。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

このアプリは、ポート 8080 とエンドポイント /postMessage でメッセージを含む POST リクエストをリッスンしています。これについては後で説明します。

6. メッセージを受信するアプリケーションを作成する

Google Cloud Pub/Sub を介してメッセージを送信するアプリを作成しました。次に、これらのメッセージを受信して処理する別のアプリを作成します。

[+] をクリックして、新しい Cloud Shell セッションを開きます。

9799bee5fea95aa6.png

次に、新しい Cloud Shell セッションで、ディレクトリを受信側アプリのディレクトリに変更します。

$ cd spring-integration-receiver

前のアプリでは、メッセージング ゲートウェイの宣言によって送信チャンネルが作成されました。メッセージを受信するためにメッセージング ゲートウェイを使用しないため、受信メッセージが届く独自の MessageChannel を宣言する必要があります。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

Google Cloud Pub/Sub からメッセージを受信して pubsubInputChannel にリレーするには、インバウンド チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

このアダプターは pubsubInputChannel にバインドされ、Google Cloud Pub/Sub exampleSubscription サブスクリプションからの新しいメッセージをリッスンします。

受信メッセージが投稿されるチャネルがありますが、これらのメッセージをどうすればよいでしょうか?

pubsubInputChannel に新しいメッセージが届いたときにトリガーされる @ServiceActivator で処理しましょう。この場合、メッセージ ペイロードをログに記録するだけです。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

JAVA_HOME が正しいバージョンに設定されていることを確認します。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

レシーバー アプリを実行します。

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

これで、送信側アプリに送信したメッセージは受信側アプリに記録されます。これをテストするには、新しい Cloud Shell セッションを開き、送信側アプリに HTTP POST リクエストを行います。

$ curl --data "message=Hello world!" localhost:8080/postMessage

次に、受信側アプリが送信したメッセージをログに記録したことを確認します。

INFO: Message arrived! Payload: Hello world!

7. クリーンアップ

この演習で作成したサブスクリプションとトピックを削除します。

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. まとめ

Google Cloud Pub/Sub 用 Spring 統合チャネル アダプタを使用する 2 つの Spring Boot アプリを設定します。これらの関数は、Google Cloud Pub/Sub API とやり取りすることなく、相互にメッセージを交換します。

9. 完了

Google Cloud Pub/Sub 用の Spring 統合チャネル アダプタの使用方法を学習しました。

詳細

ライセンス

この作業はクリエイティブ・コモンズの表示 2.0 汎用ライセンスにより使用許諾されています。