Обмен сообщениями с помощью Spring Integration и Google Cloud Pub/Sub

1. Обзор

Spring Integration предоставляет вам механизм обмена Messages через MessageChannels . Он использует адаптеры каналов для связи с внешними системами.

В этом упражнении мы создадим два приложения, которые взаимодействуют с помощью адаптеров каналов Spring Integration, предоставляемых Spring Cloud GCP. Эти адаптеры позволяют Spring Integration использовать Google Cloud Pub/Sub в качестве серверной части обмена сообщениями.

Вы узнаете, как использовать Cloud Shell и команду gcloud Cloud SDK.

В этом руководстве используется пример кода из руководства Spring Boot Getting Started .

Что вы узнаете

  • Как обмениваться сообщениями между приложениями с помощью Google Cloud Pub/Sub с использованием Spring Integration и Spring Cloud GCP

Что вам понадобится

  • Проект облачной платформы Google
  • Браузер, например Chrome или Firefox.
  • Знакомство со стандартными текстовыми редакторами Linux, такими как Vim, EMAC или Nano.

Как вы будете использовать это руководство?

Прочтите только до конца Прочитайте его и выполните упражнения.

Как бы вы оценили свой опыт создания веб-приложений HTML/CSS?

Новичок Средний Опытный

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Опытный

2. Настройка и требования

Самостоятельная настройка среды

  1. Войдите в Google Cloud Console и создайте новый проект или повторно используйте существующий. Если у вас еще нет учетной записи Gmail или Google Workspace, вам необходимо ее создать .

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • Имя проекта — это отображаемое имя для участников этого проекта. Это строка символов, не используемая API Google. Вы всегда можете обновить его.
  • Идентификатор проекта уникален для всех проектов Google Cloud и является неизменяемым (невозможно изменить после его установки). Cloud Console автоматически генерирует уникальную строку; обычно тебя не волнует, что это такое. В большинстве лабораторий кода вам потребуется указать идентификатор проекта (обычно идентифицируемый как PROJECT_ID ). Если вам не нравится сгенерированный идентификатор, вы можете создать другой случайный идентификатор. Альтернативно, вы можете попробовать свой собственный и посмотреть, доступен ли он. Его нельзя изменить после этого шага и он сохраняется на протяжении всего проекта.
  • К вашему сведению, есть третье значение — номер проекта , которое используют некоторые API. Подробнее обо всех трех этих значениях читайте в документации .
  1. Затем вам необходимо включить выставление счетов в Cloud Console, чтобы использовать облачные ресурсы/API. Прохождение этой кодовой лаборатории не будет стоить много, если вообще что-то стоить. Чтобы отключить ресурсы и избежать выставления счетов за пределами этого руководства, вы можете удалить созданные вами ресурсы или удалить проект. Новые пользователи Google Cloud имеют право на участие в программе бесплатной пробной версии стоимостью 300 долларов США .

Google Cloud Shell

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой лаборатории мы будем использовать Google Cloud Shell , среду командной строки, работающую в облаке.

Активировать Cloud Shell

  1. В Cloud Console нажмите «Активировать Cloud Shell». 853e55310c205094.png .

55efc1aaa7a4d3ad.png

Если вы запускаете Cloud Shell впервые, вы увидите промежуточный экран с описанием того, что это такое. Если вам был представлен промежуточный экран, нажмите «Продолжить» .

9c92662c6a846a5c.png

Подготовка и подключение к Cloud Shell займет всего несколько минут.

9f0e51b578fecce5.png

Эта виртуальная машина загружена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог объемом 5 ГБ и работает в Google Cloud, что значительно повышает производительность сети и аутентификацию. Большую часть, если не всю, работу в этой лаборатории кода можно выполнить с помощью браузера.

После подключения к Cloud Shell вы увидите, что вы прошли аутентификацию и что для проекта установлен идентификатор вашего проекта.

  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:
gcloud auth list

Вывод команды

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Выполните следующую команду в Cloud Shell, чтобы убедиться, что команда gcloud знает о вашем проекте:
gcloud config list project

Вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить его с помощью этой команды:

gcloud config set project <PROJECT_ID>

Вывод команды

Updated property [core/project].

3. Предоставление ресурсов Pub/Sub

Перейдите на страницу тем Google Cloud Pub/Sub .

Нажмите Создать тему .

4c938409dc7169a6.png

Введите exampleTopic в качестве названия темы и нажмите «Создать» .

e2daeec91537f672.png

После создания темы оставайтесь на странице Темы. Найдите только что созданную тему, нажмите три вертикальные точки в конце строки и нажмите «Новая подписка» .

975efa26e5054936.png

Введите exampleSubscription в текстовое поле имени подписки и нажмите « Создать» .

f7a91d9e1cb48009.png

4. Инициализируйте приложения Spring Boot.

После запуска Cloud Shell вы можете использовать командную строку для создания двух новых приложений Spring Boot с помощью Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. Создайте приложение для отправки сообщений.

Теперь давайте создадим наше приложение для отправки сообщений. Перейдите в каталог отправляющего приложения.

$ cd spring-integration-sender

Мы хотим, чтобы наше приложение писало сообщения в канал. После того как сообщение окажется в канале, оно будет подхвачено адаптером исходящего канала, который преобразует его из общего сообщения Spring в сообщение Google Cloud Pub/Sub и публикует его в теме Google Cloud Pub/Sub.

Чтобы наше приложение могло писать в канал, мы можем использовать шлюз обмена сообщениями Spring Integration . Используя текстовый редактор vim , emacs или nano , объявите интерфейс PubsubOutboundGateway внутри класса DemoApplication .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

Теперь у нас есть механизм отправки сообщений в канал, но куда эти сообщения попадают после того, как попадают в канал?

Нам нужен адаптер исходящего канала, чтобы получать новые сообщения в канале и публиковать их в теме Google Cloud Pub/Sub.

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

Аннотация @ServiceActivator заставляет этот MessageHandler применяться к любым новым сообщениям в inputChannel . В этом случае мы вызываем наш адаптер исходящего канала PubSubMessageHandler , чтобы опубликовать сообщение в теме exampleTopic Google Cloud Pub/Sub.

Имея адаптер канала, мы можем автоматически подключить объект PubsubOutboundGateway и использовать его для записи сообщения в канал.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

Благодаря аннотации @PostMapping у нас теперь есть конечная точка, которая прослушивает HTTP-запросы POST, но не без добавления аннотации @RestController к классу DemoApplication , чтобы пометить ее как контроллер REST.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

Убедитесь, что для JAVA_HOME установлена ​​правильная версия.

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

Запустите приложение отправителя.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

Приложение прослушивает запросы POST, содержащие сообщение на порту 8080 и конечной точке /postMessage , но мы вернемся к этому позже.

6. Создайте приложение для получения сообщений.

Мы только что создали приложение, которое отправляет сообщения через Google Cloud Pub/Sub. Теперь мы создадим еще одно приложение, которое будет получать эти сообщения и обрабатывать их.

Нажмите + , чтобы открыть новый сеанс Cloud Shell.

9799bee5fea95aa6.png

Затем в новом сеансе Cloud Shell измените каталог на каталог приложения-получателя:

$ cd spring-integration-receiver

В предыдущем приложении объявление шлюза обмена сообщениями создало для нас исходящий канал. Поскольку мы не используем шлюз обмена сообщениями для получения сообщений, нам необходимо объявить собственный MessageChannel , куда будут поступать входящие сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

Нам понадобится адаптер входящего канала для получения сообщений от Google Cloud Pub/Sub и их ретрансляции в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

Этот адаптер привязывается к pubsubInputChannel и прослушивает новые сообщения из подписки Google Cloud Pub/Sub exampleSubscription .

У нас есть канал, куда публикуются входящие сообщения, но что с этими сообщениями делать?

Давайте обработаем их с помощью @ServiceActivator , который срабатывает, когда новые сообщения поступают в pubsubInputChannel . В этом случае мы просто зарегистрируем полезную нагрузку сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

Убедитесь, что для JAVA_HOME установлена ​​правильная версия.

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

Запустите приложение-приемник.

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

Теперь любые сообщения, которые вы отправляете в приложение-отправитель, будут регистрироваться в приложении-получателе. Чтобы проверить это, откройте новый сеанс Cloud Shell и отправьте запрос HTTP POST приложению-отправителю.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Затем убедитесь, что приложение-получатель зарегистрировало отправленное вами сообщение!

INFO: Message arrived! Payload: Hello world!

7. Очистка

Удалите подписку и тему, созданные в ходе этого упражнения.

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. Резюме

Вы настраиваете два приложения Spring Boot, которые используют адаптеры канала интеграции Spring для Google Cloud Pub/Sub. Они обмениваются сообщениями между собой, даже не взаимодействуя с API Google Cloud Pub/Sub.

9. Поздравляем!

Вы узнали, как использовать адаптеры канала интеграции Spring для Google Cloud Pub/Sub!

Узнать больше

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 2.0 Generic License.