使用 Spring Integration 和 Google Cloud Pub/Sub 傳送訊息

1. 總覽

Spring 整合提供透過 MessageChannels 交換 Messages 的訊息機制。並使用管道轉接器與外部系統通訊。

在本練習中,我們將使用 Spring Cloud GCP 提供的 Spring Integration 管道轉接程式建立兩個應用程式。這些轉接器讓 Spring Integration 將 Google Cloud Pub/Sub 做為訊息交換後端。

您會瞭解如何使用 Cloud Shell 和 Cloud SDK gcloud 指令。

本教學課程使用 Spring Boot 入門指南的程式碼範例。

課程內容

  • 如何使用 Spring Integration 和 Spring Cloud GCP,使用 Google Cloud Pub/Sub 在應用程式之間交換訊息

軟硬體需求

  • Google Cloud Platform 專案
  • 瀏覽器,例如 ChromeFirefox
  • 熟悉 Vim、EMACs 或 Nano 等標準 Linux 文字編輯器

您會如何使用這個教學課程?

僅供閱讀 閱讀並完成練習

針對建立 HTML/CSS 網頁應用程式的經驗,您會給予什麼評價?

新手 中級 還算容易

根據您使用 Google Cloud Platform 服務的經驗,您會給予什麼評價?

新手 中級 還算容易

2. 設定和需求

自修環境設定

  1. 登入 Google Cloud 控制台,建立新專案或重複使用現有專案。如果您還沒有 Gmail 或 Google Workspace 帳戶,請先建立帳戶

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • 「專案名稱」是這項專案參與者的顯示名稱。這是 Google API 未使用的字元字串。你隨時可以更新。
  • 所有 Google Cloud 專案的專案 ID 均不得重複,而且設定後即無法變更。Cloud 控制台會自動產生一個不重複的字串。但通常是在乎它何在在大部分的程式碼研究室中,您必須參照專案 ID (通常為 PROJECT_ID)。如果您對產生的 ID 不滿意,可以隨機產生一個 ID。或者,您也可以自行嘗試,看看是否支援。在這個步驟後,這個名稱即無法變更,而且在專案期間內仍會保持有效。
  • 資訊中的第三個值是專案編號,部分 API 會使用這個編號。如要進一步瞭解這三個值,請參閱說明文件
  1. 接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Cloud 資源/API。執行本程式碼研究室不會產生任何費用 (如果有的話)。如要關閉資源,以免產生本教學課程結束後產生的費用,您可以刪除自己建立的資源或刪除專案。新使用者符合 $300 美元免費試用計畫的資格。

Google Cloud Shell

雖然 Google Cloud 可以在筆電上遠端操作,但在本程式碼研究室中,我們會使用 Google Cloud Shell,這是 Cloud 中運作的指令列環境。

啟用 Cloud Shell

  1. 在 Cloud 控制台中,按一下「啟用 Cloud Shell」圖示 853e55310c205094.png

55efc1aaa7a4d3ad.png

如果您是第一次啟動 Cloud Shell,系統會顯示中繼畫面,說明這項服務的內容。如果系統顯示中繼畫面,請按一下「繼續」

9c92662c6a846a5c.png

佈建並連線至 Cloud Shell 只需幾分鐘的時間。

9f0e51b578fecce5.png

這個虛擬機器已載入所有必要的開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,大幅提高網路效能和驗證能力。在本程式碼研究室中,您的大部分作業都可透過瀏覽器完成。

連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。

  1. 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list

指令輸出

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. 在 Cloud Shell 中執行下列指令,確認 gcloud 指令知道您的專案:
gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如果尚未設定,請使用下列指令進行設定:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

3. 佈建 Pub/Sub 資源

前往 Google Cloud Pub/Sub 主題頁面

按一下 [Create Topic] (建立主題)

4c938409dc7169a6.png

輸入 exampleTopic 做為主題名稱,然後按一下 [建立]

e2daeec91537f672.png

建立主題後,請留在「主題」頁面。找到您剛剛建立的主題,按下行結尾的三個垂直圓點,然後按一下「新增訂閱」

975efa26e5054936.png

在訂閱項目名稱文字方塊中輸入 exampleSubscription,然後按一下「建立」

f7a91d9e1cb48009.png

4. 初始化 Spring Boot 應用程式

Cloud Shell 啟動後,您可以使用指令列,透過 Spring Initializr 產生兩個新的 Spring Boot 應用程式:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. 建立應用程式以傳送訊息

現在,請建立訊息傳送應用程式。變更為傳送端應用程式的目錄。

$ cd spring-integration-sender

我們希望應用程式能將訊息寫入頻道。訊息進入管道後,傳出管道轉接器會擷取訊息,從一般 Spring 訊息將訊息轉換成 Google Cloud Pub/Sub 訊息,並發布至 Google Cloud Pub/Sub 主題。

我們可使用 Spring Integration 訊息閘道,在應用程式中寫入頻道。使用 vimemacsnano 的文字編輯器,在 DemoApplication 類別內宣告 PubsubOutboundGateway 介面。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

我們現在有傳送頻道訊息的機制,但是訊息張貼到頻道之後,要如何移動呢?

我們需要傳出管道轉接器,以便使用管道中的新訊息,並將這些訊息發布至 Google Cloud Pub/Sub 主題。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

@ServiceActivator 註解會使此 MessageHandler 套用至 inputChannel 中的任何新訊息。在這個範例中,我們將呼叫傳出管道轉接器 PubSubMessageHandler,以將訊息發布至 Google Cloud Pub/Sub 的 exampleTopic 主題。

完成頻道轉接程式後,我們現在可以自動連接 PubsubOutboundGateway 物件,並使用該物件為頻道撰寫訊息。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

由於有 @PostMapping 註解,我們現在有會監聽 HTTP POST 要求的端點,但不會在 DemoApplication 類別中新增 @RestController 註解,將其標示為 REST 控制器。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

確認 JAVA_HOME 已設為正確的版本。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

執行寄件者應用程式。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

應用程式正在監聽 POST 要求,其中包含來自通訊埠 8080 和端點 /postMessage 的訊息,但我們稍後會執行這項操作。

6. 建立應用程式以接收訊息

我們剛剛建立了一個透過 Google Cloud Pub/Sub 傳送訊息的應用程式。現在,我們要建立另一個應用程式,以便接收並處理這些訊息。

按一下「+」,開啟新的 Cloud Shell 工作階段。

9799bee5fea95aa6.png

然後在新的 Cloud Shell 工作階段中,將目錄變更為接收端應用程式的目錄:

$ cd spring-integration-receiver

在上一個應用程式中,訊息閘道宣告建立了我們的外送管道。由於我們不使用訊息閘道接收訊息,因此我們需要宣告自己的 MessageChannel,用來接收外來訊息。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

我們需要傳入通道轉接器,才能接收來自 Google Cloud Pub/Sub 的訊息,並將其轉發到 pubsubInputChannel

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

這個轉接器本身會繫結至 pubsubInputChannel,並監聽 Google Cloud Pub/Sub exampleSubscription 訂閱項目的新訊息。

我們有管道可以發布外來訊息,但該如何處理這些訊息?

讓我們以 @ServiceActivator (會在新訊息送達 pubsubInputChannel 時觸發) 處理郵件。在這個案例中,我們只會記錄訊息酬載。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

確認 JAVA_HOME 已設為正確的版本。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

執行接收器應用程式。

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

現在,你傳送到傳送者應用程式的所有訊息都會記錄在接收端應用程式中。如要進行測試,請開啟新的 Cloud Shell 工作階段,並向傳送端應用程式發出 HTTP POST 要求。

$ curl --data "message=Hello world!" localhost:8080/postMessage

接著,確認接收端應用程式是否記錄您傳送的訊息!

INFO: Message arrived! Payload: Hello world!

7. 清除

刪除此練習中建立的訂閱項目和主題。

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. 摘要

您設定了兩個使用 Google Cloud Pub/Sub 專用 Spring Integration Channel Adapters 的 Spring Boot 應用程式。不需要與 Google Cloud Pub/Sub API 互動,就能自我交換訊息。

9. 恭喜!

您已瞭解如何使用 Google Cloud Pub/Sub 專用的 Spring Integration Channel Adapters!

瞭解詳情

授權

這項內容採用的是創用 CC 姓名標示 2.0 通用授權。