Spring Integration と Google Cloud Pub/Sub を使用したメッセージング

1. 概要

Spring Integration には、MessageChannels を介して Messages を交換するメッセージ メカニズムが用意されています。チャネル アダプターを使用して外部システムと通信します。

この演習では、Spring Cloud GCP が提供する Spring Integration チャネル アダプタを使用して通信する 2 つのアプリを作成します。これらのアダプタにより、Spring Integration では Google Cloud Pub/Sub がメッセージ交換バックエンドとして使用されます。

Cloud Shell と Cloud SDK の gcloud コマンドを使用する方法について学習します。

このチュートリアルでは、Spring Boot スタートガイドのサンプルコードを使用します。

学習内容

  • Spring Integration と Spring Cloud GCP を使用して、Google Cloud Pub/Sub とアプリ間でメッセージを交換する方法

必要なもの

  • Google Cloud Platform プロジェクト
  • ChromeFirefox などのブラウザ
  • Linux の標準的なテキスト エディタ(vim、emacs、nano など)を使い慣れていること

このチュートリアルをどのように使用しますか?

通読するのみ 通読し、演習を行う

HTML/CSS ウェブアプリの作成経験をどのように評価されますか。

初心者 中級者 上級者

Google Cloud Platform サービスのご利用経験についてどのように評価されますか?

<ph type="x-smartling-placeholder"></ph> 初心者 中級 上達 をご覧ください。

2. 設定と要件

セルフペース型の環境設定

  1. Google Cloud Console にログインして、プロジェクトを新規作成するか、既存のプロジェクトを再利用します。Gmail アカウントも Google Workspace アカウントもまだお持ちでない場合は、アカウントを作成してください。

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • プロジェクト名は、このプロジェクトの参加者に表示される名称です。Google API では使用されない文字列です。いつでも更新できます。
  • プロジェクト ID は、すべての Google Cloud プロジェクトにおいて一意でなければならず、不変です(設定後は変更できません)。Cloud コンソールでは一意の文字列が自動生成されます。通常は、この内容を意識する必要はありません。ほとんどの Codelab では、プロジェクト ID(通常は PROJECT_ID と識別されます)を参照する必要があります。生成された ID が好みではない場合は、ランダムに別の ID を生成できます。または、ご自身で試して、利用可能かどうかを確認することもできます。このステップ以降は変更できず、プロジェクトを通して同じ ID になります。
  • なお、3 つ目の値として、一部の API が使用するプロジェクト番号があります。これら 3 つの値について詳しくは、こちらのドキュメントをご覧ください。
  1. 次に、Cloud のリソースや API を使用するために、Cloud コンソールで課金を有効にする必要があります。この Codelab の操作をすべて行って、費用が生じたとしても、少額です。このチュートリアルの終了後に請求が発生しないようにリソースをシャットダウンするには、作成したリソースを削除するか、プロジェクトを削除します。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。

Google Cloud Shell

Google Cloud はノートパソコンからリモートで操作できますが、この Codelab では Cloud 上で動作するコマンドライン環境である Google Cloud Shell を使用します。

Cloud Shell をアクティブにする

  1. Cloud Console で、[Cloud Shell をアクティブにする] 853e55310c205094.png をクリックします。

55efc1aaa7a4d3ad.png

Cloud Shell を初めて起動する場合は、内容を説明する中間画面が表示されます。中間画面が表示されたら、[続行] をクリックします。

9c92662c6a846a5c.png

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

9f0e51b578fecce5.png

この仮想マシンには、必要なすべての開発ツールが読み込まれます。5 GB の永続的なホーム ディレクトリが用意されており、Google Cloud で稼働するため、ネットワークのパフォーマンスと認証が大幅に向上しています。この Codelab での作業のほとんどはブラウザを使って行うことができます。

Cloud Shell に接続すると、認証が完了し、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

  1. Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list

コマンド出力

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

3. Pub/Sub リソースをプロビジョニングする

Google Cloud Pub/Sub の [トピック] ページに移動します。

[トピックを作成] をクリックします。

4c938409dc7169a6.png

トピックの名前として「exampleTopic」と入力し、[作成] をクリックします。

e2daeec91537f672.png

トピックの作成後は、トピックページを開いたままにします。先ほど作成したトピックを見つけ、行の末尾にあるその他アイコンを押して [新規サブスクリプション] をクリックします。

975efa26e5054936.png

サブスクリプション名のテキスト ボックスに「exampleSubscription」と入力し、[作成] をクリックします。

f7a91d9e1cb48009.png

4. Spring Boot アプリケーションを初期化する

Cloud Shell が起動したら、コマンドラインを使用して、Spring Initializr で 2 つの新しい Spring Boot アプリケーションを生成できます。

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. メッセージを送信するアプリケーションを作成する

次に、メッセージ送信アプリを作成します。送信側のアプリのディレクトリに移動します。

$ cd spring-integration-sender

アプリがチャンネルにメッセージを書き込む必要があります。メッセージがチャネルに送られると、アウトバウンド チャネル アダプタが取得します。このアダプタは、メッセージを汎用の Spring メッセージから Google Cloud Pub/Sub メッセージに変換し、Google Cloud Pub/Sub トピックにパブリッシュします。

アプリをチャネルに書き込むには、Spring Integration メッセージング ゲートウェイを使用できます。vimemacs、または nano のテキスト エディタを使用して、DemoApplication クラス内で PubsubOutboundGateway インターフェースを宣言します。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

チャンネルにメッセージを送信するメカニズムができましたが、メッセージがチャンネルに追加された後、どこに行くのでしょうか?

チャネル内の新しいメッセージを消費して Google Cloud Pub/Sub トピックにパブリッシュするには、アウトバウンド チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

@ServiceActivator アノテーションにより、この MessageHandlerinputChannel 内のすべての新しいメッセージに適用されます。この例では、アウトバウンド チャネル アダプタ PubSubMessageHandler を呼び出して、メッセージを Google Cloud Pub/Sub の exampleTopic トピックにパブリッシュします。

チャネル アダプタを使用すると、PubsubOutboundGateway オブジェクトを自動接続し、それを使用してチャネルにメッセージを書き込むことができます。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

@PostMapping アノテーションによって、HTTP POST リクエストをリッスンするエンドポイントができましたが、DemoApplication クラスに @RestController アノテーションを追加して REST コントローラとしてマークすることも必要です。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

JAVA_HOME が正しいバージョンに設定されていることを確認してください。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

送信側アプリを実行します。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

このアプリは、ポート 8080 とエンドポイント /postMessage でメッセージを含む POST リクエストをリッスンしていますが、これについては後で説明します。

6. メッセージを受信するアプリケーションの作成

Google Cloud Pub/Sub を介してメッセージを送信するアプリを作成しました。次に、これらのメッセージを受信して処理する別のアプリを作成します。

[+] をクリックして、新しい Cloud Shell セッションを開きます。

9799bee5fea95aa6.png

次に、新しい Cloud Shell セッションで、レシーバーアプリのディレクトリに移動します。

$ cd spring-integration-receiver

以前のアプリでは、メッセージ ゲートウェイの宣言によって送信チャネルが作成されていました。メッセージを受信するためにメッセージング ゲートウェイを使用しないため、受信メッセージを受信する独自の MessageChannel を宣言する必要があります。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

Google Cloud Pub/Sub からメッセージを受信して pubsubInputChannel に中継するには、受信チャネル アダプタが必要です。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

このアダプタは自身を pubsubInputChannel にバインドし、Google Cloud Pub/Sub exampleSubscription サブスクリプションからの新しいメッセージをリッスンします。

受信メッセージが投稿されるチャンネルがありますが、それらのメッセージはどうすればよいでしょうか?

新しいメッセージが pubsubInputChannel に到着したときにトリガーされる @ServiceActivator を使用して処理してみましょう。この例では、メッセージ ペイロードのみをログに記録します。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

JAVA_HOME が正しいバージョンに設定されていることを確認してください。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

レシーバーアプリを実行します。

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

これで、送信者アプリに送信されたすべてのメッセージが受信アプリに記録されるようになりました。これをテストするには、新しい Cloud Shell セッションを開き、送信側アプリに HTTP POST リクエストを行います。

$ curl --data "message=Hello world!" localhost:8080/postMessage

次に、送信したメッセージが受信側のアプリでログに記録されたことを確認します。

INFO: Message arrived! Payload: Hello world!

7. クリーンアップ

この演習で作成したサブスクリプションとトピックを削除します。

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. まとめ

Google Cloud Pub/Sub 用の Spring Integration チャネル アダプタを使用する 2 つの Spring Boot アプリを設定します。開発者は Google Cloud Pub/Sub API とやり取りすることなく、メッセージを交換します。

9. 完了

Google Cloud Pub/Sub 用の Spring Integration チャネル アダプタの使用方法を学びました。

詳細

ライセンス

この作業はクリエイティブ・コモンズの表示 2.0 汎用ライセンスにより使用許諾されています。