1. Giới thiệu
Luồng dữ liệu Google Cloud
Lần cập nhật gần đây nhất: 5/7/2023
Dataflow là gì?
Dataflow là một dịch vụ được quản lý để thực thi nhiều mẫu xử lý dữ liệu. Tài liệu trên trang web này cho bạn biết cách triển khai quy trình xử lý dữ liệu hàng loạt và truyền trực tuyến bằng Dataflow, bao gồm cả hướng dẫn sử dụng các tính năng dịch vụ.
Apache Beam SDK là một mô hình lập trình nguồn mở cho phép bạn phát triển cả quy trình hàng loạt và quy trình phát trực tuyến. Bạn tạo quy trình bằng chương trình Apache Beam, sau đó chạy quy trình trên dịch vụ Dataflow. Tài liệu về Apache Beam cung cấp thông tin khái niệm chuyên sâu và tài liệu tham khảo cho mô hình lập trình Apache Beam, SDK và các trình chạy khác.
Truyền trực tuyến phân tích dữ liệu một cách nhanh chóng
Dataflow giúp phát triển quy trình dữ liệu truyền trực tuyến nhanh chóng và đơn giản với độ trễ dữ liệu thấp hơn.
Đơn giản hoá việc vận hành và quản lý
Cho phép các nhóm tập trung vào việc lập trình thay vì quản lý các cụm máy chủ, vì phương pháp không máy chủ của Dataflow giúp loại bỏ chi phí vận hành khỏi các tải công việc về kỹ thuật dữ liệu.
Giảm tổng chi phí sở hữu
Phương pháp tự động cấp tài nguyên kết hợp với khả năng xử lý hàng loạt để tối ưu hoá chi phí có nghĩa là Dataflow cung cấp dung lượng gần như vô hạn để quản lý các khối lượng công việc tăng đột biến theo mùa mà không chi tiêu quá mức.
Các tính năng chính
Quản lý tài nguyên tự động và cân bằng công việc một cách linh hoạt
Dataflow tự động hoá việc cấp phép và quản lý tài nguyên xử lý để giảm thiểu độ trễ và tối đa hoá việc sử dụng. Nhờ đó, bạn không cần phải tạo bản sao hoặc đặt trước các bản sao theo cách thủ công. Việc phân chia công việc cũng được tự động hoá và tối ưu hoá để tự động cân bằng lại công việc bị trễ. Không cần phải theo đuổi "phím nóng" hoặc xử lý trước dữ liệu đầu vào.
Tự động cấp tài nguyên bổ sung theo chiều ngang
Việc tự động cấp tài nguyên bổ sung theo chiều ngang cho tài nguyên của worker theo chiều ngang để đạt công suất tối ưu, giúp cải thiện hiệu suất tổng thể.
Định giá linh hoạt khi lên lịch cho tài nguyên để xử lý hàng loạt
Để xử lý linh hoạt trong thời gian lên lịch công việc, chẳng hạn như các công việc qua đêm, tính năng lập lịch tài nguyên linh hoạt (FlexRS) đưa ra mức giá thấp hơn khi xử lý hàng loạt. Các công việc linh hoạt này được đưa vào hàng đợi với đảm bảo rằng chúng sẽ được truy xuất để thực thi trong khoảng thời gian 6 giờ.
Bạn sẽ chạy chương trình nào trong quá trình này
Việc sử dụng trình chạy tương tác Apache Beam với sổ tay JupyterLab cho phép bạn phát triển liên tục các quy trình, kiểm tra biểu đồ quy trình và phân tích cú pháp từng PBộ sưu tập riêng lẻ trong quy trình làm việc đọc eval-print-loop (RepL). Những sổ tay Apache Beam này được cung cấp thông qua Vertex AI Workbench, một dịch vụ được quản lý lưu trữ các máy ảo sổ tay được cài đặt sẵn các khung công nghệ học máy và khoa học dữ liệu mới nhất.
Lớp học lập trình này tập trung vào chức năng mà sổ tay Apache Beam giới thiệu.
Kiến thức bạn sẽ học được
- Cách tạo một thực thể sổ tay
- Tạo một quy trình cơ bản
- Đọc dữ liệu từ nguồn không bị ràng buộc
- Trực quan hoá dữ liệu
- Khởi chạy một công việc Dataflow từ sổ tay
- Lưu sổ tay
Bạn cần có
- Một dự án Google Cloud Platform đã bật tính năng Thanh toán.
- Đã bật Google Cloud Dataflow và Google Cloud PubSub.
2. Thiết lập
- Trong Cloud Console, trên trang bộ chọn dự án, hãy chọn hoặc tạo một dự án Cloud.
Đảm bảo rằng bạn đã bật các API sau:
- API Dataflow
- API Cloud Pub/Sub
- Compute Engine
- API Notebooks
Bạn có thể xác minh điều này bằng cách kiểm tra trên Dịch vụ.
Trong hướng dẫn này, chúng ta sẽ đọc dữ liệu của một gói thuê bao Pub/Sub. Do đó, hãy đảm bảo rằng tài khoản dịch vụ mặc định của Compute Engine có vai trò Người chỉnh sửa hoặc cấp cho tài khoản đó vai trò Người chỉnh sửa Pub/Sub.
3. Bắt đầu sử dụng sổ tay Apache Beam
Khởi chạy một thực thể của sổ tay Apache Beam
- Chạy Dataflow trên Console:
- Chọn trang Workbench bằng cách sử dụng trình đơn bên trái.
- Đảm bảo rằng bạn đang ở trên thẻ Sổ tay do người dùng quản lý.
- Trên thanh công cụ, hãy nhấp vào Sổ tay mới.
- Chọn Apache Beam > Không có GPU.
- Trên trang Sổ tay mới, hãy chọn một mạng con cho máy ảo sổ tay rồi nhấp vào Tạo.
- Nhấp vào Open JupyterLab khi đường liên kết hoạt động. Vertex AI Workbench tạo một thực thể mới cho sổ tay Apache Beam.
4. Tạo quy trình
Tạo một thực thể sổ tay
Chuyển đến Tệp > Mới > Sổ tay và chọn một nhân hệ điều hành là Apache Beam 2.47 trở lên.
Bắt đầu thêm mã vào sổ tay
- Sao chép và dán mã từ mỗi phần trong một ô mới trên sổ tay của bạn
- Thực thi ô
Việc sử dụng trình chạy tương tác Apache Beam với sổ tay JupyterLab cho phép bạn phát triển liên tục các quy trình, kiểm tra biểu đồ quy trình và phân tích cú pháp từng PBộ sưu tập riêng lẻ trong quy trình làm việc đọc eval-print-loop (RepL).
Apache Beam được cài đặt trên thực thể sổ tay của bạn, vì vậy, hãy bao gồm các mô-đun interactive_runner
và interactive_beam
trong sổ tay của bạn.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
Nếu sổ tay của bạn sử dụng các dịch vụ khác của Google, hãy thêm các câu lệnh nhập sau:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
Đặt các tuỳ chọn tương tác
Phần sau đây đặt thời lượng thu thập dữ liệu thành 60 giây. Nếu bạn muốn lặp lại nhanh hơn, hãy đặt thời lượng ngắn hơn, ví dụ: "10 giây".
ib.options.recording_duration = '60s'
Để biết các tuỳ chọn tương tác khác, hãy xem lớp tương tác_beam.options.
Khởi động quy trình bằng đối tượng InteractiveRunner
.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
Đọc và trực quan hoá dữ liệu
Ví dụ sau đây cho thấy một quy trình Apache Beam sẽ tạo một gói thuê bao cho chủ đề Pub/Sub nhất định và đọc từ gói thuê bao.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
Quy trình đếm số từ theo cửa sổ từ nguồn. Cách này tạo cửa sổ cố định, trong đó mỗi cửa sổ có thời lượng 10 giây.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
Sau khi dữ liệu được tạo thành cửa sổ, các từ sẽ được đếm theo cửa sổ.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
Trực quan hoá dữ liệu
Phương thức show()
sẽ trực quan hoá PCollection thu được trong sổ tay.
ib.show(windowed_word_counts, include_window_info=True)
Để hiển thị hình ảnh dữ liệu của bạn, hãy truyền visualize_data=True
vào phương thức show()
. Thêm một ô mới:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
Bạn có thể áp dụng nhiều bộ lọc cho hình ảnh trực quan. Hình ảnh sau đây cho phép bạn lọc theo nhãn và trục:
5. Sử dụng Pandas Dataframe
Một hình ảnh hữu ích khác trong sổ tay Apache Beam là Pandas DataFrame. Ví dụ sau đây trước tiên chuyển đổi các từ thành chữ thường, sau đó tính toán tần suất của mỗi từ.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
Phương thức collect()
cung cấp kết quả trong DataFrame Gấu trúc.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (Không bắt buộc) Khởi chạy công việc Dataflow từ sổ tay của bạn
- Để chạy công việc trên Dataflow, bạn cần có thêm quyền. Đảm bảo tài khoản dịch vụ mặc định của Compute Engine có vai trò Người chỉnh sửa hoặc cấp cho tài khoản đó các vai trò IAM sau đây:
- Quản trị viên Dataflow
- Worker Dataflow
- Quản trị viên bộ nhớ, và
- Người dùng tài khoản dịch vụ (roles/iam.serviceAccountUser)
Xem thêm về các vai trò trong tài liệu này.
- (Không bắt buộc) Trước khi sử dụng sổ tay để chạy công việc Dataflow, hãy khởi động lại nhân hệ điều hành, chạy lại tất cả các ô và xác minh kết quả.
- Xoá các câu lệnh nhập sau:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- Thêm câu lệnh nhập sau:
from apache_beam.runners import DataflowRunner
- Xoá tuỳ chọn thời lượng ghi sau đây:
ib.options.recording_duration = '60s'
- Thêm nội dung sau đây vào các tuỳ chọn quy trình. Bạn sẽ cần điều chỉnh vị trí Cloud Storage để trỏ đến một bộ chứa bạn đã sở hữu hoặc bạn có thể tạo một bộ chứa mới cho mục đích này. Bạn cũng có thể thay đổi giá trị khu vực từ
us-central1
.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- Trong hàm khởi tạo của
beam.Pipeline()
, hãy thay thếInteractiveRunner
bằngDataflowRunner
.p
là đối tượng quy trình trong quá trình tạo quy trình.
p = beam.Pipeline(DataflowRunner(), options=options)
- Xoá các cuộc gọi tương tác khỏi mã của bạn. Ví dụ: xoá
show()
,collect()
,head()
,show_graph()
vàwatch()
khỏi mã của bạn. - Để có thể xem bất kỳ kết quả nào, bạn sẽ cần phải thêm bồn rửa. Trong phần trước, chúng ta đã trực quan hoá kết quả trong sổ tay, nhưng lần này, chúng ta đang chạy công việc bên ngoài sổ tay này – trong Dataflow. Do đó, chúng tôi cần một địa điểm bên ngoài cho các kết quả của mình. Trong ví dụ này, chúng ta sẽ ghi kết quả vào tệp văn bản trong GCS (Google Cloud Storage). Vì đây là một quy trình truyền trực tuyến, cùng với chế độ cửa sổ dữ liệu, chúng ta sẽ cần tạo một tệp văn bản cho mỗi cửa sổ. Để làm được điều này, hãy thêm các bước sau vào quy trình của bạn:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- Thêm
p.run()
vào cuối mã quy trình. - Bây giờ, hãy xem lại mã sổ tay của bạn để xác nhận rằng bạn đã kết hợp tất cả thay đổi. Đoạn mã sẽ có dạng như sau:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- Thực thi các ô.
- Bạn sẽ thấy kết quả tương tự như sau:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- Để xác thực xem công việc có đang chạy hay không, hãy chuyển đến trang Công việc để tìm Dataflow. Bạn sẽ thấy một việc làm mới trong danh sách. Công việc này sẽ mất khoảng 5-10 phút để bắt đầu xử lý dữ liệu.
- Sau khi dữ liệu được xử lý, hãy chuyển đến Cloud Storage rồi chuyển đến thư mục nơi Dataflow lưu trữ kết quả (
output_gcs_location
mà bạn đã xác định). Bạn sẽ thấy danh sách các tệp văn bản, trong đó một tệp cho mỗi cửa sổ. - Tải tệp xuống rồi kiểm tra nội dung. Tệp này phải chứa danh sách các từ ghép với số lượng của từ đó. Ngoài ra, hãy sử dụng giao diện dòng lệnh để kiểm tra tệp. Bạn có thể thực hiện việc này bằng cách chạy dòng sau trong một ô mới trong sổ tay:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- Bạn sẽ thấy kết quả tương tự như sau:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Vậy là xong! Đừng quên dọn dẹp và dừng công việc bạn đã tạo (xem bước cuối cùng của lớp học lập trình này).
Để xem ví dụ về cách thực hiện việc chuyển đổi này trên một sổ tay tương tác, hãy xem sổ tay Dataflow Đếm từ trong phiên bản sổ tay của bạn.
Ngoài ra, bạn có thể xuất sổ tay dưới dạng tập lệnh thực thi, sửa đổi tệp .py đã tạo bằng các bước trước đó, sau đó triển khai quy trình đến dịch vụ Dataflow.
7. Đang lưu sổ tay của bạn
Các sổ tay bạn tạo được lưu trên máy tính trong thực thể sổ tay đang chạy của bạn. Nếu bạn đặt lại hoặc tắt thực thể sổ tay trong quá trình phát triển, những sổ tay mới đó sẽ vẫn tồn tại miễn là chúng được tạo trong thư mục /home/jupyter
. Tuy nhiên, nếu một thực thể của sổ tay bị xoá, thì các sổ tay đó cũng sẽ bị xoá.
Để giữ lại sổ tay để dùng sau này, hãy tải sổ tay xuống máy trạm của bạn, lưu sổ tay vào GitHub hoặc xuất sang một định dạng tệp khác.
8. Dọn dẹp
Sau khi sử dụng xong phiên bản sổ tay Apache Beam, bạn có thể dọn dẹp các tài nguyên bạn đã tạo trên Google Cloud bằng cách tắt phiên bản sổ tay đó và dừng công việc truyền trực tuyến (nếu bạn có chạy một phiên bản).
Ngoài ra, nếu đã tạo một dự án với mục đích duy nhất của lớp học lập trình này, bạn cũng có thể ngừng hoạt động hoàn toàn dự án đó.