使用 Apache Iceberg 和 BigLake 构建统一的数据湖仓,以用于 AI

1. 简介

在此 Codelab 中,您将探索 Google Cloud 上统一数据湖仓一体架构的功能。您将与通过 BigLake 上的 Apache Iceberg REST Catalog 提供的公开数据集进行交互,然后将 Google Cloud 的 AI 功能应用于结构化和非结构化数据。

您将使用 Apache Iceberg 查询经典 NYC 出租车数据集,深入了解时间旅行以审核数据更改,然后使用 BigQuery MLGemini 对数据运行 AI 模型。

您将执行的操作

  • 使用 Google Cloud Serverless for Apache Spark 查询托管在 BigLake 上的 Apache Iceberg 公共数据集
  • 查询 Apache Iceberg 格式的结构化数据。
  • 演示 Apache Iceberg 中的时间旅行
  • 使用 BigQuery ML 基于结构化数据训练预测模型。
  • 创建 BigLake 对象表(非结构化数据),并使用 Gemini 分析图片。

所需条件

  • 网络浏览器,例如 Chrome
  • 启用了结算功能的 Google Cloud 项目。

预期费用和时长

  • 完成所需时间:约 45 分钟。
  • 估算费用:低于 2.00 美元。我们使用公共数据集和无服务器查询来降低费用。

2. 设置和要求

在此步骤中,您将准备环境并启用必要的 API。

启动 Cloud Shell

您将通过 Google Cloud Shell 运行大部分命令。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 验证身份验证:
    gcloud auth list
    
  3. 确认您的项目:
    gcloud config get project
    
  4. 如果项目未设置,请使用您的项目 ID 进行设置:
    gcloud config set project <YOUR_PROJECT_ID>
    

启用 API

运行以下命令,为 BigQueryCloud Resource ManagerVertex AI 启用所需的 API:

gcloud services enable \
  bigquery.googleapis.com \
  aiplatform.googleapis.com \
  cloudresourcemanager.googleapis.com

配置环境并创建依赖项存储分区

  1. 在终端中设置环境变量:
    export PROJECT_ID=$(gcloud config get project)
    export REGION=us-central1
    export DEPS_BUCKET=$PROJECT_ID-deps-bucket
    
  2. 创建依赖项 Cloud Storage 存储分区。PySpark 脚本在提交作业时上传到此处:
    gcloud storage buckets create gs://$DEPS_BUCKET --location=$REGION
    

3. 连接到 Apache Iceberg 公共目录

在此步骤中,您将连接到 Google Cloud 的 BigLake 上托管的实时生产级 Apache Iceberg Catalog。

使用 Serverless for Apache Spark Batch CLI 运行 Spark SQL

我们将使用 Google Cloud Serverless for Apache Spark 运行 PySpark 作业,而无需管理基础设施。我们将配置它以指向公共 BigLake REST 目录。

  1. 定义 BigLake REST Catalog 属性,以避免重复定义。此配置会告知 Spark:
    • 使用 iceberg-spark-runtimeiceberg-gcp-bundle 库。
    • 使用 BigLake REST 目录端点配置名为 my_catalog 的目录。
    • 使用 Google Cloud Storage (GCS) 读取数据文件,而不是使用默认的本地文件系统。
    • 将此 my_catalog 目录设置为会话的默认目录。
    • 使用出售的凭据来增强安全性并简化数据访问权限。
    export METASTORE_PROPERTIES="^|^spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0,org.apache.iceberg:iceberg-gcp-bundle:1.10.0|\
    spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog|\
    spark.sql.catalog.my_catalog.type=rest|\
    spark.sql.catalog.my_catalog.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog|\
    spark.sql.catalog.my_catalog.warehouse=gs://biglake-public-nyc-taxi-iceberg|\
    spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.gcp.gcs.GCSFileIO|\
    spark.sql.catalog.my_catalog.header.x-goog-user-project=$PROJECT_ID|\
    spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation=vended-credentials|\
    spark.sql.catalog.my_catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager|\
    spark.sql.defaultCatalog=my_catalog|\
    spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions|\
    spark.log.level=ERROR"
    
  2. 创建简单的测试查询文件:
    cat <<EOF > test.py
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    spark.sql("SHOW TABLES IN public_data").show()
    EOF
    
  3. 提交批量作业:
    gcloud dataproc batches submit pyspark \
      --project=$PROJECT_ID \
      --region=$REGION \
      --version=2.3 \
      --properties="$METASTORE_PROPERTIES" \
      --deps-bucket=gs://$DEPS_BUCKET \
      test.py
    
    等待批量作业完成运行几分钟。作业完成后,您应该会看到类似于以下内容的输出:
    +-----------+----------------+-----------+
    |  namespace|       tableName|isTemporary|
    +-----------+----------------+-----------+
    |public_data|     nyc_taxicab|      false|
    |public_data|nyc_taxicab_2021|      false|
    +-----------+----------------+-----------+
    

4. 查询结构化 Iceberg 数据

连接后,您将拥有对数据集的完整 SQL 访问权限。我们将查询建模为 Iceberg 表的纽约市出租车数据集。

运行标准聚合查询

创建一个名为 query.py 的文件:

cat <<EOF > query.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

query = """
SELECT
  passenger_count,
  COUNT(1) AS num_trips,
  ROUND(AVG(total_amount), 2) AS avg_fare,
  ROUND(AVG(trip_distance), 2) AS avg_distance
FROM public_data.nyc_taxicab
WHERE data_file_year = 2021 AND passenger_count > 0
GROUP BY passenger_count
ORDER BY num_trips DESC
"""

spark.sql(query).show()
EOF

然后使用 Serverless for Apache Spark 提交该作业:

gcloud dataproc batches submit pyspark \
  --project=$PROJECT_ID \
  --region=$REGION \
  --version=2.3 \
  --properties="$METASTORE_PROPERTIES" \
  --deps-bucket=gs://$DEPS_BUCKET \
  query.py

等待批量作业运行完成,这可能需要几分钟时间。

作业完成后,您应该会看到类似于以下内容的输出:

+---------------+---------+--------+------------+
|passenger_count|num_trips|avg_fare|avg_distance|
+---------------+---------+--------+------------+
|              1| 21508009|   18.82|        3.03|
|              2|  4424746|   20.22|        3.40|
|              3|  1164846|   19.84|        3.27|
|              5|   718282|   18.88|        3.07|
|              4|   466485|   20.61|        3.44|
|              6|   452467|   18.97|        3.11|
|              7|       78|   65.24|        3.71|
|              8|       49|   57.39|        5.88|
|              9|       35|   73.26|        6.20|
|             96|        1|   17.00|        2.00|
|            112|        1|   15.00|        2.00|
+---------------+---------+--------+------------+

为何在此处使用 Apache Iceberg?

  • 分区剪枝:查询对 data_file_year = 2021 进行过滤。Iceberg 可让引擎完全跳过扫描其他年份的数据。
  • 引擎灵活性:您可以在 Spark、Trino 或 BigQuery 中运行此功能,而无需复制数据!

5. Apache Iceberg 中的时间旅行

Iceberg 最强大的功能之一是时间旅行。借助此功能,您可以查询过去版本或快照中的数据。

查看表格历史记录

创建一个名为 history.py 的文件:

cat <<EOF > history.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark.sql("SELECT * FROM public_data.nyc_taxicab.history").show()
EOF

然后提交:

gcloud dataproc batches submit pyspark \
  --project=$PROJECT_ID \
  --region=$REGION \
  --version=2.3 \
  --properties="$METASTORE_PROPERTIES" \
  --deps-bucket=gs://$DEPS_BUCKET \
  history.py

您应该会在控制台中看到类似于以下内容的输出:

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2026-01-07 21:32:...|6333415779680505547|               NULL|               true|
|2026-01-07 21:34:...|1840345522877675925|6333415779680505547|               true|
|2026-01-07 21:36:...|7203554539964460256|1840345522877675925|               true|
|2026-01-07 21:38:...|4573466015237516024|7203554539964460256|               true|
|2026-01-07 21:40:...|3353190952148867790|4573466015237516024|               true|
|2026-01-07 21:42:...|1335547378580631681|3353190952148867790|               true|
|2026-01-07 21:44:...|8203141258229894239|1335547378580631681|               true|
|2026-01-07 21:46:...|1597048231706307813|8203141258229894239|               true|
|2026-01-07 21:48:...|6247811509231462655|1597048231706307813|               true|
|2026-01-07 21:50:...|2527184310045633322|6247811509231462655|               true|
|2026-01-07 21:52:...|2512764101237223642|2527184310045633322|               true|
|2026-01-07 21:52:...|7045957533358062548|2512764101237223642|               true|
|2026-01-07 21:53:...| 531753237516076726|7045957533358062548|               true|
|2026-01-07 21:53:...|4184653573199718274| 531753237516076726|               true|
|2026-01-07 21:54:...|5125223829492177301|4184653573199718274|               true|
|2026-01-07 21:54:...|6844673237417600305|5125223829492177301|               true|
|2026-01-07 21:54:...|6634828203344518093|6844673237417600305|               true|
|2026-01-07 21:55:...|7637728273407236194|6634828203344518093|               true|
|2026-01-07 21:55:...|3424071684958740192|7637728273407236194|               true|
|2026-01-07 21:55:...|1743746294196424254|3424071684958740192|               true|
+--------------------+-------------------+-------------------+-------------------+

您会看到表示不同快照 ID 及其提交时间的行。

比较当前与过去的行数

创建一个名为 timetravel.py 的文件:

cat <<EOF > timetravel.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

query = """
SELECT 'Current State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab
UNION ALL
SELECT 'Past State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab VERSION AS OF 4573466015237516024
"""

spark.sql(query).show()
EOF

然后提交:

gcloud dataproc batches submit pyspark \
  --project=$PROJECT_ID \
  --region=$REGION \
  --version=2.3 \
  --properties="$METASTORE_PROPERTIES" \
  --deps-bucket=gs://$DEPS_BUCKET \
  timetravel.py

您应该会在控制台中看到类似于以下内容的输出:

+-------------+----------+
|      version|     count|
+-------------+----------+
|Current State|1293069366|
|   Past State|  72878594|
+-------------+----------+

这样可确保您能够随时审核数据变化。

6. 利用 BigQuery ML 实现结构化 AI

现在,您已探索了 Iceberg 数据,接下来让我们使用 BigQuery AI 功能!由于公共 Iceberg 目录是只读的,因此我们可以使用 BigQuery 通过读取公共表来训练工作区中的模型。

创建本地数据集

首先,使用 bq CLI 在项目中创建一个数据集来存储 AI 模型:

bq mk --location=$REGION --project_id=$PROJECT_ID iceberg_ai

训练线性回归模型

现在,您将使用公共 BigLake Iceberg 表训练线性回归模型。

创建查询文件并使用 bq query 训练模型:

cat <<'EOF' > train_model.sql
CREATE OR REPLACE MODEL `iceberg_ai.predict_fare`
OPTIONS(model_type='LINEAR_REG', input_label_cols=['fare_amount']) AS
SELECT fare_amount, passenger_count, CAST(trip_distance AS FLOAT64) AS trip_distance
FROM `bigquery-public-data`.`biglake-public-nyc-taxi-iceberg`.public_data.nyc_taxicab
WHERE fare_amount > 0 AND trip_distance > 0 AND RAND() < 0.01; -- Using 1% of data to downsample
EOF

bq query --location=$REGION --use_legacy_sql=false < train_model.sql

使用模型进行预测

现在模型已训练完毕,您可以使用 ML.PREDICT 来预测新行程的费用金额。

创建查询文件并使用 bq query 运行预测:

cat <<'EOF' > predict_fare.sql
SELECT
  predicted_fare_amount, passenger_count, trip_distance
FROM
  ML.PREDICT(MODEL `iceberg_ai.predict_fare`,
    (
    SELECT 2 AS passenger_count, 5.0 AS trip_distance
    )
  );
EOF

bq query --location=$REGION --use_legacy_sql=false < predict_fare.sql

您应看到类似于以下内容的输出:

+-----------------------+-----------------+---------------+
| predicted_fare_amount | passenger_count | trip_distance |
+-----------------------+-----------------+---------------+
|     14.12252095150709 |               2 |           5.0 |
+-----------------------+-----------------+---------------+

7. 使用 BigLake 的非结构化 AI

数据不仅仅是行和列。统一的数据湖仓还可以处理非结构化数据(图片、PDF)。我们来使用对象表对象引用查询非结构化数据。

对象表是 BigQuery 中的只读外部表,用于列出 Cloud Storage 路径中的对象。每一行代表一个文件,其中包含 urisize 等元数据列,以及一个包含 ObjectRef 的特殊 ref 列。

对象引用 (ObjectRef) 指向单个文件的实际数据。新版 BigQuery ML 函数(例如 AI.GENERATEAI.AGG)会消耗 ObjectRef 来读取文件内容(图片、音频或文本)以进行分析,而无需将字节加载到标准表中。

创建非结构化 AI 数据集

首先,在您的项目中创建第二个数据集,以使用 US 多区域中的 bq CLI 存储对象表:

bq mk --location=US --project_id=$PROJECT_ID iceberg_object_ai

创建外部连接

如需从 BigQuery 查询存储在 Cloud Storage 中的数据(包括对象表和非结构化数据),您需要创建外部连接。

在 Cloud Shell 中运行以下命令,以创建 Cloud 资源连接:

bq mk --connection --project_id=$PROJECT_ID --location=US --connection_type=CLOUD_RESOURCE my-conn

查找为您的连接创建的服务账号 ID:

CONNECTION_SA=$(bq show --format=json --project_id=$PROJECT_ID --connection $PROJECT_ID.us.my-conn | jq -r '.serviceAccountId // .cloudResource.serviceAccountId')

向服务账号授予 Vertex AI UserStorage Object Viewer 角色,以便其可以调用 Gemini 模型并读取 GCS 数据:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CONNECTION_SA" \
  --role="roles/aiplatform.user"

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$CONNECTION_SA" \
  --role="roles/storage.objectViewer"

创建对象表

我们将使用上一部分中创建的外部连接 my-conn 来访问非结构化数据。创建查询文件并使用 bq query 创建对象表:

cat <<'EOF' > create_object_table.sql
CREATE EXTERNAL TABLE `iceberg_object_ai.sample_images`
WITH CONNECTION `us.my-conn`
OPTIONS (
  object_metadata = 'SIMPLE',
  uris = ['gs://cloud-samples-data/vision/landmark/*']
);
EOF

bq query --use_legacy_sql=false < create_object_table.sql

在对象数据上使用 Gemini

现在,您可以使用 Gemini 运行查询,在不下载图片的情况下评估图片!

通过 bq query 使用标准 SQL 查询图片:

cat <<EOF > query_images.sql
SELECT
  uri,
  image_analysis.description
FROM (
  SELECT
    uri,
    AI.GENERATE(
      (
        'Identify what is happening in the image.',
        ref
      ),
      connection_id => 'us.my-conn',
      endpoint => 'gemini-2.5-flash-lite',
      output_schema => 'event STRING, severity STRING, description STRING'
    ) AS image_analysis
  FROM
    iceberg_object_ai.sample_images
);
EOF

bq query --use_legacy_sql=false < query_images.sql

示例输出:

+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                           uri                            |                                                                                                                                                                                                                             description                                                                                                                                                                                                                             |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| gs://cloud-samples-data/vision/landmark/eiffel_tower.jpg | The Eiffel Tower stands tall against a cloudy sky, overlooking the Seine River in Paris. Boats are docked along the riverbank, and trees line the opposite shore, with bridges and buildings visible in the distance.                                                                                                                                                                                                                                               |
| gs://cloud-samples-data/vision/landmark/pofa.jpg         | A wide shot shows the Palace of Fine Arts, a monumental structure in San Francisco, California. The building features a large rotunda with a dome, surrounded by colonnades. In front of the rotunda is a lagoon. Several people are walking around the grounds. The sky is blue with a few scattered clouds.                                                                                                                                                       |
| gs://cloud-samples-data/vision/landmark/st_basils.jpeg   | A monument stands in front of Saint Basil's Cathedral in Moscow under a bright blue sky with scattered white clouds. The cathedral features distinctive onion domes in various colors and patterns, including red, blue and white stripes, green and beige stripes, and red and blue diamonds. A large green tree partially obscures the left side of the cathedral. People are visible in the foreground near the base of the monument and the cathedral entrance. |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

直接探索 ObjectRef:情感分析

虽然对象表会自动管理文件引用,但您可以使用 BigQuery 对象引用直接与这些对象互动,以便对单个文件进行临时分析。

例如,您可以使用存储在您自己的 GCS 存储分区中的小型文本文件(使用之前创建的 $DEPS_BUCKET 变量),并使用 bq query 通过 OBJ.MAKE_REF 对其进行分析。

首先,创建一个小型文本文件并将其上传到您的存储分区:

cat <<'EOF' > review.txt
This product is fantastic! It exceeded my expectations. The quality is top-notch. I highly recommend it to everyone!
EOF

gcloud storage cp review.txt gs://${DEPS_BUCKET}/review.txt

现在,使用标准 SQL 中的 OBJ.MAKE_REF 查询该文件:

cat <<EOF > sentiment_analysis.sql
SELECT
  AI.GENERATE(
    (
      'Analyze the sentiment of this text file. Is it positive, negative, or neutral? Explain why.',
      OBJ.MAKE_REF('gs://${DEPS_BUCKET}/review.txt', 'us.my-conn')
    ),
    connection_id => 'us.my-conn',
    endpoint => 'gemini-2.5-flash-lite'
  ).result AS ml_generate_text_result;
EOF

bq query --use_legacy_sql=false < sentiment_analysis.sql

示例输出:

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                 ml_generate_text_result                                                                                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| This text file has a **strongly positive** sentiment.                                                                                                                                    |
|                                                                                                                                                                                          |
| Here's why:                                                                                                                                                                              |
|                                                                                                                                                                                          |
| *   **Positive Keywords:** The text is filled with unequivocally positive words and phrases:                                                                                             |
|     *   "fantastic"                                                                                                                                                                      |
|     *   "exceeded my expectations"                                                                                                                                                       |
|     *   "top-notch"                                                                                                                                                                      |
|     *   "highly recommend"                                                                                                                                                               |
|                                                                                                                                                                                          |
| *   **Enthusiastic Language:** The use of exclamation marks ("!") further amplifies the positive tone, indicating excitement and strong approval.                                        |
|                                                                                                                                                                                          |
| *   **Lack of Negative or Neutral Elements:** There are no words, phrases, or implications that suggest any dissatisfaction, criticism, or even indifference.                            |
|                                                                                                                                                                                          |
| In summary, the author's language is enthusiastic and uses multiple strong positive descriptors, leaving no room for doubt that their opinion of the product is overwhelmingly positive. |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

8. 清理

为避免系统向您的 Google Cloud 账号持续收取费用,请删除本 Codelab 中创建的资源。

删除数据集和连接

在 Cloud Shell 中运行以下命令,以删除数据集和连接:

bq rm -r -f --location=$REGION iceberg_ai
bq rm -r -f --location=US iceberg_object_ai
bq rm --connection $PROJECT_ID.US.my-conn

删除 GCS 存储分区和本地文件

清理 GCS 存储分区和本地文件:

# Delete GCS buckets
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
gcloud storage rm -r gs://dataproc-temp-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://dataproc-staging-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://${DEPS_BUCKET}

# Delete local files
rm -f train_model.sql predict_fare.sql create_object_table.sql query_images.sql sentiment_analysis.sql test.py query.py history.py timetravel.py review.txt

如果您创建的项目仅用于本实验,还可以删除整个项目。

9. 恭喜

恭喜!您已成功使用 Apache Iceberg、BigLake 和 BigQuery AI 构建了统一的数据湖仓一体!

您学到的内容

  • 如何连接和查询公共 Apache Iceberg REST Catalog。
  • 在 Iceberg 中使用时间旅行来审核数据集版本。
  • 基于结构化数据训练 BigQuery ML 模型
  • 使用对象表和 ObjectRef 连接非结构化数据(图片)
  • 使用 Gemini 直接在 BigQuery SQL 中分析图片。

参考文档