1. Ringkasan - Google Dataproc
Dataproc adalah layanan yang terkelola sepenuhnya dan sangat skalabel untuk menjalankan Apache Spark, Apache Flink, Presto, serta banyak alat dan framework open source lainnya. Gunakan Dataproc untuk modernisasi data lake, ETL / ELT, dan data science yang aman, dalam skala global. Dataproc juga terintegrasi sepenuhnya dengan beberapa layanan Google Cloud, termasuk BigQuery, Cloud Storage, Vertex AI, dan Dataplex.
Dataproc tersedia dalam tiga jenis:
- Dataproc Serverless memungkinkan Anda menjalankan tugas PySpark tanpa perlu mengonfigurasi infrastruktur dan penskalaan otomatis. Dataproc Serverless mendukung workload dan sesi / notebook batch PySpark.
- Dataproc di Google Compute Engine memungkinkan Anda mengelola cluster Hadoop YARN untuk beban kerja Spark berbasis YARN selain alat open source seperti Flink dan Presto. Anda dapat menyesuaikan cluster berbasis cloud dengan penskalaan vertikal atau horizontal sebanyak yang Anda inginkan, termasuk penskalaan otomatis.
- Dataproc di Google Kubernetes Engine memungkinkan Anda mengonfigurasi cluster virtual Dataproc di infrastruktur GKE untuk mengirimkan tugas Spark, PySpark, SparkR, atau Spark SQL.
Dalam codelab ini, Anda akan mempelajari beberapa cara berbeda yang dapat Anda gunakan untuk mengakses Dataproc Serverless.
Apache Spark awalnya dibuat untuk berjalan di cluster Hadoop dan menggunakan YARN sebagai pengelola resource-nya. Memelihara cluster Hadoop memerlukan keahlian khusus dan memastikan banyak tombol yang berbeda di cluster dikonfigurasi dengan benar. Selain itu, ada serangkaian tombol terpisah yang juga mengharuskan pengguna untuk menyetelnya di Spark. Hal ini menyebabkan banyak skenario di mana developer menghabiskan lebih banyak waktu untuk mengonfigurasi infrastruktur mereka, bukan mengerjakan kode Spark itu sendiri.
Dataproc Serverless menghilangkan kebutuhan untuk mengonfigurasi cluster Hadoop atau Spark secara manual. Dataproc Serverless tidak berjalan di Hadoop dan menggunakan Alokasi Resource Dinamis sendiri untuk menentukan persyaratan resource-nya, termasuk penskalaan otomatis. Sebagian kecil properti Spark masih dapat disesuaikan dengan Dataproc Serverless, tetapi dalam sebagian besar kasus, Anda tidak perlu mengubahnya.
2. Siapkan
Anda akan memulai dengan mengonfigurasi lingkungan dan resource yang digunakan dalam codelab ini.
Buat project Google Cloud. Anda dapat menggunakan yang sudah ada.
Buka Cloud Shell dengan mengkliknya di toolbar Cloud Console.

Cloud Shell menyediakan lingkungan Shell siap pakai yang dapat Anda gunakan untuk codelab ini.

Cloud Shell akan menetapkan nama project Anda secara default. Periksa kembali dengan menjalankan echo $GOOGLE_CLOUD_PROJECT. Jika Anda tidak melihat project ID di output, tetapkan project ID.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Tetapkan region Compute Engine untuk resource Anda, seperti us-central1 atau europe-west2.
export REGION=<your-region>
Mengaktifkan API
Codelab ini menggunakan API berikut:
- BigQuery
- Dataproc
Aktifkan API yang diperlukan. Proses ini akan memakan waktu sekitar satu menit, dan pesan keberhasilan akan muncul setelah selesai.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Mengonfigurasi akses jaringan
Dataproc Serverless memerlukan Akses Pribadi Google diaktifkan di region tempat Anda akan menjalankan tugas Spark karena driver dan eksekutor Spark hanya memiliki IP pribadi. Jalankan perintah berikut untuk mengaktifkannya di subnet default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Anda dapat memverifikasi bahwa Akses Pribadi Google diaktifkan melalui perintah berikut yang akan menghasilkan output True atau False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Membuat bucket penyimpanan
Buat bucket penyimpanan yang akan digunakan untuk menyimpan aset yang dibuat dalam codelab ini.
Pilih nama untuk bucket Anda. Nama bucket harus unik secara global di semua pengguna.
export BUCKET=<your-bucket-name>
Buat bucket di region tempat Anda ingin menjalankan tugas Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Anda dapat melihat bahwa bucket Anda tersedia di konsol Cloud Storage. Anda juga dapat menjalankan gsutil ls untuk melihat bucket Anda.
Membuat Persistent History Server
UI Spark menyediakan serangkaian alat dan insight yang kaya untuk men-debug tugas Spark. Untuk melihat UI Spark untuk tugas Dataproc Serverless yang telah selesai, Anda harus membuat cluster Dataproc satu node untuk digunakan sebagai persistent history server.
Tetapkan nama untuk server histori persisten Anda.
PHS_CLUSTER_NAME=my-phs
Jalankan perintah berikut.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
UI Spark dan server histori persisten akan dijelaskan lebih mendetail nanti dalam codelab ini.
3. Menjalankan tugas Serverless Spark dengan Batch Dataproc
Dalam contoh ini, Anda akan menggunakan sekumpulan data dari set data publik Perjalanan Citi Bike (NYC) New York City. NYC Citi Bikes adalah sistem berbagi sepeda berbayar di NYC. Anda akan melakukan beberapa transformasi sederhana dan mencetak sepuluh ID stasiun Citi Bike paling populer. Contoh ini juga menggunakan spark-bigquery-connector open source untuk membaca dan menulis data secara lancar antara Spark dan BigQuery.
Clone repositori GitHub berikut dan cd ke direktori yang berisi file citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Kirimkan tugas ke Serverless Spark menggunakan Cloud SDK, yang tersedia di Cloud Shell secara default. Jalankan perintah berikut di shell Anda yang menggunakan Cloud SDK dan Dataproc Batches API untuk mengirimkan tugas Serverless Spark.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Untuk menguraikannya:
gcloud dataproc batches submitmereferensikan Dataproc Batches API.pysparkmenunjukkan bahwa Anda mengirimkan tugas PySpark.--batchadalah nama tugas. Jika tidak diberikan, UUID yang dibuat secara acak akan digunakan.--region=${REGION}adalah wilayah geografis tempat tugas akan diproses.--deps-bucket=${BUCKET}adalah tempat file Python lokal Anda diupload sebelum dijalankan di lingkungan Serverless.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarmenyertakan JAR untuk spark-bigquery-connector di lingkungan runtime Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}adalah nama yang sepenuhnya memenuhi syarat dari server histori persisten. Di sinilah data peristiwa Spark (terpisah dari output konsol) disimpan dan dapat dilihat dari UI Spark.--di akhir menunjukkan bahwa apa pun di luar ini akan menjadi argumen runtime untuk program. Dalam hal ini, Anda mengirimkan nama bucket, sebagaimana diwajibkan oleh tugas.
Anda akan melihat output berikut saat batch dikirimkan.
Batch [citibike-job] submitted.
Setelah beberapa menit, Anda akan melihat output berikut beserta metadata dari tugas.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Di bagian berikutnya, Anda akan mempelajari cara menemukan log untuk tugas ini.
Fitur tambahan
Dengan Spark Serverless, Anda memiliki opsi tambahan untuk menjalankan tugas.
- Anda dapat membuat image Docker kustom yang dijalankan tugas Anda. Cara ini adalah cara yang bagus untuk menyertakan dependensi tambahan, termasuk library Python dan R.
- Anda dapat menghubungkan instance Dataproc Metastore ke tugas Anda untuk mengakses metadata Hive.
- Untuk kontrol tambahan, Dataproc Serverless mendukung konfigurasi sejumlah kecil properti Spark.
4. Metrik dan Kemampuan Observasi Dataproc
Konsol Batch Dataproc mencantumkan semua tugas Dataproc Serverless Anda. Di konsol, Anda akan melihat ID Batch, Lokasi, Status, Waktu pembuatan, Waktu berlalu, dan Jenis setiap tugas. Klik ID Batch tugas Anda untuk melihat informasi selengkapnya tentang tugas tersebut.
Di halaman ini, Anda akan melihat informasi seperti Monitoring yang menunjukkan jumlah Batch Spark Executor yang digunakan tugas Anda dari waktu ke waktu (menunjukkan seberapa besar penskalaan otomatisnya).
Di tab Detail, Anda akan melihat metadata selengkapnya tentang tugas, termasuk argumen dan parameter yang dikirimkan bersama tugas.
Anda juga dapat mengakses semua log dari halaman ini. Saat tugas Dataproc Serverless dijalankan, tiga set log yang berbeda akan dibuat:
- Tingkat layanan
- Output konsol
- Pencatatan peristiwa Spark
Tingkat layanan, mencakup log yang dihasilkan oleh layanan Dataproc Serverless. Hal ini mencakup hal-hal seperti Dataproc Serverless yang meminta CPU tambahan untuk penskalaan otomatis. Anda dapat melihatnya dengan mengklik Lihat log yang akan membuka Cloud Logging.
Output konsol dapat dilihat di bagian Output. Ini adalah output yang dihasilkan oleh tugas, termasuk metadata yang dicetak Spark saat memulai tugas atau pernyataan cetak apa pun yang disertakan dalam tugas.
Logging peristiwa Spark dapat diakses dari UI Spark. Karena Anda menyediakan server histori persisten untuk tugas Spark, Anda dapat mengakses UI Spark dengan mengklik View Spark History Server, yang berisi informasi untuk tugas Spark yang sebelumnya dijalankan. Anda dapat mempelajari lebih lanjut UI Spark dari dokumentasi Spark resmi.
5. Template Dataproc: BQ -> GCS
Templat Dataproc adalah alat open source yang membantu menyederhanakan lebih lanjut tugas pemrosesan data dalam Cloud. Alat ini berfungsi sebagai wrapper untuk Dataproc Serverless dan menyertakan template untuk banyak tugas impor dan ekspor data, termasuk:
BigQuerytoGCSdanGCStoBigQueryGCStoBigTableGCStoJDBCdanJDBCtoGCSHivetoBigQueryMongotoGCSdanGCStoMongo
Daftar lengkapnya tersedia di README.
Di bagian ini, Anda akan menggunakan Template Dataproc untuk mengekspor data dari BigQuery ke GCS.
Meng-clone repo
Clone repo dan ubah ke folder python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Mengonfigurasi lingkungan
Sekarang Anda akan menetapkan variabel lingkungan. Template Dataproc menggunakan variabel lingkungan GCP_PROJECT untuk project ID Anda, jadi tetapkan ini sama dengan GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Wilayah Anda harus ditetapkan di lingkungan dari sebelumnya. Jika belum, tetapkan di sini.
export REGION=<region>
Template Dataproc menggunakan spark-bigquery-connector untuk memproses tugas BigQuery dan memerlukan URI untuk disertakan dalam variabel lingkungan JARS. Tetapkan variabel JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Mengonfigurasi parameter template
Tetapkan nama bucket penyiapan yang akan digunakan layanan.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Selanjutnya, Anda akan menetapkan beberapa variabel khusus tugas. Untuk tabel input, Anda akan kembali mereferensikan set data BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Anda dapat memilih csv, parquet, avro, atau json. Untuk codelab ini, pilih CSV - di bagian berikutnya, Anda akan mempelajari cara menggunakan Template Dataproc untuk mengonversi jenis file.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Tetapkan mode output ke overwrite. Anda dapat memilih antara overwrite, append, ignore, atau errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Tetapkan lokasi output GCS sebagai jalur di bucket Anda.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Menjalankan template
Jalankan template BIGQUERYTOGCS dengan menentukannya di bawah dan memberikan parameter input yang Anda tetapkan.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Outputnya akan cukup berisik, tetapi setelah sekitar satu menit, Anda akan melihat hal berikut.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Anda dapat memverifikasi bahwa file dibuat dengan menjalankan perintah berikut.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Secara default, Spark menulis ke beberapa file, bergantung pada jumlah data. Dalam hal ini, Anda akan melihat sekitar 30 file yang dihasilkan. Nama file output Spark diformat dengan part-, diikuti dengan angka lima digit (yang menunjukkan nomor bagian) dan string hash. Untuk data dalam jumlah besar, Spark biasanya akan menulis ke beberapa file. Contoh nama file adalah part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Template Dataproc: CSV ke Parquet
Sekarang Anda akan menggunakan Template Dataproc untuk mengonversi data di GCS dari satu jenis file ke jenis file lainnya menggunakan GCSTOGCS. Template ini menggunakan SparkSQL dan memberikan opsi untuk mengirimkan kueri SparkSQL agar diproses selama transformasi untuk pemrosesan tambahan.
Mengonfirmasi variabel lingkungan
Konfirmasi bahwa GCP_PROJECT, REGION, dan GCS_STAGING_BUCKET ditetapkan dari bagian sebelumnya.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Menetapkan parameter template
Sekarang Anda akan menetapkan parameter konfigurasi untuk GCStoGCS. Mulai dengan lokasi file input. Perhatikan bahwa ini adalah direktori, bukan file tertentu karena semua file dalam direktori akan diproses. Tetapkan ini ke BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Tetapkan format file input.
GCS_TO_GCS_INPUT_FORMAT=csv
Tetapkan format output yang diinginkan. Anda dapat memilih parquet, json, avro, atau csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Tetapkan mode output ke overwrite. Anda dapat memilih antara overwrite, append, ignore, atau errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Tetapkan lokasi output.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Menjalankan template
Jalankan template GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Output akan cukup berisik, tetapi setelah sekitar satu menit, Anda akan melihat pesan keberhasilan seperti di bawah.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Anda dapat memverifikasi bahwa file dibuat dengan menjalankan perintah berikut.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Dengan template ini, Anda juga memiliki opsi untuk menyediakan kueri SparkSQL dengan meneruskan gcs.to.gcs.temp.view.name dan gcs.to.gcs.sql.query ke template, sehingga kueri SparkSQL dapat dijalankan pada data sebelum ditulis ke GCS.
7. Membersihkan resource
Agar tidak menimbulkan biaya yang tidak perlu pada akun GCP Anda setelah menyelesaikan codelab ini:
- Hapus bucket Cloud Storage untuk lingkungan yang Anda buat.
gsutil rm -r gs://${BUCKET}
- Hapus cluster Dataproc yang digunakan untuk persistent history server Anda.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Hapus tugas Dataproc Serverless. Buka Konsol Batch, klik kotak di samping setiap tugas yang ingin Anda hapus, lalu klik HAPUS.
Jika membuat project hanya untuk codelab ini, Anda juga dapat menghapus project tersebut jika mau:
- Di Konsol GCP, buka halaman Project.
- Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Hapus.
- Di kotak, ketik project ID, lalu klik Shut down untuk menghapus project.
8. Langkah berikutnya
Referensi berikut memberikan cara tambahan untuk memanfaatkan Serverless Spark:
- Pelajari cara mengorkestrasi alur kerja Dataproc Serverless menggunakan Cloud Composer.
- Pelajari cara mengintegrasikan Dataproc Serverless dengan pipeline Kubeflow.