1. 简介
在此 Codelab 中,您将探索 Google Cloud 上统一数据湖仓一体的功能。您将与通过 BigLake 上的 Apache Iceberg REST Catalog 提供的公共数据集进行交互,然后将 Google Cloud 的 AI 功能应用于结构化数据和非结构化数据。
您将使用 Apache Iceberg 查询经典的纽约市出租车数据集,深入了解 Time Travel 以审核数据更改,然后使用 BigQuery ML 和 Gemini 对数据运行 AI 模型。
您将执行的操作
- 使用 Google Cloud Serverless for Apache Spark 查询 BigLake 上托管的 Apache Iceberg 公共数据集 。
- 查询 Apache Iceberg 格式的结构化数据。
- 演示 Apache Iceberg 中的 Time Travel 。
- 使用 BigQuery ML 根据结构化数据训练预测模型。
- 创建 BigLake 对象表 (非结构化数据),并使用 Gemini 分析图片。
所需条件
- 网络浏览器,例如 Chrome。
- 启用了结算功能的 Google Cloud 项目。
预计费用和时长
- 完成时间:约 45 分钟。
- 预计费用:< 2.00 美元。我们使用公共数据集和无服务器查询来降低费用。
2. 设置和要求
在此步骤中,您将准备环境并启用必要的 API。
启动 Cloud Shell
您将通过 Google Cloud Shell 运行大多数命令。
- 点击 Google Cloud 控制台顶部的激活 Cloud Shell 。
- 验证身份验证:
gcloud auth list - 确认您的项目:
gcloud config get project - 如果未设置项目,请使用您的项目 ID 进行设置:
gcloud config set project <YOUR_PROJECT_ID>
启用 API
运行以下命令,为 BigQuery、Cloud Resource Manager 和 Vertex AI 启用所需的 API:
gcloud services enable \
bigquery.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
配置环境并创建依赖项存储分区
- 在终端中设置环境变量:
export PROJECT_ID=$(gcloud config get project) export REGION=us-central1 export DEPS_BUCKET=$PROJECT_ID-deps-bucket - 创建依赖项 Cloud Storage 存储分区。PySpark 脚本会在提交作业时上传到此处:
gcloud storage buckets create gs://$DEPS_BUCKET --location=$REGION
3. 连接到 Apache Iceberg 公共目录
在此步骤中,您将连接到 Google Cloud 的 BigLake 上托管的实时生产级 Apache Iceberg 目录。
使用 Serverless for Apache Spark 批量 CLI 运行 Spark SQL
我们将使用 Google Cloud Serverless for Apache Spark 运行 PySpark 作业,而无需管理基础架构。我们将对其进行配置,使其指向公共 BigLake REST Catalog。
- 定义 BigLake REST Catalog 属性,以避免重复这些属性。此配置会告知 Spark:
- 使用
iceberg-spark-runttime和iceberg-gcp-bundle库。 - 使用 BigLake REST Catalog 端点 配置名为
my_catalog的目录。 - 使用 Google Cloud Storage (GCS) 读取数据文件,而不是默认的本地文件系统。
- 将此
my_catalog目录设置为会话的默认 目录。 - 使用销售凭据来增强安全性并简化数据访问权限。
export METASTORE_PROPERTIES="^|^spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0,org.apache.iceberg:iceberg-gcp-bundle:1.10.0|\ spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog|\ spark.sql.catalog.my_catalog.type=rest|\ spark.sql.catalog.my_catalog.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog|\ spark.sql.catalog.my_catalog.warehouse=gs://biglake-public-nyc-taxi-iceberg|\ spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.gcp.gcs.GCSFileIO|\ spark.sql.catalog.my_catalog.header.x-goog-user-project=$PROJECT_ID|\ spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation=vended-credentials|\ spark.sql.catalog.my_catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager|\ spark.sql.defaultCatalog=my_catalog|\ spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions|\ spark.log.level=ERROR" - 使用
- 创建一个简单的测试查询文件:
cat <<EOF > test.py from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sql("SHOW TABLES IN public_data").show() EOF - 提交批量作业:
您应看到类似于以下内容的输出:gcloud dataproc batches submit pyspark \ --project=$PROJECT_ID \ --region=$REGION \ --version=2.3 \ --properties="$METASTORE_PROPERTIES" \ --deps-bucket=gs://$DEPS_BUCKET \ test.py+-----------+----------------+-----------+ | namespace| tableName|isTemporary| +-----------+----------------+-----------+ |public_data| nyc_taxicab| false| |public_data|nyc_taxicab_2021| false| +-----------+----------------+-----------+
4. 查询结构化 Iceberg 数据
连接后,您将拥有对数据集的完整 SQL 访问权限。我们将查询以 Iceberg 表形式建模的纽约市出租车数据集。
运行标准聚合查询
创建一个名为 query.py 的文件:
cat <<EOF > query.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
query = """
SELECT
passenger_count,
COUNT(1) AS num_trips,
ROUND(AVG(total_amount), 2) AS avg_fare,
ROUND(AVG(trip_distance), 2) AS avg_distance
FROM public_data.nyc_taxicab
WHERE data_file_year = 2021 AND passenger_count > 0
GROUP BY passenger_count
ORDER BY num_trips DESC
"""
spark.sql(query).show()
EOF
并使用 Serverless for Apache Spark 提交该文件:
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
query.py
您应该会在控制台中看到类似于以下内容的输出:
+---------------+---------+--------+------------+ |passenger_count|num_trips|avg_fare|avg_distance| +---------------+---------+--------+------------+ | 1| 21508009| 18.82| 3.03| | 2| 4424746| 20.22| 3.40| | 3| 1164846| 19.84| 3.27| | 5| 718282| 18.88| 3.07| | 4| 466485| 20.61| 3.44| | 6| 452467| 18.97| 3.11| | 7| 78| 65.24| 3.71| | 8| 49| 57.39| 5.88| | 9| 35| 73.26| 6.20| | 96| 1| 17.00| 2.00| | 112| 1| 15.00| 2.00| +---------------+---------+--------+------------+
为何在此处使用 Apache Iceberg?
- 分区修剪:查询会根据
data_file_year = 2021进行过滤。Iceberg 允许引擎完全跳过扫描其他年份的数据。 - 引擎敏捷性:您可以在 Spark、Trino 或 BigQuery 中运行此操作,而无需复制数据!
5. Apache Iceberg 中的 Time Travel
Time Travel 是 Iceberg 最强大的功能之一。借助此功能,您可以查询过去版本或快照中的数据。
查看表历史记录
创建一个名为 history.py 的文件:
cat <<EOF > history.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sql("SELECT * FROM public_data.nyc_taxicab.history").show()
EOF
并提交该文件:
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
history.py
您应该会在控制台中看到类似于以下内容的输出:
+--------------------+-------------------+-------------------+-------------------+ | made_current_at| snapshot_id| parent_id|is_current_ancestor| +--------------------+-------------------+-------------------+-------------------+ |2026-01-07 21:32:...|6333415779680505547| NULL| true| |2026-01-07 21:34:...|1840345522877675925|6333415779680505547| true| |2026-01-07 21:36:...|7203554539964460256|1840345522877675925| true| |2026-01-07 21:38:...|4573466015237516024|7203554539964460256| true| |2026-01-07 21:40:...|3353190952148867790|4573466015237516024| true| |2026-01-07 21:42:...|1335547378580631681|3353190952148867790| true| |2026-01-07 21:44:...|8203141258229894239|1335547378580631681| true| |2026-01-07 21:46:...|1597048231706307813|8203141258229894239| true| |2026-01-07 21:48:...|6247811509231462655|1597048231706307813| true| |2026-01-07 21:50:...|2527184310045633322|6247811509231462655| true| |2026-01-07 21:52:...|2512764101237223642|2527184310045633322| true| |2026-01-07 21:52:...|7045957533358062548|2512764101237223642| true| |2026-01-07 21:53:...| 531753237516076726|7045957533358062548| true| |2026-01-07 21:53:...|4184653573199718274| 531753237516076726| true| |2026-01-07 21:54:...|5125223829492177301|4184653573199718274| true| |2026-01-07 21:54:...|6844673237417600305|5125223829492177301| true| |2026-01-07 21:54:...|6634828203344518093|6844673237417600305| true| |2026-01-07 21:55:...|7637728273407236194|6634828203344518093| true| |2026-01-07 21:55:...|3424071684958740192|7637728273407236194| true| |2026-01-07 21:55:...|1743746294196424254|3424071684958740192| true| +--------------------+-------------------+-------------------+-------------------+
您将看到表示不同快照 ID 的行以及提交时间。
比较当前行数与过去行数
创建一个名为 timetravel.py 的文件:
cat <<EOF > timetravel.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
query = """
SELECT 'Current State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab
UNION ALL
SELECT 'Past State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab VERSION AS OF 4573466015237516024
"""
spark.sql(query).show()
EOF
并提交该文件:
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
timetravel.py
您应该会在控制台中看到类似于以下内容的输出:
+-------------+----------+ | version| count| +-------------+----------+ |Current State|1293069366| | Past State| 72878594| +-------------+----------+
这可确保您可以审核随时间推移的数据更改。
6. 使用 BigQuery ML 进行结构化 AI
您已探索 Iceberg 数据,现在让我们使用 BigQuery AI 功能!由于公共 Iceberg 目录是只读的,因此我们可以使用 BigQuery 通过读取公共表在工作区中训练模型。
创建本地数据集
首先,在您的项目中创建一个数据集,以使用 bq CLI 存储 AI 模型:
bq mk --location=$REGION --project_id=$PROJECT_ID iceberg_ai
训练线性回归模型
现在,您将使用公共 BigLake Iceberg 表训练线性回归模型。
创建一个查询文件并使用 bq query 训练模型:
cat <<'EOF' > train_model.sql
CREATE OR REPLACE MODEL `iceberg_ai.predict_fare`
OPTIONS(model_type='LINEAR_REG', input_label_cols=['fare_amount']) AS
SELECT fare_amount, passenger_count, CAST(trip_distance AS FLOAT64) AS trip_distance
FROM `bigquery-public-data`.`biglake-public-nyc-taxi-iceberg`.public_data.nyc_taxicab
WHERE fare_amount > 0 AND trip_distance > 0 AND RAND() < 0.01; -- Using 1% of data to downsample
EOF
bq query --location=$REGION --use_legacy_sql=false < train_model.sql
使用模型进行预测
模型经过训练后,您可以使用 ML.PREDICT 预测新行程的票价。
创建一个查询文件并使用 bq query 运行预测:
cat <<'EOF' > predict_fare.sql
SELECT
predicted_fare_amount, passenger_count, trip_distance
FROM
ML.PREDICT(MODEL `iceberg_ai.predict_fare`,
(
SELECT 2 AS passenger_count, 5.0 AS trip_distance
)
);
EOF
bq query --location=$REGION --use_legacy_sql=false < predict_fare.sql
您应看到类似于以下内容的输出:
+-----------------------+-----------------+---------------+ | predicted_fare_amount | passenger_count | trip_distance | +-----------------------+-----------------+---------------+ | 14.12252095150709 | 2 | 5.0 | +-----------------------+-----------------+---------------+
7. 使用 BigLake 进行非结构化 AI
数据不仅仅是行和列。统一数据湖仓一体还可以处理非结构化数据(图片、PDF)。让我们使用对象表 和对象引用 查询非结构化数据。
对象表 是 BigQuery 中只读的外部表,用于列出 Cloud Storage 路径中的对象。每一行代表一个文件,其中包含 uri、size 等元数据的列,以及包含 ObjectRef 的特殊 ref 列。
对象引用 (ObjectRef) 指向单个文件的实际数据。现代 BigQuery ML 函数(如 AI.GENERATE 或 AI.AGG)会使用 ObjectRef 读取文件内容(图片、音频或文本)以进行分析,而无需将字节加载到标准表中。
为非结构化 AI 创建数据集
首先,在您的项目中创建第二个数据集,以使用 US 多区域中的 bq CLI 存储对象表:
bq mk --location=US --project_id=$PROJECT_ID iceberg_object_ai
创建外部连接
如需从 BigQuery 查询 Cloud Storage 中存储的数据(包括对象表和非结构化数据),您需要创建外部连接。
在 Cloud Shell 中运行以下命令以创建 Cloud 资源连接:
bq mk --connection --project_id=$PROJECT_ID --location=US --connection_type=CLOUD_RESOURCE my-conn
找到为您的连接创建的服务账号 ID:
CONNECTION_SA=$(bq show --format=json --project_id=$PROJECT_ID --connection $PROJECT_ID.us.my-conn | jq -r '.serviceAccountId // .cloudResource.serviceAccountId')
向服务账号授予 Vertex AI User 和 Storage Object Viewer 角色,以便其可以调用 Gemini 模型并读取 GCS 数据:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CONNECTION_SA" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CONNECTION_SA" \
--role="roles/storage.objectViewer"
创建对象表
我们将使用上一部分中创建的外部连接 my-conn 来访问非结构化数据。创建一个查询文件并使用 bq query 创建对象表:
cat <<'EOF' > create_object_table.sql
CREATE EXTERNAL TABLE `iceberg_object_ai.sample_images`
WITH CONNECTION `us.my-conn`
OPTIONS (
object_metadata = 'SIMPLE',
uris = ['gs://cloud-samples-data/vision/landmark/*']
);
EOF
bq query --use_legacy_sql=false < create_object_table.sql
在对象数据上使用 Gemini
现在,使用 Gemini 运行查询来评估图片,而无需下载这些图片!
使用标准 SQL 通过 bq query 查询图片:
cat <<EOF > query_images.sql
SELECT
uri,
image_analysis.description
FROM (
SELECT
uri,
AI.GENERATE(
(
'Identify what is happening in the image.',
ref
),
connection_id => 'us.my-conn',
endpoint => 'gemini-2.5-flash-lite',
output_schema => 'event STRING, severity STRING, description STRING'
) AS image_analysis
FROM
iceberg_object_ai.sample_images
);
EOF
bq query --use_legacy_sql=false < query_images.sql
示例输出:
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| uri | description |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| gs://cloud-samples-data/vision/landmark/eiffel_tower.jpg | The Eiffel Tower stands tall against a cloudy sky, overlooking the Seine River in Paris. Boats are docked along the riverbank, and trees line the opposite shore, with bridges and buildings visible in the distance. |
| gs://cloud-samples-data/vision/landmark/pofa.jpg | A wide shot shows the Palace of Fine Arts, a monumental structure in San Francisco, California. The building features a large rotunda with a dome, surrounded by colonnades. In front of the rotunda is a lagoon. Several people are walking around the grounds. The sky is blue with a few scattered clouds. |
| gs://cloud-samples-data/vision/landmark/st_basils.jpeg | A monument stands in front of Saint Basil's Cathedral in Moscow under a bright blue sky with scattered white clouds. The cathedral features distinctive onion domes in various colors and patterns, including red, blue and white stripes, green and beige stripes, and red and blue diamonds. A large green tree partially obscures the left side of the cathedral. People are visible in the foreground near the base of the monument and the cathedral entrance. |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
直接探索 ObjectRef:情感分析
虽然对象表会自动管理文件引用,但您可以使用 BigQuery 对象引用 直接与这些对象互动,以对单个文件运行即时分析。
例如,您可以使用存储在您自己的 GCS 存储分区中的小型文本文件(使用之前创建的 $DEPS_BUCKET 变量),并使用 OBJ.MAKE_REF 和 bq query 对其进行分析。
首先,创建一个小型文本文件并将其上传到您的存储分区:
cat <<'EOF' > review.txt
This product is fantastic! It exceeded my expectations. The quality is top-notch. I highly recommend it to everyone!
EOF
gcloud storage cp review.txt gs://${DEPS_BUCKET}/review.txt
现在,使用标准 SQL 中的 OBJ.MAKE_REF 查询该文件:
cat <<EOF > sentiment_analysis.sql
SELECT
AI.GENERATE(
(
'Analyze the sentiment of this text file. Is it positive, negative, or neutral? Explain why.',
OBJ.MAKE_REF('gs://${DEPS_BUCKET}/review.txt', 'us.my-conn')
),
connection_id => 'us.my-conn',
endpoint => 'gemini-2.5-flash-lite'
).result AS ml_generate_text_result;
EOF
bq query --use_legacy_sql=false < sentiment_analysis.sql
示例输出:
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ml_generate_text_result |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| This text file has a **strongly positive** sentiment. |
| |
| Here's why: |
| |
| * **Positive Keywords:** The text is filled with unequivocally positive words and phrases: |
| * "fantastic" |
| * "exceeded my expectations" |
| * "top-notch" |
| * "highly recommend" |
| |
| * **Enthusiastic Language:** The use of exclamation marks ("!") further amplifies the positive tone, indicating excitement and strong approval. |
| |
| * **Lack of Negative or Neutral Elements:** There are no words, phrases, or implications that suggest any dissatisfaction, criticism, or even indifference. |
| |
| In summary, the author's language is enthusiastic and uses multiple strong positive descriptors, leaving no room for doubt that their opinion of the product is overwhelmingly positive. |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
8. 清理
为避免系统持续向您的 Google Cloud 账号收取费用,请删除在此 Codelab 期间创建的资源。
删除数据集和连接
在 Cloud Shell 中运行以下命令,以删除数据集和连接:
bq rm -r -f --location=$REGION iceberg_ai
bq rm -r -f --location=US iceberg_object_ai
bq rm --connection $PROJECT_ID.US.my-conn
删除 GCS 存储分区和本地文件
清理 GCS 存储分区和本地文件:
# Delete GCS buckets
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
gcloud storage rm -r gs://dataproc-temp-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://dataproc-staging-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://${DEPS_BUCKET}
# Delete local files
rm -f train_model.sql predict_fare.sql create_object_table.sql query_images.sql sentiment_analysis.sql test.py query.py history.py timetravel.py review.txt
如果您仅为此实验创建了项目,也可以删除整个项目。
9. 恭喜
恭喜!您已使用 Apache Iceberg、BigLake 和 BigQuery AI 成功构建了统一数据湖仓一体!
您学到的内容
- 如何连接和查询 公共 Apache Iceberg REST Catalog。
- 在 Iceberg 中使用 Time Travel 审核数据集版本。
- 根据结构化数据训练 BigQuery ML 模型 。
- 使用对象表和 ObjectRef 连接非结构化数据(图片) 。
- 直接在 BigQuery SQL 中使用 Gemini 分析图片。