Erste Schritte mit gRPC-Python – Streaming

1. Einführung

In diesem Codelab verwenden Sie gRPC-Python, um einen Client und einen Server zu erstellen, die die Grundlage einer in Python geschriebenen Routenplanungsanwendung bilden.

Am Ende des Tutorials haben Sie einen Client, der über gRPC eine Verbindung zu einem Remote-Server herstellt, um Informationen zu Funktionen auf der Route eines Clients abzurufen, eine Zusammenfassung der Route eines Clients zu erstellen und Routeninformationen wie Verkehrsaktualisierungen mit dem Server und anderen Clients auszutauschen.

Der Dienst wird in einer Protocol Buffers-Datei definiert, die zum Generieren von Boilerplate-Code für den Client und den Server verwendet wird, damit sie miteinander kommunizieren können. So sparen Sie Zeit und Aufwand bei der Implementierung dieser Funktion.

Dieser generierte Code kümmert sich nicht nur um die Komplexität der Kommunikation zwischen Server und Client, sondern auch um die Serialisierung und Deserialisierung von Daten.

Lerninhalte

  • Wie Sie Protocol Buffers zum Definieren einer Dienst-API verwenden.
  • Hier erfahren Sie, wie Sie einen gRPC-basierten Client und Server aus einer Protocol Buffers-Definition mithilfe der automatischen Codegenerierung erstellen.
  • Sie haben ein grundlegendes Verständnis der Client-Server-Streaming-Kommunikation mit gRPC.

Dieses Codelab richtet sich an Python-Entwickler, die neu in gRPC sind oder eine Auffrischung von gRPC wünschen, sowie an alle anderen, die sich für die Entwicklung verteilter Systeme interessieren. Es sind keine Vorkenntnisse in gRPC erforderlich.

2. Hinweis

Voraussetzungen

  • Python 3.9 oder höher. Wir empfehlen Python 3.13. Plattformspezifische Installationsanleitungen finden Sie unter Python-Einrichtung und ‑Verwendung. Alternativ können Sie mit Tools wie uv oder pyenv eine Nicht-System-Python-Version installieren.
  • pip zum Installieren von Python-Paketen.
  • venv zum Erstellen virtueller Python-Umgebungen.

Die Pakete ensurepip und venv sind Teil der Python-Standardbibliothek und in der Regel standardmäßig verfügbar.

Einige Debian-basierte Distributionen (einschließlich Ubuntu) schließen sie jedoch bei der Weitergabe von Python aus. Führen Sie Folgendes aus, um die Pakete zu installieren:

sudo apt install python3-pip python3-venv

Code abrufen

Um Ihnen den Einstieg zu erleichtern, wird in diesem Codelab ein vorgefertigtes Quellcode-Gerüst bereitgestellt. In den folgenden Schritten wird beschrieben, wie Sie die Anwendung fertigstellen, einschließlich der gRPC-Code-Generierung mit dem grpc_tools.protoc-Compiler-Plug-in für Protocol Buffer.

grpc-codelabs

Der Gerüstquellcode für dieses Codelab ist im Verzeichnis codelabs/grpc-python-streaming/start_here verfügbar. Wenn Sie den Code nicht selbst implementieren möchten, finden Sie den vollständigen Quellcode im Verzeichnis completed.

Erstellen Sie zuerst das Arbeitsverzeichnis für das Codelab und wechseln Sie in dieses Verzeichnis:

mkdir grpc-python-streaming && cd grpc-python-streaming

Laden Sie das Codelab herunter und extrahieren Sie es:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

Alternativ können Sie die ZIP-Datei herunterladen, die nur das Codelab-Verzeichnis enthält, und sie manuell entpacken.

3. Nachrichten und Dienste definieren

Als Erstes müssen Sie den gRPC-Dienst der Anwendung, die RPC-Methode sowie die Anfrage- und Antwortnachrichtentypen mit Protokollpuffern definieren. Ihr Dienst bietet Folgendes:

  • RPC-Methoden mit den Namen ListFeatures, RecordRoute und RouteChat, die der Server implementiert und der Client aufruft.
  • Die Nachrichtentypen Point, Feature, Rectangle, RouteNote und RouteSummary, die Datenstrukturen sind, die beim Aufrufen der RPC-Methoden zwischen Client und Server ausgetauscht werden.

Diese RPC-Methoden und ihre Nachrichtentypen werden alle in der Datei protos/route_guide.proto des bereitgestellten Quellcodes definiert.

Protocol Buffers werden allgemein als Protobufs bezeichnet. Weitere Informationen zur gRPC-Terminologie finden Sie unter Core concepts, architecture, and lifecycle.

Nachrichtentypen definieren

Definieren Sie zuerst den Nachrichtentyp Point in der Datei protos/route_guide.proto des Quellcodes. Ein Point stellt ein Paar aus Breiten- und Längengrad auf einer Karte dar. Verwenden Sie für dieses Codelab Ganzzahlen für die Koordinaten:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Die Zahlen 1 und 2 sind eindeutige ID-Nummern für die einzelnen Felder in der message-Struktur.

Als Nächstes definieren Sie den Nachrichtentyp Feature. Bei einem Feature wird ein string-Feld für den Namen oder die Postanschrift von etwas an einem Standort verwendet, der durch ein Point angegeben wird:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Damit mehrere Punkte in einem Gebiet an einen Client gestreamt werden können, benötigen Sie eine Rectangle-Nachricht, die ein Rechteck aus Breiten- und Längengrad darstellt, das durch zwei diagonal gegenüberliegende Punkte lo und hi dargestellt wird:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Außerdem eine RouteNote-Nachricht, die eine Nachricht darstellt, die an einem bestimmten Punkt gesendet wurde:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Schließlich benötigen Sie eine RouteSummary-Meldung. Diese Nachricht wird als Antwort auf einen RecordRoute-RPC empfangen, der im nächsten Abschnitt erläutert wird. Sie enthält die Anzahl der einzelnen empfangenen Punkte, die Anzahl der erkannten Features und die zurückgelegte Gesamtstrecke als kumulative Summe der Distanz zwischen den einzelnen Punkten.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Dienstmethoden definieren

Wenn Sie einen Dienst definieren möchten, geben Sie einen benannten Dienst in der Datei .proto an. Die Datei route_guide.proto hat eine service-Struktur mit dem Namen RouteGuide, die eine oder mehrere Methoden definiert, die vom Dienst der Anwendung bereitgestellt werden.

Wenn Sie RPC-Methoden in Ihrer Dienstdefinition definieren, geben Sie die zugehörigen Anfrage- und Antworttypen an. In diesem Abschnitt des Codelabs definieren wir Folgendes:

ListFeatures

Ruft die im angegebenen Rectangle verfügbaren Feature-Objekte ab. Die Ergebnisse werden gestreamt und nicht auf einmal zurückgegeben, da das Rechteck ein großes Gebiet abdecken und eine große Anzahl von Features enthalten kann.

Für diese Anwendung verwenden Sie einen serverseitigen Streaming-RPC: Der Client sendet eine Anfrage an den Server und erhält einen Stream, um eine Reihe von Nachrichten zurückzulesen. Der Client liest den zurückgegebenen Stream, bis keine Nachrichten mehr vorhanden sind. Wie Sie in unserem Beispiel sehen, geben Sie eine serverseitige Streaming-Methode an, indem Sie das Stream-Keyword vor den Antworttyp setzen.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Akzeptiert einen Stream von Punkten auf einer zurückgelegten Route und gibt ein RouteSummary zurück, wenn die Route zurückgelegt wurde.

In diesem Fall ist ein clientseitiger Streaming-RPC angebracht: Der Client schreibt eine Reihe von Nachrichten und sendet sie an den Server, wiederum über einen bereitgestellten Stream. Nachdem der Client mit dem Schreiben der Nachrichten fertig ist, wartet er darauf, dass der Server sie alle liest und seine Antwort zurückgibt. Sie geben eine clientseitige Streamingmethode an, indem Sie das Stream-Keyword vor den Anfragetyp setzen.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Akzeptiert einen Stream von RouteNotes, der während der Fahrt auf einer Route gesendet wird, und empfängt gleichzeitig andere RouteNotes (z.B. von anderen Nutzern).

Genau das ist ein Anwendungsfall für bidirektionales Streaming. Eine bidirektionale Streaming-RPC, bei der beide Seiten eine Reihe von Nachrichten über einen Lese-/Schreib-Stream senden. Die beiden Streams funktionieren unabhängig voneinander. Clients und Server können also in beliebiger Reihenfolge lesen und schreiben. Der Server kann beispielsweise warten, bis er alle Clientnachrichten empfangen hat, bevor er seine Antworten schreibt. Er kann aber auch abwechselnd eine Nachricht lesen und dann eine Nachricht schreiben oder eine andere Kombination aus Lese- und Schreibvorgängen ausführen. Die Reihenfolge der Nachrichten in jedem Stream wird beibehalten. Sie geben diese Art von Methode an, indem Sie das Stream-Keyword sowohl vor die Anfrage als auch vor die Antwort setzen.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Client- und Servercode generieren

Als Nächstes generieren Sie den Boilerplate-gRPC-Code für Client und Server aus der Datei .proto mit dem Protokollpuffer-Compiler.

Für die Generierung von gRPC-Python-Code haben wir grpcio-tools erstellt. Dazu gehören:

  1. Der reguläre protoc-Compiler, der Python-Code aus message-Definitionen generiert.
  2. gRPC-Protobuf-Plug-in, das Python-Code (Client- und Server-Stubs) aus den service-Definitionen generiert.

Wir installieren das Python-Paket grpcio-tools mit pip. Erstellen wir eine neue virtuelle Python-Umgebung (venv), um die Abhängigkeiten Ihres Projekts von den Systempaketen zu isolieren:

python3 -m venv --upgrade-deps .venv

So aktivieren Sie die virtuelle Umgebung in der Bash-/Zsh-Shell:

source .venv/bin/activate

Informationen zu Windows und nicht standardmäßigen Shells finden Sie in der Tabelle unter https://docs.python.org/3/library/venv.html#how-venvs-work.

Installieren Sie als Nächstes grpcio-tools (dadurch wird auch das Paket grpcio installiert):

pip install grpcio-tools

Verwenden Sie den folgenden Befehl, um den Python-Boilerplate-Code zu generieren:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

Dadurch werden die folgenden Dateien für die Schnittstellen generiert, die wir in route_guide.proto definiert haben:

  1. route_guide_pb2.py enthält den Code, der dynamisch Klassen erstellt, die aus den message-Definitionen generiert werden.
  2. route_guide_pb2.pyi ist eine Stub-Datei oder „Type Hint“-Datei, die aus den message-Definitionen generiert wird. Sie enthält nur die Signaturen ohne Implementierung. Stub-Dateien können von IDEs verwendet werden, um eine bessere automatische Vervollständigung und Fehlererkennung zu ermöglichen.
  3. route_guide_pb2_grpc.py wird aus den service-Definitionen generiert und enthält gRPC-spezifische Klassen und Funktionen.

Der gRPC-spezifische Code enthält:

  1. RouteGuideStub, die von einem gRPC-Client zum Aufrufen von RouteGuide-RPCs verwendet werden kann.
  2. RouteGuideServicer, das die Schnittstelle für Implementierungen des RouteGuide-Dienstes definiert.
  3. Die add_RouteGuideServicer_to_server-Funktion wird verwendet, um einen RouteGuideServicer auf einem gRPC-Server zu registrieren.

5. Server erstellen

Sehen wir uns zuerst an, wie Sie einen RouteGuide-Server erstellen. Das Erstellen und Ausführen eines RouteGuide-Servers lässt sich in zwei Aufgaben unterteilen:

  • Implementieren der Dienstschnittstelle, die aus unserer Dienstdefinition generiert wurde, mit Funktionen, die die eigentliche „Arbeit“ des Dienstes ausführen.
  • Ausführen eines gRPC-Servers, der auf Anfragen von Clients wartet und Antworten überträgt.

Sehen wir uns route_guide_server.py an.

RouteGuide implementieren

route_guide_server.py hat eine RouteGuideServicer-Klasse, die von der generierten Klasse route_guide_pb2_grpc.RouteGuideServicer abgeleitet wird:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer implementiert alle RouteGuide-Dienstmethoden.

Serverseitiger Streaming-RPC

ListFeatures ist ein RPC für das Streaming von Antworten, bei dem mehrere Features an den Client gesendet werden:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

Die Anfragenachricht ist hier ein route_guide_pb2.Rectangle, in dem der Client Features finden möchte. Anstatt eine einzelne Antwort zurückzugeben, liefert die Methode null oder mehr Antworten.

Clientseitiger Streaming-RPC

Bei der Methode RecordRoute für das Streaming von Anfragen wird ein Iterator mit Anfrage-Werten verwendet und ein einzelner Antwortwert zurückgegeben.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

Bidirektionale Streaming-RPC

Sehen wir uns zum Schluss noch unseren bidirektionalen Streaming-RPC RouteChat() an:

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

Die Semantik dieser Methode ist eine Kombination aus der der Anfrage-Streaming-Methode und der Antwort-Streaming-Methode. Es wird ein Iterator mit Anfragevariablen übergeben und es ist selbst ein Iterator mit Antwortvariablen.

Server starten

Nachdem Sie alle RouteGuide-Methoden implementiert haben, müssen Sie einen gRPC-Server starten, damit Clients Ihren Dienst verwenden können:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

Die Servermethode start() ist nicht blockierend. Ein neuer Thread wird instanziiert, um Anfragen zu verarbeiten. Der Thread, der server.start() aufruft, hat in der Zwischenzeit oft nichts anderes zu tun. In diesem Fall können Sie server.wait_for_termination() aufrufen, um den aufrufenden Thread sauber zu blockieren, bis der Server beendet wird.

6. Client erstellen

Sehen wir uns route_guide_client.py an.

Stub erstellen

Um Dienstmethoden aufzurufen, müssen wir zuerst einen Stub erstellen.

Wir instanziieren die RouteGuideStub-Klasse des route_guide_pb2_grpc-Moduls, die aus unserer .proto.-Methode run() generiert wurde:

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

channel wird hier als Kontextmanager verwendet und automatisch geschlossen, sobald der Interpreter den with-Block verlässt.

Dienstmethoden aufrufen

Für RPC-Methoden, die eine einzelne Antwort zurückgeben („response-unary“-Methoden), unterstützt gRPC Python sowohl synchrone (blockierende) als auch asynchrone (nicht blockierende) Kontrollflusssemantik. Bei RPC-Methoden für das Streaming von Antworten wird bei Aufrufen sofort ein Iterator von Antwortwerten zurückgegeben. Aufrufe der next()-Methode dieses Iterators werden blockiert, bis die Antwort, die vom Iterator zurückgegeben werden soll, verfügbar ist.

Serverseitiger Streaming-RPC

Das Aufrufen des Antwort-Streamings ListFeatures ähnelt der Arbeit mit Sequenztypen:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

Clientseitiger Streaming-RPC

Das Aufrufen des Request-Streamings RecordRoute ähnelt dem Übergeben eines Iterators an eine lokale Methode. Wie der einfache RPC oben, der auch eine einzelne Antwort zurückgibt, kann er synchron aufgerufen werden:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

Bidirektionale Streaming-RPC

Der Aufruf von RouteChat mit bidirektionalem Streaming hat (wie auf der Dienstseite) eine Kombination aus Anfrage- und Antwortstreaming.

Generieren Sie die Anfragenachrichten und senden Sie sie einzeln mit yield.

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

Serverantworten empfangen und verarbeiten:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

Hilfsmethoden aufrufen

Führen Sie im Lauf die Methoden aus, die wir gerade erstellt haben, und übergeben Sie ihnen die stub.

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. Jetzt ausprobieren

Führen Sie den Server aus:

python route_guide_server.py

Aktivieren Sie in einem anderen Terminal die virtuelle Umgebung noch einmal (source .venv/bin/activate)) und führen Sie den Client aus:

python route_guide_client.py

Sehen wir uns die Ausgabe an.

ListFeatures

Zuerst finden Sie die Liste der Funktionen. Jedes Feature wird vom Server gestreamt (serverseitiges Streaming-RPC), sobald es sich im angeforderten Rechteck befindet:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

Zweitens zeigt RecordRoute die Liste der zufällig besuchten Punkte, die vom Client an den Server gestreamt werden (Client-Streaming-RPC):

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

Nachdem der Client alle besuchten Punkte gestreamt hat, erhält er eine Nicht-Streamingantwort (unärer RPC) vom Server. Diese Antwort enthält eine Zusammenfassung der Berechnungen, die für die vollständige Route des Kunden durchgeführt wurden.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

Schließlich zeigt die RouteChat-Ausgabe bidirektionales Streaming. Wenn der Client home- oder work-Punkte „besucht“, wird eine Notiz für den Punkt aufgezeichnet, indem eine RouteNote an den Server gesendet wird. Wenn ein Ort bereits besucht wurde, streamt der Server alle vorherigen Notizen für diesen Ort zurück.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. Nächste Schritte