תחילת העבודה עם gRPC-Go – סטרימינג

1. מבוא

ב-codelab הזה תשתמשו ב-gRPC-Go כדי ליצור לקוח ושרת שמהווים את הבסיס לאפליקציה למיפוי מסלולים שנכתבה ב-Go.

בסוף המדריך יהיה לכם לקוח שמתחבר לשרת מרוחק באמצעות gRPC כדי לקבל מידע על תכונות במסלול של לקוח, ליצור סיכום של המסלול של הלקוח ולהחליף מידע על המסלול, כמו עדכוני תנועה, עם השרת ועם לקוחות אחרים.

השירות מוגדר בקובץ Protocol Buffers, שישמש ליצירת קוד boilerplate ללקוח ולשרת, כדי שהם יוכלו לתקשר זה עם זה. כך תוכלו לחסוך זמן ומאמץ בהטמעת הפונקציונליות הזו.

הקוד שנוצר מטפל לא רק במורכבויות של התקשורת בין השרת ללקוח, אלא גם בסריאליזציה ובדה-סריאליזציה של הנתונים.

מה תלמדו

  • איך משתמשים ב-Protocol Buffers כדי להגדיר API של שירות.
  • איך ליצור לקוח ושרת מבוססי gRPC מהגדרה של Protocol Buffers באמצעות יצירת קוד אוטומטית.
  • הבנה של תקשורת סטרימינג בין לקוח לשרת באמצעות gRPC.

ה-codelab הזה מיועד למפתחי Go שחדשים ב-gRPC או שרוצים לרענן את הידע שלהם ב-gRPC, או לכל מי שמתעניין בפיתוח מערכות מבוזרות. לא נדרש ניסיון קודם ב-gRPC.

‫2. לפני שמתחילים

דרישות מוקדמות

ודאו שהתקנתם את הפריטים הבאים:

  • ערכת הכלים של Go בגרסה 1.24.5 ואילך. הוראות להתקנה מופיעות במאמר תחילת העבודה של Go.
  • הקומפיילר של מאגר אחסון לפרוטוקולים, protoc, גרסה 3.27.1 ואילך. הוראות ההתקנה מופיעות במדריך ההתקנה של הקומפיילר.
  • תוספים של קומפיילר מאגר אחסון לפרוטוקולים ל-Go ול-gRPC. כדי להתקין את הפלאגינים האלה, מריצים את הפקודות הבאות:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

מעדכנים את המשתנה PATH כדי שהקומפיילר של מאגר אחסון לפרוטוקולים יוכל למצוא את הפלאגינים:

export PATH="$PATH:$(go env GOPATH)/bin"

קבל את הקוד

כדי שלא תצטרכו להתחיל מאפס, ב-codelab הזה מופיע סקאפולד של קוד המקור של האפליקציה שתוכלו להשלים. בשלבים הבאים נסביר איך לסיים את האפליקציה, כולל שימוש בתוספים של קומפיילר מאגר אחסון לפרוטוקולים כדי ליצור את קוד ה-boilerplate של gRPC.

קודם יוצרים את ספריית העבודה של ה-codelab ומנווטים אליה באמצעות הפקודה cd:

mkdir streaming-grpc-go-getting-started && cd streaming-grpc-go-getting-started

מורידים ומחלצים את ה-codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-go-streaming/start_here

אפשרות אחרת היא להוריד את קובץ ה-‎ .zip שמכיל רק את ספריית ה-codelab ולבטל את הדחיסה שלו באופן ידני.

קוד המקור המלא זמין ב-GitHub אם אתם רוצים לדלג על הקלדת ההטמעה.

3. הגדרת הודעות ושירותים

השלב הראשון הוא להגדיר את שירות ה-gRPC של האפליקציה, את שיטות ה-RPC שלה ואת סוגי ההודעות של הבקשות והתגובות באמצעות Protocol Buffers. השירות שלכם יספק:

  • שיטות RPC שנקראות ListFeatures, ‏ RecordRoute ו-RouteChat, שהשרת מטמיע והלקוח קורא להן.
  • סוגי ההודעות Point,‏ Feature,‏ Rectangle,‏ RouteNote ו-RouteSummary, שהן מבני נתונים שמועברים בין הלקוח לשרת כשמפעילים את השיטות שלמעלה.

כל השיטות האלה של RPC וסוגי ההודעות שלהן מוגדרות בקובץ routeguide/route_guide.proto של קוד המקור שסופק.

פרוטוקול Buffers ידוע בדרך כלל בשם protobufs. מידע נוסף על המינוח של gRPC זמין במאמר מושגים מרכזיים, ארכיטקטורה ומחזור חיים של gRPC.

הגדרת סוגי הודעות

בקובץ routeguide/route_guide.proto של קוד המקור, מגדירים קודם את סוג ההודעה Point. הסמל Point מייצג זוג קואורדינטות של קו רוחב וקו אורך במפה. ב-codelab הזה, משתמשים במספרים שלמים לקואורדינטות:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

המספרים 1 ו-2 הם מספרי מזהים ייחודיים לכל אחד מהשדות במבנה message.

לאחר מכן, מגדירים את Feature סוג ההודעה. ‫Feature משתמש בשדה string כדי לציין את השם או הכתובת למשלוח דואר של משהו במיקום שצוין על ידי Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

אחריו מופיעה הודעת Rectangle שמייצגת מלבן של קווי רוחב ואורך, שמיוצג כשתי נקודות מנוגדות באלכסון, lo ו-hi.

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

גם הודעה של RouteNote שמייצגת הודעה שנשלחה בנקודה מסוימת.

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

נצטרך גם הודעה RouteSummary. ההודעה הזו מתקבלת בתגובה ל-RPC‏ RecordRoute, שמוסבר בקטע הבא. הוא מכיל את מספר הנקודות הבודדות שהתקבלו, את מספר התכונות שזוהו ואת המרחק הכולל שעבר, כסכום מצטבר של המרחק בין כל נקודה.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

הגדרת שיטות שירות

כדי להגדיר שירות, מציינים שירות עם שם בקובץ ‎ .proto. לקובץ route_guide.proto יש מבנה service בשם RouteGuide שמגדיר שיטה אחת או יותר שמוגדרות בשירות של האפליקציה.

מגדירים RPC שיטות בתוך הגדרת השירות, ומציינים את סוגי הבקשות והתשובות שלהן. בקטע הזה של ה-codelab, נגדיר:

ListFeatures

מקבל את Features שזמינים בתוך Rectangle שצוין. התוצאות מועברות בסטרימינג ולא מוחזרות בבת אחת (למשל בהודעת תגובה עם שדה חוזר), כי יכול להיות שהמלבן יכסה אזור גדול ויכיל מספר עצום של תכונות.

סוג ה-RPC המתאים במקרה הזה הוא RPC של סטרימינג בצד השרת: הלקוח שולח בקשה לשרת ומקבל סטרימינג כדי לקרוא רצף של הודעות בחזרה. הלקוח קורא מהזרם שהוחזר עד שאין יותר הודעות. כפי שאפשר לראות בדוגמה שלנו, כדי לציין שיטת סטרימינג בצד השרת, צריך להציב את מילת המפתח stream לפני סוג התגובה.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

מקבל זרם של Points במסלול שמועבר, ומחזיר RouteSummary כשהמעבר מסתיים.

במקרה הזה, נראה ש-RPC של סטרימינג בצד הלקוח הוא המתאים: הלקוח כותב רצף של הודעות ושולח אותן לשרת, שוב באמצעות סטרימינג שסופק. אחרי שהלקוח מסיים לכתוב את ההודעות, הוא מחכה שהשרת יקרא את כולן ויחזיר את התגובה שלו. כדי לציין שיטת סטרימינג בצד הלקוח, מציבים את מילת המפתח stream לפני סוג הבקשה.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

מקבלת זרם של RouteNotes שנשלחים בזמן שמתבצעת תנועה במסלול, תוך כדי קבלת RouteNotes אחרים (למשל ממשתמשים אחרים).

זה בדיוק סוג השימוש בשידור דו-כיווני. ב-RPC של סטרימינג דו-כיווני, שני הצדדים שולחים רצף של הודעות באמצעות סטרימינג לקריאה ולכתיבה. שני הזרמים פועלים באופן עצמאי, כך שהלקוחות והשרתים יכולים לקרוא ולכתוב בכל סדר שירצו.

לדוגמה, השרת יכול לחכות לקבלת כל ההודעות מהלקוח לפני שהוא כותב את התשובות שלו, או שהוא יכול לקרוא הודעה ואז לכתוב הודעה, או שילוב אחר של קריאות וכתיבות.

הסדר של ההודעות בכל זרם נשמר. כדי לציין את סוג ה-method הזה, צריך להוסיף את מילת המפתח stream לפני הבקשה ולפני התשובה.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. יצירת קוד לקוח וקוד שרת

לאחר מכן, יוצרים את קוד ה-boilerplate של gRPC גם ללקוח וגם לשרת מקובץ .proto באמצעות מהדר מאגר אחסון לפרוטוקולים. בספרייה routeguide, מריצים את הפקודה:

protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       route_guide.proto

הפקודה הזו יוצרת את הקבצים הבאים:

  • route_guide.pb.go, שמכיל פונקציות ליצירת סוגי ההודעות של האפליקציה ולגישה לנתונים שלהן ולהגדרה של הסוגים שמייצגים את ההודעות.
  • route_guide_grpc.pb.go, שמכיל פונקציות שהלקוח משתמש בהן כדי לקרוא לשיטת gRPC מרוחקת של השירות, ופונקציות שהשרת משתמש בהן כדי לספק את השירות המרוחק הזה.

בשלב הבא, נטמיע את השיטות בצד השרת, כך שכשהלקוח ישלח בקשה, השרת יוכל להשיב עם תשובה.

5. הטמעה של השירות

קודם נראה איך יוצרים RouteGuide שרת. יש שני חלקים בתהליך שבו שירות RouteGuide מבצע את העבודה שלו:

  • הטמעה של ממשק השירות שנוצר מהגדרת השירות שלנו: ביצוע ה "עבודה" בפועל של השירות שלנו.
  • הפעלת שרת gRPC להאזנה לבקשות מלקוחות ולשליחתן להטמעה הנכונה של השירות.

בואו נטמיע את RouteGuide ב-server/server.go.

הטמעה של RouteGuide

צריך להטמיע את הממשק RouteGuideService שנוצר. כך ייראה היישום.

type routeGuideServer struct {
        ...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}

בואו נבחן כל הטמעה של RPC בפירוט.

RPC של סטרימינג בצד השרת

מתחילים עם אחד מ-RPCs של הסטרימינג. ‫ListFeatures הוא RPC של סטרימינג מצד השרת, ולכן אנחנו צריכים לשלוח בחזרה כמה Feature ללקוח שלנו.

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

כפי שאפשר לראות, במקום לקבל אובייקטים פשוטים של בקשה ותגובה בפרמטרים של השיטה, הפעם אנחנו מקבלים אובייקט בקשה (Rectangle שבו הלקוח רוצה למצוא את Features) ואובייקט מיוחד RouteGuide_ListFeaturesServer לכתיבת התגובות. בשיטה הזו, מאכלסים כמה אובייקטים של Feature שצריך להחזיר, וכותבים אותם ל-RouteGuide_ListFeaturesServer באמצעות השיטה Send() שלו. לבסוף, כמו ב-RPC הפשוט שלנו, אנחנו מחזירים שגיאת nil כדי לציין ל-gRPC שסיימנו לכתוב תשובות. אם מתרחשת שגיאה כלשהי בקריאה הזו, הפונקציה מחזירה שגיאה שאינה nil. שכבת ה-gRPC תתרגם אותה לסטטוס RPC מתאים שיישלח בחיבור.

RPC של סטרימינג מצד הלקוח

עכשיו נסתכל על משהו קצת יותר מסובך: שיטת הסטרימינג בצד הלקוח RecordRoute, שבה אנחנו מקבלים סטרימינג של Points מהלקוח ומחזירים RouteSummary יחיד עם מידע על הנסיעה שלו. כפי שאפשר לראות, הפעם לשיטה אין פרמטר בקשה בכלל. במקום זאת, הוא מקבל זרם RouteGuide_RecordRouteServer, שהשרת יכול להשתמש בו כדי לקרוא ולכתוב הודעות – הוא יכול לקבל הודעות מהלקוח באמצעות השיטה Recv() שלו ולהחזיר את התגובה היחידה שלו באמצעות השיטה SendAndClose() שלו.

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

בגוף השיטה אנחנו משתמשים בשיטה RouteGuide_RecordRouteServer של Recv() כדי לקרוא שוב ושוב את הבקשות של הלקוח לאובייקט בקשה (במקרה הזה Point) עד שלא נשארו עוד הודעות: השרת צריך לבדוק את השגיאה שמוחזרת מ-Recv() אחרי כל קריאה. אם הערך הוא nil, הסטרים עדיין תקין והשרת יכול להמשיך לקרוא אותו. אם הערך הוא io.EOF, הסטרים של ההודעה הסתיים והשרת יכול להחזיר את RouteSummary. אם יש לו ערך אחר, אנחנו מחזירים את השגיאה 'כמו שהיא' כדי שהיא תתורגם לסטטוס RPC על ידי שכבת gRPC.

Bidirectional streaming RPC

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat().

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)

    s.mu.Lock()
    s.routeNotes[key] = append(s.routeNotes[key], in)
    // Note: this copy prevents blocking other clients while serving this one.
    // We don't need to do a deep copy, because elements in the slice are
    // insert-only and never modified.
    rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
    copy(rn, s.routeNotes[key])
    s.mu.Unlock()

    for _, note := range rn {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

הפעם אנחנו מקבלים RouteGuide_RouteChatServer זרם שאפשר להשתמש בו כדי לקרוא ולכתוב הודעות, כמו בדוגמה של סטרימינג בצד הלקוח. לעומת זאת, הפעם נחזיר ערכים דרך הזרם של השיטה שלנו בזמן שהלקוח עדיין כותב הודעות לזרם ההודעות שלו. התחביר לקריאה ולכתיבה כאן דומה מאוד לשיטת הזרמת הנתונים של הלקוח, אבל השרת משתמש בשיטה send() של הזרם ולא בשיטה SendAndClose(), כי הוא כותב כמה תשובות. למרות שכל צד תמיד יקבל את ההודעות של הצד השני בסדר שבו הן נכתבו, גם הלקוח וגם השרת יכולים לקרוא ולכתוב בכל סדר – הזרמים פועלים באופן עצמאי לחלוטין.

הפעלת השרת

אחרי שמטמיעים את כל השיטות, צריך גם להפעיל שרת gRPC כדי שהלקוחות יוכלו להשתמש בשירות. בקטע הקוד הבא אפשר לראות איך אנחנו עושים את זה בשירות RouteGuide:

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()

s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures()
pb.RegisterRouteGuideServer(grpcServer, s)
grpcServer.Serve(lis)

זה מה שקורה באזור main(), שלב אחר שלב:

  1. מציינים את יציאת ה-TCP שבה יש להשתמש כדי להאזין לבקשות של לקוחות מרוחקים, באמצעות lis, err := net.Listen(...). כברירת מחדל, האפליקציה משתמשת ביציאת TCP‏ 50051 כפי שמצוין במשתנה port או בהעברת המתג --port בשורת הפקודה כשמריצים את השרת. אם אי אפשר לפתוח את יציאת ה-TCP, האפליקציה מסתיימת עם שגיאה חמורה.
  2. יוצרים מופע של שרת gRPC באמצעות grpc.NewServer(...) ונותנים למופע הזה את השם grpcServer.
  3. יוצרים מצביע ל-routeGuideServer, מבנה שמייצג את שירות ה-API של האפליקציה, ונותנים למצביע את השם s.
  4. משתמשים ב-s.loadFeatures() כדי לאכלס את המערך s.savedFeatures.
  5. רושמים את הטמעת השירות בשרת gRPC.
  6. מתקשרים אל Serve() בשרת עם פרטי ההעברה כדי לבצע המתנה חוסמת לבקשות לקוח. הפעולה הזו נמשכת עד שהתהליך מופסק או עד שמתקשרים אל Stop().

הפונקציה loadFeatures() מקבלת את המיפויים של קואורדינטות למיקום מ-server/testdata.go.

6. יצירת הלקוח

עכשיו עורכים את client/client.go, שבו מטמיעים את קוד הלקוח.

כדי להפעיל את השיטות של השירות המרוחק, קודם צריך ליצור ערוץ gRPC כדי לתקשר עם השרת. אנחנו יוצרים את זה על ידי העברת מחרוזת ה-URI של יעד השרת (שבמקרה הזה היא פשוט הכתובת ומספר היציאה) אל grpc.NewClient() בפונקציה main() של הלקוח באופן הבא:

// Set up a connection to the gRPC server.
conn, err := grpc.NewClient("dns:///"+*serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
  log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

כתובת השרת, שמוגדרת על ידי המשתנה serverAddr, היא localhost:50051 כברירת מחדל, ואפשר לבטל את ברירת המחדל באמצעות המתג --addr בשורת הפקודה כשמריצים את הלקוח.

אם הלקוח צריך להתחבר לשירות שדורש פרטי אימות, כמו פרטי אימות של TLS או JWT, הלקוח יכול להעביר אובייקט DialOptions כפרמטר ל-grpc.NewClient שמכיל את פרטי האימות הנדרשים. לא נדרשים פרטי כניסה לשירות RouteGuide.

אחרי שמגדירים את הערוץ של gRPC, צריך stub של לקוח כדי לבצע RPC באמצעות קריאות לפונקציות Go. אנחנו מקבלים את ה-stub באמצעות ה-method‏ NewRouteGuideClient שמופיעה בקובץ route_guide_grpc.pb.go שנוצר מקובץ .proto של האפליקציה.

import (pb "github.com/grpc-ecosystem/codelabs/getting_started_streaming/routeguide")

client := pb.NewRouteGuideClient(conn)

קריאה לשיטות של שירותים

עכשיו נראה איך קוראים לשיטות של השירות. ב-gRPC-Go, קריאות לשירות מרוחק (RPC) פועלות במצב חסימה/סינכרוני, כלומר הקריאה לשירות מרוחק מחכה שהשרת יגיב, ותחזיר תגובה או שגיאה.

RPC של סטרימינג בצד השרת

כאן אנחנו קוראים לשיטת הסטרימינג בצד השרת ListFeatures, שמחזירה זרם של אובייקטים גיאוגרפיים Feature.

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
log.Printf("Looking for features within %v", rect)
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  log.Fatalf("client.ListFeatures failed: %v", err)
}
for {
  // For server-to-client streaming RPCs, you call stream.Recv() until it
  // returns io.EOF.
  feature, err := stream.Recv()
  if err == io.EOF {
    break
  }
  if err != nil {
    log.Fatalf("client.ListFeatures failed: %v", err)
  }
  log.Printf("Feature: name: %q, point:(%v, %v)", feature.GetName(),
    feature.GetLocation().GetLatitude(), feature.GetLocation().GetLongitude())
}

כמו ב-RPC הפשוט, אנחנו מעבירים למתודה הקשר ובקשה. עם זאת, במקום לקבל אובייקט תגובה, אנחנו מקבלים מופע של RouteGuide_ListFeaturesClient. הלקוח יכול להשתמש בזרם RouteGuide_ListFeaturesClient כדי לקרוא את התגובות של השרת. אנחנו משתמשים בשיטה RouteGuide_ListFeaturesClient's Recv() כדי לקרוא שוב ושוב את התגובות של השרת לאובייקט של מאגר אחסון לפרוטוקולים (במקרה הזה Feature) עד שלא נשארו עוד הודעות: הלקוח צריך לבדוק את השגיאה err שמוחזרת מ-Recv() אחרי כל קריאה. אם הערך הוא nil, הסטרים עדיין תקין ואפשר להמשיך לקרוא אותו. אם הערך הוא io.EOF, הסטרים של ההודעה הסתיים. אחרת, חייבת להיות שגיאת RPC, שמועברת דרך err.

RPC של סטרימינג מצד הלקוח

השיטה RecordRoute להעברת נתונים בזמן אמת מצד הלקוח דומה לשיטה מצד השרת, אבל כאן מעבירים לשיטה רק הקשר ומקבלים בחזרה נתונים בזמן אמת מסוג RouteGuide_RecordRouteClient, שאפשר להשתמש בהם גם כדי לכתוב וגם כדי לקרוא הודעות.

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
c2sStream, err := client.RecordRoute(context.TODO())
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
// Stream each point to the server.
for _, point := range points {
  if err := c2sStream.Send(point); err != nil {
    log.Fatalf("client.RecordRoute: stream.Send(%v) failed: %v", point, err)
  }
}
// Close the stream and receive the RouteSummary from the server.
reply, err := c2sStream.CloseAndRecv()
if err != nil {
  log.Fatalf("client.RecordRoute failed: %v", err)
}
log.Printf("Route summary: %v", reply)

ל-RouteGuide_RecordRouteClient יש שיטת Send() שבה אפשר להשתמש כדי לשלוח בקשות לשרת. אחרי שסיימנו לכתוב את הבקשות של הלקוח לזרם באמצעות Send(), אנחנו צריכים להפעיל את הפונקציה CloseAndRecv() בזרם כדי להודיע ל-gRPC שסיימנו לכתוב ומצפים לקבל תשובה. אנחנו מקבלים את סטטוס ה-RPC מהשגיאה שמוחזרת מ-CloseAndRecv(). אם הסטטוס הוא nil, הערך הראשון שמוחזר מ-CloseAndRecv() יהיה תגובה תקינה מהשרת.

Bidirectional streaming RPC

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat(). בדומה למקרה של RecordRoute, אנחנו מעבירים למתודה רק אובייקט הקשר ומקבלים בחזרה זרם שאפשר להשתמש בו גם לכתיבה וגם לקריאה של הודעות. לעומת זאת, הפעם אנחנו מחזירים ערכים דרך הזרם של השיטה שלנו בזמן שהשרת עדיין כותב הודעות לזרם ההודעות שלו.

biDiStream, err := client.RouteChat(context.Background())
if err != nil {
  log.Fatalf("client.RouteChat failed: %v", err)
}
// this channel is used to wait for the receive goroutine to finish.
recvDoneCh := make(chan struct{})
// receive goroutine.
go func() {
  for {
    in, err := biDiStream.Recv()
    if err == io.EOF {
      // read done.
      close(recvDoneCh)
      return
    }
    if err != nil {
      log.Fatalf("client.RouteChat failed: %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
// send messages simultaneously.
for _, note := range notes {
  if err := biDiStream.Send(note); err != nil {
    log.Fatalf("client.RouteChat: stream.Send(%v) failed: %v", note, err)
  }
}
biDiStream.CloseSend()
// wait for the receive goroutine to finish.
<-recvDoneCh

התחביר לקריאה ולכתיבה כאן דומה מאוד לשיטת הסטרימינג בצד הלקוח, רק שבמקרה הזה אנחנו משתמשים בשיטה CloseSend() של הסטרימינג אחרי שסיימנו את השיחה. למרות שכל צד תמיד יקבל את ההודעות של הצד השני בסדר שבו הן נכתבו, גם הלקוח וגם השרת יכולים לקרוא ולכתוב בכל סדר – הזרמים פועלים באופן עצמאי לחלוטין.

7. רוצה לנסות?

כדי לוודא שהשרת והלקוח פועלים יחד בצורה תקינה, מריצים את הפקודות הבאות בספריית העבודה של האפליקציה:

  1. מריצים את השרת במסוף אחד:
cd server
go run .
  1. מריצים את הלקוח ממסוף אחר:
cd client
go run .

יוצג פלט כמו זה, בלי חותמות זמן לשם הבהרה:

Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

8. המאמרים הבאים