Inizia a utilizzare gRPC-Python - Streaming

1. Introduzione

In questo codelab utilizzerai gRPC-Python per creare un client e un server che costituiscono la base di un'applicazione di mappatura di itinerari scritta in Python.

Al termine del tutorial, avrai un client che si connette a un server remoto utilizzando gRPC per ottenere informazioni sulle funzionalità di un percorso del client, creare un riepilogo di un percorso del client e scambiare informazioni sul percorso, come gli aggiornamenti sul traffico, con il server e altri client.

Il servizio è definito in un file Protocol Buffers, che verrà utilizzato per generare il codice boilerplate per il client e il server in modo che possano comunicare tra loro, risparmiando tempo e fatica nell'implementazione di questa funzionalità.

Questo codice generato si occupa non solo delle complessità della comunicazione tra il server e il client, ma anche della serializzazione e della deserializzazione dei dati.

Obiettivi didattici

  • Come utilizzare i buffer di protocollo per definire un'API di servizio.
  • Come creare un client e un server basati su gRPC da una definizione di Protocol Buffers utilizzando la generazione automatica del codice.
  • Comprensione della comunicazione di streaming client-server con gRPC.

Questo codelab è rivolto agli sviluppatori Python che non hanno mai utilizzato gRPC o che vogliono ripassare gRPC, nonché a chiunque sia interessato a creare sistemi distribuiti. Non è richiesta alcuna esperienza precedente con gRPC.

2. Prima di iniziare

Che cosa ti serve

  • Python 3.9 o versioni successive. Ti consigliamo Python 3.13. Per istruzioni di installazione specifiche per la piattaforma, vedi Configurazione e utilizzo di Python. In alternativa, installa una versione di Python non di sistema utilizzando strumenti come uv o pyenv.
  • pip per installare i pacchetti Python.
  • venv per creare ambienti virtuali Python.

I pacchetti ensurepip e venv fanno parte della libreria standard di Python e sono in genere disponibili per impostazione predefinita.

Tuttavia, alcune distribuzioni basate su Debian (inclusa Ubuntu) scelgono di escluderli durante la ridistribuzione di Python. Per installare i pacchetti, esegui:

sudo apt install python3-pip python3-venv

Ottieni il codice

Per semplificare l'apprendimento, questo codelab offre uno scaffold di codice sorgente predefinito per aiutarti a iniziare. I passaggi seguenti ti guideranno nel completamento dell'applicazione, inclusa la generazione di codice gRPC utilizzando il plug-in del compilatore del buffer di protocollo grpc_tools.protoc.

grpc-codelabs

Il codice sorgente dello scaffold per questo codelab è disponibile nella directory codelabs/grpc-python-streaming/start_here. Se preferisci non implementare il codice autonomamente, il codice sorgente completato è disponibile nella directory completed.

Innanzitutto, crea la directory di lavoro del codelab e accedi tramite cd:

mkdir grpc-python-streaming && cd grpc-python-streaming

Scarica ed estrai il codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

In alternativa, puoi scaricare il file .zip contenente solo la directory del codelab e decomprimerlo manualmente.

3. Definisci messaggi e servizi

Il primo passaggio consiste nel definire il servizio gRPC dell'applicazione, il relativo metodo RPC e i tipi di messaggi di richiesta e risposta utilizzando Protocol Buffers. Il tuo servizio fornirà:

  • Metodi RPC chiamati ListFeatures, RecordRoute e RouteChat che il server implementa e il client chiama.
  • I tipi di messaggi Point, Feature, Rectangle, RouteNote e RouteSummary, che sono strutture di dati scambiate tra il client e il server quando vengono chiamati i metodi RPC.

Questi metodi RPC e i relativi tipi di messaggio verranno tutti definiti nel file protos/route_guide.proto del codice sorgente fornito.

Protocol Buffers sono comunemente noti come protobuf. Per ulteriori informazioni sulla terminologia gRPC, consulta Concetti fondamentali, architettura e ciclo di vita di gRPC.

Definisci i tipi di messaggi

Nel file protos/route_guide.proto del codice sorgente, definisci innanzitutto il tipo di messaggio Point. Un Point rappresenta una coppia di coordinate di latitudine e longitudine su una mappa. Per questo codelab, utilizza numeri interi per le coordinate:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

I numeri 1 e 2 sono numeri ID univoci per ciascuno dei campi nella struttura message.

Successivamente, definisci il tipo di messaggio Feature. Un Feature utilizza un campo string per il nome o l'indirizzo postale di un elemento in una località specificata da un Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Affinché più punti all'interno di un'area possano essere trasmessi in streaming a un client, è necessario un messaggio Rectangle che rappresenti un rettangolo di latitudine e longitudine, rappresentato da due punti diagonalmente opposti lo e hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Inoltre, un messaggio RouteNote che rappresenta un messaggio inviato in un determinato momento:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Infine, ti servirà un messaggio RouteSummary. Questo messaggio viene ricevuto in risposta a una RPC RecordRoute, che viene spiegata nella sezione successiva. Contiene il numero di punti individuali ricevuti, il numero di caratteristiche rilevate e la distanza totale percorsa come somma cumulativa della distanza tra ogni punto.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Definisci i metodi di servizio

Per definire un servizio, specifica un servizio denominato nel file .proto. Il file route_guide.proto ha una struttura service denominata RouteGuide che definisce uno o più metodi forniti dal servizio dell'applicazione.

Quando definisci i metodi RPC all'interno della definizione del servizio, specifichi i tipi di richiesta e risposta. In questa sezione del codelab, definiamo:

ListFeatures

Ottiene gli oggetti Feature disponibili all'interno del Rectangle specificato. I risultati vengono trasmessi in streaming anziché restituiti contemporaneamente, poiché il rettangolo potrebbe coprire un'area vasta e contenere un numero elevato di caratteristiche.

Per questa applicazione, utilizzerai una RPC di streaming lato server: il client invia una richiesta al server e riceve un flusso per leggere una sequenza di messaggi. Il client legge dallo stream restituito finché non ci sono più messaggi. Come puoi vedere nel nostro esempio, devi specificare un metodo di streaming lato server inserendo la parola chiave stream prima del tipo di risposta.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Accetta un flusso di punti su un percorso in fase di attraversamento, restituendo un RouteSummary al termine dell'attraversamento.

In questo caso è appropriata una RPC di streaming lato client: il client scrive una sequenza di messaggi e li invia al server, sempre utilizzando un flusso fornito. Una volta che il client ha finito di scrivere i messaggi, attende che il server li legga tutti e restituisca la risposta. Specifichi un metodo di streaming lato client inserendo la parola chiave stream prima del tipo di richiesta.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Accetta un flusso di RouteNotes inviati durante il percorso, mentre riceve altri RouteNotes (ad esempio da altri utenti).

Questo è esattamente il tipo di caso d'uso per lo streaming bidirezionale. Una RPC di streaming bidirezionale in cui entrambe le parti inviano una sequenza di messaggi utilizzando uno stream di lettura/scrittura. I due flussi operano in modo indipendente, quindi client e server possono leggere e scrivere nell'ordine che preferiscono: ad esempio, il server potrebbe attendere di ricevere tutti i messaggi del client prima di scrivere le risposte oppure potrebbe leggere un messaggio e poi scriverne un altro o una combinazione diversa di letture e scritture. L'ordine dei messaggi in ogni stream viene mantenuto. Specifichi questo tipo di metodo inserendo la parola chiave stream prima della richiesta e della risposta.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Genera il codice client e server

A questo punto, genera il codice gRPC boilerplate sia per il client che per il server dal file .proto utilizzando il compilatore del buffer del protocollo.

Per la generazione di codice Python gRPC, abbiamo creato grpcio-tools. Comprende:

  1. Il compilatore protoc standard che genera codice Python dalle definizioni message.
  2. Plug-in protobuf gRPC che genera codice Python (stub client e server) dalle definizioni service.

Installeremo il pacchetto Python grpcio-tools utilizzando pip. Creiamo un nuovo ambiente virtuale Python (venv) per isolare le dipendenze del progetto dai pacchetti di sistema:

python3 -m venv --upgrade-deps .venv

Per attivare l'ambiente virtuale nella shell bash/zsh:

source .venv/bin/activate

Per Windows e shell non standard, consulta la tabella all'indirizzo https://docs.python.org/3/library/venv.html#how-venvs-work.

Poi, installa grpcio-tools (in questo modo viene installato anche il pacchetto grpcio):

pip install grpcio-tools

Utilizza il seguente comando per generare il codice boilerplate Python:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

Verranno generati i seguenti file per le interfacce definite in route_guide.proto:

  1. route_guide_pb2.py contiene il codice che crea dinamicamente le classi generate dalle definizioni di message.
  2. route_guide_pb2.pyi è un "file stub" o "file di suggerimenti sul tipo" generato dalle definizioni message. Contiene solo le firme senza implementazione. I file stub possono essere utilizzati dagli IDE per fornire un completamento automatico e un rilevamento degli errori migliori.
  3. route_guide_pb2_grpc.py viene generato dalle definizioni di service e contiene classi e funzioni specifiche di gRPC.

Il codice specifico di gRPC contiene:

  1. RouteGuideStub, che può essere utilizzato da un client gRPC per richiamare RPC RouteGuide.
  2. RouteGuideServicer, che definisce l'interfaccia per le implementazioni del servizio RouteGuide.
  3. add_RouteGuideServicer_to_server, che viene utilizzata per registrare un RouteGuideServicer in un server gRPC.

5. Crea il server

Per prima cosa, vediamo come creare un server RouteGuide. La creazione e l'esecuzione di un server RouteGuide si suddivide in due elementi di lavoro:

  • Implementazione dell'interfaccia del servizio generata dalla nostra definizione del servizio con funzioni che eseguono il "lavoro" effettivo del servizio.
  • Esecuzione di un server gRPC per ascoltare le richieste dei client e trasmettere le risposte.

Diamo un'occhiata a route_guide_server.py.

Implementa RouteGuide

route_guide_server.py ha una classe RouteGuideServicer che crea sottoclassi della classe generata route_guide_pb2_grpc.RouteGuideServicer:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer implementa tutti i metodi di servizio RouteGuide.

RPC di streaming lato server

ListFeatures è una RPC di streaming delle risposte che invia più Feature al client:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

In questo caso, il messaggio di richiesta è un route_guide_pb2.Rectangle all'interno del quale il client vuole trovare Feature. Anziché restituire una singola risposta, il metodo produce zero o più risposte.

RPC di streaming lato client

Il metodo di streaming delle richieste RecordRoute utilizza un iteratore di valori di richiesta e restituisce un singolo valore di risposta.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

RPC di streaming bidirezionale

Infine, diamo un'occhiata alla nostra RPC di streaming bidirezionale RouteChat():

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

La semantica di questo metodo è una combinazione di quella del metodo di streaming delle richieste e del metodo di streaming delle risposte. Riceve un iteratore di valori di richiesta ed è a sua volta un iteratore di valori di risposta.

Avviare il server

Dopo aver implementato tutti i metodi RouteGuide, il passaggio successivo consiste nell'avviare un server gRPC in modo che i client possano effettivamente utilizzare il tuo servizio:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

Il metodo start() del server non è bloccante. Verrà creata una nuova istanza di thread per gestire le richieste. Il thread che chiama server.start() spesso non avrà altro lavoro da fare nel frattempo. In questo caso, puoi chiamare server.wait_for_termination() per bloccare in modo pulito il thread di chiamata finché il server non termina.

6. Crea il client

Diamo un'occhiata a route_guide_client.py.

Creare uno stub

Per chiamare i metodi di servizio, dobbiamo prima creare uno stub.

Istanziamo la classe RouteGuideStub del modulo route_guide_pb2_grpc, generata dal nostro metodo .proto. In run():

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

Tieni presente che qui channel viene utilizzato come gestore del contesto e verrà chiuso automaticamente quando l'interprete esce dal blocco with.

Metodi di servizio di chiamata

Per i metodi RPC che restituiscono una singola risposta ("response-unary"), gRPC Python supporta la semantica del flusso di controllo sincrono (bloccante) e asincrono (non bloccante). Per i metodi RPC di streaming delle risposte, le chiamate restituiscono immediatamente un iteratore di valori di risposta. Le chiamate al blocco del metodo next() dell'iteratore vengono bloccate finché la risposta da generare dall'iteratore non diventa disponibile.

RPC di streaming lato server

La chiamata a ListFeatures per lo streaming delle risposte è simile all'utilizzo dei tipi di sequenza:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

RPC di streaming lato client

Chiamare RecordRoute per lo streaming delle richieste è simile a passare un iteratore a un metodo locale. Come la semplice RPC precedente che restituisce anche una singola risposta, può essere chiamata in modo sincrono:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

RPC di streaming bidirezionale

La chiamata dello streaming bidirezionale RouteChat ha (come nel caso del lato servizio) una combinazione di semantica di streaming delle richieste e di streaming delle risposte.

Genera i messaggi di richiesta e inviali uno alla volta utilizzando yield.

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

Ricevi ed elabora le risposte del server:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

Chiama i metodi helper

In run, esegui i metodi appena creati e trasmetti loro stub.

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. Prova

Esegui il server:

python route_guide_server.py

Da un altro terminale, attiva di nuovo l'ambiente virtuale (source .venv/bin/activate)), quindi esegui il client:

python route_guide_client.py

Diamo un'occhiata all'output.

ListFeatures

Innanzitutto, troverai l'elenco delle funzionalità. Ogni funzionalità viene trasmessa in streaming dal server (RPC di streaming lato server) man mano che vengono rilevate all'interno del rettangolo richiesto:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

In secondo luogo, RecordRoute mostra l'elenco dei punti visitati in modo casuale trasmessi in streaming dal client al server (RPC di streaming lato client):

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

Dopo che il client ha terminato lo streaming di tutti i punti visitati, riceverà una risposta non in streaming (una RPC unaria) dal server. Questa risposta conterrà un riepilogo dei calcoli eseguiti sull'intero percorso del cliente.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

Infine, l'output RouteChat mostra lo streaming bidirezionale. Quando il client "visita" i punti home o work, registra una nota per il punto inviando un RouteNote al server. Quando un punto è già stato visitato, il server trasmette in streaming tutte le note precedenti per questo punto.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. Passaggi successivi