Обмен сообщениями с помощью Spring Integration и Google Cloud Pub/Sub

1. Обзор

Spring Integration предоставляет механизм обмена Messages через MessageChannels . Он использует адаптеры каналов для связи с внешними системами.

В этом упражнении мы создадим два приложения, которые будут взаимодействовать, используя адаптеры каналов Spring Integration, предоставляемые Spring Cloud GCP. Эти адаптеры позволяют Spring Integration использовать Google Cloud Pub/Sub в качестве бэкенда для обмена сообщениями.

Вы научитесь использовать Cloud Shell и команду gcloud из Cloud SDK.

В этом руководстве используется пример кода из руководства по началу работы со Spring Boot .

Что вы узнаете

  • Как обмениваться сообщениями между приложениями с помощью Google Cloud Pub/Sub, используя Spring Integration и Spring Cloud GCP.

Что вам понадобится

  • Проект Google Cloud Platform
  • Браузер, например Chrome или Firefox.
  • Знание стандартных текстовых редакторов Linux, таких как Vim, EMACs или Nano.

Как вы будете использовать этот учебник?

Прочитайте только от начала до конца. Прочитайте текст и выполните упражнения.

Как бы вы оценили свой опыт разработки веб-приложений на HTML/CSS?

Новичок Средний Профессионал

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Профессионал

2. Настройка и требования

Настройка среды для самостоятельного обучения

  1. Войдите в консоль Google Cloud и создайте новый проект или используйте существующий. Если у вас еще нет учетной записи Gmail или Google Workspace, вам необходимо ее создать .

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • Название проекта — это отображаемое имя участников данного проекта. Это строка символов, не используемая API Google. Вы всегда можете его изменить.
  • Идентификатор проекта уникален для всех проектов Google Cloud и является неизменяемым (его нельзя изменить после установки). Консоль Cloud автоматически генерирует уникальную строку; обычно вам неважно, какая она. В большинстве практических заданий вам потребуется указать идентификатор вашего проекта (обычно обозначается как PROJECT_ID ). Если сгенерированный идентификатор вас не устраивает, вы можете сгенерировать другой случайный идентификатор. В качестве альтернативы вы можете попробовать свой собственный и посмотреть, доступен ли он. После этого шага его нельзя изменить, и он сохраняется на протяжении всего проекта.
  • К вашему сведению, существует третье значение — номер проекта , которое используется некоторыми API. Подробнее обо всех трех значениях можно узнать в документации .
  1. Далее вам потребуется включить оплату в консоли Cloud для использования ресурсов/API Cloud. Выполнение этого практического задания не потребует больших затрат, если вообще потребует. Чтобы отключить ресурсы и избежать дополнительных расходов после завершения этого урока, вы можете удалить созданные ресурсы или удалить проект. Новые пользователи Google Cloud имеют право на бесплатную пробную версию стоимостью 300 долларов США .

Google Cloud Shell

Хотя Google Cloud можно запускать удалённо с ноутбука, в этом практическом занятии мы будем использовать Google Cloud Shell — среду командной строки, работающую в облаке.

Активировать Cloud Shell

  1. В консоли Cloud нажмите «Активировать Cloud Shell» . 853e55310c205094.png .

55efc1aaa7a4d3ad.png

Если вы запускаете Cloud Shell впервые, вам будет показан промежуточный экран с описанием его возможностей. Если вам был показан промежуточный экран, нажмите «Продолжить» .

9c92662c6a846a5c.png

Подготовка и подключение к Cloud Shell займут всего несколько минут.

9f0e51b578fecce5.png

Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Она предоставляет постоянный домашний каталог объемом 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Большая часть, если не вся, ваша работа в этом практическом задании может быть выполнена с помощью браузера.

После подключения к Cloud Shell вы увидите, что прошли аутентификацию и что проект настроен на ваш идентификатор проекта.

  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить свою аутентификацию:
gcloud auth list

вывод команды

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Выполните следующую команду в Cloud Shell, чтобы убедиться, что команда gcloud знает о вашем проекте:
gcloud config list project

вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить это с помощью следующей команды:

gcloud config set project <PROJECT_ID>

вывод команды

Updated property [core/project].

3. Предоставление ресурсов для публикации/подписки.

Перейдите на страницу тем Google Cloud Pub/Sub .

Нажмите «Создать тему» .

4c938409dc7169a6.png

Введите exampleTopic в качестве имени темы и нажмите «Создать» .

e2daeec91537f672.png

После создания темы оставайтесь на странице «Темы». Найдите только что созданную тему, нажмите на три вертикальные точки в конце строки и выберите «Новая подписка» .

975efa26e5054936.png

Введите exampleSubscription в текстовое поле с названием подписки и нажмите «Создать» .

f7a91d9e1cb48009.png

4. Инициализация приложений Spring Boot.

После запуска Cloud Shell вы можете использовать командную строку для генерации двух новых приложений Spring Boot с помощью Spring Initializr:

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-sender | tar -xzvf -

$ curl https://start.spring.io/starter.tgz \
  -d bootVersion=3.0.5 \
  -d dependencies=web,integration,cloud-gcp-pubsub \
  -d type=maven-project \
  -d baseDir=spring-integration-receiver | tar -xzvf -

5. Создайте приложение для отправки сообщений.

Теперь давайте создадим наше приложение для отправки сообщений. Перейдите в каталог приложения для отправки.

$ cd spring-integration-sender

Мы хотим, чтобы наше приложение отправляло сообщения в канал. После того, как сообщение окажется в канале, его обработает адаптер исходящего канала, который преобразует его из стандартного сообщения Spring в сообщение Google Cloud Pub/Sub и опубликует в тему Google Cloud Pub/Sub.

Для того чтобы наше приложение могло записывать данные в канал, мы можем использовать шлюз обмена сообщениями Spring Integration . Используя текстовый редактор ( vim , emacs или nano , объявите интерфейс PubsubOutboundGateway внутри класса DemoApplication .

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.integration.annotation.MessagingGateway;

@SpringBootApplication
public class DemoApplication {

  ...

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway {
    void sendToPubsub(String text);
  }
}

Теперь у нас есть механизм для отправки сообщений в канал, но куда эти сообщения отправляются после того, как они попадают в канал?

Нам нужен адаптер исходящего канала для приема новых сообщений в канале и их публикации в топик Google Cloud Pub/Sub.

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
  }
}

Аннотация @ServiceActivator приводит к тому, что этот MessageHandler применяется ко всем новым сообщениям в inputChannel . В данном случае мы вызываем наш адаптер исходящего канала ` PubSubMessageHandler для публикации сообщения в тему exampleTopic Google Cloud Pub/Sub.

После установки адаптера канала мы можем автоматически подключить объект PubsubOutboundGateway и использовать его для записи сообщения в канал.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.view.RedirectView;

@SpringBootApplication
public class DemoApplication {

  ...

  @Autowired
  private PubsubOutboundGateway messagingGateway;

  @PostMapping("/postMessage")
  public RedirectView postMessage(@RequestParam("message") String message) {
    this.messagingGateway.sendToPubsub(message);
    return new RedirectView("/");
  }
}

Благодаря аннотации @PostMapping у нас теперь есть конечная точка, которая принимает HTTP POST-запросы, но для этого нам также необходимо добавить аннотацию @RestController к классу DemoApplication , чтобы пометить его как REST-контроллер.

src/main/java/com/example/demo/DemoApplication.java

import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class DemoApplication {
  ...
}

Убедитесь, что JAVA_HOME установлена ​​на правильную версию.

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

Запустите приложение отправителя.

# Set the Project ID in environmental variable
$ export GOOGLE_CLOUD_PROJECT=`gcloud config list --format 'value(core.project)'`

$ ./mvnw spring-boot:run

Приложение принимает POST-запросы, содержащие сообщение, на порту 8080 и конечной точке /postMessage , но об этом мы поговорим позже.

6. Создайте приложение для приема сообщений.

Мы только что создали приложение, которое отправляет сообщения через Google Cloud Pub/Sub. Теперь мы создадим другое приложение, которое будет получать эти сообщения и обрабатывать их.

Нажмите + , чтобы открыть новую сессию Cloud Shell.

9799bee5fea95aa6.png

Затем в новой сессии Cloud Shell перейдите в каталог приложения-получателя:

$ cd spring-integration-receiver

В предыдущем приложении объявление шлюза обмена сообщениями создавало для нас исходящий канал. Поскольку мы не используем шлюз обмена сообщениями для приема сообщений, нам необходимо объявить собственный MessageChannel , куда будут поступать входящие сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }
}

Нам потребуется адаптер входящего канала для приема сообщений от Google Cloud Pub/Sub и их пересылки в pubsubInputChannel .

src/main/java/com/example/demo/DemoApplication.java

...
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;

import org.springframework.beans.factory.annotation.Qualifier;

@SpringBootApplication
public class DemoApplication {

  ...

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
    adapter.setOutputChannel(inputChannel);

    return adapter;
  }
}

Этот адаптер привязывается к каналу pubsubInputChannel и прослушивает новые сообщения из подписки exampleSubscription в Google Cloud Pub/Sub.

У нас есть канал, куда отправляются входящие сообщения, но что с ними делать?

Давайте обработаем их с помощью аннотации @ServiceActivator , которая срабатывает при поступлении новых сообщений в pubsubInputChannel . В данном случае мы просто выведем в лог содержимое сообщения.

src/main/java/com/example/demo/DemoApplication.java

...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class DemoApplication {

  ...

  private static final Log LOGGER = LogFactory.getLog(DemoApplication.class);

  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public void messageReceiver(String payload) {
    LOGGER.info("Message arrived! Payload: " + payload);
  }
}

Убедитесь, что JAVA_HOME установлена ​​на правильную версию.

export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64

Запустите приложение-приемник.

$ ./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="-Dserver.port=8081"

Теперь все сообщения, отправленные в приложение-отправитель, будут регистрироваться в приложении-получателе. Чтобы это проверить, откройте новую сессию Cloud Shell и отправьте HTTP POST-запрос в приложение-отправитель.

$ curl --data "message=Hello world!" localhost:8080/postMessage

Затем убедитесь, что приложение-получатель зарегистрировало отправленное вами сообщение!

INFO: Message arrived! Payload: Hello world!

7. Уборка

Удалите подписку и тему, созданные в рамках этого упражнения.

$ gcloud pubsub subscriptions delete exampleSubscription
$ gcloud pubsub topics delete exampleTopic

8. Резюме

Вы настроили два приложения Spring Boot, использующих адаптеры каналов Spring Integration для Google Cloud Pub/Sub. Они обмениваются сообщениями между собой, не взаимодействуя при этом с API Google Cloud Pub/Sub.

9. Поздравляем!

Вы научились использовать адаптеры каналов Spring Integration для Google Cloud Pub/Sub!

Узнать больше

Лицензия

Данная работа распространяется под лицензией Creative Commons Attribution 2.0 Generic.