תחילת העבודה עם gRPC-Python – סטרימינג

1. מבוא

ב-codelab הזה תשתמשו ב-gRPC-Python כדי ליצור לקוח ושרת שיהוו את הבסיס לאפליקציה למיפוי מסלולים שנכתבה ב-Python.

בסוף המדריך יהיה לכם לקוח שמתחבר לשרת מרוחק באמצעות gRPC כדי לקבל מידע על תכונות במסלול של לקוח, ליצור סיכום של המסלול של הלקוח ולהחליף מידע על המסלול, כמו עדכוני תנועה, עם השרת ועם לקוחות אחרים.

השירות מוגדר בקובץ Protocol Buffers, שישמש ליצירת קוד boilerplate ללקוח ולשרת, כדי שהם יוכלו לתקשר זה עם זה. כך תוכלו לחסוך זמן ומאמץ בהטמעת הפונקציונליות הזו.

הקוד שנוצר מטפל לא רק במורכבויות של התקשורת בין השרת ללקוח, אלא גם בסריאליזציה ובדה-סריאליזציה של הנתונים.

מה תלמדו

  • איך משתמשים ב-Protocol Buffers כדי להגדיר API של שירות.
  • איך ליצור לקוח ושרת מבוססי gRPC מהגדרה של Protocol Buffers באמצעות יצירת קוד אוטומטית.
  • הבנה של תקשורת סטרימינג בין לקוח לשרת באמצעות gRPC.

ה-codelab הזה מיועד למפתחי Python שחדשים ב-gRPC או שרוצים לרענן את הידע שלהם ב-gRPC, או לכל מי שמעוניין ליצור מערכות מבוזרות. לא נדרש ניסיון קודם ב-gRPC.

‫2. לפני שמתחילים

מה נדרש

  • Python בגרסה 3.9 ואילך. מומלץ להשתמש ב-Python 3.13. הוראות התקנה ספציפיות לפלטפורמה מופיעות במאמר Python Setup and Usage (הגדרה ושימוש ב-Python). אפשר גם להתקין Python שאינו מערכת באמצעות כלים כמו uv או pyenv.
  • pip להתקנת חבילות Python.
  • venv כדי ליצור סביבות וירטואליות של Python.

החבילות ensurepip ו-venv הן חלק מהספרייה הרגילה של Python, ובדרך כלל הן זמינות כברירת מחדל.

עם זאת, חלק מההפצות שמבוססות על Debian (כולל Ubuntu) בוחרות להחריג אותן כשמפיצים מחדש את Python. כדי להתקין את החבילות, מריצים את הפקודה:

sudo apt install python3-pip python3-venv

קבל את הקוד

כדי לייעל את הלמידה, ב-Codelab הזה מוצעת מסגרת קוד מקור מוכנה מראש שתעזור לכם להתחיל. בשלבים הבאים מוסבר איך להשלים את הבקשה, כולל יצירת קוד gRPC באמצעות התוסף grpc_tools.protoc Protocol Buffer compiler.

grpc-codelabs

קוד המקור של ה-scaffold ל-codelab הזה זמין בספרייה codelabs/grpc-python-streaming/start_here. אם אתם מעדיפים לא להטמיע את הקוד בעצמכם, קוד המקור המלא זמין בספרייה completed.

קודם יוצרים את ספריית העבודה של ה-codelab ועוברים אליה:

mkdir grpc-python-streaming && cd grpc-python-streaming

מורידים ומחלצים את ה-Codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

אפשרות אחרת היא להוריד את קובץ ה-‎ .zip שמכיל רק את ספריית ה-codelab ולבטל את הדחיסה שלו באופן ידני.

3. הגדרת הודעות ושירותים

השלב הראשון הוא להגדיר את שירות gRPC של האפליקציה, את שיטת ה-RPC ואת סוגי ההודעות של הבקשה והתגובה באמצעות Protocol Buffers. השירות שלכם יספק:

  • שיטות RPC שנקראות ListFeatures, ‏ RecordRoute ו-RouteChat, שהשרת מטמיע והלקוח קורא להן.
  • סוגי ההודעות Point,‏ Feature,‏ Rectangle,‏ RouteNote ו-RouteSummary, שהן מבני נתונים שמועברים בין הלקוח לשרת כשמפעילים את ה-RPC methods.

כל השיטות האלה של RPC וסוגי ההודעות שלהן מוגדרות בקובץ protos/route_guide.proto של קוד המקור שסופק.

פרוטוקול Buffers ידוע בדרך כלל בשם protobufs. מידע נוסף על המינוח של gRPC זמין במאמר מושגי ליבה, ארכיטקטורה ומחזור חיים של gRPC.

הגדרת סוגי הודעות

בקובץ protos/route_guide.proto של קוד המקור, מגדירים קודם את סוג ההודעה Point. הסמל Point מייצג זוג קואורדינטות של קו רוחב וקו אורך במפה. ב-codelab הזה, משתמשים במספרים שלמים לקואורדינטות:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

המספרים 1 ו-2 הם מספרי מזהים ייחודיים לכל אחד מהשדות במבנה message.

לאחר מכן, מגדירים את Feature סוג ההודעה. ‫Feature משתמש בשדה string כדי לציין את השם או הכתובת למשלוח דואר של משהו במיקום שצוין על ידי Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

כדי להזרים ללקוח כמה נקודות באזור מסוים, צריך להשתמש בהודעה Rectangle שמייצגת מלבן של קווי רוחב ואורך, שמיוצג על ידי שתי נקודות מנוגדות באלכסון lo ו-hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

בנוסף, הודעה RouteNote שמייצגת הודעה שנשלחה בנקודה מסוימת:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

לבסוף, תצטרכו לשלוח הודעה ב-RouteSummary. ההודעה הזו מתקבלת בתגובה ל-RPC‏ RecordRoute, שמוסבר בקטע הבא. הוא מכיל את מספר הנקודות הבודדות שהתקבלו, מספר התכונות שזוהו והמרחק הכולל שעבר כסכום מצטבר של המרחק בין כל נקודה.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

הגדרת שיטות שירות

כדי להגדיר שירות, מציינים שירות עם שם בקובץ .proto. לקובץ route_guide.proto יש מבנה service בשם RouteGuide שמגדיר שיטה אחת או יותר שמוגדרות על ידי השירות של האפליקציה.

כשמגדירים שיטות RPC בהגדרת השירות, מציינים את סוגי הבקשות והתגובות שלהן. בקטע הזה של ה-codelab, נגדיר:

ListFeatures

מקבל את האובייקטים Feature שזמינים בתוך Rectangle הנתון. התוצאות מועברות בסטרימינג ולא מוחזרות בבת אחת, כי המלבן עשוי לכסות אזור גדול ולהכיל מספר עצום של תכונות.

באפליקציה הזו, תשתמשו ב-RPC של סטרימינג בצד השרת: הלקוח שולח בקשה לשרת ומקבל סטרימינג כדי לקרוא רצף של הודעות בחזרה. הלקוח קורא מהזרם שהוחזר עד שאין יותר הודעות. כפי שאפשר לראות בדוגמה שלנו, כדי לציין שיטת סטרימינג בצד השרת, צריך להציב את מילת המפתח stream לפני סוג התגובה.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

מקבל זרם של נקודות במסלול שמוגדר למעבר, ומחזיר RouteSummary כשהמעבר מסתיים.

במקרה כזה, מתאים להשתמש ב-RPC של הזרמת נתונים מצד הלקוח: הלקוח כותב רצף של הודעות ושולח אותן לשרת, שוב באמצעות זרם נתונים שסופק. אחרי שהלקוח מסיים לכתוב את ההודעות, הוא מחכה שהשרת יקרא את כולן ויחזיר את התגובה שלו. כדי לציין שיטת סטרימינג בצד הלקוח, מציבים את מילת המפתח stream לפני סוג הבקשה.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

מקבלת זרם של RouteNotes שנשלח בזמן שמתבצעת תנועה במסלול, תוך כדי קבלת RouteNotes אחרים (למשל ממשתמשים אחרים).

זה בדיוק סוג השימוש בסטרימינג דו-כיווני. בקשת RPC להזרמת נתונים דו-כיוונית, שבה שני הצדדים שולחים רצף של הודעות באמצעות זרם לקריאה ולכתיבה. שני הזרמים פועלים באופן עצמאי, כך שהלקוחות והשרתים יכולים לקרוא ולכתוב בכל סדר שירצו. לדוגמה, השרת יכול לחכות עד שיקבל את כל ההודעות מהלקוח לפני שהוא כותב את התשובות שלו, או שהוא יכול לקרוא הודעה ואז לכתוב הודעה, או כל שילוב אחר של קריאות וכתיבות. הסדר של ההודעות בכל פיד נשמר. כדי לציין את סוג ה-method הזה, צריך להוסיף את מילת המפתח stream לפני הבקשה ולפני התשובה.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. יצירת קוד הלקוח והשרת

בשלב הבא, יוצרים את קוד ה-gRPC הסטנדרטי גם ללקוח וגם לשרת מקובץ .proto באמצעות מהדר פרוטוקול ה-Buffer.

יצרנו את grpcio-tools ליצירת קוד gRPC Python. היא כוללת:

  1. הקומפיילר הרגיל protoc שיוצר קוד Python מהגדרות message.
  2. תוסף gRPC protobuf שיוצר קוד Python (stub של לקוח ושרת) מההגדרות של service.

נתקין את חבילת Python‏ grpcio-tools באמצעות pip. כדאי ליצור סביבה וירטואלית של Python (venv) כדי לבודד את התלות של הפרויקט בחבילות המערכת:

python3 -m venv --upgrade-deps .venv

כדי להפעיל את הסביבה הווירטואלית במעטפת bash/zsh:

source .venv/bin/activate

במערכות Windows ובמעטפות לא סטנדרטיות, אפשר לעיין בטבלה שבכתובת https://docs.python.org/3/library/venv.html#how-venvs-work.

בשלב הבא, מתקינים את grpcio-tools (הפעולה הזו מתקינה גם את החבילה grpcio):

pip install grpcio-tools

כדי ליצור את קוד ה-boilerplate של Python, משתמשים בפקודה הבאה:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

הפעולה הזו תיצור את הקבצים הבאים לממשקים שהגדרנו ב-route_guide.proto:

  1. route_guide_pb2.py מכיל את הקוד שיוצר באופן דינמי כיתות שנוצרו מההגדרות של message.
  2. route_guide_pb2.pyi הוא קובץ stub או קובץ רמזים לגבי סוגים שנוצר מההגדרות של message. הוא מכיל רק את החתימות ללא הטמעה. סביבות פיתוח משולבות (IDE) יכולות להשתמש בקובצי stub כדי לספק השלמה אוטומטית טובה יותר וזיהוי שגיאות.
  3. route_guide_pb2_grpc.py נוצר מההגדרות של service ומכיל מחלקות ופונקציות ספציפיות ל-gRPC.

קוד ספציפי ל-gRPC מכיל:

  1. RouteGuideStub, שאפשר להשתמש בו בלקוח gRPC כדי להפעיל קריאות RPC של RouteGuide.
  2. RouteGuideServicer, שמגדיר את הממשק להטמעות של שירות RouteGuide.
  3. הפונקציה add_RouteGuideServicer_to_server שמשמשת לרישום RouteGuideServicer ב-gRPC server.

5. יצירת השרת

קודם נראה איך יוצרים RouteGuide שרת. יצירה והפעלה של שרת RouteGuide מתחלקות לשני פריטי עבודה:

  • הטמעה של ממשק השירות שנוצר מהגדרת השירות עם פונקציות שמבצעות את ה "עבודה" בפועל של השירות.
  • הפעלת שרת gRPC להאזנה לבקשות מלקוחות ולהעברת תגובות.

בואו נסתכל על route_guide_server.py.

הטמעה של RouteGuide

route_guide_server.py יש כיתה RouteGuideServicer שמהווה מחלקת משנה של הכיתה שנוצרה route_guide_pb2_grpc.RouteGuideServicer:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer מטמיע את כל שיטות השירות של RouteGuide.

הזרמת RPC בצד השרת

ListFeatures הוא RPC של סטרימינג תגובות ששולח כמה Feature ללקוח:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

בדוגמה הזו, הודעת הבקשה היא route_guide_pb2.Rectangle והלקוח רוצה למצוא בתוכה את המחרוזת Feature. במקום להחזיר תשובה אחת, ה-method מחזירה אפס תשובות או יותר.

Client-side streaming RPC

שיטת הזרמת הבקשות RecordRoute משתמשת באיטרטור של ערכי בקשות ומחזירה ערך תגובה יחיד.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

RPC לסטרימינג דו-כיווני

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat():

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

הסמנטיקה של השיטה הזו היא שילוב של הסמנטיקה של השיטה request-streaming ושל השיטה response-streaming. הפונקציה מקבלת איטרטור של ערכי בקשה, והיא עצמה איטרטור של ערכי תגובה.

הפעלת השרת

אחרי שמטמיעים את כל השיטות של RouteGuide, השלב הבא הוא להפעיל שרת gRPC כדי שהלקוחות יוכלו להשתמש בשירות:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

השיטה של השרת start() לא חוסמת. ייווצר שרשור חדש כדי לטפל בבקשות. לרוב, לשרשור שקורא ל-server.start() לא תהיה עבודה אחרת לבצע בינתיים. במקרה כזה, אפשר להתקשר אל server.wait_for_termination() כדי לחסום את שרשור השיחות בצורה נקייה עד שהשרת יסיים את הפעולה.

6. יצירת הלקוח

בואו נסתכל על route_guide_client.py.

יצירת דף stub

כדי לקרוא לשיטות של שירות, קודם צריך ליצור stub.

אנחנו יוצרים מופע של המחלקה RouteGuideStub מהמודול route_guide_pb2_grpc, שנוצר מהשיטה .proto. In run():

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

שימו לב: כאן נעשה שימוש ב-channel כמנהל הקשר, והוא ייסגר אוטומטית ברגע שהמתורגמן יעזוב את הבלוק with.

הפעלת שיטות של שירות

עבור שיטות RPC שמחזירות תגובה יחידה (שיטות response-unary), ‏ gRPC Python תומך בסמנטיקה של זרימת בקרה סינכרונית (חסימה) ואסינכרונית (לא חסימה). בשיטות RPC של סטרימינג תגובות, הקריאות מחזירות מיד איטרטור של ערכי תגובה. הפעלת המתודה next() של האיטרטור חוסמת את התהליך עד שהתשובה שצריך להחזיר מהאיטרטור הופכת לזמינה.

הזרמת RPC בצד השרת

השימוש ב-response-streaming ListFeatures דומה לעבודה עם סוגי רצפים:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

Client-side streaming RPC

הקריאה ל-request-streaming RecordRoute דומה להעברת איטרטור לשיטה מקומית. בדומה ל-RPC הפשוט שלמעלה שמחזיר גם הוא תגובה אחת, אפשר לקרוא לו באופן סינכרוני:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

RPC לסטרימינג דו-כיווני

הקריאה ל-RouteChat של סטרימינג דו-כיווני (כמו במקרה של צד השרת) משלבת בין הסמנטיקה של סטרימינג של בקשות וסטרימינג של תגובות.

יוצרים את הודעות הבקשה ושולחים אותן אחת אחת באמצעות yield.

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

קבלת תגובות מהשרת ועיבוד שלהן:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

הפעלת שיטות העזר

בפונקציה run, מריצים את השיטות שיצרנו ומעבירים להן את stub.

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. רוצה לנסות?

מריצים את השרת:

python route_guide_server.py

ממסוף אחר, מפעילים שוב את הסביבה הווירטואלית (source .venv/bin/activate)), ואז מריצים את הלקוח:

python route_guide_client.py

בואו נסתכל על הפלט.

ListFeatures

קודם כל, תופיע רשימת התכונות. כל תכונה מועברת בסטרימינג מהשרת (RPC בצד השרת לסטרימינג) כשהמערכת מזהה שהיא נמצאת בתוך המלבן המבוקש:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

שנית, RecordRoute מדגים את רשימת הנקודות שנבחרו באופן אקראי ומוזרמות מהלקוח לשרת (הזרמת RPC בצד הלקוח):

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

אחרי שהלקוח יסיים להזרים את כל הנקודות שביקר בהן, הוא יקבל מהשרת תגובה שלא מועברת בסטרימינג (RPC אוניטרי). התגובה הזו תכיל סיכום של החישובים שבוצעו במסלול המלא של הלקוח.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

לבסוף, הפלט RouteChat מדגים סטרימינג דו-כיווני. כשהלקוח 'מבקר' בנקודות home או work, הוא שולח RouteNote לשרת כדי לתעד הערה לגבי הנקודה. אם כבר ביקרתם בנקודה מסוימת, השרת ישדר בחזרה את כל ההערות הקודמות לגבי הנקודה הזו.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. המאמרים הבאים