Dataproc Serverless

1. Ringkasan - Google Dataproc

Dataproc adalah layanan yang sangat skalabel dan terkelola sepenuhnya untuk menjalankan Apache Spark, Apache Flink, Presto, serta berbagai framework dan alat open source lainnya. Gunakan Dataproc untuk modernisasi data lake, ETL / ELT, dan data science yang aman, dalam skala besar. Dataproc juga terintegrasi penuh dengan beberapa layanan Google Cloud, termasuk BigQuery, Cloud Storage, Vertex AI, dan Dataplex.

Dataproc tersedia dalam tiga varian:

  • Dataproc Serverless dapat digunakan untuk menjalankan tugas PySpark tanpa perlu mengonfigurasi infrastruktur dan penskalaan otomatis. Dataproc Serverless mendukung workload dan sesi / notebook batch PySpark.
  • Dataproc di Google Compute Engine memungkinkan Anda mengelola cluster Hadoop YARN untuk workload Spark berbasis YARN selain di alat open source seperti Flink dan Presto. Anda dapat menyesuaikan cluster berbasis cloud dengan penskalaan vertikal atau horizontal sebanyak yang diinginkan, termasuk penskalaan otomatis.
  • Dataproc di Google Kubernetes Engine memungkinkan Anda mengonfigurasi cluster virtual Dataproc di infrastruktur GKE untuk mengirimkan tugas Spark, PySpark, SparkR, atau Spark SQL.

Dalam codelab ini, Anda akan mempelajari beberapa cara berbeda untuk menggunakan Dataproc Serverless.

Apache Spark awalnya dibangun untuk berjalan di cluster Hadoop dan menggunakan YARN sebagai pengelola resource-nya. Mengelola cluster Hadoop memerlukan keahlian khusus dan memastikan berbagai tombol pada cluster dikonfigurasi dengan benar. Ini merupakan tambahan untuk satu set kenop terpisah yang juga perlu diatur oleh Spark oleh pengguna. Hal ini menyebabkan banyak skenario di mana developer menghabiskan lebih banyak waktu untuk mengonfigurasi infrastruktur mereka daripada mengerjakan kode Spark itu sendiri.

Dataproc Serverless menghilangkan kebutuhan untuk mengonfigurasi cluster Hadoop atau Spark secara manual. Dataproc Serverless tidak berjalan di Hadoop dan menggunakan Alokasi Resource Dinamis-nya sendiri untuk menentukan persyaratan resource-nya, termasuk penskalaan otomatis. Sebagian kecil properti Spark masih dapat disesuaikan dengan Dataproc Serverless, tetapi umumnya Anda tidak perlu menyesuaikannya.

2. Siapkan

Anda akan memulai dengan mengonfigurasi lingkungan dan resource yang digunakan dalam codelab ini.

Buat project Google Cloud. Anda dapat menggunakan project yang sudah ada.

Buka Cloud Shell dengan mengkliknya di toolbar Cloud Console.

ba0bb17945a73543.png

Cloud Shell menyediakan lingkungan Shell yang siap digunakan yang dapat Anda gunakan untuk codelab ini.

68c4ebd2a8539764.pngS

Cloud Shell akan menetapkan nama project Anda secara default. Periksa kembali dengan menjalankan echo $GOOGLE_CLOUD_PROJECT. Jika Anda tidak melihat project ID di output, setel project ID tersebut.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Tetapkan region Compute Engine untuk resource Anda, seperti us-central1 atau europe-west2.

export REGION=<your-region>

Mengaktifkan API

Codelab ini menggunakan API berikut:

  • BigQuery
  • Dataproc

Aktifkan API yang diperlukan. Proses ini memerlukan waktu sekitar satu menit, dan pesan berhasil akan muncul setelah proses selesai.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Mengonfigurasi akses jaringan

Dataproc Serverless mengharuskan Akses Pribadi Google diaktifkan di region tempat Anda akan menjalankan tugas Spark karena driver dan eksekutor Spark hanya memiliki IP pribadi. Jalankan perintah berikut untuk mengaktifkannya di subnet default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Anda dapat memverifikasi bahwa Akses Pribadi Google diaktifkan melalui hal berikut yang akan menghasilkan True atau False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Membuat bucket penyimpanan

Membuat bucket penyimpanan yang akan digunakan untuk menyimpan aset yang dibuat dalam codelab ini.

Pilih nama untuk bucket Anda. Nama bucket harus unik secara global di semua pengguna.

export BUCKET=<your-bucket-name>

Buat bucket di region tempat Anda ingin menjalankan tugas Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Anda dapat melihat bahwa bucket Anda tersedia di konsol Cloud Storage. Anda juga dapat menjalankan gsutil ls untuk melihat bucket Anda.

Membuat Server Histori Persisten

UI Spark menyediakan insight dan alat proses debug yang lengkap tentang tugas Spark. Agar dapat melihat UI Spark untuk tugas Dataproc Serverless yang telah selesai, Anda harus membuat cluster Dataproc node tunggal untuk digunakan sebagai server histori persisten.

Tetapkan nama untuk server histori persisten Anda.

PHS_CLUSTER_NAME=my-phs

Jalankan perintah berikut.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

UI Spark dan server histori persisten akan dijelajahi secara lebih mendetail nanti di codelab.

3. Menjalankan tugas Serverless Spark dengan Dataproc Batch

Dalam contoh ini, Anda akan bekerja menggunakan set data dari set data publik Citi Bike Trip New York City (NYC). NYC Citi Bikes adalah sistem berbagi sepeda berbayar di NYC. Anda akan melakukan beberapa transformasi sederhana dan mencetak sepuluh ID stasiun sepeda Citi Bike yang paling populer. Contoh ini juga menggunakan spark-bigquery-connector open source untuk membaca dan menulis data antara Spark dan BigQuery dengan lancar.

Clone repo GitHub dan cd berikut ke direktori yang berisi file citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Kirim tugas ke Serverless Spark menggunakan Cloud SDK, yang tersedia di Cloud Shell secara default. Jalankan perintah berikut di shell Anda yang menggunakan Cloud SDK dan Dataproc Batches API untuk mengirim tugas Serverless Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Untuk memerincinya:

  • gcloud dataproc batches submit merujuk ke Dataproc Batches API.
  • pyspark menunjukkan bahwa Anda mengirimkan tugas PySpark.
  • --batch adalah nama tugas. Jika tidak diberikan, UUID yang dibuat secara acak akan digunakan.
  • --region=${REGION} adalah region geografis tempat tugas akan diproses.
  • --deps-bucket=${BUCKET} adalah tempat upload file Python lokal Anda sebelum dijalankan di lingkungan Serverless.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar menyertakan jar untuk spark-bigquery-connector di lingkungan runtime Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} adalah nama yang sepenuhnya memenuhi syarat dari server histori persisten. Di sinilah data peristiwa Spark (terpisah dari output konsol) disimpan dan dapat dilihat dari UI Spark.
  • -- di akhir menunjukkan bahwa semua hal di luarnya akan menjadi argumen runtime untuk program. Dalam hal ini, Anda mengirimkan nama bucket, seperti yang diwajibkan oleh tugas.

Anda akan melihat output berikut saat batch dikirimkan.

Batch [citibike-job] submitted.

Setelah beberapa menit, Anda akan melihat output berikut beserta metadata dari tugas.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Di bagian berikutnya, Anda akan mempelajari cara menemukan log untuk tugas ini.

Fitur tambahan

Dengan Spark Serverless, Anda memiliki opsi tambahan untuk menjalankan tugas.

  • Anda dapat membuat image docker kustom tempat tugas Anda dijalankan. Ini adalah cara yang bagus untuk menyertakan dependensi tambahan, termasuk library Python dan R.
  • Anda dapat menghubungkan instance Dataproc Metastore ke tugas Anda untuk mengakses metadata Hive.
  • Untuk kontrol tambahan, Dataproc Serverless mendukung konfigurasi sekumpulan kecil properti Spark.

4. Metrik Dataproc dan Kemampuan Observasi

Konsol Batch Dataproc mencantumkan semua tugas Dataproc Serverless Anda. Di konsol, Anda akan melihat Batch ID, Location, Status, Creation time, Elapsed time, dan Type untuk setiap tugas. Klik Batch ID tugas Anda untuk melihat informasi lebih lanjut.

Di halaman ini, Anda akan melihat informasi seperti Monitoring yang menunjukkan jumlah Batch Spark Executors yang digunakan tugas Anda dari waktu ke waktu (yang menunjukkan seberapa banyak tugas tersebut diskalakan secara otomatis).

Di tab Details, Anda akan melihat metadata selengkapnya tentang tugas, termasuk argumen dan parameter apa pun yang dikirimkan dengan tugas tersebut.

Anda juga dapat mengakses semua log dari halaman ini. Saat tugas Serverless Dataproc dijalankan, tiga kumpulan log berbeda akan dibuat:

  • Tingkat layanan
  • Output konsol
  • Logging peristiwa Spark

Tingkat layanan, mencakup log yang dibuat oleh layanan Dataproc Serverless. Hal ini termasuk hal-hal seperti Dataproc Serverless yang meminta CPU tambahan untuk penskalaan otomatis. Anda dapat melihatnya dengan mengklik View logs yang akan membuka Cloud Logging.

Output konsol dapat dilihat di bagian Output. Ini adalah output yang dihasilkan oleh tugas, termasuk metadata yang dicetak Spark saat memulai tugas atau pernyataan cetak apa pun yang dimasukkan ke dalam tugas.

Logging peristiwa Spark dapat diakses dari UI Spark. Karena Anda menyediakan server histori persisten untuk tugas Spark, Anda dapat mengakses UI Spark dengan mengklik View Spark History Server, yang berisi informasi untuk tugas Spark yang dijalankan sebelumnya. Anda dapat mempelajari UI Spark lebih lanjut dari dokumentasi Spark resmi.

5. Template Dataproc: BQ -> GCS

Template Dataproc adalah alat open source yang membantu menyederhanakan tugas pemrosesan data dalam Cloud lebih lanjut. Kode ini berfungsi sebagai wrapper untuk Dataproc Serverless dan menyertakan template untuk banyak tugas impor dan ekspor data, termasuk:

  • BigQuerytoGCS dan GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC dan JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS dan GCStoMongo

Daftar lengkapnya tersedia README.

Di bagian ini, Anda akan menggunakan Template Dataproc untuk mengekspor data dari BigQuery ke GCS.

Meng-clone repo

Clone repo dan ubah ke folder python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Mengonfigurasi lingkungan

Sekarang Anda akan menetapkan variabel lingkungan. Template Dataproc menggunakan variabel lingkungan GCP_PROJECT untuk project ID Anda, jadi tetapkan sama dengan GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Region Anda harus ditetapkan di lingkungan sebelumnya. Jika tidak, tetapkan di sini.

export REGION=<region>

Template Dataproc menggunakan spark-bigquery-conector untuk memproses tugas BigQuery dan mengharuskan URI disertakan dalam variabel lingkungan JARS. Tetapkan variabel JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Mengonfigurasi parameter template

Menetapkan nama bucket staging yang akan digunakan oleh layanan.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Selanjutnya, Anda akan menetapkan beberapa variabel khusus pekerjaan. Untuk tabel input, Anda akan kembali merujuk pada {i>dataset<i} BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Anda dapat memilih antara csv, parquet, avro, atau json. Untuk codelab ini, pilih CSV - di bagian selanjutnya tentang cara menggunakan Template Dataproc untuk mengonversi jenis file.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Tetapkan mode output ke overwrite. Anda dapat memilih antara overwrite, append, ignore, atau errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Tetapkan lokasi output GCS menjadi jalur di bucket Anda.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Menjalankan template

Jalankan template BIGQUERYTOGCS dengan menentukannya di bawah ini dan memberikan parameter input yang telah Anda tetapkan.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Output-nya akan cukup berisik, tetapi setelah sekitar satu menit Anda akan melihat berikut ini.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Anda dapat memverifikasi bahwa file telah dibuat dengan menjalankan perintah berikut.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark secara default menulis ke beberapa file, bergantung pada jumlah data. Dalam hal ini, Anda akan melihat sekitar 30 file yang dibuat. Nama file output Spark diformat dengan part- diikuti dengan angka lima digit (menunjukkan nomor suku cadang) dan string hash. Untuk data dalam jumlah besar, Spark biasanya akan ditulis ke beberapa file. Contoh nama file adalah part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Template Dataproc: CSV ke parquet

Sekarang Anda akan menggunakan Template Dataproc untuk mengonversi data di GCS dari satu jenis file ke jenis file lain menggunakan GCSTOGCS. Template ini menggunakan SparkSQL dan memberikan opsi untuk juga mengirimkan kueri SparkSQL yang akan diproses selama transformasi untuk pemrosesan tambahan.

Mengonfirmasi variabel lingkungan

Pastikan GCP_PROJECT, REGION, dan GCS_STAGING_BUCKET ditetapkan dari bagian sebelumnya.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Tetapkan parameter template

Sekarang Anda akan menetapkan parameter konfigurasi untuk GCStoGCS. Mulai dengan lokasi file input. Perlu diketahui bahwa ini adalah direktori dan bukan file spesifik karena semua file dalam direktori tersebut akan diproses. Tetapkan string ini ke BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Tetapkan format file input.

GCS_TO_GCS_INPUT_FORMAT=csv

Setel format output yang diinginkan. Anda dapat memilih parquet, json, avro, atau csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Tetapkan mode output ke overwrite. Anda dapat memilih antara overwrite, append, ignore, atau errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Tetapkan lokasi output.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Menjalankan template

Jalankan template GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Output-nya akan cukup berisik, tetapi setelah sekitar satu menit Anda akan melihat pesan berhasil seperti di bawah.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Anda dapat memverifikasi bahwa file telah dibuat dengan menjalankan perintah berikut.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Dengan template ini, Anda juga memiliki opsi menyediakan kueri SparkSQL dengan meneruskan gcs.to.gcs.temp.view.name dan gcs.to.gcs.sql.query ke template, sehingga kueri SparkSQL dapat dijalankan pada data sebelum menulis ke GCS.

7. Membersihkan resource

Agar tidak menimbulkan tagihan yang tidak perlu pada akun GCP Anda setelah menyelesaikan codelab ini:

  1. Hapus bucket Cloud Storage untuk lingkungan yang Anda buat.
gsutil rm -r gs://${BUCKET}
  1. Hapus cluster Dataproc yang digunakan untuk server histori persisten Anda.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Menghapus tugas Dataproc Serverless. Buka Batches Console, klik kotak di samping setiap tugas yang ingin dihapus, lalu klik HAPUS.

Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut secara opsional:

  1. Di GCP Console, buka halaman Project.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada kotak, ketik project ID, lalu klik Shut Down untuk menghapus project.

8. Langkah berikutnya

Referensi berikut memberikan cara tambahan untuk memanfaatkan Serverless Spark: