הטמעת נתוני CSV (ערכים מופרדים בפסיקים) ב-BigQuery באמצעות Cloud Data Fusion - הטמעת נתונים בזמן אמת

1. מבוא

509db33558ae025.png

עדכון אחרון: 28.02.2020

ה-Codelab הזה מדגים דפוס הטמעת נתונים כדי להטמיע ב-BigQuery בזמן אמת נתונים של שירותי בריאות בפורמט CSV. בשיעור ה-Lab הזה נשתמש בצינור עיבוד הנתונים של Cloud Data Fusion בזמן אמת. נתונים ריאליים של בדיקות בריאותיות נוצרו בשבילך והם זמינים בקטגוריה של Google Cloud Storage (gs://hcls_testing_data_fhir_10_patients/csv/).

בשיעור ה-Lab הזה של הקוד, תלמדו:

  • איך מטמיעים נתוני CSV (טעינה בזמן אמת) מ-Pub/Sub ל-BigQuery באמצעות Cloud Data Fusion.
  • פיתוח ויזואלי של צינור עיבוד נתונים לשילוב נתונים ב-Cloud Data Fusion לצורך טעינה, טרנספורמציה והתאמה למסכה של נתוני בריאות בזמן אמת.

מה צריך כדי להריץ את ההדגמה הזו?

  • צריכה להיות לך גישה לפרויקט GCP.
  • צריך להיות לך תפקיד 'בעלים' בפרויקט GCP.
  • נתוני שירותי בריאות בפורמט CSV, כולל הכותרת.

אם אין לך פרויקט GCP, עליך לפעול לפי השלבים האלה כדי ליצור פרויקט חדש ב-GCP.

נתוני הבריאות בפורמט CSV נטענו מראש לקטגוריה של GCS בכתובת gs://hcls_testing_data_fhir_10_patients/csv/. לכל קובץ משאבים מסוג CSV יש מבנה סכימה ייחודי. לדוגמה, הסכימה של Patients.csv שונה מזו של Providers.csv. ניתן למצוא קובצי סכימה שנטענו מראש בכתובת gs://hcls_testing_data_fhir_10_patients/csv_schemas.

אם אתם צריכים מערך נתונים חדש, תמיד אפשר ליצור אותו באמצעות SyntheaTM. לאחר מכן, מעלים אותו ל-GCS במקום להעתיק אותו מהקטגוריה בשלב 'העתקת נתוני קלט'.

2. הגדרת פרויקט ב-GCP

אתחול משתני מעטפת לסביבה שלכם.

כדי למצוא את PROJECT_ID, ניתן לעיין במאמר זיהוי פרויקטים.

<!-- CODELAB: Initialize shell variables ->
<!-- Your current GCP Project ID ->
export PROJECT_ID=<PROJECT_ID>
<!-- A new GCS Bucket in your current Project  - INPUT ->
export BUCKET_NAME=<BUCKET_NAME>
<!-- A new BQ Dataset ID - OUTPUT ->
export DATASET_ID=<DATASET_ID>

יוצרים קטגוריה של GCS כדי לאחסן נתוני קלט ויומני שגיאות באמצעות הכלי gsutil.

gsutil mb -l us gs://$BUCKET_NAME

מקבלים גישה למערך הנתונים הסינתטי.

  1. מכתובת האימייל שמשמשת להתחברות אל Cloud Console, שולחים אימייל לכתובת hcls-solutions-external+subscribe@google.com עם בקשת הצטרפות.
  2. תקבלו אימייל עם הוראות לאישור הפעולה.
  3. משתמשים באפשרות של שליחת תשובה לאימייל והצטרפות לקבוצה. אין ללחוץ על הלחצן 525a0fa752e0acae.png.
  4. אחרי שתקבלו את הודעת האישור באימייל, תוכלו להמשיך לשלב הבא ב-Codelab.

מעתיקים את נתוני הקלט.

gsutil -m cp -r gs://hcls_testing_data_fhir_10_patients/csv gs://$BUCKET_NAME

יצירת מערך נתונים ב-BigQuery.

bq mk --location=us --dataset $PROJECT_ID:$DATASET_ID

מתקינים ומפעילים את Google Cloud SDK ויוצרים נושא ומינויים ב-Pub או Sub.

gcloud init
gcloud pubsub topics create your-topic
gcloud pubsub subscriptions create --topic your-topic your-sub

3. הגדרת סביבת Cloud Data Fusion

כדי להפעיל את Cloud Data Fusion API ולהעניק את ההרשאות הנדרשות, יש לבצע את השלבים הבאים:

הפעלת ממשקי API.

  1. עוברים אל GCP Console API Library.
  2. בוחרים את הפרויקט מרשימת הפרויקטים.
  3. בספריית ה-API, בוחרים את ה-API שרוצים להפעיל ( Cloud Data Fusion API,Cloud Pub/Sub API). אם אתם צריכים עזרה במציאת ה-API, אפשר להשתמש בשדה החיפוש ובמסננים.
  4. בדף ה-API, לוחצים על הפעלה.

יצירת מכונת Cloud Data Fusion.

  1. בוחרים את ProjectID במסוף GCP.
  2. בוחרים באפשרות Data Fusion מהתפריט השמאלי, ולאחר מכן לוחצים על הלחצן CREATE AN INSTANCE במרכז הדף (יצירה ראשונה), או לוחצים על הלחצן CREATE INSTANCE בתפריט העליון (יצירה נוספת).

a828690ff3bf3c46.png

e8ffacaba8e61be5.png

  1. מציינים את שם המכונה. בוחרים באפשרות Enterprise.

5af91e46917260ff.png

  1. לוחצים על הלחצן CREATE (יצירה).

איך מגדירים הרשאות למכונות

אחרי שיוצרים מכונה, פועלים לפי השלבים הבאים כדי להעניק לחשבון השירות שמשויך להרשאות המכונה בפרויקט:

  1. כדי לעבור לדף הפרטים של המכונה, לוחצים על השם שלה.

76ad691f795e1ab3.png

  1. מעתיקים את חשבון השירות.

6c91836afb72209d.png

  1. עוברים לדף IAM של הפרויקט.
  2. בדף ההרשאות ב-IAM, לוחצים על הלחצן Add כדי להקצות לחשבון השירות את התפקיד Cloud Data Fusion API Service Agent. מדביקים את 'חשבון השירות'. בשדה 'חברים חדשים' ובוחרים באפשרות Service Management -> תפקיד 'סוכן שרת' של Cloud Data Fusion API.

36f03d11c2a4ce0.png

  1. לוחצים על + הוספת תפקיד נוסף (או על Edit Cloud Data Fusion API Service Agent) כדי להוסיף תפקיד מנויים ב-Pub/Sub.

b4bf5500b8cbe5f9.png

  1. לוחצים על שמירה.

לאחר ביצוע השלבים האלה, אפשר להתחיל להשתמש ב-Cloud Data Fusion על ידי לחיצה על הקישור View Instance בדף המכונות של Cloud Data Fusion או בדף הפרטים של מכונה.

מגדירים את כלל חומת האש.

  1. מעבר אל קונסולת GCP -> רשת VPC -> כללי חומת אש כדי לבדוק אם כלל ברירת המחדל של allow-ssh קיים או לא.

102adef44bbe3a45.png

  1. אם לא, מוסיפים כלל של חומת אש שמאפשר לכל תעבורת הנתונים הנכנסת (ingress) מסוג SSH לרשת ברירת המחדל.

באמצעות שורת הפקודה:

gcloud beta compute --project={PROJECT_ID} firewall-rules create default-allow-ssh --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:22 --source-ranges=0.0.0.0/0 --enable-logging

שימוש בממשק המשתמש: לוחצים על 'יצירת כלל חומת אש' וממלאים את הפרטים:

d9c69ac10496b3d9.png

2dc4971594b82a1f.png

4. פיתוח צמתים לצינור עיבוד הנתונים

עכשיו, אחרי שיש לנו את סביבת Cloud Data Fusion ב-GCP, נתחיל לפתח את צינורות הנתונים ב-Cloud Data Fusion באמצעות השלבים הבאים:

  1. בחלון Cloud Data Fusion, לוחצים על הקישור View Instance בעמודה Action. המערכת תפנה אתכם לדף אחר. לוחצים על ה-url שסופק כדי לפתוח את מכונת Cloud Data Fusion. הבחירה ללחוץ על 'התחלת הסיור' או 'לא, תודה' בחלון הקופץ 'ברוכים הבאים'.
  2. הרחבה של 'המבורגר' בתפריט, בוחרים באפשרות Pipeline -> רשימה

317820def934a00a.png

  1. לוחצים על לחצן + הירוק בפינה השמאלית העליונה ובוחרים באפשרות Create Pipeline. או לוחצים על 'יצירה' קישור לצינור עיבוד נתונים.

711975bb2c2416d7.png

3ec0a71409657fb8.png

  1. אחרי שהסטודיו לצינור עיבוד הנתונים מופיע, בפינה השמאלית העליונה בוחרים באפשרות Data Pipeline – Realtime בתפריט הנפתח.

372a889a81da5e66.png

  1. בממשק המשתמש של צינורות הנתונים יש קטעים שונים בחלונית הימנית בתור 'מסנן', 'מקור', 'טרנספורמציה', 'Analytics', 'Sink', 'רכיבי Handler של שגיאות' ו'התראות'. בקטעים האלה אפשר לבחור צומת או צמתים לצינור עיבוד הנתונים.

c63de071d4580f2f.png

בוחרים מקור צומת.

  1. בקטע Source (מקור) בלוח יישומי הפלאגין שבצד ימין, לוחצים לחיצה כפולה על הצומת Google Cloud PubSub, שמופיע בממשק המשתמש של Data Pipelines.
  2. מעבירים את העכבר מעל הצומת של מקור PubSub ולוחצים על מאפיינים.

ed857a5134148d7b.png

  1. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • Label = {any text}
  • שם ההפניה = {any text}
  • Project ID = זיהוי אוטומטי
  • מינוי = המינוי נוצר בקטע 'יצירת נושא Pub/Sub' (לדוגמה, your-sub)
  • נושא = הנושא שנוצר בקטע 'יצירת נושא Pub/Sub' (לדוגמה הנושא שלך)
  1. לקבלת הסבר מפורט, לוחצים על תיעוד. צריך ללחוץ על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ירוק "לא נמצאו שגיאות" שמצביע על הצלחה.

5c2774338b66bebe.png

  1. כדי לסגור את מאפייני Pub/Sub, לוחצים על הלחצן X.

בוחרים את הצומת טרנספורמציה.

  1. בקטע Transform (טרנספורמציה) בלוח יישומי הפלאגין שבצד ימין, לוחצים לחיצה כפולה על הצומת Projection, שמופיע בממשק המשתמש של Data Pipelines. חיבור צומת המקור של Pub/Sub לצומת של טרנספורמציה בפרויקט.
  2. מצביעים על הצומת Projection ולוחצים על Properties (מאפיינים).

b3a9a3878879bfd7.png

  1. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • המרה = המרה של message מסוג בייט לסוג מחרוזת.
  • השדות לשחרור = {כל שדה}
  • השדות לשמירה = {message , timestamp ו-Attributes} (לדוגמה, מאפיינים: key=‘filename':value=‘patients' שנשלחו מ-Pub/Sub)
  • השדות לשינוי השם = {message, timestamp}
  1. לקבלת הסבר מפורט, לוחצים על תיעוד. צריך ללחוץ על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ירוק "לא נמצאו שגיאות" שמצביע על הצלחה.

b8c2f8efe18234ff.png

  1. בקטע Transform (טרנספורמציה) בלוח יישומי הפלאגין שבצד ימין, לוחצים לחיצה כפולה על הצומת Wrangler, שמופיע בממשק המשתמש של Data Pipelines. חיבור צומת טרנספורמציה של הקרנה לצומת טרנספורמציה של Wrangler. מצביעים על הצומת של Wrangler ולוחצים על Properties (מאפיינים).

aa44a4db5fe6623a.png

  1. לוחצים על התפריט הנפתח פעולות ובוחרים באפשרות ייבוא כדי לייבא סכימה שמורה (לדוגמה: gs://hcls_testing_data_fhir_10_patients/csv_schemas/ schema (Patients).json).
  2. מוסיפים את השדה TIMESTAMP בסכימת פלט (אם הוא לא קיים) על ידי לחיצה על הלחצן + ליד השדה האחרון ומסמנים את הערך 'Null'. .
  3. מזינים את הפרטים בשדות הדרושים. מגדירים את השדות הבאים:
  • Label = {any text}
  • שם השדה להזנת קלט = {*}
  • Precondition = {attributes.get("filename") != "patients"} כדי להבחין בין כל סוג של רשומה או הודעה (לדוגמה: מטופלים, ספקים, אלרגיות וכו') שנשלחים מצומת המקור של PubSub.
  1. לקבלת הסבר מפורט, לוחצים על תיעוד. צריך ללחוץ על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ירוק "לא נמצאו שגיאות" שמצביע על הצלחה.

3b8e552cd2e3442c.png

  1. מגדירים את שמות העמודות לפי סדר מועדף ומשחררים את השדות שלא נחוצים. מעתיקים את קטע הקוד הבא ומדביקים אותו בתיבה 'מתכון'.
drop attributes
parse-as-csv :body ',' false
drop body
set columns TIMESTAMP,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,MAIDEN,MARITAL,RACE,ETHNICITY,GENDER,BIRTHPLACE,ADDRESS,CITY,STATE,ZIP
mask-number SSN xxxxxxx####

b93cb9952ca2de73.png

  1. עיינו במאמר Batch-Codelab – CSV ל-BigQuery דרך CDF לצורך התממה של נתונים והסרת פרטי הזיהוי. או להוסיף את קטע הקוד הזה mask-number SSN xxxxxxx#### לתיבה מתכון
  2. כדי לסגור את החלון של Transform Properties, לוחצים על הלחצן X.

בוחרים את הצומת של Sink.

  1. בקטע Sink בלוח יישומי הפלאגין שבצד ימין, לוחצים לחיצה כפולה על הצומת של BigQuery, שמופיע בממשק המשתמש של צינור הנתונים. חיבור צומת הטרנספורמציה של Wrangler לצומת sink ב-BigQuery.
  2. מצביעים על צומת ה-sink ב-BigQuery ולוחצים על 'מאפיינים'.

1be711152c92c692.png

  1. ממלאים את שדות החובה:
  • Label = {any text}
  • שם ההפניה = {any text}
  • Project ID = זיהוי אוטומטי
  • מערך הנתונים = מערך הנתונים ב-BigQuery שנמצא בשימוש בפרויקט הנוכחי (לדוגמה, DATASET_ID)
  • טבלה = {table name}
  1. לקבלת הסבר מפורט, לוחצים על תיעוד. צריך ללחוץ על הלחצן 'אימות' כדי לאמת את כל פרטי הקלט. ירוק "לא נמצאו שגיאות" שמצביע על הצלחה.

bba71de9f31e842a.png

  1. כדי לסגור את מאפייני BigQuery, לוחצים על הלחצן X.

5. פיתוח צינור נתונים בזמן אמת

בקטע הקודם יצרנו צמתים שנדרשים לפיתוח צינור נתונים ב-Cloud Data Fusion. בקטע הזה אנחנו מחברים את הצמתים כדי לפתח את צינור עיבוד הנתונים בפועל.

חיבור כל הצמתים בצינור עיבוד נתונים

  1. יש לגרור חץ של חיבור > בקצה הימני של צומת המקור ולשחרר אותו בקצה השמאלי של צומת היעד.
  2. לצינור עיבוד נתונים יכולים להיות כמה הסתעפויות שמקבלים הודעות שפורסמו מאותו צומת של מקור PubSub.

b22908cc35364cdd.png

  1. נותנים שם לצינור עיבוד הנתונים.

זה הכול. יצרתם עכשיו את צינור הנתונים הראשון בזמן אמת לפריסה ולהפעלה.

שליחת הודעות דרך Cloud Pub/Sub

באמצעות ממשק המשתמש של Pub/Sub:

  1. מעבר למסוף GCP -> Pub/Sub -> נושאים, בוחרים באפשרות הנושא שלך ולוחצים על 'פרסום הודעה' בתפריט העליון.

d65b2a6af1668ecd.png

  1. ממקמים רק שורה אחת בכל פעם בשדה 'הודעה'. לוחצים על הלחצן +הוספת מאפיין. מספקים מפתח = filename, ערך = <סוג הרשומה> (לדוגמה: מטופלים, ספקים, אלרגיות וכו').
  2. לוחצים על הלחצן 'פרסום' כדי לשלוח את ההודעה.

באמצעות הפקודה gcloud:

  1. להעביר את ההודעה ידנית.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"paste one record row here"
  1. מעבירים את ההודעה באופן אוטומטי למחצה באמצעות פקודות cat ו-sed unix. אפשר להריץ את הפקודה הזו שוב ושוב עם פרמטרים שונים.
gcloud pubsub topics publish <your-topic> --attribute <key>=<value> --message \
"$(gsutil cat gs://$BUCKET_NAME/csv/<value>.csv | sed -n '#p')"

6. הגדרה, פריסה והפעלה של צינור עיבוד נתונים

עכשיו, לאחר שפיתחנו את צינור הנתונים, אנחנו יכולים לפרוס ולהריץ אותו ב-Cloud Data Fusion.

1bb5b0b8e2953ffa.png

  1. משאירים את ברירות המחדל של Configure.
  2. לוחצים על Preview (תצוגה מקדימה) כדי לראות תצוגה מקדימה של הנתונים**.** לוחצים שוב על **תצוגה מקדימה** כדי לחזור לחלון הקודם. אפשר גם להפעיל את צינור עיבוד הנתונים במצב Preview (תצוגה מקדימה) על ידי לחיצה על **RUN**.

b3c891e5e1aa20ae.png

  1. לוחצים על יומנים כדי לצפות ביומנים.
  2. לוחצים על Save כדי לשמור את כל השינויים.
  3. לוחצים על Import כדי לייבא את ההגדרות השמורות של צינור עיבוד הנתונים כשיוצרים צינור עיבוד נתונים חדש.
  4. לוחצים על Export (ייצוא) כדי לייצא תצורה של צינור עיבוד נתונים.
  5. לוחצים על Deploy כדי לפרוס את צינור עיבוד הנתונים.
  6. לאחר הפריסה, לוחצים על Run ומחכים עד שצינור עיבוד הנתונים יסתיים.

f01ba6b746ba53a.png

  1. לוחצים על Stop (עצירה) כדי להפסיק את הפעלת צינור עיבוד הנתונים בכל שלב.
  2. אפשר לשכפל את צינור עיבוד הנתונים על ידי בחירה באפשרות 'שכפול' בלחצן פעולות.
  3. כדי לייצא את ההגדרות של צינור עיבוד הנתונים, בוחרים באפשרות 'ייצוא' מתחת ללחצן פעולות.

28ea4fc79445fad2.png

  1. לוחצים על סיכום כדי להציג תרשימים של היסטוריית ההרצה, רשומות, יומני שגיאות ואזהרות.

7. אימות

בקטע הזה אנחנו מאמתים את ההפעלה של צינור הנתונים.

  1. לאמת שצינור עיבוד הנתונים הופעל בהצלחה ופועל באופן רציף.

1644dfac4a2d819d.png

  1. מוודאים שהטבלאות של BigQuery נטענות עם רשומות מעודכנות על סמך TIMESTAMP. בדוגמה הזו, ב-25 ביוני 2019 פורסמו שתי הודעות או רישומי מטופלים ורשומת או הודעה אחת בנושא אלרגיה.
bq query --nouse_legacy_sql 'select (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Patients'  where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC" ) as Patients, (select count(*) from \
'$PROJECT_ID.$DATASET_ID.Allergies' where TIMESTAMP > "2019-06-25 \
01:29:00.0000 UTC") as Allergies;'
Waiting on bqjob_r14c8b94c1c0fe06a_0000016b960df4e1_1 ... (0s) Current status: DONE  
+----------+-----------+
| Patients | Allergies |
+----------+-----------+
|        2 |         1 |
+----------+-----------+
  1. אימות שההודעות שפורסמו ב<your-topic> התקבלו על ידי <your-sub> שלך.
gcloud pubsub subscriptions pull --auto-ack <your-sub>

4cae99a9e4f2ec9f.png

הצגת התוצאות

כדי להציג את התוצאות אחרי שההודעות מתפרסמות בנושא Pub/Sub בזמן שצינור עיבוד הנתונים 'זמן אמת' פועל:

  1. הרצת שאילתות על הטבלה בממשק המשתמש של BigQuery. לממשק המשתמש של BIGQUERY
  2. מעדכנים את השאילתה שלמטה לשם הפרויקט, למערך הנתונים ולטבלה שלכם.

6a1fb85bd868abc9.png

8. מנקה

כדי להימנע מצבירת חיובים בחשבון Google Cloud Platform על המשאבים שבהם השתמשתם במדריך הזה:

בסיום המדריך, אפשר למחוק את המשאבים שיצרתם ב-GCP כדי שהם לא ייכללו במכסה ולא תחויבו עליהם בעתיד. בסעיפים הבאים מוסבר איך למחוק או להשבית את המשאבים האלו.

מחיקת מערך הנתונים ב-BigQuery

יש לפעול לפי ההוראות הבאות כדי למחוק את מערך הנתונים ב-BigQuery שנוצר במסגרת המדריך הזה.

מחיקת הקטגוריה של GCS

כדי למחוק את הקטגוריה של GCS שיצרתם במסגרת המדריך הזה, פועלים לפי ההוראות הבאות.

מחיקת מכונת Cloud Data Fusion

פועלים לפי ההוראות למחיקת מכונת Cloud Data Fusion.

מחיקת הפרויקט

הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך הזה.

כדי למחוק את הפרויקט:

  1. נכנסים לדף Projects במסוף GCP. לדף 'פרויקטים'
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

9. מזל טוב

כל הכבוד! סיימתם בהצלחה את מעבדת הקוד כדי להטמיע נתונים מתחום הטיפול הרפואי ב-BigQuery באמצעות Cloud Data Fusion.

פרסמתם נתוני CSV בנושא Pub/Sub ואז טענתם נתונים ל-BigQuery.

יצרתם באופן חזותי צינור עיבוד נתונים לשילוב נתונים לטעינה, לטרנספורמציה ולאנונימיזציה של נתוני שירותי בריאות בזמן אמת.

עכשיו אתם יודעים מהם השלבים העיקריים שצריך לבצע כדי להתחיל את השימוש בניתוח נתוני בריאות עם BigQuery ב-Google Cloud Platform.