پیش پردازش داده های BigQuery با PySpark در Dataproc

۱. مرور کلی

این آزمایشگاه کد به نحوه ایجاد یک خط لوله پردازش داده با استفاده از Apache Spark به همراه Dataproc در پلتفرم Google Cloud می‌پردازد. خواندن داده‌ها از یک مکان ذخیره‌سازی، انجام تبدیل‌ها روی آن و نوشتن آن در مکان ذخیره‌سازی دیگر، یک مورد استفاده رایج در علم داده و مهندسی داده است. تبدیل‌های رایج شامل تغییر محتوای داده‌ها، حذف اطلاعات غیرضروری و تغییر نوع فایل‌ها می‌شود.

در این آزمایشگاه کد، شما با آپاچی اسپارک آشنا می‌شوید، یک پایپ‌لاین نمونه با استفاده از Dataproc به همراه PySpark (API پایتون آپاچی اسپارک)، BigQuery ، فضای ذخیره‌سازی ابری گوگل و داده‌های ردیت اجرا خواهید کرد.

۲. مقدمه‌ای بر آپاچی اسپارک (اختیاری)

طبق گفته وب‌سایت، « آپاچی اسپارک یک موتور تحلیلی یکپارچه برای پردازش داده‌های در مقیاس بزرگ است.» این موتور به شما امکان می‌دهد داده‌ها را به صورت موازی و درون حافظه‌ای تجزیه و تحلیل و پردازش کنید، که امکان محاسبات موازی گسترده را در چندین ماشین و گره مختلف فراهم می‌کند. این موتور در ابتدا در سال ۲۰۱۴ به عنوان ارتقاء MapReduce سنتی منتشر شد و هنوز هم یکی از محبوب‌ترین چارچوب‌ها برای انجام محاسبات در مقیاس بزرگ است. آپاچی اسپارک با زبان اسکالا نوشته شده است و متعاقباً دارای APIهایی در اسکالا، جاوا، پایتون و R است. این موتور شامل مجموعه‌ای از کتابخانه‌ها مانند Spark SQL برای انجام پرس‌وجوهای SQL روی داده‌ها، Spark Streaming برای داده‌های استریمینگ، MLlib برای یادگیری ماشین و GraphX ​​برای پردازش گراف است که همگی بر روی موتور آپاچی اسپارک اجرا می‌شوند.

32add0b6a47bafbc.png

اسپارک می‌تواند به تنهایی اجرا شود یا می‌تواند از یک سرویس مدیریت منابع مانند Yarn ، Mesos یا Kubernetes برای مقیاس‌پذیری استفاده کند. شما برای این آزمایشگاه کد از Dataproc استفاده خواهید کرد که از Yarn استفاده می‌کند.

داده‌ها در اسپارک در ابتدا در حافظه‌ای به نام RDD یا مجموعه داده توزیع‌شده انعطاف‌پذیر بارگذاری می‌شدند. از آن زمان، توسعه در اسپارک شامل اضافه شدن دو نوع داده جدید به سبک ستونی بوده است: مجموعه داده که نوع‌بندی شده است و قاب داده که نوع‌بندی نشده است. به طور کلی، RDDها برای هر نوع داده‌ای عالی هستند، در حالی که مجموعه داده‌ها و قاب‌های داده برای داده‌های جدولی بهینه شده‌اند. از آنجایی که مجموعه داده‌ها فقط با APIهای جاوا و اسکالا در دسترس هستند، ما برای این آزمایشگاه کد از API قاب داده PySpark استفاده خواهیم کرد. برای اطلاعات بیشتر، لطفاً به مستندات آپاچی اسپارک مراجعه کنید.

۳. مورد استفاده

مهندسان داده اغلب نیاز دارند که داده‌ها به راحتی در دسترس دانشمندان داده قرار گیرند. با این حال، داده‌ها اغلب در ابتدا کثیف هستند (استفاده از آنها برای تجزیه و تحلیل در وضعیت فعلی دشوار است) و قبل از اینکه بتوانند کاربرد زیادی داشته باشند، باید تمیز شوند. نمونه‌ای از این، داده‌هایی است که از وب استخراج شده‌اند و ممکن است حاوی کدگذاری‌های عجیب یا برچسب‌های HTML نامربوط باشند.

در این آزمایش، مجموعه‌ای از داده‌ها را از BigQuery در قالب پست‌های Reddit در یک خوشه Spark که در Dataproc میزبانی می‌شود، بارگذاری خواهید کرد، اطلاعات مفید را استخراج کرده و داده‌های پردازش شده را به صورت فایل‌های فشرده CSV در Google Cloud Storage ذخیره خواهید کرد.

be2a4551ece63bfc.png

دانشمند ارشد داده در شرکت شما علاقه‌مند است که تیم‌هایش روی مسائل مختلف پردازش زبان طبیعی کار کنند. به طور خاص، آنها علاقه‌مند به تجزیه و تحلیل داده‌ها در ساب ردیت "r/food" هستند. شما یک خط لوله برای جمع‌آوری داده‌ها ایجاد خواهید کرد که از ژانویه ۲۰۱۷ تا آگوست ۲۰۱۹ ادامه خواهد داشت.

۴. دسترسی به BigQuery از طریق BigQuery Storage API

دریافت داده‌ها از BigQuery با استفاده از روش tabledata.list API می‌تواند زمان‌بر و ناکارآمد باشد، زیرا حجم داده‌ها افزایش می‌یابد. این روش لیستی از اشیاء JSON را برمی‌گرداند و برای خواندن کل مجموعه داده‌ها، نیاز به خواندن متوالی یک صفحه در هر زمان دارد.

رابط برنامه‌نویسی کاربردی ذخیره‌سازی BigQuery با استفاده از یک پروتکل مبتنی بر RPC، پیشرفت‌های قابل توجهی در دسترسی به داده‌ها در BigQuery ایجاد می‌کند. این رابط از خواندن و نوشتن داده‌ها به صورت موازی و همچنین فرمت‌های سریال‌سازی مختلف مانند Apache Avro و Apache Arrow پشتیبانی می‌کند. در سطح بالا، این به معنای بهبود قابل توجه عملکرد، به ویژه در مجموعه داده‌های بزرگتر است.

در این آزمایشگاه کد، شما از spark-bigquery-connector برای خواندن و نوشتن داده‌ها بین BigQuery و Spark استفاده خواهید کرد.

۵. ایجاد یک پروژه

وارد کنسول پلتفرم ابری گوگل در console.cloud.google.com شوید و یک پروژه جدید ایجاد کنید:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

در مرحله بعد، برای استفاده از منابع گوگل کلود، باید صورتحساب را در کنسول کلود فعال کنید .

اجرای این آزمایشگاه کد نباید بیش از چند دلار برای شما هزینه داشته باشد، اما اگر تصمیم به استفاده از منابع بیشتر بگیرید یا اگر آنها را در حال اجرا رها کنید، می‌تواند بیشتر هم بشود. بخش آخر این آزمایشگاه کد، شما را در تمیز کردن پروژه‌تان راهنمایی خواهد کرد.

کاربران جدید پلتفرم ابری گوگل واجد شرایط دریافت یک دوره آزمایشی رایگان ۳۰۰ دلاری هستند.

۶. آماده‌سازی محیط

اکنون با انجام مراحل زیر، محیط خود را راه‌اندازی خواهید کرد:

  • فعال کردن موتور محاسباتی، رابط‌های برنامه‌نویسی کاربردی Dataproc و BigQuery Storage
  • پیکربندی تنظیمات پروژه
  • ایجاد یک کلاستر Dataproc
  • ایجاد یک مخزن ذخیره‌سازی ابری گوگل

فعال کردن APIها و پیکربندی محیط شما

با فشار دادن دکمه‌ای که در گوشه سمت راست بالای کنسول ابری شما قرار دارد، پوسته ابری را باز کنید.

a10c47ee6ca41c54.png

پس از بارگذاری Cloud Shell، دستورات زیر را برای فعال کردن Compute Engine، Dataproc و BigQuery Storage APIs اجرا کنید:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

شناسه پروژه خود را تنظیم کنید. می‌توانید با رفتن به صفحه انتخاب پروژه و جستجوی پروژه خود، آن را پیدا کنید. این ممکن است با نام پروژه شما یکسان نباشد.

e682e8227aa3c781.png

76d45fb295728542.png

برای تنظیم شناسه پروژه خود، دستور زیر را اجرا کنید:

gcloud config set project <project_id>

با انتخاب یکی از لیست اینجا، منطقه پروژه خود را تعیین کنید. یک مثال می‌تواند us-central1 باشد.

gcloud config set dataproc/region <region>

یک نام برای کلاستر Dataproc خود انتخاب کنید و یک متغیر محیطی برای آن ایجاد کنید.

CLUSTER_NAME=<cluster_name>

ایجاد یک کلاستر Dataproc

با اجرای دستور زیر، یک کلاستر Dataproc ایجاد کنید:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

اجرای این دستور چند دقیقه طول می‌کشد. برای توضیح بیشتر، دستور را به صورت زیر اجرا کنید:

این کار باعث ایجاد یک کلاستر Dataproc با نامی که قبلاً ارائه دادید، می‌شود. استفاده از API beta ، ویژگی‌های بتای Dataproc مانند Component Gateway را فعال می‌کند.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

این نوع دستگاه مورد استفاده برای کارگران شما را تعیین می‌کند.

--worker-machine-type n1-standard-8

این تعداد کارگرانی را که خوشه شما خواهد داشت، تعیین می‌کند.

--num-workers 8

این دستور نسخه تصویر Dataproc را تنظیم می‌کند.

--image-version 1.5-debian

این دستور ، اقدامات اولیه‌سازی مورد استفاده در کلاستر را پیکربندی می‌کند. در اینجا، شما اقدام اولیه‌سازی pip را لحاظ می‌کنید.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

این متادیتایی است که باید در کلاستر قرار گیرد. در اینجا، شما متادیتایی را برای عمل مقداردهی اولیه pip ارائه می‌دهید.

--metadata 'PIP_PACKAGES=google-cloud-storage'

این کار باعث می‌شود کامپوننت‌های اختیاری روی کلاستر نصب شوند.

--optional-components=ANACONDA

این کار، کامپوننت گیت‌وی (Component Gateway) را فعال می‌کند که به شما امکان می‌دهد از کامپوننت گیت‌وی Dataproc برای مشاهده رابط‌های کاربری رایج مانند Zeppelin، Jupyter یا تاریخچه Spark استفاده کنید.

--enable-component-gateway

برای آشنایی عمیق‌تر با Dataproc، لطفاً این codelab را بررسی کنید.

ایجاد یک سطل ذخیره‌سازی ابری گوگل

برای خروجی کار خود به یک سطل ذخیره‌سازی ابری گوگل (Google Cloud Storage bucket) نیاز دارید. یک نام منحصر به فرد برای سطل خود تعیین کنید و دستور زیر را برای ایجاد یک سطل جدید اجرا کنید. نام سطل‌ها در تمام پروژه‌های گوگل کلود برای همه کاربران منحصر به فرد است، بنابراین ممکن است لازم باشد این کار را چند بار با نام‌های مختلف امتحان کنید. اگر خطای ServiceException دریافت نکنید، سطل با موفقیت ایجاد شده است.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

۷. تحلیل اکتشافی داده‌ها

قبل از انجام پیش‌پردازش، باید درباره ماهیت داده‌هایی که با آنها سر و کار دارید، اطلاعات بیشتری کسب کنید. برای انجام این کار، دو روش کاوش داده‌ها را بررسی خواهید کرد. ابتدا، با استفاده از رابط کاربری وب BigQuery، داده‌های خام را مشاهده خواهید کرد و سپس با استفاده از PySpark و Dataproc تعداد پست‌های هر subreddit را محاسبه خواهید کرد.

استفاده از رابط کاربری وب BigQuery

با استفاده از رابط کاربری وب BigQuery برای مشاهده داده‌های خود شروع کنید. از آیکون منو در کنسول ابری، به پایین بروید و روی «BigQuery» کلیک کنید تا رابط کاربری وب BigQuery باز شود.

242a597d7045b4da.png

سپس، دستور زیر را در ویرایشگر کوئری BigQuery Web UI اجرا کنید. این دستور 10 ردیف کامل از داده‌ها را از ژانویه 2017 برمی‌گرداند:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

می‌توانید در صفحه اسکرول کنید تا همه ستون‌های موجود و همچنین برخی مثال‌ها را ببینید. به طور خاص، دو ستون خواهید دید که محتوای متنی هر پست را نشان می‌دهند: "عنوان" و "متن خود" که دومی بدنه پست است. همچنین به ستون‌های دیگری مانند "created_utc" که زمان utc ایجاد پست است و "subreddit" که ساب ردیتی است که پست در آن قرار دارد، توجه کنید.

اجرای یک کار PySpark

دستورات زیر را در Cloud Shell خود اجرا کنید تا مخزن را به همراه کد نمونه کلون کنید و با دستور cd به دایرکتوری صحیح بروید:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

شما می‌توانید از PySpark برای تعیین تعداد پست‌های موجود برای هر subreddit استفاده کنید. می‌توانید Cloud Editor را باز کنید و قبل از اجرای اسکریپت cloud-dataproc/codelabs/spark-bigquery در مرحله بعد، آن را بخوانید:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

برای بازگشت به Cloud Shell خود، روی دکمه‌ی «Open Terminal» در Cloud Editor کلیک کنید و دستور زیر را برای اجرای اولین کار PySpark خود اجرا کنید:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

این دستور به شما امکان می‌دهد تا کارها را از طریق API مربوط به Jobs به Dataproc ارسال کنید. در اینجا نوع کار را pyspark مشخص می‌کنید. می‌توانید نام کلاستر، پارامترهای اختیاری و نام فایل حاوی کار را وارد کنید. در اینجا، پارامتر --jars را ارائه می‌دهید که به شما امکان می‌دهد spark-bigquery-connector به کار خود اضافه کنید. همچنین می‌توانید سطوح خروجی لاگ را با استفاده از --driver-log-levels root=FATAL تنظیم کنید که تمام خروجی‌های لاگ را به جز خطاها سرکوب می‌کند. لاگ‌های اسپارک معمولاً نویز زیادی دارند.

اجرای این دستور چند دقیقه طول می‌کشد و خروجی نهایی شما چیزی شبیه به این خواهد بود:

6c185228db47bb18.png

۸. بررسی رابط‌های کاربری Dataproc و Spark

هنگام اجرای کارهای Spark روی Dataproc، به دو رابط کاربری برای بررسی وضعیت کارها/خوشه‌های خود دسترسی دارید. اولین رابط کاربری، رابط کاربری Dataproc است که می‌توانید با کلیک روی آیکون منو و پیمایش به پایین تا Dataproc، آن را پیدا کنید. در اینجا می‌توانید حافظه فعلی موجود و همچنین حافظه در انتظار اجرا و تعداد کارگران را مشاهده کنید.

6f2987346d15c8e2.png

همچنین می‌توانید روی تب «کارها» کلیک کنید تا کارهای تکمیل‌شده را ببینید. می‌توانید با کلیک روی شناسه کار برای یک کار خاص، جزئیات کار مانند گزارش‌ها و خروجی آن کارها را مشاهده کنید. ۱۱۴d90129b0e4c88.png

۱b2160f0f484594a.png

همچنین می‌توانید رابط کاربری Spark را مشاهده کنید. از صفحه کار، روی فلش برگشت کلیک کنید و سپس روی رابط‌های وب کلیک کنید. باید چندین گزینه را در زیر دروازه کامپوننت مشاهده کنید. بسیاری از این موارد را می‌توان از طریق کامپوننت‌های اختیاری هنگام تنظیم کلاستر خود فعال کرد. برای این آزمایش، روی "Spark History Server" کلیک کنید.

5da7944327d193dc.png

6a349200289c69c1.pnge63b36bdc90ff610.png

این باید پنجره زیر را باز کند:

8f6786760f994fe8.png

تمام کارهای تکمیل‌شده اینجا نمایش داده می‌شوند و می‌توانید روی هر application_id کلیک کنید تا اطلاعات بیشتری در مورد آن کار کسب کنید. به طور مشابه، می‌توانید روی «نمایش برنامه‌های ناقص» در پایین صفحه فرود کلیک کنید تا تمام کارهای در حال انجام را مشاهده کنید.

۹. اجرای عملیات خاکریزی

اکنون وظیفه‌ای را اجرا خواهید کرد که داده‌ها را در حافظه بارگذاری می‌کند، اطلاعات لازم را استخراج می‌کند و خروجی را در یک مخزن ذخیره‌سازی ابری گوگل (Google Cloud Storage Bucket) ذخیره می‌کند. شما "عنوان"، "بدنه" (متن خام) و "برچسب زمانی ایجاد شده" را برای هر نظر ردیت استخراج خواهید کرد. سپس این داده‌ها را دریافت کرده، به یک فایل csv تبدیل می‌کنید، آن را فشرده کرده و در یک مخزن با URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz بارگذاری می‌کنید.

می‌توانید دوباره به ویرایشگر ابر مراجعه کنید تا کد مربوط به cloud-dataproc/codelabs/spark-bigquery/backfill.sh را که یک اسکریپت پوششی برای اجرای کد موجود در cloud-dataproc/codelabs/spark-bigquery/backfill.py است، بخوانید.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

شما باید به زودی تعدادی پیام تکمیل کار را مشاهده کنید. تکمیل کار ممکن است تا ۱۵ دقیقه طول بکشد. همچنین می‌توانید با استفاده از gsutil، سطل ذخیره‌سازی خود را دوباره بررسی کنید تا خروجی موفقیت‌آمیز داده‌ها را تأیید کنید. پس از انجام همه کارها، دستور زیر را اجرا کنید:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

شما باید خروجی زیر را ببینید:

a7c3c7b2e82f9fca.png

تبریک می‌گویم، شما با موفقیت یک فایل پشتیبان برای داده‌های نظرات ردیت خود تکمیل کردید! اگر علاقه‌مند به دانستن نحوه ساخت مدل‌ها بر اساس این داده‌ها هستید، لطفاً به آزمایشگاه کد Spark-NLP مراجعه کنید.

۱۰. پاکسازی

برای جلوگیری از تحمیل هزینه‌های غیرضروری به حساب GCP خود پس از تکمیل این راهنمای سریع:

  1. سطل ذخیره‌سازی ابری مربوط به محیط و آنچه ایجاد کرده‌اید را حذف کنید.
  2. محیط Dataproc را حذف کنید .

اگر فقط برای این codelab پروژه‌ای ایجاد کرده‌اید، می‌توانید به صورت اختیاری پروژه را حذف کنید:

  1. در کنسول GCP، به صفحه پروژه‌ها بروید.
  2. در لیست پروژه‌ها، پروژه‌ای را که می‌خواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
  3. در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.

مجوز

این اثر تحت مجوز عمومی Creative Commons Attribution 3.0 و مجوز Apache 2.0 منتشر شده است.