Menggunakan Notebook dengan Google Cloud Dataflow

1. Pengantar

Cloud-Dataflow.png

Google Cloud Dataflow

Terakhir Diperbarui: 5-Jul-2023

Apa yang dimaksud dengan Dataflow?

Dataflow adalah layanan terkelola untuk menjalankan berbagai macam pola pemrosesan data. Dokumentasi di situs ini menunjukkan cara men-deploy pipeline pemrosesan data batch dan streaming menggunakan Dataflow, termasuk petunjuk untuk menggunakan fitur layanan.

Apache Beam SDK adalah model pemrograman open source yang memungkinkan Anda mengembangkan pipeline batch dan streaming. Anda membuat pipeline dengan program Apache Beam, lalu menjalankannya di layanan Dataflow. Dokumentasi Apache Beam memberikan informasi konseptual mendalam dan materi referensi untuk model pemrograman Apache Beam, SDK, dan runner lainnya.

Melakukan streaming analisis data dengan cepat

Dataflow memungkinkan pengembangan pipeline data streaming yang cepat dan sederhana dengan latensi data yang lebih rendah.

Menyederhanakan operasi dan pengelolaan

Biarkan tim untuk berfokus pada pemrograman, dan bukan pengelolaan cluster server karena pendekatan serverless Dataflow menghilangkan beban operasional dari workload rekayasa data.

Mengurangi total biaya kepemilikan

Dengan penskalaan otomatis resource serta kemampuan batch processing yang menggunakan pengoptimalan biaya, Dataflow menawarkan kapasitas tanpa batas untuk mengelola workload musiman atau naik turun tanpa memboroskan anggaran.

Fitur utama

Pengelolaan resource otomatis dan penyeimbangan ulang tugas dinamis

Dataflow mengotomatiskan penyediaan dan pengelolaan resource pemrosesan untuk menekan latensi dan memaksimalkan penggunaan, sehingga Anda tidak perlu lagi mengoperasikan instance atau mencadangkannya secara manual. Pembagian tugas juga diotomatiskan dan dioptimalkan untuk menyeimbangkan kembali tugas yang mengalami lag secara dinamis. Tidak perlu menghapal "hot key" atau memproses data input terlebih dahulu.

Penskalaan otomatis horizontal

Penskalaan horizontal otomatis terhadap resource pekerja demi mendapatkan hasil throughput optimal dan rasio harga terhadap performa keseluruhan yang lebih baik.

Harga penjadwalan resource yang fleksibel untuk batch processing

Untuk pemrosesan dengan waktu penjadwalan tugas yang fleksibel, seperti tugas semalaman, penjadwalan resource yang fleksibel (FlexRS) menawarkan harga yang lebih rendah untuk batch processing. Tugas fleksibel ini ditempatkan dalam antrean dengan jaminan bahwa tugas tersebut akan diambil untuk dieksekusi dalam waktu enam jam.

Yang akan Anda jalankan sebagai bagian dari ini

Dengan menggunakan runner interaktif Apache Beam dengan notebook JupyterLab, Anda dapat mengembangkan pipeline secara iteratif, memeriksa grafik pipeline, dan mengurai setiap PCollection dalam alur kerja read-eval-print-loop (REPL). Notebook Apache Beam ini tersedia melalui Vertex AI Workbench, layanan terkelola yang menghosting virtual machine notebook yang telah diinstal sebelumnya dengan framework data science dan machine learning terbaru.

Codelab ini berfokus pada fungsi yang diperkenalkan oleh notebook Apache Beam.

Yang akan Anda pelajari

  • Cara membuat instance notebook
  • Membuat pipeline dasar
  • Membaca data dari sumber yang tidak terikat
  • Memvisualisasikan data
  • Meluncurkan Tugas Dataflow dari notebook
  • Menyimpan notebook

Yang Anda butuhkan

  • Project Google Cloud Platform dengan Penagihan diaktifkan.
  • Google Cloud Dataflow dan Google Cloud PubSub diaktifkan.

2. Mempersiapkan

  1. Di Konsol Cloud, pada halaman pemilih project, pilih atau buat project Cloud.

Pastikan Anda telah mengaktifkan API berikut:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Anda dapat memverifikasi hal ini dengan memeriksa halaman API & Layanan.

Dalam panduan ini, kita akan membaca data dari langganan Pub/Sub, jadi pastikan akun layanan default Compute Engine memiliki peran Editor, atau berikan peran Pub/Sub Editor kepadanya.

3. Mulai menggunakan notebook Apache Beam

Meluncurkan instance notebook Apache Beam

  1. Luncurkan Dataflow di Konsol:

  1. Pilih halaman Workbench menggunakan menu sebelah kiri.
  2. Pastikan Anda berada di tab User-managed notebooks.
  3. Di toolbar, klik Notebook Baru.
  4. Pilih Apache Beam > Without GPUs.
  5. Di halaman New notebook, pilih subnetwork untuk VM notebook, lalu klik Create.
  6. Klik Open JupyterLab saat link menjadi aktif. Vertex AI Workbench membuat instance notebook Apache Beam baru.

4. Membuat pipeline

Membuat instance notebook

Buka File > New > Notebook, lalu pilih kernel Apache Beam 2.47 atau yang lebih baru.

Mulai menambahkan kode ke notebook Anda

  • Salin dan tempel kode dari setiap bagian dalam sel baru di notebook Anda
  • Jalankan sel

6bd3dd86cc7cf802.png

Dengan menggunakan runner interaktif Apache Beam dengan notebook JupyterLab, Anda dapat mengembangkan pipeline secara iteratif, memeriksa grafik pipeline, dan mengurai setiap PCollection dalam alur kerja read-eval-print-loop (REPL).

Apache Beam diinstal di instance notebook Anda, jadi sertakan modul interactive_runner dan interactive_beam di notebook Anda.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Jika notebook Anda menggunakan layanan Google lainnya, tambahkan pernyataan impor berikut:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Menetapkan opsi interaktivitas

Berikut ini menetapkan durasi pengambilan data menjadi 60 detik. Jika Anda ingin melakukan iterasi lebih cepat, tetapkan durasi yang lebih rendah, misalnya '10 detik'.

ib.options.recording_duration = '60s'

Untuk opsi interaktif tambahan, lihat class interactive_beam.options.

Lakukan inisialisasi pipeline menggunakan objek InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Membaca dan memvisualisasikan data

Contoh berikut menunjukkan pipeline Apache Beam yang membuat langganan ke topik Pub/Sub tertentu dan membaca dari langganan tersebut.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Pipeline menghitung kata-kata berdasarkan jendela dari sumber. Tindakan ini akan membuat windowing tetap dengan setiap jendela berdurasi 10 detik.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Setelah data diatur dalam periode, kata-kata dihitung berdasarkan periode.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Memvisualisasikan data

Metode show() memvisualisasikan PCollection yang dihasilkan di notebook.

ib.show(windowed_word_counts, include_window_info=True)

Metode show yang memvisualisasikan PCollection dalam bentuk tabel.

Untuk menampilkan visualisasi data Anda, teruskan visualize_data=True ke dalam metode show(). Menambahkan sel baru:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Anda dapat menerapkan beberapa filter ke visualisasi. Visualisasi berikut memungkinkan Anda memfilter menurut label dan sumbu:

Metode show memvisualisasikan PCollection sebagai kumpulan elemen UI yang dapat difilter.

5. Menggunakan DataFrame Pandas

Visualisasi lain yang berguna di notebook Apache Beam adalah Pandas DataFrame. Contoh berikut pertama-tama mengonversi kata menjadi huruf kecil, lalu menghitung frekuensi setiap kata.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

Metode collect() memberikan output dalam Pandas DataFrame.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Metode pengumpulan yang merepresentasikan PCollection dalam Pandas DataFrame.

6. (Opsional) Meluncurkan tugas Dataflow dari notebook

  1. Untuk menjalankan tugas di Dataflow, Anda memerlukan izin tambahan. Pastikan akun layanan default Compute Engine memiliki peran Editor, atau berikan peran IAM berikut kepadanya:
  • Dataflow Admin
  • Dataflow Worker
  • Storage Admin, dan
  • Pengguna Akun Layanan (roles/iam.serviceAccountUser)

Lihat informasi selengkapnya tentang peran dalam dokumentasi.

  1. (Opsional) Sebelum menggunakan notebook untuk menjalankan tugas Dataflow, mulai ulang kernel, jalankan ulang semua sel, dan verifikasi output.
  2. Hapus pernyataan impor berikut:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Tambahkan pernyataan import berikut:
from apache_beam.runners import DataflowRunner
  1. Hapus opsi durasi perekaman berikut:
ib.options.recording_duration = '60s'
  1. Tambahkan kode berikut ke opsi pipeline Anda. Anda harus menyesuaikan lokasi Cloud Storage agar mengarah ke bucket yang sudah Anda miliki, atau Anda dapat membuat bucket baru untuk tujuan ini. Anda juga dapat mengubah nilai wilayah dari us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. Di konstruktor beam.Pipeline(), ganti InteractiveRunner dengan DataflowRunner. p adalah objek pipeline dari pembuatan pipeline Anda.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Hapus panggilan interaktif dari kode Anda. Misalnya, hapus show(), collect(), head(), show_graph(), dan watch() dari kode Anda.
  2. Untuk dapat melihat hasil apa pun, Anda harus menambahkan sink. Di bagian sebelumnya, kita memvisualisasikan hasil di notebook, tetapi kali ini, kita menjalankan tugas di luar notebook ini - di Dataflow. Oleh karena itu, kita memerlukan lokasi eksternal untuk hasil kita. Dalam contoh ini, kita akan menulis hasilnya ke file teks di GCS (Google Cloud Storage). Karena ini adalah pipeline streaming, dengan windowing data, kita akan membuat satu file teks per jendela. Untuk melakukannya, tambahkan langkah-langkah berikut ke pipeline Anda:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Tambahkan p.run() di akhir kode pipeline Anda.
  2. Sekarang tinjau kode notebook Anda untuk mengonfirmasi bahwa Anda telah menggabungkan semua perubahan. Konfigurasinya akan terlihat seperti ini:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Jalankan sel.
  2. Anda akan melihat output yang mirip dengan berikut ini:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Untuk memvalidasi apakah tugas sedang berjalan, buka halaman Tugas untuk Dataflow. Anda akan melihat tugas baru dalam daftar. Tugas ini akan memerlukan waktu sekitar 5-10 menit untuk mulai memproses data.
  2. Setelah data diproses, buka Cloud Storage dan buka direktori tempat Dataflow menyimpan hasilnya (output_gcs_location yang Anda tentukan). Anda akan melihat daftar file teks, dengan satu file per jendela. bfcc5ce9e46a8b14.png
  3. Download file dan periksa isinya. File ini harus berisi daftar kata yang dipasangkan dengan jumlahnya. Atau, gunakan antarmuka command line untuk memeriksa file. Anda dapat melakukannya dengan menjalankan kode berikut di sel baru di notebook Anda:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Anda akan melihat output yang serupa dengan ini:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Selesai! Jangan lupa untuk membersihkan dan menghentikan tugas yang telah Anda buat (lihat langkah terakhir codelab ini).

Untuk contoh cara melakukan konversi ini di notebook interaktif, lihat notebook Dataflow Word Count di instance notebook Anda.

Atau, Anda dapat mengekspor notebook sebagai skrip yang dapat dieksekusi, mengubah file .py yang dihasilkan menggunakan langkah-langkah sebelumnya, lalu men-deploy pipeline ke layanan Dataflow.

7. Menyimpan notebook Anda

Notebook yang Anda buat disimpan secara lokal di instance notebook yang sedang berjalan. Jika Anda mereset atau mematikan instance notebook selama pengembangan, notebook baru tersebut akan tetap ada selama dibuat di direktori /home/jupyter. Namun, jika instance notebook dihapus, notebook tersebut juga akan dihapus.

Untuk menyimpan notebook Anda untuk digunakan di masa mendatang, download notebook secara lokal ke workstation Anda, simpan ke GitHub, atau ekspor ke format file lain.

8. Pembersihan

Setelah selesai menggunakan instance notebook Apache Beam, bersihkan resource yang Anda buat di Google Cloud dengan menonaktifkan instance notebook dan menghentikan tugas streaming, jika Anda telah menjalankannya.

Atau, jika Anda telah membuat project untuk tujuan tunggal codelab ini, Anda juga dapat menonaktifkan project sepenuhnya.