הטמעת נתוני CSV (ערכים מופרדים בפסיקים) ב-BigQuery באמצעות Cloud Data Fusion - הטמעת נתונים בזמן אמת

1. מבוא

509db33558ae025.png

העדכון האחרון: 28 בפברואר 2020

ב-codelab הזה מוצג דפוס של הטמעת נתונים להטמעת נתונים בתחום הבריאות בפורמט CSV ב-BigQuery בזמן אמת. בשיעור ה-Lab הזה נשתמש בפייפליין נתונים בזמן אמת של Cloud Data Fusion. נתוני בדיקה ריאליים של שירותי בריאות נוצרו והועמדו לרשותכם בקטגוריה של Google Cloud Storage ‏ (gs://hcls_testing_data_fhir_10_patients/csv/).

בשיעור ה-Lab הזה תלמדו:

  • איך מטמיעים נתוני CSV (טעינה בזמן אמת) מ-Pub/Sub ל-BigQuery באמצעות Cloud Data Fusion.
  • איך ליצור באופן ויזואלי צינור עיבוד נתונים לשילוב נתונים ב-Cloud Data Fusion כדי לטעון, לשנות ולהסתיר נתונים של מערכות בריאות בזמן אמת.

מה צריך כדי להפעיל את ההדגמה הזו?

  • צריכה להיות לכם גישה לפרויקט GCP.
  • צריכה להיות לכם הרשאת 'בעלים' בפרויקט GCP.
  • נתוני בריאות בפורמט CSV, כולל הכותרת.

אם אין לכם פרויקט GCP, אתם יכולים לפעול לפי השלבים האלה כדי ליצור פרויקט חדש ב-GCP.

נתוני בריאות בפורמט CSV נטענו מראש לקטגוריית GCS בכתובת gs://hcls_testing_data_fhir_10_patients/csv/. לכל קובץ משאבים בפורמט CSV יש מבנה סכימה ייחודי. לדוגמה, ל-Patients.csv יש סכימה שונה מזו של Providers.csv. אפשר למצוא קובצי סכימה שנטענו מראש בכתובת gs://hcls_testing_data_fhir_10_patients/csv_schemas.

אם אתם צריכים מערך נתונים חדש, תמיד תוכלו ליצור אותו באמצעות SyntheaTM. לאחר מכן, מעלים אותו ל-GCS במקום להעתיק אותו מהמאגר בשלב 'העתקת נתוני קלט'.

2. הגדרת פרויקט GCP

מאתחלים משתני מעטפת עבור הסביבה.

כדי למצוא את PROJECT_ID, אפשר לעיין במאמר בנושא זיהוי פרויקטים.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

יוצרים קטגוריה ב-GCS לאחסון נתוני קלט ויומני שגיאות באמצעות הכלי gsutil.

gsutil mb -l us gs://$BUCKET_NAME

קבלת גישה למערך הנתונים הסינתטי

  1. מכתובת האימייל שבה אתם משתמשים כדי להיכנס למסוף Cloud, שולחים אימייל אל hcls-solutions-external+subscribe@google.com ומבקשים להצטרף.
  2. תקבלו אימייל עם הוראות לאישור הפעולה.
  3. משתמשים באפשרות להשיב לאימייל כדי להצטרף לקבוצה. אל תלחצו על הלחצן 525a0fa752e0acae.png.
  4. אחרי שתקבלו את אישור ההרשמה באימייל, תוכלו להמשיך לשלב הבא ב-codelab.

העתקת נתוני הקלט.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

יוצרים מערך נתונים ב-BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

מתקינים ומאתחלים את Google Cloud SDK ויוצרים נושאים ומינויים ב-Pub או ב-Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. הגדרת סביבת Cloud Data Fusion

כדי להפעיל את Cloud Data Fusion API ולהעניק את ההרשאות הנדרשות:

מפעילים את ממשקי ה-API.

  1. עוברים אל GCP Console API Library.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט הרצוי.
  3. ב-API Library, בוחרים את ה-API שרוצים להפעיל ( Cloud Data Fusion API, Cloud Pub/Sub API). אם אתם צריכים עזרה באיתור ה-API, אתם יכולים להשתמש בשדה החיפוש ובמסננים.
  4. בדף ה-API, לוחצים על הפעלה.

יצירת מכונת Cloud Data Fusion

  1. במסוף GCP, בוחרים את ProjectID.
  2. בוחרים באפשרות Data Fusion מהתפריט הימני, ואז לוחצים על הלחצן CREATE AN INSTANCE (יצירת מופע) באמצע הדף (יצירה ראשונה), או לוחצים על הלחצן CREATE INSTANCE (יצירת מופע) בתפריט העליון (יצירה נוספת).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. מזינים את שם המכונה. בוחרים באפשרות Enterprise.

5af91e46917260ff.png

  1. לוחצים על הלחצן CREATE (יצירה).

הגדרת הרשאות למופע

אחרי שיוצרים מופע, מבצעים את השלבים הבאים כדי לתת לחשבון השירות שמשויך למופע הרשאות בפרויקט:

  1. כדי לעבור לדף הפרטים של המכונה, לוחצים על שם המכונה.

76ad691f795e1ab3.png

  1. מעתיקים את חשבון השירות.

6c91836afb72209d.png

  1. עוברים לדף IAM של הפרויקט.
  2. בדף ההרשאות של IAM, לוחצים על הלחצן הוספה כדי להעניק לחשבון השירות את התפקיד סוכן שירות של Cloud Data Fusion API. מדביקים את 'חשבון השירות' בשדה 'חברים חדשים' ובוחרים באפשרות 'ניהול שירותים' -> 'תפקיד סוכן שרת Cloud Data Fusion API'.

36f03d11c2a4ce0.png

  1. לוחצים על + Add another role (או על Edit Cloud Data Fusion API Service Agent) כדי להוסיף את התפקיד Pub/Sub Subscriber.

b4bf5500b8cbe5f9.png

  1. לוחצים על שמירה.

אחרי שמבצעים את השלבים האלה, אפשר להתחיל להשתמש ב-Cloud Data Fusion. לשם כך, לוחצים על הקישור View Instance (הצגת המכונה) בדף המכונות של Cloud Data Fusion או בדף הפרטים של מכונה.

מגדירים את הכלל בחומת האש.

  1. עוברים אל מסוף GCP -> VPC Network -> Firewall rules כדי לבדוק אם הכלל default-allow-ssh קיים או לא.

102adef44bbe3a45.png

  1. אם לא, מוסיפים כלל חומת אש שמאפשר את כל תעבורת הנתונים הנכנסת של SSH לרשת שמוגדרת כברירת מחדל.

באמצעות שורת הפקודה:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

באמצעות ממשק המשתמש: לוחצים על Create Firewall Rule (יצירת כלל של חומת אש) וממלאים את הפרטים:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. פיתוח צמתים לצינור עיבוד הנתונים

עכשיו, אחרי שיצרנו את סביבת Cloud Data Fusion ב-GCP, נתחיל לבנות את צינורות הנתונים ב-Cloud Data Fusion באמצעות השלבים הבאים:

  1. בחלון Cloud Data Fusion, לוחצים על הקישור 'הצגת המופע' בעמודה 'פעולה'. המערכת תפנה אתכם לדף אחר. לוחצים על כתובת ה-URL שמופיעה כדי לפתוח את המכונה של Cloud Data Fusion. הבחירה שלכם אם ללחוץ על הלחצן 'התחלת סיור' או על הלחצן 'לא, תודה' בחלון הקופץ של הודעת הפתיחה.
  2. מרחיבים את תפריט ההמבורגר, בוחרים באפשרות 'צינור' -> 'רשימה'

317820def934a00a.png

  1. לוחצים על הלחצן הירוק + בפינה השמאלית העליונה ואז בוחרים באפשרות יצירת צינור. או לוחצים על 'יצירת קישור' לצינור.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. אחרי שמופיע סטודיו צינורות הנתונים, בפינה הימנית העליונה, בוחרים באפשרות צינור נתונים – בזמן אמת מהתפריט הנפתח.

372a889a81da5e66.png

  1. בממשק המשתמש של צינורות הנתונים, בחלונית הימנית מוצגים קטעים שונים כמו Filter (מסנן), Source (מקור), Transform (שינוי), Analytics (ניתוח), Sink (יעד), Error Handlers (טיפול בשגיאות) ו-Alerts (התראות). אפשר לבחור צומת או צמתים לצינור.

c63de071d4580f2f.png

בוחרים צומת של מקור.

  1. בקטע Source (מקור) בפלטת התוספים בצד ימין, לוחצים לחיצה כפולה על הצומת Google Cloud PubSub שמופיע בממשק המשתמש של Data Pipelines.
  2. מצביעים על צומת המקור של PubSub ולוחצים על מאפיינים.

ed857a5134148d7b.png

  1. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • Label = {any text}
  • שם הפניה = {כל טקסט}
  • Project ID = זיהוי אוטומטי
  • Subscription = מינוי שנוצר בקטע Create Pub/Sub Topic (לדוגמה, your-sub)
  • Topic = הנושא שנוצר בקטע 'יצירת נושא Pub/Sub' (לדוגמה, your-topic)
  1. לקבלת הסבר מפורט, לוחצים על Documentation (מסמכים). לוחצים על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ההודעה הירוקה 'לא נמצאו שגיאות' מציינת שהפעולה הצליחה.

5c2774338b66bebe.png

  1. כדי לסגור את החלון Pub/Sub Properties, לוחצים על הלחצן X.

בוחרים את צומת ההמרה.

  1. בקטע Transform (טרנספורמציה) בפלטת התוספים בצד ימין, לוחצים לחיצה כפולה על הצומת Projection שמופיע בממשק המשתמש של Data Pipelines. מחברים את צומת המקור Pub/Sub לצומת ההמרה Projection.
  2. מצביעים על הצומת Projection (הקרנה) ולוחצים על Properties (מאפיינים).

b3a9a3878879bfd7.png

  1. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • Convert = המרה של ההודעה מסוג byte לסוג string.
  • Fields to drop = {any field}
  • השדות שרוצים לשמור = {message, timestamp, and attributes} (לדוגמה, attributes: key=‘filename':value=‘patients' sent from Pub/Sub)
  • Fields to rename = {message, timestamp}
  1. לקבלת הסבר מפורט, לוחצים על Documentation (מסמכים). לוחצים על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ההודעה הירוקה 'לא נמצאו שגיאות' מציינת שהפעולה הצליחה.

b8c2f8efe18234ff.png

  1. בקטע Transform (טרנספורמציה) בלוח התוספים בצד ימין, לוחצים לחיצה כפולה על הצומת Wrangler שמופיע בממשק המשתמש של Data Pipelines. מחברים את הצומת Projection transform (טרנספורמציה של הקרנה) לצומת Wrangler transform (טרנספורמציה של שינוי נתונים). מציבים את הסמן מעל צומת ה-Wrangler ולוחצים על Properties (מאפיינים).

aa44a4db5fe6623a.png

  1. לוחצים על התפריט הנפתח פעולות ובוחרים באפשרות ייבוא כדי לייבא סכימה שנשמרה (לדוגמה: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. מוסיפים את השדה TIMESTAMP בסכימת הפלט (אם הוא לא קיים) על ידי לחיצה על הלחצן + לצד השדה האחרון ומסמנים את התיבה Null.
  3. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • Label = {any text}
  • שם השדה להזנת קלט = {*}
  • Precondition = {attributes.get("filename") != "patients"} כדי להבחין בין כל סוג של רשומה או הודעה (לדוגמה, patients, providers, allergies וכו') שנשלחים מצומת המקור של PubSub.
  1. לקבלת הסבר מפורט, לוחצים על Documentation (מסמכים). לוחצים על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ההודעה הירוקה 'לא נמצאו שגיאות' מציינת שהפעולה הצליחה.

3b8e552cd2e3442c.png

  1. מגדירים את שמות העמודות בסדר הרצוי ומשמיטים את השדות שלא צריך. מעתיקים את קטע הקוד הבא ומדביקים אותו בתיבת המתכון.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. מידע על מיסוך נתונים והסרת פרטי הזיהוי זמין במאמר Batch-Codelab - CSV to BigQuery via CDF. אפשר גם להוסיף את קטע הקוד הזה mask-number SSN xxxxxxx#### לתיבת המתכון
  2. כדי לסגור את החלון 'מאפייני שינוי הצורה', לוחצים על הלחצן X.

בוחרים את צומת היעד.

  1. בקטע Sink בפלטת התוספים בצד ימין, לוחצים פעמיים על הצומת BigQuery שמופיע בממשק המשתמש של Data Pipeline. מחברים את צומת ההמרה של Wrangler לצומת היעד של BigQuery.
  2. מציבים את הסמן מעל צומת היעד של BigQuery ולוחצים על Properties (מאפיינים).

1be711152c92c692.png

  1. ממלאים את שדות החובה:
  • Label = {any text}
  • שם הפניה = {כל טקסט}
  • Project ID = זיהוי אוטומטי
  • Dataset = מערך נתונים ב-BigQuery שמשמש בפרויקט הנוכחי (לדוגמה, DATASET_ID)
  • Table = {table name}
  1. לקבלת הסבר מפורט, לוחצים על Documentation (מסמכים). לוחצים על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ההודעה הירוקה 'לא נמצאו שגיאות' מציינת שהפעולה הצליחה.

bba71de9f31e842a.png

  1. כדי לסגור את BigQuery Properties, לוחצים על הלחצן X.

5. פיתוח פייפליין נתונים בזמן אמת

בקטע הקודם יצרנו צמתים שנדרשים לבניית פייפליין נתונים ב-Cloud Data Fusion. בקטע הזה מחברים את הצמתים כדי ליצור את צינור הנתונים בפועל.

חיבור כל הצמתים בצינור

  1. גוררים חץ של חיבור > מהקצה הימני של צומת המקור ומשחררים אותו בקצה השמאלי של צומת היעד.
  2. לצינור יכולים להיות כמה ענפים שמקבלים הודעות שפורסמו מאותו צומת מקור של PubSub.

b22908cc35364cdd.png

  1. נותנים שם לצינור.

זה הכול. יצרתם עכשיו את פייפליין הנתונים הראשון שלכם בזמן אמת, שמוכן לפריסה ולהפעלה.

שליחת הודעות דרך Cloud Pub/Sub

באמצעות ממשק המשתמש של Pub/Sub:

  1. עוברים אל מסוף GCP ->‏ Pub/Sub ->‏ Topics, בוחרים באפשרות your-topic ולוחצים על PUBLISH MESSAGE בתפריט העליון.

d65b2a6af1668ecd.png

  1. מציבים רק שורת רשומה אחת בכל פעם בשדה ההודעה. לוחצים על הלחצן +הוספת מאפיין. מזינים Key = filename, Value = <type of record> (for example, patients, providers, allergies, etc.).
  2. לוחצים על לחצן הפרסום כדי לשלוח את ההודעה.

באמצעות הפקודה gcloud:

  1. מזינים את ההודעה באופן ידני.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. העברת ההודעה באופן חצי אוטומטי באמצעות פקודות ה-Unix‏ cat ו-sed. אפשר להריץ את הפקודה הזו שוב ושוב עם פרמטרים שונים.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. הגדרה, פריסה והפעלה של צינור עיבוד נתונים

אחרי שפיתחנו את פייפליין הנתונים, אנחנו יכולים לפרוס אותו ולהריץ אותו ב-Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. משאירים את הגדרות ברירת המחדל של Configure (הגדרה).
  2. כדי לראות תצוגה מקדימה של הנתונים, לוחצים על תצוגה מקדימה**.** לוחצים שוב על **תצוגה מקדימה** כדי לחזור לחלון הקודם. אפשר גם להריץ את צינור העיבוד במצב תצוגה מקדימה בלחיצה על **הפעלה**.

b3c891e5e1aa20ae.png

  1. לוחצים על יומנים כדי לראות את היומנים.
  2. לוחצים על שמירה כדי לשמור את כל השינויים.
  3. כשיוצרים צינור חדש, לוחצים על ייבוא כדי לייבא את הגדרת הצינור ששמרתם.
  4. לוחצים על ייצוא כדי לייצא הגדרה של צינור עיבוד נתונים.
  5. לוחצים על Deploy (פריסה) כדי לפרוס את צינור עיבוד הנתונים.
  6. אחרי הפריסה, לוחצים על Run ומחכים עד שהצינור יפעל עד הסוף.

f01ba6b746ba53a.png

  1. אפשר ללחוץ על Stop (עצירה) כדי להפסיק את הפעלת צינור הנתונים בכל שלב.
  2. כדי לשכפל את צינור הנתונים, לוחצים על Duplicate (שכפול) מתחת ללחצן Actions (פעולות).
  3. כדי לייצא את הגדרות הצינור, לוחצים על Export (ייצוא) מתחת ללחצן Actions (פעולות).

28ea4fc79445fad2.png

  1. לוחצים על סיכום כדי להציג תרשימים של היסטוריית ההרצה, רשומות, יומני שגיאות ואזהרות.

7. אימות

בקטע הזה נבדוק את ההפעלה של פייפליין הנתונים.

  1. מוודאים שצינור עיבוד הנתונים בוצע בהצלחה ופועל באופן רציף.

1644dfac4a2d819d.png

  1. מוודאים שהטבלאות ב-BigQuery נטענות עם רשומות מעודכנות על סמך חותמת הזמן. בדוגמה הזו, שתי רשומות או הודעות של מטופלים ורשומה או הודעה אחת של אלרגיה פורסמו בנושא Pub/Sub בתאריך 25 ביוני 2019.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. מוודאים שההודעות שפורסמו ב-<your-topic> התקבלו על ידי המנוי <your-sub>.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

איך רואים את התוצאות

כדי לראות את התוצאות אחרי שההודעות מתפרסמות בנושא Pub/Sub בזמן שצינור הנתונים בזמן אמת פועל:

  1. מריצים שאילתה על הטבלה בממשק המשתמש של BigQuery. מעבר לממשק המשתמש של BigQuery
  2. מעדכנים את השאילתה שלמטה לשם הפרויקט, מערך הנתונים והטבלה שלכם.

6a1fb85bd868abc9.png

8. סידור וארגון

כדי להימנע מחיובים בחשבון Google Cloud Platform בגלל השימוש במשאבים שנעשה במסגרת המדריך הזה:

בסיום המדריך, חשוב להסיר את המשאבים שיצרתם ב-GCP כדי שלא יתפסו מכסה ולא תחויבו עליהם בעתיד. בסעיפים הבאים מוסבר איך למחוק או להשבית את המשאבים האלו.

מחיקת מערך הנתונים ב-BigQuery

כדי למחוק את מערך הנתונים ב-BigQuery שיצרתם במסגרת המדריך הזה, פועלים לפי ההוראות האלה.

מחיקת קטגוריית GCS

כדי למחוק את קטגוריית ה-GCS שיצרתם כחלק מההדרכה הזו, פועלים לפי ההוראות הבאות.

מחיקת מכונת Cloud Data Fusion

פועלים לפי ההוראות כדי למחוק את מופע Cloud Data Fusion.

מחיקת הפרויקט

הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך.

כדי למחוק את הפרויקט:

  1. במסוף GCP, נכנסים לדף Projects. כניסה לדף Projects
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

9. מזל טוב

סיימתם בהצלחה את סדנת הקוד להטמעת נתונים בתחום הבריאות ב-BigQuery באמצעות Cloud Data Fusion.

פרסמתם נתוני CSV בנושא Pub/Sub ואז טענתם אותם ל-BigQuery.

יצרתם באופן ויזואלי צינור עיבוד נתונים לשילוב נתונים, לטעינה, לשינוי ולהסתרת נתונים של שירותי בריאות בזמן אמת.

עכשיו אתם יודעים מהם השלבים העיקריים שצריך לבצע כדי להתחיל את התהליך של ניתוח נתונים בתחום הבריאות באמצעות BigQuery ב-Google Cloud Platform.