Начало работы с gRPC-Python — Потоковая передача

1. Введение

В этой лабораторной работе вы будете использовать gRPC-Python для создания клиента и сервера, которые составят основу приложения для отображения маршрутов, написанного на Python.

К концу руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации об объектах на маршруте клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.

Служба определена в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия при реализации этой функциональности.

Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но также сериализацию и десериализацию данных.

Чему вы научитесь

  • Как использовать Protocol Buffers для определения API сервиса.
  • Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с использованием автоматической генерации кода.
  • Понимание потокового взаимодействия клиент-сервер с помощью gRPC.

Эта практическая работа предназначена для разработчиков Python, впервые использующих gRPC или желающих освежить свои знания, а также для всех, кто интересуется разработкой распределённых систем. Опыт работы с gRPC не требуется.

2. Прежде чем начать

Что вам понадобится

  • Python 3.9 или выше. Мы рекомендуем Python 3.13. Инструкции по установке для конкретных платформ см. в разделе «Настройка и использование Python» . Также можно установить несистемный Python с помощью таких инструментов, как uv или pyenv .
  • pip для установки пакетов Python.
  • venv для создания виртуальных сред Python.

Пакеты ensurepip и venv являются частью стандартной библиотеки Python и обычно доступны по умолчанию.

Однако некоторые дистрибутивы на базе Debian (включая Ubuntu) исключают их при распространении Python. Чтобы установить пакеты, выполните:

sudo apt install python3-pip python3-venv

Получить код

Для упрощения обучения эта лабораторная работа предлагает готовый исходный код, который поможет вам начать работу. Следующие шаги помогут вам завершить приложение, включая генерацию кода gRPC с помощью плагина компилятора буфера протокола grpc_tools.protoc .

grpc-codelabs

Исходный код для этой лабораторной работы доступен в каталоге codelabs/grpc-python-streaming/start_here . Если вы предпочитаете не реализовывать код самостоятельно, готовый исходный код доступен в каталоге completed .

Сначала создайте рабочий каталог codelab и перейдите в него:

mkdir grpc-python-streaming && cd grpc-python-streaming

Загрузите и распакуйте кодовую лабораторию:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

Кроме того, вы можете загрузить .zip-файл, содержащий только каталог codelab, и вручную распаковать его.

3. Определите сообщения и услуги

Первым шагом будет определение службы gRPC приложения, её метода RPC и типов сообщений запросов и ответов с помощью Protocol Buffers . Ваша служба будет предоставлять:

  • Методы RPC, называемые ListFeatures , RecordRoute и RouteChat , которые реализует сервер и вызывает клиент.
  • Типы сообщений Point , Feature , Rectangle , RouteNote и RouteSummary представляют собой структуры данных, которыми обмениваются клиент и сервер при вызове методов RPC.

Все эти методы RPC и типы их сообщений будут определены в файле protos/route_guide.proto предоставленного исходного кода.

Буферы протоколов обычно называются protobuf. Подробнее о терминологии gRPC см. в разделе «Основные концепции, архитектура и жизненный цикл gRPC».

Определить типы сообщений

В файле protos/route_guide.proto исходного кода сначала определите тип сообщения Point . Point представляет собой пару координат (широта-долгота) на карте. В этой практической работе используйте целые числа в качестве координат:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Цифры 1 и 2 — это уникальные идентификационные номера для каждого из полей в структуре message .

Затем определите тип сообщения Feature . Feature использует string поле для имени или почтового адреса объекта, расположенного в месте, указанном Point :

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Чтобы передать клиенту несколько точек в пределах области, вам понадобится сообщение Rectangle , представляющее прямоугольник широты и долготы, представленный в виде двух диагонально противоположных точек lo и hi :

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Также сообщение RouteNote , представляющее собой сообщение, отправленное в заданной точке:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Наконец, вам понадобится сообщение RouteSummary . Это сообщение приходит в ответ на RPC-запрос RecordRoute , который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние, представляющее собой сумму расстояний между точками.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Определить методы обслуживания

Чтобы определить службу, укажите её название в файле .proto . Файл route_guide.proto содержит структуру service RouteGuide , которая определяет один или несколько методов, предоставляемых службой приложения.

При определении методов RPC в определении сервиса вы указываете их типы запросов и ответов. В этом разделе практикума давайте определим:

СписокОсобенности

Получает объекты Feature доступные в заданном Rectangle . Результаты передаются потоком, а не возвращаются сразу, поскольку прямоугольник может охватывать большую площадь и содержать огромное количество объектов.

В этом приложении вы будете использовать серверный потоковый RPC-вызов: клиент отправляет запрос серверу и получает поток для чтения последовательности сообщений. Клиент читает данные из возвращаемого потока до тех пор, пока сообщения не закончатся. Как видно из нашего примера, вы указываете серверный потоковый метод, добавляя ключевое слово stream перед типом ответа.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Принимает поток точек на проходимом маршруте и возвращает RouteSummary после завершения обхода.

В этом случае целесообразно использовать потоковый RPC-вызов на стороне клиента : клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочитает их все и вернет ответ. Для указания метода потоковой передачи на стороне клиента необходимо добавить ключевое слово stream перед типом запроса.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Принимает поток RouteNotes отправленных во время прохождения маршрута, одновременно получая другие RouteNotes (например, от других пользователей).

Именно такой вариант использования используется для двунаправленной потоковой передачи . Двунаправленный потоковый RPC-вызов, при котором обе стороны отправляют последовательность сообщений, используя поток чтения-записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и писать в любом порядке: например, сервер может дождаться получения всех сообщений от клиентов, прежде чем отправлять свои ответы, или поочередно читать сообщение, а затем записывать его, или использовать другую комбинацию чтения и записи. Порядок сообщений в каждом потоке сохраняется. Этот тип метода указывается ключевым словом stream перед запросом и ответом.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Сгенерируйте клиентский и серверный код.

Затем сгенерируйте шаблонный код gRPC для клиента и сервера из файла .proto , используя компилятор буфера протокола.

Для генерации кода gRPC Python мы создали пакет grpcio-tools . Он включает в себя:

  1. Обычный компилятор протоколов , который генерирует код Python из определений message .
  2. Плагин gRPC protobuf, который генерирует код Python (клиентские и серверные заглушки) из определений service .

Мы установим пакет Python grpcio-tools с помощью pip. Создадим новую виртуальную среду Python (venv), чтобы изолировать зависимости вашего проекта от системных пакетов:

python3 -m venv --upgrade-deps .venv

Чтобы активировать виртуальную среду в оболочке bash/zsh:

source .venv/bin/activate

Для Windows и нестандартных оболочек см. таблицу по адресу https://docs.python.org/3/library/venv.html#how-venvs-work .

Далее установите grpcio-tools (это также установит пакет grpcio ):

pip install grpcio-tools

Используйте следующую команду для генерации шаблонного кода Python:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

Это приведет к созданию следующих файлов для интерфейсов, которые мы определили в route_guide.proto :

  1. route_guide_pb2.py содержит код , который динамически создает классы , генерируемые из определений message .
  2. route_guide_pb2.pyi — это «файл-заглушка» или «файл подсказок типа», созданный на основе определений message . Он содержит только сигнатуры без реализации. Файлы-заглушки могут использоваться IDE для улучшения автодополнения и обнаружения ошибок.
  3. route_guide_pb2_grpc.py создается на основе определений service и содержит специфичные для gRPC классы и функции.

Специфический для gRPC код содержит:

  1. RouteGuideStub , который может использоваться клиентом gRPC для вызова RPC RouteGuide.
  2. RouteGuideServicer , который определяет интерфейс для реализаций службы RouteGuide .
  3. Функция add_RouteGuideServicer_to_server , которая используется для регистрации RouteGuideServicer на сервере gRPC .

5. Создайте сервер

Сначала давайте рассмотрим, как создать сервер RouteGuide . Создание и запуск сервера RouteGuide делится на два этапа:

  • Реализация интерфейса сервиса, созданного на основе определения нашего сервиса, с функциями, которые выполняют фактическую «работу» сервиса.
  • Запуск сервера gRPC для прослушивания запросов от клиентов и передачи ответов.

Давайте посмотрим на route_guide_server.py .

Реализовать RouteGuide

route_guide_server.py имеет класс RouteGuideServicer , который является подклассом сгенерированного класса route_guide_pb2_grpc.RouteGuideServicer :

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer реализует все методы службы RouteGuide .

Потоковая передача RPC на стороне сервера

ListFeatures — это RPC-потоковый ответ, который отправляет клиенту несколько Feature :

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

В данном случае запрос представляет собой route_guide_pb2.Rectangle , внутри которого клиент хочет найти Feature . Вместо того, чтобы вернуть один ответ, метод возвращает ноль или более ответов.

Клиентская потоковая передача RPC

Метод потоковой передачи запросов RecordRoute использует итератор значений запроса и возвращает одно значение ответа.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

Двунаправленный потоковый RPC

Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() :

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

Семантика этого метода представляет собой комбинацию семантики методов «запрос-поток» и «ответ-поток». Он получает итератор значений запроса и сам является итератором значений ответа.

Запустить сервер

После реализации всех методов RouteGuide следующим шагом будет запуск сервера gRPC, чтобы клиенты могли использовать вашу службу:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

Метод server.start start() не блокирует выполнение. Для обработки запросов будет создан новый поток. Поток, вызывающий server.start() часто не будет занят другими делами. В этом случае можно вызвать server.wait_for_termination() , чтобы аккуратно заблокировать вызывающий поток до завершения работы сервера.

6. Создайте клиента

Давайте посмотрим на route_guide_client.py .

Создать заглушку

Для вызова методов службы сначала нужно создать заглушку .

Мы создаём экземпляр класса RouteGuideStub модуля route_guide_pb2_grpc , сгенерированного из нашего .proto. В методе run() :

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

Обратите внимание, что здесь channel используется как менеджер контекста и будет автоматически закрыт, как только интерпретатор покинет блок with .

Методы обслуживания вызовов

Для методов RPC, возвращающих один ответ (методы с унарной обработкой ответа), gRPC Python поддерживает как синхронную (блокирующую), так и асинхронную (неблокирующую) семантику потока управления. Для методов RPC с потоковой обработкой ответа вызовы немедленно возвращают итератор значений ответа. Вызовы метода next() этого итератора блокируются до тех пор, пока не станет доступен ответ, который должен быть получен от итератора.

Потоковая передача RPC на стороне сервера

Вызов функции потоковой передачи ответов ListFeatures аналогичен работе с типами последовательностей:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

Клиентская потоковая передача RPC

Вызов метода RecordRoute для потоковой передачи запросов аналогичен передаче итератора локальному методу. Как и простой RPC-вызов выше, который также возвращает один ответ, его можно вызывать синхронно:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

Двунаправленный потоковый RPC

Вызов двунаправленного потокового RouteChat (как и в случае со стороны сервиса) представляет собой комбинацию семантики потокового запроса и потокового ответа.

Сгенерируйте сообщения-запросы и отправьте их по одному с помощью yield .

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

Получение и обработка ответов сервера:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

Вызов вспомогательных методов

В run выполните методы, которые мы только что создали, и передайте им stub .

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. Попробуйте

Запускаем сервер:

python route_guide_server.py

С другого терминала снова активируйте виртуальную среду ( source .venv/bin/activate) , затем запустите клиент:

python route_guide_client.py

Давайте посмотрим на результат.

СписокОсобенности

Сначала вы увидите список объектов. Каждый объект передается с сервера ( серверный потоковый RPC ) по мере того, как он обнаруживает их в запрошенном прямоугольнике:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

Во-вторых, RecordRoute демонстрирует список случайно посещённых точек, передаваемый от клиента к серверу (потоковый RPC на стороне клиента) :

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

После того, как клиент завершит потоковую передачу всех посещённых точек, он получит от сервера ответ без потоковой передачи ( унарный RPC ). Этот ответ будет содержать сводку вычислений, выполненных по всему маршруту клиента.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

Наконец, вывод RouteChat демонстрирует двунаправленную потоковую передачу . Когда клиент «посещает» home или work точку, он записывает заметку для этой точки, отправляя RouteNote на сервер. Если точка уже посещена, сервер возвращает все предыдущие заметки для этой точки.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. Что дальше?