1. Pengantar
Google Cloud Dataflow
Terakhir Diperbarui: 5-Jul-2023
Apa yang dimaksud dengan Dataflow?
Dataflow adalah layanan terkelola untuk menjalankan berbagai macam pola pemrosesan data. Dokumentasi di situs ini menunjukkan cara men-deploy pipeline pemrosesan data batch dan streaming menggunakan Dataflow, termasuk petunjuk cara menggunakan fitur layanan.
Apache Beam SDK adalah model pemrograman open source yang memungkinkan Anda mengembangkan pipeline batch dan streaming. Anda membuat pipeline dengan program Apache Beam, lalu menjalankannya di layanan Dataflow. Dokumentasi Apache Beam memberikan informasi konseptual yang mendalam dan materi referensi untuk model pemrograman Apache Beam, SDK, dan runner lainnya.
Analisis data streaming dengan cepat
Dataflow memungkinkan pengembangan pipeline data streaming yang cepat dan sederhana dengan latensi data yang lebih rendah.
Menyederhanakan pengoperasian dan pengelolaan
Biarkan tim untuk berfokus pada pemrograman, dan bukan pengelolaan cluster server karena pendekatan serverless Dataflow menghilangkan beban operasional dari workload data engineering.
Mengurangi total biaya kepemilikan
Dengan penskalaan otomatis resource serta kemampuan batch processing yang menggunakan pengoptimalan biaya, Dataflow menawarkan kapasitas tanpa batas untuk mengelola workload musiman atau naik turun tanpa memboroskan anggaran.
Fitur utama
Pengelolaan resource otomatis dan penyeimbangan ulang tugas dinamis
Dataflow mengotomatiskan penyediaan dan pengelolaan resource pemrosesan untuk meminimalkan latensi dan memaksimalkan penggunaan, sehingga Anda tidak perlu menjalankan instance atau mencadangkannya secara manual. Pembagian tugas juga diotomatiskan dan dioptimalkan untuk menyeimbangkan kembali tugas yang mengalami lag secara dinamis. Tidak perlu memburu "hot key" atau melakukan pra-pemrosesan data.
Penskalaan horizontal otomatis
Penskalaan horizontal otomatis terhadap resource worker untuk menghasilkan throughput yang optimal dan rasio harga terhadap performa yang lebih baik secara keseluruhan.
Harga penjadwalan resource yang fleksibel untuk batch processing
Untuk pemrosesan dengan waktu penjadwalan tugas yang fleksibel, seperti tugas semalaman, penjadwalan resource yang fleksibel (FlexRS) menawarkan harga yang lebih rendah untuk batch processing. Tugas fleksibel ini ditempatkan dalam antrean dengan jaminan bahwa tugas tersebut akan diambil untuk dieksekusi dalam waktu enam jam.
Yang akan Anda jalankan sebagai bagian dari program ini
Menggunakan runner interaktif Apache Beam dengan notebook JupyterLab memungkinkan Anda secara iteratif mengembangkan pipeline, memeriksa grafik pipeline, dan mengurai setiap PCollection dalam alur kerja read-eval-print-loop (REPL). Notebook Apache Beam ini tersedia melalui Vertex AI Workbench, sebuah layanan terkelola yang menghosting virtual machine notebook yang telah diinstal sebelumnya dengan framework machine learning dan data science terbaru.
Codelab ini berfokus pada fungsi yang diperkenalkan oleh notebook Apache Beam.
Yang akan Anda pelajari
- Cara membuat instance notebook
- Membuat pipeline dasar
- Membaca data dari sumber yang tidak terbatas
- Memvisualisasikan data
- Meluncurkan Tugas Dataflow dari notebook
- Menyimpan notebook
Yang Anda butuhkan
- Project Google Cloud Platform dengan Penagihan diaktifkan.
- Google Cloud Dataflow dan Google Cloud PubSub diaktifkan.
2. Mempersiapkan
- Di Cloud Console, pada halaman pemilih project, pilih atau buat project Cloud.
Pastikan Anda telah mengaktifkan API berikut:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
Anda dapat memverifikasinya dengan memeriksa perintah & Halaman layanan.
Dalam panduan ini, kita akan membaca data dari langganan Pub/Sub, jadi pastikan akun layanan default Compute Engine memiliki peran Editor, atau berikan peran Pub/Sub Editor.
3. Memulai dengan notebook Apache Beam
Meluncurkan instance notebook Apache Beam
- Luncurkan Dataflow di Konsol:
- Pilih halaman Workbench menggunakan menu sebelah kiri.
- Pastikan Anda berada di tab User-managed notebooks.
- Di toolbar, klik New Notebook.
- Pilih Apache Beam > Tanpa GPU.
- Di halaman New notebook, pilih subnetwork untuk VM notebook lalu klik Create.
- Klik Open JupyterLab saat link sudah aktif. Vertex AI Workbench membuat instance notebook Apache Beam baru.
4. Membuat pipeline
Membuat instance notebook
Buka File > Baru > Notebook lalu pilih kernel dengan Apache Beam 2.47 atau yang lebih baru.
Mulai tambahkan kode ke notebook Anda
- Salin dan tempel kode dari setiap bagian dalam sel baru di {i>notebook<i} Anda
- Jalankan sel
Menggunakan runner interaktif Apache Beam dengan notebook JupyterLab memungkinkan Anda secara iteratif mengembangkan pipeline, memeriksa grafik pipeline, dan mengurai setiap PCollection dalam alur kerja read-eval-print-loop (REPL).
Apache Beam sudah diinstal di instance notebook Anda, jadi sertakan modul interactive_runner
dan interactive_beam
di notebook Anda.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Jika notebook Anda menggunakan layanan Google lainnya, tambahkan pernyataan impor berikut:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Menetapkan opsi interaktivitas
Tabel berikut menetapkan durasi pengambilan data ke 60 detik. Jika Anda ingin melakukan iterasi dengan lebih cepat, setel ke durasi yang lebih rendah, misalnya ‘10 dtk'.
ib.options.recording_duration = '60s'
Untuk opsi interaktif tambahan, lihat interactive_beam.options class.
Inisialisasi pipeline menggunakan objek InteractiveRunner
.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Membaca dan memvisualisasikan data
Contoh berikut menunjukkan pipeline Apache Beam yang membuat langganan ke topik Pub/Sub tertentu dan membaca dari langganan.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
Pipeline menghitung kata berdasarkan jendela dari sumber. Ini membuat windowing tetap dengan setiap jendela berdurasi 10 detik.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
Setelah data dijendelakan, kata-kata dihitung berdasarkan periode.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Memvisualisasikan data
Metode show()
memvisualisasikan PCollection yang dihasilkan di notebook.
ib.show(windowed_word_counts, include_window_info=True)
Untuk menampilkan visualisasi data Anda, teruskan visualize_data=True
ke metode show()
. Menambahkan sel baru:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
Anda dapat menerapkan beberapa filter ke visualisasi. Visualisasi berikut memungkinkan Anda memfilter menurut label dan sumbu:
5. Menggunakan Dataframe Pandas
Visualisasi lain yang berguna di notebook Apache Beam adalah Pandas DataFrame. Contoh berikut pertama-tama mengonversi kata menjadi huruf kecil, lalu menghitung frekuensi setiap kata.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Metode collect()
menyediakan output dalam DataFrame Pandas.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (Opsional) Meluncurkan tugas Dataflow dari notebook Anda
- Untuk menjalankan tugas di Dataflow, Anda memerlukan izin tambahan. Pastikan akun layanan default Compute Engine memiliki peran Editor, atau berikan peran IAM berikut:
- Dataflow Admin
- Dataflow Worker
- Admin Penyimpanan, dan
- Service Account User (roles/iam.serviceAccountUser)
Lihat peran selengkapnya dalam dokumentasi.
- (Opsional) Sebelum menggunakan notebook Anda untuk menjalankan tugas Dataflow, mulai ulang kernel, jalankan ulang semua sel, dan verifikasi output.
- Hapus pernyataan impor berikut:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Tambahkan pernyataan import berikut:
from apache_beam.runners import DataflowRunner
- Hapus opsi durasi perekaman berikut:
ib.options.recording_duration = '60s'
- Tambahkan kode berikut ke opsi pipeline Anda. Anda harus menyesuaikan lokasi Cloud Storage agar mengarah ke bucket yang sudah Anda miliki, atau Anda dapat membuat bucket baru untuk tujuan ini. Anda juga dapat mengubah nilai wilayah dari
us-central1
.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- Dalam konstruktor
beam.Pipeline()
, gantiInteractiveRunner
denganDataflowRunner
.p
adalah objek pipeline dari pembuatan pipeline Anda.
p = beam.Pipeline(DataflowRunner(), options=options)
- Hapus panggilan interaktif dari kode Anda. Misalnya, hapus
show()
,collect()
,head()
,show_graph()
, danwatch()
dari kode Anda. - Untuk dapat melihat hasil, Anda perlu menambahkan sink. Di bagian sebelumnya, kita memvisualisasikan hasil di notebook, tetapi kali ini, kita menjalankan tugas di luar notebook ini - di Dataflow. Oleh karena itu, kita memerlukan lokasi eksternal untuk hasil kita. Dalam contoh ini, kita akan menulis hasilnya ke file teks di GCS (Google Cloud Storage). Karena ini adalah pipeline streaming, dengan windowing data, kita sebaiknya membuat satu file teks per jendela. Untuk melakukannya, tambahkan langkah-langkah berikut ke pipeline Anda:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Tambahkan
p.run()
di akhir kode pipeline Anda. - Sekarang tinjau kode notebook Anda untuk memastikan bahwa Anda telah memasukkan semua perubahan. Konfigurasinya akan terlihat seperti ini:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Jalankan sel.
- Anda akan melihat output yang mirip dengan berikut ini:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Untuk memvalidasi apakah tugas sedang berjalan, buka halaman Tugas untuk Dataflow. Anda akan melihat tugas baru di dalam daftar. Tugas akan memakan waktu sekitar 5-10 menit untuk mulai memproses data.
- Setelah data diproses, buka Cloud Storage dan buka direktori tempat Dataflow menyimpan hasil (
output_gcs_location
yang Anda tentukan). Anda akan melihat daftar file teks, dengan satu file per jendela. - Download file dan periksa kontennya. Ini harus berisi daftar kata yang dipasangkan dengan jumlahnya. Atau, gunakan antarmuka command line untuk memeriksa file. Anda dapat melakukannya dengan menjalankan perintah berikut di sel baru di {i>notebook<i} Anda:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Anda akan melihat output yang serupa dengan ini:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Selesai. Jangan lupa untuk membersihkan dan menghentikan tugas yang telah Anda buat (lihat langkah terakhir dari codelab ini).
Untuk melihat contoh cara melakukan konversi ini di notebook interaktif, lihat notebook Jumlah Kata Dataflow di instance notebook Anda.
Atau, Anda dapat mengekspor notebook sebagai skrip yang dapat dieksekusi, mengubah file .py yang dihasilkan menggunakan langkah-langkah sebelumnya, lalu men-deploy pipeline Anda ke layanan Dataflow.
7. Menyimpan notebook Anda
Notebook yang Anda buat akan disimpan secara lokal di instance notebook yang sedang berjalan. Jika Anda mereset atau menonaktifkan instance notebook selama pengembangan, notebook baru tersebut akan dipertahankan selama dibuat di direktori /home/jupyter
. Namun, jika instance notebook dihapus, notebook tersebut juga akan dihapus.
Agar dapat menyimpan notebook Anda untuk penggunaan mendatang, download notebook secara lokal ke workstation, simpan ke GitHub, atau ekspor ke format file yang berbeda.
8. Pembersihan
Setelah selesai menggunakan instance notebook Apache Beam, bersihkan resource yang Anda buat di Google Cloud dengan menonaktifkan instance notebook dan menghentikan tugas streaming, jika Anda telah menjalankannya.
Atau, jika membuat project hanya untuk tujuan codelab ini, Anda juga dapat menonaktifkan project sepenuhnya.