1. Ringkasan

Apa yang dimaksud dengan Dataflow?
Dataflow adalah layanan terkelola untuk menjalankan berbagai macam pola pemrosesan data. Dokumentasi di situs ini menunjukkan cara men-deploy pipeline pemrosesan data batch dan streaming menggunakan Dataflow, termasuk petunjuk untuk menggunakan fitur layanan.
Apache Beam SDK adalah model pemrograman open source yang memungkinkan Anda mengembangkan pipeline batch dan streaming. Anda membuat pipeline dengan program Apache Beam, lalu menjalankannya di layanan Dataflow. Dokumentasi Apache Beam memberikan informasi konseptual mendalam dan materi referensi untuk model pemrograman Apache Beam, SDK, dan runner lainnya.
Melakukan streaming analisis data dengan cepat
Dataflow memungkinkan pengembangan pipeline data streaming yang cepat dan sederhana dengan latensi data yang lebih rendah.
Menyederhanakan operasi dan pengelolaan
Biarkan tim untuk berfokus pada pemrograman, dan bukan pengelolaan cluster server karena pendekatan serverless Dataflow menghilangkan beban operasional dari workload rekayasa data.
Mengurangi total biaya kepemilikan
Dengan penskalaan otomatis resource serta kemampuan batch processing yang menggunakan pengoptimalan biaya, Dataflow menawarkan kapasitas tanpa batas untuk mengelola workload musiman atau naik turun tanpa memboroskan anggaran.
Fitur utama
Pengelolaan resource otomatis dan penyeimbangan ulang tugas dinamis
Dataflow mengotomatiskan penyediaan dan pengelolaan resource pemrosesan untuk menekan latensi dan memaksimalkan penggunaan, sehingga Anda tidak perlu lagi mengoperasikan instance atau mencadangkannya secara manual. Pembagian tugas juga diotomatiskan dan dioptimalkan untuk menyeimbangkan kembali tugas yang mengalami lag secara dinamis. Tidak perlu menghapal "hot key" atau memproses data input terlebih dahulu.
Penskalaan otomatis horizontal
Penskalaan horizontal otomatis terhadap resource pekerja demi mendapatkan hasil throughput optimal dan rasio harga terhadap performa keseluruhan yang lebih baik.
Harga penjadwalan resource yang fleksibel untuk batch processing
Untuk pemrosesan dengan waktu penjadwalan tugas yang fleksibel, seperti tugas semalaman, penjadwalan resource yang fleksibel (FlexRS) menawarkan harga yang lebih rendah untuk batch processing. Tugas fleksibel ini ditempatkan dalam antrean dengan jaminan bahwa tugas tersebut akan diambil untuk dieksekusi dalam waktu enam jam.
Tutorial ini diadaptasi dari https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
Yang akan Anda pelajari
- Cara membuat project Maven dengan Apache Beam, menggunakan Java SDK
- Menjalankan contoh pipeline menggunakan Konsol Google Cloud Platform
- Cara menghapus bucket Cloud Storage terkait dan isinya
Yang Anda butuhkan
Bagaimana Anda akan menggunakan tutorial ini?
Bagaimana penilaian Anda terhadap pengalaman menggunakan layanan Google Cloud Platform?
2. Penyiapan dan Persyaratan
Penyiapan lingkungan mandiri
- Login ke Cloud Console dan buat project baru atau gunakan kembali project yang sudah ada. (Jika belum memiliki akun Gmail atau G Suite, Anda harus membuatnya.)
Ingat project ID, nama unik di semua project Google Cloud (maaf, nama di atas telah digunakan dan tidak akan berfungsi untuk Anda!) Project ID tersebut selanjutnya akan dirujuk di codelab ini sebagai PROJECT_ID.
- Selanjutnya, Anda harus mengaktifkan penagihan di Cloud Console untuk menggunakan resource Google Cloud.
Menjalankan operasi dalam codelab ini seharusnya tidak memerlukan banyak biaya, bahkan mungkin tidak sama sekali. Pastikan untuk mengikuti petunjuk yang ada di bagian "Membersihkan" yang memberi tahu Anda cara menonaktifkan resource sehingga tidak menimbulkan penagihan di luar tutorial ini. Pengguna baru Google Cloud memenuhi syarat untuk mengikuti program Uji Coba Gratis senilai $300 USD.
Aktifkan API
Klik ikon menu di kiri atas layar.

Pilih APIs & Services > Dashboard dari drop-down.

Pilih + Enable APIs and Services.

Telusuri "Compute Engine" di kotak penelusuran. Klik "Compute Engine API" dalam daftar hasil yang muncul.

Di halaman Google Compute Engine, klik Enable

Setelah diaktifkan, klik panah untuk kembali.
Sekarang telusuri API berikut dan aktifkan juga:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- JSON Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. Membuat bucket Cloud Storage baru
Di Google Cloud Platform Console, klik ikon Menu di kiri atas layar:

Scroll ke bawah dan pilih Cloud Storage > Browser di subbagian Storage:

Sekarang Anda akan melihat Browser Cloud Storage, dan jika Anda menggunakan project yang saat ini tidak memiliki bucket Cloud Storage, Anda akan melihat undangan untuk membuat bucket baru. Tekan tombol Buat bucket untuk membuatnya:

Masukkan nama untuk bucket Anda. Seperti yang tercantum dalam kotak dialog, nama bucket harus unik di seluruh Cloud Storage. Jadi, jika Anda memilih nama yang jelas, seperti "test", Anda mungkin akan mendapati bahwa orang lain telah membuat bucket dengan nama tersebut, dan akan menerima error.
Ada juga beberapa aturan terkait karakter yang diizinkan dalam nama bucket. Jika Anda memulai dan mengakhiri nama bucket dengan huruf atau angka, dan hanya menggunakan tanda hubung di tengah, Anda tidak akan mengalami masalah. Jika Anda mencoba menggunakan karakter khusus, atau mencoba memulai atau mengakhiri nama bucket dengan sesuatu selain huruf atau angka, kotak dialog akan mengingatkan Anda tentang aturan tersebut.

Masukkan nama unik untuk bucket Anda, lalu tekan Buat. Jika Anda memilih sesuatu yang sudah digunakan, Anda akan melihat pesan error yang ditampilkan di atas. Setelah berhasil membuat bucket, Anda akan diarahkan ke bucket baru yang kosong di browser:

Tentu saja, nama bucket yang Anda lihat akan berbeda karena harus unik di semua project.
4. Mulai Cloud Shell
Mengaktifkan Cloud Shell
- Dari Cloud Console, klik Aktifkan Cloud Shell
.
Jika belum pernah memulai Cloud Shell, Anda akan melihat layar perantara (di paruh bawah) yang menjelaskan apa itu Cloud Shell. Jika demikian, klik Lanjutkan (dan Anda tidak akan pernah melihatnya lagi). Berikut tampilan layar sekali-tampil tersebut:
Perlu waktu beberapa saat untuk penyediaan dan terhubung ke Cloud Shell.
Mesin virtual ini berisi semua alat pengembangan yang Anda perlukan. Layanan ini menawarkan direktori beranda tetap sebesar 5 GB dan beroperasi di Google Cloud, sehingga sangat meningkatkan performa dan autentikasi jaringan. Sebagian besar pekerjaan Anda dalam codelab ini dapat dilakukan hanya dengan browser atau Chromebook.
Setelah terhubung ke Cloud Shell, Anda akan melihat bahwa Anda sudah diautentikasi dan project sudah ditetapkan ke project ID Anda.
- Jalankan perintah berikut di Cloud Shell untuk mengonfirmasi bahwa Anda telah diautentikasi:
gcloud auth list
Output perintah
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
Output perintah
[core] project = <PROJECT_ID>
Jika tidak, Anda dapat menyetelnya dengan perintah ini:
gcloud config set project <PROJECT_ID>
Output perintah
Updated property [core/project].
5. Membuat project Maven
Setelah Cloud Shell diluncurkan, mari kita mulai dengan membuat project Maven menggunakan Java SDK untuk Apache Beam.
Apache Beam adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline ini dengan program Apache Beam dan dapat memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.
Jalankan perintah mvn archetype:generate di shell Anda sebagai berikut:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Setelah menjalankan perintah, Anda akan melihat direktori baru bernama first-dataflow di direktori Anda saat ini. first-dataflow berisi project Maven yang mencakup Cloud Dataflow SDK untuk Java dan contoh pipeline.
6. Menjalankan pipeline pemrosesan teks di Cloud Dataflow
Mari mulai dengan menyimpan project ID dan nama bucket Cloud Storage sebagai variabel lingkungan. Anda dapat melakukannya di Cloud Shell. Pastikan untuk mengganti <your_project_id> dengan project ID Anda.
export PROJECT_ID=<your_project_id>
Sekarang kita akan melakukan hal yang sama untuk bucket Cloud Storage. Ingat, ganti <your_bucket_name> dengan nama unik yang Anda gunakan untuk membuat bucket di langkah sebelumnya.
export BUCKET_NAME=<your_bucket_name>
Ubah ke direktori first-dataflow/.
cd first-dataflow
Kita akan menjalankan pipeline bernama WordCount, yang membaca teks, membuat token baris teks menjadi kata individual, dan menjalankan penghitungan frekuensi pada setiap kata tersebut. Pertama, kita akan menjalankan pipeline, dan saat pipeline berjalan, kita akan melihat apa yang terjadi di setiap langkah.
Mulai pipeline dengan menjalankan perintah mvn compile exec:java di shell atau jendela terminal Anda. Untuk argumen --project, --stagingLocation, dan --output, perintah di bawah merujuk ke variabel lingkungan yang Anda siapkan sebelumnya pada langkah ini.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Saat tugas berjalan, mari kita temukan tugas tersebut dalam daftar tugas.
Buka UI Web Cloud Dataflow di Google Cloud Platform Console. Anda akan melihat tugas wordcount Anda dengan status Running:

Sekarang, mari kita lihat parameter pipeline. Mulai dengan mengklik nama tugas Anda:

Saat memilih tugas, Anda dapat melihat grafik eksekusi. Grafik eksekusi pipeline merepresentasikan setiap transformasi dalam pipeline sebagai kotak yang berisi nama transformasi dan beberapa informasi status. Anda dapat mengklik tanda sisipan di sudut kanan atas setiap langkah untuk melihat detail selengkapnya:

Mari kita lihat bagaimana pipeline mengubah data di setiap langkah:
- Baca: Pada langkah ini, pipeline membaca dari sumber input. Dalam hal ini, file tersebut adalah file teks dari Cloud Storage yang berisi seluruh teks drama Shakespeare King Lear. Pipeline kami membaca file baris demi baris dan menghasilkan setiap
PCollection, di mana setiap baris dalam file teks kami adalah elemen dalam koleksi. - CountWords: Langkah
CountWordsmemiliki dua bagian. Pertama, fungsi ini menggunakan fungsi parallel do (ParDo) bernamaExtractWordsuntuk melakukan tokenisasi setiap baris menjadi kata-kata individual. Output ExtractWords adalah PCollection baru yang setiap elemennya adalah kata. Langkah berikutnya,Count, menggunakan transformasi yang disediakan oleh Java SDK yang menampilkan pasangan nilai kunci, dengan kunci adalah kata unik dan nilai adalah jumlah kemunculannya. Berikut adalah metode yang menerapkanCountWords, dan Anda dapat melihat file WordCount.java lengkap di GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: Ini memanggil
FormatAsTextFn, yang disalin di bawah, yang memformat setiap pasangan nilai kunci menjadi string yang dapat dicetak.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: Pada langkah ini, kita menulis string yang dapat dicetak ke dalam beberapa file teks yang di-shard.
Kita akan melihat hasil output dari pipeline dalam beberapa menit.
Sekarang lihat halaman Info tugas di sebelah kanan grafik, yang mencakup parameter pipeline yang kami sertakan dalam perintah mvn compile exec:java.


Anda juga dapat melihat Penghitung kustom untuk pipeline, yang dalam hal ini menunjukkan jumlah baris kosong yang telah ditemukan sejauh ini selama eksekusi. Anda dapat menambahkan penghitung baru ke pipeline untuk melacak metrik spesifik per aplikasi.

Anda dapat mengklik ikon Log di bagian bawah konsol untuk melihat pesan error tertentu.

Panel ini secara default menampilkan pesan Log Tugas yang melaporkan status tugas secara keseluruhan. Anda dapat menggunakan pemilih Tingkat Keparahan Minimum untuk memfilter pesan progres dan status tugas.

Memilih langkah pipeline dalam grafik akan mengubah tampilan ke log yang dihasilkan oleh kode Anda dan kode yang dihasilkan yang berjalan dalam langkah pipeline.
Untuk kembali ke Log Tugas, batalkan pilihan langkah dengan mengklik di luar grafik atau menggunakan tombol Tutup di panel samping kanan.
Anda dapat menggunakan tombol Worker Logs di tab log untuk melihat log pekerja untuk instance Compute Engine yang menjalankan pipeline Anda. Log Pekerja terdiri dari baris log yang dihasilkan oleh kode Anda dan kode yang dihasilkan Dataflow yang menjalankannya.
Jika Anda mencoba men-debug kegagalan dalam pipeline, sering kali ada logging tambahan di Log Pekerja yang membantu menyelesaikan masalah. Perlu diingat bahwa log ini digabungkan di semua pekerja, dan dapat difilter serta ditelusuri.

Antarmuka Monitoring Dataflow hanya menampilkan pesan log terbaru. Anda dapat melihat semua log dengan mengklik link Google Cloud Observability di sisi kanan panel log.

Berikut ringkasan berbagai jenis log yang tersedia untuk dilihat dari halaman Monitoring→Logs:
- Log job-message berisi pesan tingkat tugas yang dihasilkan oleh berbagai komponen Dataflow. Contohnya mencakup konfigurasi penskalaan otomatis, saat pekerja memulai atau mematikan, progres pada langkah tugas, dan error tugas. Error tingkat pekerja yang berasal dari kode pengguna yang error dan yang ada di log worker juga diteruskan ke log job-message.
- Log pekerja dihasilkan oleh pekerja Dataflow. Worker melakukan sebagian besar pekerjaan pipeline (misalnya, menerapkan ParDo ke data). Log Worker berisi pesan yang dicatat oleh kode dan Dataflow Anda.
- Log worker-startup ada di sebagian besar tugas Dataflow dan dapat merekam pesan yang terkait dengan proses startup. Proses startup mencakup mendownload JAR tugas dari Cloud Storage, lalu memulai pekerja. Jika ada masalah saat memulai pekerja, log ini adalah tempat yang tepat untuk melihatnya.
- Log shuffler berisi pesan dari pekerja yang menggabungkan hasil operasi pipeline paralel.
- Log docker dan kubelet berisi pesan yang terkait dengan teknologi publik ini, yang digunakan pada pekerja Dataflow.
Pada langkah berikutnya, kita akan memeriksa apakah pekerjaan Anda berhasil.
7. Memeriksa apakah pekerjaan telah berhasil
Buka UI Web Cloud Dataflow di Google Cloud Platform Console.
Di awal, Anda akan melihat tugas wordcount Anda dengan status Running, lalu Succeeded:

Tugas ini akan memerlukan waktu sekitar 3-4 menit untuk dijalankan.
Ingatkah saat Anda menjalankan pipeline dan menentukan bucket output? Mari kita lihat hasilnya (karena Anda pasti ingin tahu berapa kali setiap kata dalam King Lear muncul, bukan?). Kembali ke Browser Cloud Storage di Konsol Google Cloud Platform. Di bucket, Anda akan melihat file output dan file penyiapan yang dibuat oleh tugas Anda:

8. Mematikan resource
Anda dapat mematikan resource dari Konsol Google Cloud Platform.
Buka browser Cloud Storage di Konsol Google Cloud Platform.

Centang kotak di samping bucket yang Anda buat, lalu klik HAPUS untuk menghapus bucket dan isinya secara permanen.


9. Selamat!
Anda telah mempelajari cara membuat project Maven dengan Cloud Dataflow SDK, menjalankan contoh pipeline menggunakan Konsol Google Cloud Platform, dan menghapus bucket Cloud Storage terkait beserta isinya.
Pelajari Lebih Lanjut
- Dokumentasi Dataflow: https://cloud.google.com/dataflow/docs/
Lisensi
Karya ini dilisensikan berdasarkan Lisensi Umum Creative Commons Attribution 3.0, dan lisensi Apache 2.0.