پیش پردازش داده های BigQuery با PySpark در Dataproc

1. بررسی اجمالی

این کد لبه به نحوه ایجاد خط لوله پردازش داده با استفاده از Apache Spark با Dataproc در Google Cloud Platform می پردازد. در علم داده و مهندسی داده، خواندن داده‌ها از یک مکان ذخیره‌سازی، انجام تبدیل‌ها بر روی آن و نوشتن آن‌ها در مکان ذخیره‌سازی دیگری، یک مورد معمول استفاده است. تغییرات رایج شامل تغییر محتوای داده ها، حذف اطلاعات غیر ضروری و تغییر انواع فایل ها است.

در این کد لبه، شما در مورد Apache Spark، اجرای یک خط لوله نمونه با استفاده از Dataproc با PySpark (Apache Spark's Python API)، BigQuery ، Google Cloud Storage و داده‌های Reddit خواهید آموخت.

2. معرفی آپاچی اسپارک (اختیاری)

طبق این وب سایت، " Apache Spark یک موتور تجزیه و تحلیل یکپارچه برای پردازش داده در مقیاس بزرگ است." این به شما امکان می دهد داده ها را به صورت موازی و درون حافظه تجزیه و تحلیل و پردازش کنید، که امکان محاسبات موازی گسترده را در چندین ماشین و گره مختلف فراهم می کند. در ابتدا در سال 2014 به عنوان ارتقاء به MapReduce سنتی منتشر شد و هنوز هم یکی از محبوب ترین فریم ورک ها برای انجام محاسبات در مقیاس بزرگ است. Apache Spark در Scala نوشته شده است و متعاقبا دارای APIهایی در Scala، Java، Python و R است. این شامل مجموعه ای از کتابخانه ها مانند Spark SQL برای انجام پرس و جوهای SQL بر روی داده، Spark Streaming برای جریان داده، MLlib برای یادگیری ماشین و GraphX ​​برای پردازش گراف که همگی بر روی موتور آپاچی اسپارک اجرا می شوند.

32add0b6a47bafbc.png

Spark می تواند به تنهایی اجرا شود یا می تواند از یک سرویس مدیریت منابع مانند Yarn ، Mesos یا Kubernetes برای مقیاس بندی استفاده کند. شما از Dataproc برای این کد لبه استفاده خواهید کرد که از Yarn استفاده می کند.

داده‌ها در Spark ابتدا در حافظه‌ای بارگذاری می‌شوند که به آن RDD یا مجموعه داده توزیع‌شده انعطاف‌پذیر می‌گویند. توسعه در Spark از آن زمان شامل اضافه شدن دو نوع داده جدید به سبک ستونی است: Dataset که تایپ شده و Dataframe که بدون تایپ است. به زبان ساده، RDD ها برای هر نوع داده ای عالی هستند، در حالی که Datasets و Dataframe ها برای داده های جدولی بهینه شده اند. از آنجایی که مجموعه داده‌ها فقط با APIهای جاوا و اسکالا در دسترس هستند، ما با استفاده از PySpark Dataframe API برای این کد لبه ادامه خواهیم داد. برای اطلاعات بیشتر، لطفاً به مستندات Apache Spark مراجعه کنید.

3. Use Case

مهندسان داده اغلب به داده ها نیاز دارند تا به راحتی برای دانشمندان داده در دسترس باشد. با این حال، داده ها اغلب در ابتدا کثیف هستند (استفاده از آن برای تجزیه و تحلیل در وضعیت فعلی دشوار است) و قبل از استفاده زیاد باید تمیز شوند. نمونه‌ای از این داده‌هایی است که از وب خراشیده شده‌اند و ممکن است حاوی رمزگذاری‌های عجیب و غریب یا برچسب‌های HTML خارجی باشند.

در این آزمایشگاه، مجموعه ای از داده ها را از BigQuery در قالب پست های Reddit در یک Spark cluster میزبانی شده در Dataproc بارگیری می کنید، اطلاعات مفیدی را استخراج می کنید و داده های پردازش شده را به صورت فایل های فشرده CSV در Google Cloud Storage ذخیره می کنید.

be2a4551ece63bfc.png

دانشمند ارشد داده در شرکت شما علاقه مند است که تیم هایشان روی مشکلات مختلف پردازش زبان طبیعی کار کنند. به طور خاص، آنها علاقه مند به تجزیه و تحلیل داده ها در subreddit "r/food" هستند. از ژانویه 2017 تا آگوست 2019، یک خط لوله برای تخلیه داده ایجاد خواهید کرد.

4. دسترسی به BigQuery از طریق BigQuery Storage API

بیرون کشیدن داده ها از BigQuery با استفاده از روش tabledata.list API می تواند به اندازه مقیاس داده ها زمان بر و کارآمد نباشد. این روش فهرستی از اشیاء JSON را برمی‌گرداند و برای خواندن کل مجموعه داده نیاز به خواندن متوالی یک صفحه در هر زمان دارد.

BigQuery Storage API با استفاده از یک پروتکل مبتنی بر RPC، پیشرفت های قابل توجهی را برای دسترسی به داده ها در BigQuery به ارمغان می آورد. از خواندن و نوشتن داده ها به صورت موازی و همچنین فرمت های سریال سازی مختلف مانند Apache Avro و Apache Arrow پشتیبانی می کند. در سطح بالا، این به بهبود عملکرد قابل توجهی، به ویژه در مجموعه داده‌های بزرگتر منجر می‌شود.

در این کد لبه شما از spark-bigquery-connector برای خواندن و نوشتن داده ها بین BigQuery و Spark استفاده می کنید.

5. ایجاد یک پروژه

به کنسول Google Cloud Platform در console.cloud.google.com وارد شوید و یک پروژه جدید ایجاد کنید:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

در مرحله بعد، برای استفاده از منابع Google Cloud، باید صورتحساب را در کنسول Cloud فعال کنید .

گذراندن این کد نباید بیش از چند دلار برای شما هزینه داشته باشد، اما اگر تصمیم به استفاده از منابع بیشتری داشته باشید یا آنها را در حال اجرا رها کنید ممکن است بیشتر باشد. آخرین بخش از این نرم افزار کد شما را در تمیز کردن پروژه راهنمایی می کند.

کاربران جدید Google Cloud Platform واجد شرایط استفاده آزمایشی رایگان 300 دلاری هستند.

6. تنظیم محیط

اکنون می توانید محیط خود را با روش های زیر تنظیم کنید:

  • فعال کردن APIهای Compute Engine، Dataproc و BigQuery Storage
  • پیکربندی تنظیمات پروژه
  • ایجاد یک خوشه Dataproc
  • ایجاد یک سطل Google Cloud Storage

فعال کردن API ها و پیکربندی محیط شما

Cloud Shell را با فشار دادن دکمه در گوشه سمت راست بالای Cloud Console خود باز کنید.

a10c47ee6ca41c54.png

پس از بارگیری Cloud Shell، دستورات زیر را برای فعال کردن Compute Engine، Dataproc و BigQuery Storage API اجرا کنید:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

شناسه پروژه پروژه خود را تنظیم کنید. با رفتن به صفحه انتخاب پروژه و جستجوی پروژه خود می توانید آن را پیدا کنید. این ممکن است با نام پروژه شما یکی نباشد.

e682e8227aa3c781.png

76d45fb295728542.png

دستور زیر را برای تنظیم شناسه پروژه خود اجرا کنید:

gcloud config set project <project_id>

منطقه پروژه خود را با انتخاب یکی از لیست اینجا تنظیم کنید. یک مثال ممکن است us-central1 باشد.

gcloud config set dataproc/region <region>

یک نام برای خوشه Dataproc خود انتخاب کنید و یک متغیر محیطی برای آن ایجاد کنید.

CLUSTER_NAME=<cluster_name>

ایجاد یک کلاستر Dataproc

با اجرای دستور زیر یک کلاستر Dataproc ایجاد کنید:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

این دستور چند دقیقه طول می کشد تا تمام شود. برای شکستن دستور:

با این کار ایجاد یک خوشه Dataproc با نامی که قبلا ارائه کرده اید آغاز می شود. استفاده از API beta ویژگی های بتا Dataproc مانند Component Gateway را فعال می کند.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

با این کار نوع ماشین مورد استفاده برای کارگران شما تنظیم می شود.

--worker-machine-type n1-standard-8

این تعداد کارگرانی را که خوشه شما خواهد داشت تعیین می کند.

--num-workers 8

با این کار نسخه تصویر Dataproc تنظیم می شود.

--image-version 1.5-debian

این اقدامات اولیه سازی را برای استفاده در خوشه پیکربندی می کند. در اینجا، شما اکشن اولیه سازی پیپ را در نظر می گیرید.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

این ابرداده ای است که باید در خوشه گنجانده شود. در اینجا، شما در حال ارائه ابرداده برای اقدام اولیه pip هستید.

--metadata 'PIP_PACKAGES=google-cloud-storage'

با این کار کامپوننت های اختیاری روی خوشه نصب می شوند.

--optional-components=ANACONDA

این دروازه مؤلفه را فعال می کند که به شما امکان می دهد از دروازه مؤلفه Dataproc برای مشاهده رابط های کاربری رایج مانند Zeppelin، Jupyter یا Spark History استفاده کنید.

--enable-component-gateway

برای آشنایی بیشتر با Dataproc، لطفاً این نرم افزار کد را بررسی کنید.

ایجاد یک Google Cloud Storage Bucket

برای خروجی کار خود به یک سطل فضای ذخیره سازی ابری Google نیاز دارید. یک نام منحصر به فرد برای سطل خود تعیین کنید و دستور زیر را برای ایجاد یک سطل جدید اجرا کنید. نام‌های سطل در تمام پروژه‌های Google Cloud برای همه کاربران منحصربه‌فرد هستند، بنابراین ممکن است لازم باشد این کار را چند بار با نام‌های مختلف انجام دهید. اگر ServiceException دریافت نکنید یک سطل با موفقیت ایجاد می شود.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. تجزیه و تحلیل داده های اکتشافی

قبل از انجام پیش پردازش، باید در مورد ماهیت داده هایی که با آنها سر و کار دارید بیشتر بدانید. برای انجام این کار، دو روش کاوش داده را بررسی خواهید کرد. ابتدا، برخی از داده های خام را با استفاده از رابط کاربری وب BigQuery مشاهده می کنید، و سپس تعداد پست ها را در هر ساب ردیت با استفاده از PySpark و Dataproc محاسبه می کنید.

با استفاده از BigQuery Web UI

با استفاده از BigQuery Web UI برای مشاهده داده های خود شروع کنید. از نماد منو در Cloud Console، به پایین بروید و "BigQuery" را فشار دهید تا BigQuery Web UI باز شود.

242a597d7045b4da.png

سپس دستور زیر را در BigQuery Web UI Query Editor اجرا کنید. این 10 ردیف کامل از داده ها را از ژانویه 2017 برمی گرداند:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

می توانید در سراسر صفحه پیمایش کنید تا تمام ستون های موجود و همچنین چند نمونه را ببینید. به طور خاص، دو ستون را مشاهده خواهید کرد که محتوای متنی هر پست را نشان می دهد: "عنوان" و "متن خود" که دومی بدنه پست است. همچنین به ستون‌های دیگری مانند "created_utc" که زمان utc ایجاد یک پست است و "subreddit" که subredditی است که پست در آن وجود دارد توجه کنید.

اجرای یک کار PySpark

دستورات زیر را در Cloud Shell خود اجرا کنید تا مخزن را با کد نمونه و cd در دایرکتوری صحیح کلون کنید:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

می‌توانید از PySpark برای تعیین تعداد پست‌ها برای هر subreddit استفاده کنید. می‌توانید Cloud Editor را باز کنید و اسکریپت cloud-dataproc/codelabs/spark-bigquery قبل از اجرای آن در مرحله بعد بخوانید:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

روی دکمه «باز کردن ترمینال» در Cloud Editor کلیک کنید تا به Cloud Shell خود برگردید و دستور زیر را برای اجرای اولین کار PySpark خود اجرا کنید:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

این دستور به شما اجازه می دهد تا از طریق Jobs API کارهای خود را به Dataproc ارسال کنید. در اینجا شما نوع کار را به عنوان pyspark نشان می دهید. می توانید نام خوشه، پارامترهای اختیاری و نام فایل حاوی کار را ارائه کنید. در اینجا، شما پارامتر --jars ارائه می‌دهید که به شما اجازه می‌دهد تا spark-bigquery-connector به کار خود اضافه کنید. همچنین می‌توانید سطوح خروجی گزارش را با استفاده از --driver-log-levels root=FATAL تنظیم کنید که تمام خروجی‌های گزارش را به جز خطاها سرکوب می‌کند. جرقه‌های جرقه نسبتاً پر سر و صدا هستند.

این باید چند دقیقه طول بکشد و خروجی نهایی شما باید چیزی شبیه به این باشد:

6c185228db47bb18.png

8. بررسی رابط کاربری Dataproc و Spark

هنگام اجرای Spark Jobs در Dataproc، به دو رابط کاربری برای بررسی وضعیت مشاغل / خوشه های خود دسترسی دارید. اولین مورد رابط کاربری Dataproc است که می توانید با کلیک بر روی نماد منو و پایین رفتن به Dataproc آن را پیدا کنید. در اینجا، می توانید حافظه فعلی موجود و همچنین حافظه معلق و تعداد کارگران را مشاهده کنید.

6f2987346d15c8e2.png

همچنین می توانید برای مشاهده کارهای تکمیل شده روی برگه jobs کلیک کنید. با کلیک بر روی Job ID برای یک کار خاص، می توانید جزئیات شغل مانند گزارش و خروجی آن مشاغل را مشاهده کنید. 114d90129b0e4c88.png

1b2160f0f484594a.png

همچنین می توانید Spark UI را مشاهده کنید. از صفحه کار، روی فلش برگشت کلیک کنید و سپس روی Web Interfaces کلیک کنید. شما باید چندین گزینه را در زیر دروازه کامپوننت ببینید. بسیاری از این موارد را می توان از طریق کامپوننت های اختیاری در هنگام راه اندازی کلاستر فعال کرد. برای این آزمایشگاه، روی «Spark History Server» کلیک کنید.

5da7944327d193dc.png

6a349200289c69c1.pnge63b36bdc90ff610.png

این باید پنجره زیر را باز کند:

8f6786760f994fe8.png

همه کارهای تکمیل شده در اینجا نشان داده می شوند، و می توانید برای کسب اطلاعات بیشتر در مورد شغل، روی هر application_id کلیک کنید. به همین ترتیب، می‌توانید روی «نمایش برنامه‌های ناقص» در پایین صفحه فرود کلیک کنید تا تمام کارهایی که در حال حاضر در حال اجرا هستند را مشاهده کنید.

9. اجرای Backfill Job

اکنون کاری را اجرا می‌کنید که داده‌ها را در حافظه بارگیری می‌کند، اطلاعات لازم را استخراج می‌کند و خروجی را در یک سطل ذخیره‌سازی Google Cloud می‌ریزد. شما "عنوان"، "بدن" (متن خام) و "مهر زمانی ایجاد شده" را برای هر نظر reddit استخراج خواهید کرد. سپس این داده‌ها را می‌گیرید، آن‌ها را به یک csv تبدیل می‌کنید، آن را فشرده می‌کنید و در یک سطل با یک URI از gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz بارگذاری می‌کنید.

می‌توانید دوباره به Cloud Editor مراجعه کنید تا کد مربوط به cloud-dataproc/codelabs/spark-bigquery/backfill.sh بخوانید که یک اسکریپت بسته‌بندی برای اجرای کد در cloud-dataproc/codelabs/spark-bigquery/backfill.py است. .

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

به زودی باید تعداد زیادی پیام تکمیل کار را مشاهده کنید. تکمیل کار ممکن است تا 15 دقیقه طول بکشد. همچنین می‌توانید با استفاده از gsutil، سطل ذخیره‌سازی خود را دوباره بررسی کنید تا خروجی داده‌های موفق را تأیید کنید. پس از انجام تمام کارها، دستور زیر را اجرا کنید:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

شما باید خروجی زیر را ببینید:

a7c3c7b2e82f9fca.png

تبریک می‌گوییم، شما با موفقیت یک پشتیبان برای داده‌های نظرات reddit خود را تکمیل کردید! اگر به نحوه ساخت مدل‌ها بر روی این داده‌ها علاقه دارید، لطفاً به آزمایشگاه کد Spark-NLP ادامه دهید.

10. پاکسازی

برای جلوگیری از تحمیل هزینه‌های غیرضروری به حساب GCP خود پس از تکمیل این شروع سریع:

  1. سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید
  2. محیط Dataproc را حذف کنید .

اگر پروژه ای را فقط برای این کد لبه ایجاد کرده اید، می توانید به صورت اختیاری پروژه را نیز حذف کنید:

  1. در کنسول GCP، به صفحه پروژه ها بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
  3. در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.

مجوز

این اثر تحت مجوز Creative Commons Attribution 3.0 Generic و مجوز Apache 2.0 مجوز دارد.