1. Giới thiệu

Google Cloud Dataflow
Lần cập nhật gần đây nhất: 5/7/2023
Dataflow là gì?
Dataflow là một dịch vụ được quản lý để thực thi nhiều mẫu xử lý dữ liệu. Tài liệu trên trang web này hướng dẫn bạn cách triển khai các quy trình xử lý dữ liệu hàng loạt và truyền trực tuyến bằng Dataflow, bao gồm cả hướng dẫn sử dụng các tính năng của dịch vụ.
Apache Beam SDK là một mô hình lập trình nguồn mở cho phép bạn phát triển cả quy trình xử lý hàng loạt và quy trình truyền trực tuyến. Bạn tạo các quy trình bằng chương trình Apache Beam rồi chạy chúng trên dịch vụ Dataflow. Tài liệu về Apache Beam cung cấp thông tin khái niệm chuyên sâu và tài liệu tham khảo cho mô hình lập trình, SDK và các trình chạy khác của Apache Beam.
Phân tích dữ liệu truyền trực tuyến với tốc độ cao
Dataflow cho phép phát triển quy trình truyền dữ liệu trực tuyến nhanh chóng, đơn giản với độ trễ dữ liệu thấp hơn.
Đơn giản hoá hoạt động và quản lý
Cho phép các nhóm tập trung vào việc lập trình thay vì quản lý các cụm máy chủ vì phương pháp không máy chủ của Dataflow giúp loại bỏ chi phí vận hành cho các khối lượng công việc kỹ thuật dữ liệu.
Giảm tổng chi phí sở hữu
Tính năng tự động cấp tài nguyên bổ sung kết hợp với khả năng xử lý hàng loạt được tối ưu hoá chi phí có nghĩa là Dataflow cung cấp dung lượng gần như không giới hạn để quản lý các khối lượng công việc theo mùa và đột biến mà không tốn quá nhiều chi phí.
Các tính năng chính
Quản lý tài nguyên tự động và cân bằng lại công việc một cách linh động
Dataflow tự động cung cấp và quản lý các tài nguyên xử lý để giảm thiểu độ trễ và tối đa hoá mức sử dụng, nhờ đó bạn không cần khởi động các phiên bản hoặc đặt trước các phiên bản theo cách thủ công. Việc phân vùng công việc cũng được tự động hoá và tối ưu hoá để cân bằng lại động các công việc bị trễ. Không cần tìm kiếm "phím nóng" hoặc xử lý trước dữ liệu đầu vào.
Tự động mở rộng quy mô theo chiều ngang
Tính năng tự động mở rộng quy mô theo chiều ngang của tài nguyên worker để có được kết quả thông lượng tối ưu, giúp cải thiện hiệu suất tổng thể so với giá.
Giá theo lịch tài nguyên linh hoạt để xử lý hàng loạt
Để xử lý với thời gian lập lịch linh hoạt cho công việc, chẳng hạn như công việc qua đêm, tính năng lập lịch tài nguyên linh hoạt (FlexRS) cung cấp mức giá thấp hơn cho việc xử lý hàng loạt. Những công việc linh hoạt này được đưa vào một hàng đợi và đảm bảo sẽ được truy xuất để thực thi trong vòng 6 giờ.
Những gì bạn sẽ chạy trong quá trình này
Việc sử dụng trình chạy tương tác Apache Beam với sổ tay JupyterLab cho phép bạn phát triển các quy trình theo cách lặp đi lặp lại, kiểm tra biểu đồ quy trình và phân tích cú pháp từng PCollection trong quy trình làm việc vòng lặp read-eval-print (REPL). Các sổ tay Apache Beam này được cung cấp thông qua Vertex AI Workbench, một dịch vụ được quản lý, lưu trữ các máy ảo sổ tay đã cài đặt sẵn các khung học máy và khoa học dữ liệu mới nhất.
Lớp học lập trình này tập trung vào chức năng do sổ tay Apache Beam giới thiệu.
Kiến thức bạn sẽ học được
- Cách tạo một phiên bản sổ tay
- Tạo một quy trình cơ bản
- Đọc dữ liệu từ nguồn không giới hạn
- Trực quan hoá dữ liệu
- Chạy một Công việc Dataflow từ sổ tay
- Lưu sổ tay
Bạn cần có
- Một dự án trên Google Cloud Platform đã bật tính năng Thanh toán.
- Đã bật Google Cloud Dataflow và Google Cloud Pub/Sub.
2. Thiết lập
- Trong Cloud Console, trên trang bộ chọn dự án, hãy chọn hoặc tạo một dự án trên đám mây.
Đảm bảo rằng bạn đã bật các API sau:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
Bạn có thể xác minh điều này bằng cách kiểm tra trang API và dịch vụ.
Trong hướng dẫn này, chúng ta sẽ đọc dữ liệu từ một gói thuê bao Pub/Sub. Do đó, hãy đảm bảo rằng tài khoản dịch vụ mặc định của Compute Engine có vai trò Người chỉnh sửa hoặc cấp cho tài khoản đó vai trò Người chỉnh sửa Pub/Sub.
3. Bắt đầu sử dụng sổ tay Apache Beam
Chạy một phiên bản sổ tay Apache Beam
- Chạy Dataflow trên Bảng điều khiển:
- Chọn trang Workbench bằng trình đơn bên trái.
- Đảm bảo rằng bạn đang ở thẻ Sổ tay do người dùng quản lý.
- Trên thanh công cụ, hãy nhấp vào Sổ tay mới.
- Chọn Apache Beam > Without GPUs (Không có GPU).
- Trên trang Sổ tay mới, hãy chọn một mạng con cho VM sổ tay rồi nhấp vào Tạo.
- Nhấp vào Open JupyterLab (Mở JupyterLab) khi đường liên kết chuyển sang trạng thái hoạt động. Vertex AI Workbench tạo một phiên bản sổ tay Apache Beam mới.
4. Tạo đường ống
Tạo một phiên bản sổ tay
Chuyển đến File > New > Notebook (Tệp > Mới > Sổ tay) rồi chọn một nhân là Apache Beam 2.47 trở lên.
Bắt đầu thêm mã vào sổ tay
- Sao chép và dán mã từ mỗi phần vào một ô mới trong sổ tay
- Chạy ô

Việc sử dụng trình chạy tương tác Apache Beam với sổ tay JupyterLab cho phép bạn phát triển các quy trình theo cách lặp đi lặp lại, kiểm tra biểu đồ quy trình và phân tích cú pháp từng PCollection trong quy trình làm việc vòng lặp read-eval-print (REPL).
Apache Beam được cài đặt trên phiên bản sổ tay của bạn, vì vậy, hãy thêm các mô-đun interactive_runner và interactive_beam vào sổ tay.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Nếu sổ tay của bạn sử dụng các dịch vụ khác của Google, hãy thêm các câu lệnh nhập sau:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Đặt các lựa chọn tương tác
Sau đây là cách đặt thời lượng thu thập dữ liệu thành 60 giây. Nếu bạn muốn lặp lại nhanh hơn, hãy đặt thời lượng ngắn hơn, ví dụ: "10 giây".
ib.options.recording_duration = '60s'
Để biết thêm các lựa chọn tương tác, hãy xem lớp interactive_beam.options.
Khởi động quy trình bằng đối tượng InteractiveRunner.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Đọc và trực quan hoá dữ liệu
Ví dụ sau đây cho thấy một quy trình Apache Beam tạo một gói thuê bao cho chủ đề Pub/Sub đã cho và đọc từ gói thuê bao đó.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
Quy trình này đếm số từ theo cửa sổ từ nguồn. Thao tác này sẽ tạo ra cửa sổ cố định, mỗi cửa sổ có thời lượng 10 giây.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
Sau khi dữ liệu được phân chia theo khoảng thời gian, các từ sẽ được đếm theo khoảng thời gian.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Trực quan hoá dữ liệu
Phương thức show() trực quan hoá PCollection thu được trong sổ tay.
ib.show(windowed_word_counts, include_window_info=True)

Để hiển thị hình ảnh hoá dữ liệu, hãy truyền visualize_data=True vào phương thức show(). Thêm ô mới:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
Bạn có thể áp dụng nhiều bộ lọc cho các hình ảnh trực quan. Hình ảnh trực quan sau đây cho phép bạn lọc theo nhãn và trục:

5. Sử dụng Pandas Dataframe
Một hình ảnh trực quan hữu ích khác trong sổ tay Apache Beam là Pandas DataFrame. Ví dụ sau đây trước tiên chuyển đổi các từ thành chữ thường, sau đó tính tần suất của từng từ.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Phương thức collect() cung cấp đầu ra trong Pandas DataFrame.
ib.collect(windowed_lower_word_counts, include_window_info=True)

6. (Không bắt buộc) Chạy các công việc Dataflow từ sổ tay
- Để chạy các công việc trên Dataflow, bạn cần có thêm quyền. Đảm bảo tài khoản dịch vụ mặc định của Compute Engine có vai trò Người chỉnh sửa hoặc cấp cho tài khoản này các vai trò IAM sau:
- Quản trị viên Dataflow
- Worker Dataflow
- Quản trị viên bộ nhớ và
- Người dùng tài khoản dịch vụ (roles/iam.serviceAccountUser)
Xem thêm thông tin về các vai trò trong tài liệu.
- (Không bắt buộc) Trước khi sử dụng sổ tay để chạy các công việc Dataflow, hãy khởi động lại nhân, chạy lại tất cả các ô và xác minh đầu ra.
- Xoá các câu lệnh nhập sau:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Thêm câu lệnh nhập sau:
from apache_beam.runners import DataflowRunner
- Xoá lựa chọn thời lượng ghi hình sau:
ib.options.recording_duration = '60s'
- Thêm nội dung sau vào các lựa chọn của quy trình. Bạn cần điều chỉnh vị trí Cloud Storage để trỏ đến một bộ chứa mà bạn đã sở hữu hoặc bạn có thể tạo một bộ chứa mới cho mục đích này. Bạn cũng có thể thay đổi giá trị khu vực từ
us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- Trong hàm khởi tạo của
beam.Pipeline(), hãy thay thếInteractiveRunnerbằngDataflowRunner.plà đối tượng quy trình từ việc tạo quy trình.
p = beam.Pipeline(DataflowRunner(), options=options)
- Xoá các lệnh gọi tương tác khỏi mã của bạn. Ví dụ: xoá
show(),collect(),head(),show_graph()vàwatch()khỏi mã của bạn. - Để có thể xem kết quả, bạn cần thêm một bồn lưu trữ dữ liệu. Trong phần trước, chúng ta đã trực quan hoá kết quả trong sổ tay, nhưng lần này, chúng ta sẽ chạy công việc bên ngoài sổ tay này – trong Dataflow. Do đó, chúng tôi cần một vị trí bên ngoài cho kết quả của mình. Trong ví dụ này, chúng ta sẽ ghi kết quả vào các tệp văn bản trong GCS (Google Cloud Storage). Vì đây là một quy trình truyền trực tuyến, có tính năng phân chia dữ liệu theo khoảng thời gian, nên chúng ta sẽ muốn tạo một tệp văn bản cho mỗi khoảng thời gian. Để đạt được điều này, hãy thêm các bước sau vào quy trình của bạn:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Thêm
p.run()vào cuối mã quy trình. - Bây giờ, hãy xem lại mã trong sổ tay để xác nhận rằng bạn đã kết hợp tất cả các thay đổi. Nội dung này sẽ có dạng như sau:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Chạy các ô.
- Bạn sẽ thấy kết quả tương tự như sau:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Để xác thực xem công việc có đang chạy hay không, hãy chuyển đến trang Công việc cho Dataflow. Bạn sẽ thấy một công việc mới trong danh sách. Công việc này sẽ mất khoảng 5 đến 10 phút để bắt đầu xử lý dữ liệu.
- Sau khi dữ liệu đang được xử lý, hãy chuyển đến Cloud Storage rồi chuyển đến thư mục nơi Dataflow đang lưu trữ kết quả (
output_gcs_locationdo bạn xác định). Bạn sẽ thấy một danh sách các tệp văn bản, với một tệp cho mỗi cửa sổ.
- Tải tệp xuống và kiểm tra nội dung. Tệp này phải chứa danh sách các từ được ghép với số lượng của chúng. Ngoài ra, bạn có thể dùng giao diện dòng lệnh để kiểm tra các tệp. Bạn có thể thực hiện việc này bằng cách chạy lệnh sau trong một ô mới trong sổ tay:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Bạn sẽ thấy kết quả tương tự như sau:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Vậy là xong! Đừng quên dọn dẹp và dừng công việc mà bạn đã tạo (xem bước cuối cùng của lớp học lập trình này).
Để xem ví dụ về cách thực hiện quá trình chuyển đổi này trên một sổ tay tương tác, hãy xem sổ tay Đếm từ Dataflow trong phiên bản sổ tay của bạn.
Ngoài ra, bạn có thể xuất sổ tay dưới dạng một tập lệnh có thể thực thi, sửa đổi tệp .py đã tạo bằng các bước trước đó, rồi triển khai quy trình vào dịch vụ Dataflow.
7. Đang lưu sổ tay
Sổ tay bạn tạo sẽ được lưu cục bộ trong phiên bản sổ tay đang chạy. Nếu bạn đặt lại hoặc tắt phiên bản sổ tay trong quá trình phát triển, thì những sổ tay mới đó sẽ được duy trì miễn là chúng được tạo trong thư mục /home/jupyter. Tuy nhiên, nếu một phiên bản sổ tay bị xoá, thì những sổ tay đó cũng sẽ bị xoá.
Để lưu sổ tay cho lần sử dụng sau, hãy tải sổ tay xuống máy trạm của bạn, lưu vào GitHub hoặc xuất sang một định dạng tệp khác.
8. Dọn dẹp
Sau khi bạn dùng xong phiên bản sổ tay Apache Beam, hãy dọn dẹp các tài nguyên mà bạn đã tạo trên Google Cloud bằng cách tắt phiên bản sổ tay và dừng công việc truyền phát trực tuyến (nếu bạn đã chạy một công việc).
Ngoài ra, nếu đã tạo một dự án chỉ dành riêng cho lớp học lập trình này, bạn cũng có thể tắt hoàn toàn dự án.