Melakukan pra-pemrosesan Data BigQuery dengan PySpark di Dataproc

1. Ringkasan

Codelab ini akan membahas cara membuat pipeline pemrosesan data menggunakan Apache Spark dengan Dataproc di Google Cloud Platform. Biasanya dalam ilmu data dan rekayasa data, membaca data dari satu lokasi penyimpanan, menjalankan transformasi di dalamnya, dan menulisnya ke lokasi penyimpanan lain adalah kasus-kasus penggunaan yang umum terjadi. Transformasi umum termasuk mengubah isi data, menghapus informasi yang tidak perlu, dan mengubah jenis file.

Dalam codelab ini, Anda akan mempelajari Apache Spark, menjalankan contoh pipeline menggunakan Dataproc dengan PySpark (Apache Spark API Python), BigQuery, Google Cloud Storage, dan data dari Reddit.

2. Pengantar Apache Spark (Opsional)

Menurut situs tersebut, " Apache Spark adalah mesin analisis terpadu untuk pemrosesan data berskala besar." Dengan API ini, Anda dapat menganalisis dan memproses data secara paralel dan dalam memori sehingga memungkinkan komputasi paralel masif di berbagai mesin dan node. Alat ini awalnya dirilis pada tahun 2014 sebagai upgrade ke MapReduce tradisional dan masih merupakan salah satu framework paling populer untuk melakukan komputasi berskala besar. Apache Spark ditulis dalam Scala, lalu memiliki API di Scala, Java, Python, dan R. Layanan ini berisi banyak library seperti Spark SQL untuk menjalankan kueri SQL pada data, Spark Streaming untuk data streaming, MLlib untuk machine learning, dan GraphX untuk pemrosesan grafik, yang semuanya berjalan di mesin Apache Spark.

32add0b6a47bafbc.pngS

Spark dapat berjalan sendiri atau dapat memanfaatkan layanan pengelolaan resource seperti Yarn, Mesos, atau Kubernetes untuk penskalaan. Anda akan menggunakan Dataproc untuk codelab ini, yang menggunakan Yarn.

Data di Spark awalnya dimuat ke dalam memori ke dalam RDD, atau set data terdistribusi yang tangguh. Sejak saat itu, pengembangan Spark mencakup penambahan dua tipe data baru bergaya kolom: {i>Dataset<i} yang diketik, dan {i>Dataframe<i}, yang tidak diketik. Sebenarnya, RDD cocok untuk semua jenis data, sedangkan Set Data dan Bingkai Data dioptimalkan untuk data tabulasi. Karena Set Data hanya tersedia dengan API Java dan Scala, kita akan melanjutkan penggunaan PySpark Dataframe API untuk codelab ini. Untuk informasi selengkapnya, lihat dokumentasi Apache Spark.

3. Kasus Penggunaan

Data engineer sering kali membutuhkan data agar mudah diakses oleh data scientist. Namun, data pada awalnya sering kali kotor (sulit digunakan untuk analisis dalam kondisi saat ini) dan perlu dibersihkan sebelum dapat banyak digunakan. Contohnya adalah data yang telah disalin dari web yang mungkin berisi encoding yang aneh atau tag HTML yang tidak relevan.

Di lab ini, Anda akan memuat sekumpulan data dari BigQuery dalam bentuk postingan Reddit ke dalam cluster Spark yang dihosting di Dataproc, mengekstrak informasi yang berguna, dan menyimpan data yang diproses sebagai file CSV dalam bentuk zip di Google Cloud Storage.

be2a4551ece63bfc.png

Kepala data scientist di perusahaan Anda tertarik agar tim mereka menangani berbagai masalah natural language processing. Secara khusus, mereka tertarik untuk menganalisis data dalam subreddit "r/food". Anda akan membuat pipeline untuk dump data yang dimulai dengan pengisian ulang dari Januari 2017 hingga Agustus 2019.

4. Mengakses BigQuery melalui BigQuery Storage API

Menarik data dari BigQuery menggunakan metode tabledata.list API terbukti menghabiskan waktu dan tidak efisien seperti jumlah skala data. Metode ini menampilkan daftar objek JSON dan mengharuskan Anda membaca halaman satu per satu secara berurutan untuk membaca seluruh set data.

BigQuery Storage API menghadirkan peningkatan signifikan dalam mengakses data di BigQuery dengan menggunakan protokol berbasis RPC. Platform ini mendukung pembacaan dan penulisan data secara paralel serta berbagai format serialisasi seperti Apache Avro dan Apache Arrow. Pada tingkat tinggi, hal ini berarti peningkatan performa secara signifikan, terutama pada set data yang lebih besar.

Dalam codelab ini, Anda akan menggunakan spark-bigquery-connector untuk membaca dan menulis data antara BigQuery dan Spark.

5. Membuat Project

Login ke konsol Google Cloud Platform di console.cloud.google.com dan buat project baru:

7e541d932b20c074.pngS

2deefc9295d114ea.pngS

a92a49afe05008a.png

Selanjutnya, Anda harus mengaktifkan penagihan di Cloud Console untuk menggunakan resource Google Cloud.

Menjalankan operasi dalam codelab ini seharusnya tidak menghabiskan biaya lebih dari beberapa dolar, tetapi bisa lebih mahal jika Anda memutuskan untuk menggunakan lebih banyak resource atau jika Anda membiarkannya berjalan. Bagian terakhir dari codelab ini akan memandu Anda menyelesaikan project.

Pengguna baru Google Cloud Platform memenuhi syarat untuk mendapatkan uji coba gratis senilai$300.

6. Menyiapkan Lingkungan Anda

Sekarang Anda akan melakukan penyiapan lingkungan dengan:

  • Mengaktifkan Compute Engine, Dataproc, dan BigQuery Storage API
  • Mengonfigurasi setelan project
  • Membuat cluster Dataproc
  • Membuat bucket Google Cloud Storage

Mengaktifkan API dan Mengonfigurasi Lingkungan Anda

Buka Cloud Shell dengan menekan tombol di pojok kanan atas Cloud Console.

a10c47ee6ca41c54.png

Setelah Cloud Shell dimuat, jalankan perintah berikut untuk mengaktifkan Compute Engine, Dataproc, dan BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Tetapkan project id project Anda. Anda dapat menemukannya dengan membuka halaman pemilihan project dan mencari project Anda. Nama ini mungkin tidak sama dengan nama project Anda.

e682e8227aa3c781.png

76d45fb295728542.png

Jalankan perintah berikut untuk menetapkan project id:

gcloud config set project <project_id>

Tetapkan region project Anda dengan memilih salah satu dari daftar di sini. Contohnya adalah us-central1.

gcloud config set dataproc/region <region>

Pilih nama untuk cluster Dataproc Anda dan buat variabel lingkungan untuk cluster tersebut.

CLUSTER_NAME=<cluster_name>

Membuat Cluster Dataproc

Buat cluster Dataproc dengan menjalankan perintah berikut:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Perintah ini akan memerlukan waktu beberapa menit untuk diselesaikan. Untuk menguraikan perintah:

Tindakan ini akan memulai pembuatan cluster Dataproc dengan nama yang Anda berikan sebelumnya. Penggunaan beta API akan mengaktifkan fitur beta Dataproc, seperti Gateway Komponen.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Ini akan menetapkan jenis mesin yang digunakan untuk pekerja Anda.

--worker-machine-type n1-standard-8

Cara ini akan menetapkan jumlah worker yang akan dimiliki cluster Anda.

--num-workers 8

Tindakan ini akan menetapkan versi gambar Dataproc.

--image-version 1.5-debian

Tindakan ini akan mengonfigurasi tindakan inisialisasi yang akan digunakan di cluster. Di sini, Anda menyertakan tindakan inisialisasi pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Ini adalah metadata yang akan disertakan pada cluster. Di sini, Anda memberikan metadata untuk tindakan inisialisasi pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Tindakan ini akan menetapkan Komponen Opsional untuk diinstal di cluster.

--optional-components=ANACONDA

Tindakan ini akan mengaktifkan gateway komponen yang memungkinkan Anda menggunakan Gateway Komponen Dataproc untuk melihat UI umum seperti Zeppelin, Jupyter, atau Histori Spark

--enable-component-gateway

Untuk pengantar Dataproc yang lebih mendalam, lihat codelab ini.

Membuat Bucket Google Cloud Storage

Anda memerlukan bucket Google Cloud Storage untuk output tugas Anda. Tentukan nama unik untuk bucket Anda dan jalankan perintah berikut untuk membuat bucket baru. Nama bucket bersifat unik di semua project Google Cloud untuk semua pengguna, jadi Anda mungkin perlu mencobanya beberapa kali dengan nama yang berbeda. Bucket berhasil dibuat jika Anda tidak menerima ServiceException.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Analisis Data Eksploratif

Sebelum melakukan pra-pemrosesan, Anda harus mempelajari lebih lanjut tentang sifat data yang sedang Anda tangani. Untuk melakukannya, Anda akan mempelajari dua metode eksplorasi data. Pertama, Anda akan melihat beberapa data mentah menggunakan UI Web BigQuery, lalu menghitung jumlah postingan per subreddit menggunakan PySpark dan Dataproc.

Menggunakan UI Web BigQuery

Mulailah dengan menggunakan UI Web BigQuery untuk melihat data Anda. Dari ikon menu di Konsol Cloud, scroll ke bawah dan tekan "BigQuery" untuk membuka UI Web BigQuery.

242a597d7045b4da.pngS

Selanjutnya, jalankan perintah berikut di Editor Kueri UI Web BigQuery. Ini akan mengembalikan 10 baris data lengkap dari Januari 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Anda dapat men-scroll halaman untuk melihat semua kolom yang tersedia beserta beberapa contohnya. Secara khusus, Anda akan melihat dua kolom yang mewakili konten tekstual dari setiap postingan: "title" dan "selftext", yang kedua adalah isi postingan. Perhatikan juga kolom lain seperti "created_utc" yang merupakan waktu pembuatan postingan dan "subreddit" yang merupakan sub reddit tempat postingan tersebut berada.

Mengeksekusi Tugas PySpark

Jalankan perintah berikut di Cloud Shell Anda untuk meng-clone repo dengan kode sampel dan menjalankan cd ke direktori yang benar:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Anda dapat menggunakan PySpark untuk menentukan jumlah postingan yang ada untuk setiap subreddit. Anda dapat membuka Cloud Editor dan membaca skrip cloud-dataproc/codelabs/spark-bigquery sebelum mengeksekusinya di langkah berikutnya:

5d965c6fb66dbd81.pngS

797cf71de3449bdb.pngS

Klik "Open Terminal" di Cloud Editor untuk beralih kembali ke Cloud Shell dan menjalankan perintah berikut untuk menjalankan tugas PySpark pertama Anda:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Perintah ini memungkinkan Anda mengirimkan tugas ke Dataproc melalui Jobs API. Di sini, Anda menunjukkan jenis pekerjaan sebagai pyspark. Anda dapat memberikan nama cluster, parameter opsional, dan nama file yang berisi tugas. Di sini, Anda memberikan parameter --jars yang memungkinkan Anda menyertakan spark-bigquery-connector dengan tugas Anda. Anda juga dapat menetapkan tingkat output log menggunakan --driver-log-levels root=FATAL yang akan menyembunyikan semua output log kecuali error. Log Spark cenderung agak berisik.

Proses ini memerlukan waktu beberapa menit untuk dijalankan dan output akhir Anda akan terlihat seperti ini:

6c185228db47bb18.pngS

8. Menjelajahi UI Dataproc dan Spark

Saat menjalankan tugas Spark di Dataproc, Anda memiliki akses ke dua UI untuk memeriksa status tugas / cluster Anda. Yang pertama adalah UI Dataproc, yang dapat Anda temukan dengan mengklik ikon menu dan men-scroll ke bawah ke Dataproc. Di sini, Anda dapat melihat memori saat ini yang tersedia serta memori tertunda dan jumlah worker.

6f2987346d15c8e2.pngS

Anda juga dapat mengklik tab {i>Jobs<i} untuk melihat pekerjaan yang telah selesai. Anda dapat melihat detail tugas seperti log dan output tugas tersebut dengan mengklik ID Tugas untuk tugas tertentu. 114d90129b0e4c88.pngS

1b2160f0f484594a.pngS

Anda juga dapat melihat UI Spark. Dari halaman lowongan, klik panah kembali lalu klik Web Interfaces. Anda akan melihat beberapa opsi di bagian gateway komponen. Banyak komponen dapat diaktifkan melalui Komponen Opsional saat menyiapkan cluster. Untuk lab ini, klik "Spark History Server.

5da7944327d193dc.pngS

6a349200289c69c1.pngS e63b36bdc90ff610.png

Tindakan ini akan membuka jendela berikut:

8f6786760f994fe8.pngS

Semua tugas yang selesai akan muncul di sini, dan Anda dapat mengklik application_id untuk mempelajari informasi selengkapnya. Anda juga dapat mengklik "Tampilkan Aplikasi yang Tidak Lengkap" di bagian paling bawah halaman landing untuk melihat semua tugas yang sedang berjalan.

9. Menjalankan Tugas Pengisian Ulang

Sekarang Anda akan menjalankan tugas yang memuat data ke memori, mengekstrak informasi yang diperlukan, dan membuang output ke bucket Google Cloud Storage. Anda akan mengekstrak "judul", "isi" (raw text) dan "stempel waktu dibuat" untuk setiap komentar reddit. Selanjutnya, Anda akan mengambil data ini, mengubahnya menjadi csv, menjadikannya zip, dan memuatnya ke dalam bucket dengan URI gs://${BUCKET_NAME}/reddit_posting/YYYY/MM/food.csv.gz.

Anda dapat merujuk ke Cloud Editor lagi guna membaca kode untuk cloud-dataproc/codelabs/spark-bigquery/backfill.sh yang merupakan skrip wrapper untuk mengeksekusi kode di cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Anda akan segera melihat banyak pesan penyelesaian tugas. Tugas mungkin memerlukan waktu hingga 15 menit untuk diselesaikan. Anda juga dapat memeriksa kembali bucket penyimpanan untuk memverifikasi output data yang berhasil menggunakan gsutil. Setelah semua tugas selesai, jalankan perintah berikut:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Anda akan melihat output berikut:

a7c3c7b2e82f9fca.png

Selamat, Anda berhasil menyelesaikan pengisian ulang untuk data komentar reddit Anda. Jika Anda ingin mengetahui cara membangun model berdasarkan data ini, lanjutkan ke codelab Spark-NLP.

10. Pembersihan

Agar tidak menimbulkan tagihan yang tidak perlu pada akun GCP Anda setelah menyelesaikan panduan memulai ini:

  1. Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
  2. Menghapus lingkungan Dataproc.

Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut secara opsional:

  1. Di GCP Console, buka halaman Project.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada kotak, ketik project ID, lalu klik Shut down untuk menghapus project.

Lisensi

Karya ini dilisensikan berdasarkan Lisensi Generik Creative Commons Attribution 3.0 dan Apache 2.0.