使用 Spring Integration 和 Google Cloud Pub/Sub 进行消息传递

1. 概览

Spring Integration 为您提供了通过 MessageChannels 交换 Messages 的消息传递机制。它使用通道适配器与外部系统进行通信。

在本练习中,我们将创建两个应用,它们使用 Spring Cloud GCP 提供的 Spring Integration 通道适配器进行通信。Spring Integration 通过这些适配器使用 Google Cloud Pub/Sub 作为消息交换后端。

您将学习如何使用 Cloud Shell 和 Cloud SDK gcloud 命令。

本教程使用 Spring Boot 入门指南中的示例代码。

学习内容

  • 如何使用 Spring Integration 和 Spring Cloud GCP 在应用与 Google Cloud Pub/Sub 之间交换消息

所需条件

  • 一个 Google Cloud Platform 项目
  • 一个浏览器,例如 ChromeFirefox
  • 熟悉标准的 Linux 文本编辑器,例如 Vim、EMACs 或 Nano

您将如何使用本教程?

仅阅读教程内容 阅读并完成练习

您如何评价自己在构建 HTML/CSS Web 应用方面的经验水平?

新手水平 中等水平 熟练水平

您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?

<ph type="x-smartling-placeholder"></ph> 新手 中级 熟练

2. 设置和要求

自定进度的环境设置

  1. 登录 Google Cloud 控制台,然后创建一个新项目或重复使用现有项目。如果您还没有 Gmail 或 Google Workspace 账号,则必须创建一个

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • 项目名称是此项目参与者的显示名称。它是 Google API 尚未使用的字符串。您可以随时对其进行更新。
  • 项目 ID 在所有 Google Cloud 项目中是唯一的,并且是不可变的(一经设置便无法更改)。Cloud 控制台会自动生成一个唯一字符串;通常情况下,您无需关注该字符串。在大多数 Codelab 中,您都需要引用项目 ID(通常用 PROJECT_ID 标识)。如果您不喜欢生成的 ID,可以再随机生成一个 ID。或者,您也可以尝试自己的项目 ID,看看是否可用。完成此步骤后便无法更改该 ID,并且此 ID 在项目期间会一直保留。
  • 此外,还有第三个值,即部分 API 使用的项目编号,供您参考。如需详细了解所有这三个值,请参阅文档
  1. 接下来,您需要在 Cloud 控制台中启用结算功能,以便使用 Cloud 资源/API。运行此 Codelab 应该不会产生太多的费用(如果有的话)。若要关闭资源以避免产生超出本教程范围的结算费用,您可以删除自己创建的资源或删除项目。Google Cloud 新用户符合参与 300 美元免费试用计划的条件。

Google Cloud Shell

虽然 Google Cloud 可以通过笔记本电脑远程操作,但在此 Codelab 中,我们将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。

激活 Cloud Shell

  1. 在 Cloud Console 中,点击激活 Cloud Shell853e55310c205094

55efc1aaa7a4d3ad.png

如果这是您第一次启动 Cloud Shell,系统会显示一个中间屏幕,说明它是什么。如果您看到中间屏幕,请点击继续

9c92662c6a846a5c

预配和连接到 Cloud Shell 只需花几分钟时间。

9f0e51b578fecce5

这个虚拟机装有所需的所有开发工具。它提供了一个持久的 5 GB 主目录,并在 Google Cloud 中运行,大大增强了网络性能和身份验证功能。您在此 Codelab 中的大部分(即使不是全部)工作都可以通过浏览器完成。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设为您的项目 ID。

  1. 在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list

命令输出

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. 在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目:
gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

3. 预配 Pub/Sub 资源

导航到 Google Cloud Pub/Sub 主题页面

点击创建主题

4c938409dc7169a6

输入 exampleTopic 作为主题名称,然后点击创建

e2daeec91537f672.png

创建主题后,留在“主题”页面中。找到您刚刚创建的主题,按行末的三个垂直点,然后点击新建订阅

975efa26e5054936

在订阅名称文本框中输入 exampleSubscription,然后点击创建

f7a91d9e1cb48009.png

4. 初始化 Spring Boot 应用

在 Cloud Shell 启动后,您可以使用 Spring Initializr 通过命令行生成两个新的 Spring Boot 应用:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. 创建用于发送消息的应用

现在,我们来创建消息发送应用。切换到正在发送的应用的目录。

$ cd spring-integration-sender

我们希望应用向通道写入消息。消息进入通道后,将由出站通道适配器提取。该适配器将消息从常规 Spring 消息转换为 Google Cloud Pub/Sub 消息,并将其发布到 Google Cloud Pub/Sub 主题。

为了让应用写入通道,我们可以使用 Spring Integration 消息传递网关。使用来自 vimemacsnano 的文本编辑器在 DemoApplication 类中声明 PubsubOutboundGateway 接口。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

我们现在有一种向通道发送消息的机制,但是这些消息在进入通道后去了哪里?

我们需要一个出站通道适配器来接收通道中的新消息,并将其发布到 Google Cloud Pub/Sub 主题。

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

@ServiceActivator 注释可将此 MessageHandler 应用于 inputChannel 中的任何新消息。在本例中,我们将调用出站通道适配器 PubSubMessageHandler,将消息发布到 Google Cloud Pub/Sub 的 exampleTopic 主题。

设置好通道适配器后,我们现在可以自动连接 PubsubOutboundGateway 对象,并使用它向通道写入消息。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

得益于 @PostMapping 注释,我们现在有了一个端点来侦听 HTTP POST 请求,但还要向 DemoApplication 类添加 @RestController 注释来将其标记为 REST 控制器。

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

确保将 JAVA_HOME 设置为正确的版本。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

运行发送者应用。

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

该应用正在侦听包含端口 8080 和端点 /postMessage 上消息的 POST 请求,但我们稍后会对此进行说明。

6. 创建用于接收消息的应用

我们刚刚创建了一个通过 Google Cloud Pub/Sub 发送消息的应用。现在,我们将再创建一个接收这些消息并进行处理的应用。

点击 + 打开新的 Cloud Shell 会话。

9799bee5fea95aa6

然后,在新的 Cloud Shell 会话中,将目录更改为接收者应用的目录:

$ cd spring-integration-receiver

在上一个应用中,消息传递网关声明为我们创建了出站通道。由于我们不使用消息传递网关来接收消息,因此需要声明自己的 MessageChannel(收到的消息将到达此处)。

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

我们需要入站通道适配器接收来自 Google Cloud Pub/Sub 的消息,并将其中继到 pubsubInputChannel

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

此适配器会将其自身绑定到 pubsubInputChannel,并侦听 Google Cloud Pub/Sub exampleSubscription 订阅中的新消息。

我们有一个通道,收到的消息会发布到其中,但该如何处理这些消息呢?

让我们使用在 pubsubInputChannel 收到新消息时触发的 @ServiceActivator 来处理这些消息。在本例中,我们只需记录消息载荷。

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

确保将 JAVA_HOME 设置为正确的版本。

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

运行接收者应用。

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

现在,您发送到发送者应用的任何消息都会记录在接收者应用中。如需进行测试,请打开新的 Cloud Shell 会话,并向发送者应用发出 HTTP POST 请求。

$ curl --data "message=Hello world!" localhost:8080/postMessage

然后,验证接收者应用是否记录了您发送的消息!

INFO: Message arrived! Payload: Hello world!

7. 清理

删除在此练习中创建的订阅和主题。

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. 总结

您设置了两个 Spring Boot 应用,它们使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器。它们彼此之间交换消息,而无需与 Google Cloud Pub/Sub API 进行交互。

9. 恭喜!

您已经了解如何使用适用于 Google Cloud Pub/Sub 的 Spring Integration 通道适配器!

了解详情

许可

此作品已获得 Creative Commons Attribution 2.0 通用许可授权。