借助 AI 智能体,在几秒内将原始数据转化为预测结果

1. 简介

在此 Codelab 中,您将扮演一家虚构的 Froyo 公司的数据科学家的角色,该公司即将推出一种名为“午夜漩涡”的新产品口味。为确保全球发布成功,企业必须回答有关成分、市场需求和投资回报率 (ROI) 的关键问题。此端到端工作流演示了 Google Cloud 的 Knowledge Catalog(以前称为 Dataplex)和 Lakehouse for Apache Iceberg(以前称为 BigLake)如何弥合“暗”非结构化数据之间的差距,并通过统一的治理层在 IDE (VS Code) 中使用 Gemini 提供可据以采取行动的商业智能。

原始数据预测工作流

您将执行的操作

  • 非结构化数据发现:存储在 Cloud Storage 中的 PDF 食谱由 Knowledge Catalog DataScan 进行爬取。在 BigQuery 中为扫描的 PDF 创建对象表。借助 Vertex AI 语义推理,系统可以“读取”PDF,提取产品、过敏原、成分和相关属性的结构化信息。然后,它会智能地为 PDF 中存储的数据生成架构。
  • 统一元数据:从 PDF 文件中提取的数据直接存储到 BigQuery 中,作为原生宽表,并创建视图来帮助进行常见查询。一个包含历史销售数据的独立输入数据集存储在 Google Cloud Storage 上的 Apache Iceberg 表中。在后续步骤中,此 Iceberg 表将与 BigQuery 中的提取数据联接。
  • 跨引擎分析:使用 Managed Service for Apache Spark(以前称为 Dataproc)和 Iceberg REST 目录,您将能够将最新的 PDF 元数据和推理出的结构化语义数据(来自 BigQuery 表和视图)与存储在 Google Cloud Storage 上的 Apache Iceberg 表中的结构化销售数据联接起来。此行为受用作 Jupyter 笔记本内核的受管 Apache Spark 交互式会话模板约束,该模板可确保 Spark 作业具有一致的安全性和计算设置。
  • 语义分析:通过将推断出的商品数据与客户和销售数据(在 BigQuery 中)联接,演示能够提取分析,例如识别过敏原数据和收入预测。
  • 自主治理:从发现扫描到 Spark 执行的整个生命周期都通过支持 Gemini 的模板、指令、规则和代理驱动的自动化进行编排,证明 AI 可以管理为分析提供支持的基础设施。

所需条件

完成此 Codelab 可能会产生费用,估计在正常使用情况下不超过 5 美元。如需根据您的预计使用量或当前价格获取详细的费用估算,请使用 Google Cloud 价格计算器

请确保您满足以下前提条件,以便完成此 Codelab。

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

启用必需的 API

运行以下命令可启用所有必需的 API:

  gcloud services enable \
    dataplex.googleapis.com \
    datacatalog.googleapis.com \
    discoveryengine.googleapis.com \
    bigqueryconnection.googleapis.com \
    bigquery.googleapis.com \
    biglake.googleapis.com \
    dataproc.googleapis.com \
    metastore.googleapis.com \
    dataform.googleapis.com \
    notebooks.googleapis.com \
    aiplatform.googleapis.com \
    cloudresourcemanager.googleapis.com \
    serviceusage.googleapis.com \
    secretmanager.googleapis.com \
    storage.googleapis.com

下载 Codelab 资源

此代码库包含 Parquet、recipes、suppliers、copilot-instructions.md、template.yaml 和 quickstart.py 文件,可用于此 Codelab。请务必下载这些文件。

如需下载文件,请执行以下操作:

  1. 在 Cloud Shell 中,运行以下命令:
    git clone --depth 1 --filter=blob:none --sparse https://github.com/GoogleCloudPlatform/next-26-keynotes.git
    
  2. 进入新创建的文件夹:
    cd next-26-keynotes
    
  3. 拉取 data-cloud-demo 文件夹
    git sparse-checkout set genkey/data-cloud-demo
    
  4. 结账完成后,前往 data-cloud-demo 文件夹,然后解压缩 ZIP 文件以访问 Codelab 资源。

3. 为 Froyo 客户数据设置 Lakehouse

在本部分中,您将在 Lakehouse 中创建一个目录,以便在工作流中使用 Lakehouse metastore。它为所有 Iceberg 数据提供单一可信来源,从而在查询引擎之间实现互操作性。它可让 Apache Spark 等查询引擎以一致的方式发现、读取元数据和管理 Iceberg 表。

所需的角色

确保您具有以下 Identity and Access Management (IAM) 角色:

  • roles/biglake.viewer
  • roles/bigquery.user
  • roles/bigquery.dataEditor
  • roles/biglake.editor
  • roles/biglake.metadataViewer
  • roles/bigquery.connectionUser
  • roles/storage.objectUser
  • roles/storage.objectViewer
  • roles/storage.objectCreator
  • roles/storage.admin

如需详细了解如何授予 IAM 角色,请参阅授予 IAM 角色

使用存储分区创建湖仓一体目录

创建 Lakehouse 目录以管理 Iceberg 表的元数据。您可以在 Spark 作业中连接到此目录,以创建和查询 Iceberg 表。

  1. 在 Google Cloud 控制台中,前往 Lakehouse
  2. 点击创建目录。系统会打开创建目录页面。
  3. 对于目录类型,选择 Iceberg REST Catalog
  4. 选择 Lakehouse 目录存储分区选项部分,选择单存储分区目录
  5. 对于默认目录 Cloud Storage 存储分区,点击浏览,然后点击创建新存储分区
  6. 创建存储桶页面上,执行以下操作:
  7. 开始使用部分中,输入符合存储分区名称要求的全局唯一名称。
  8. 选择数据存储位置部分中,为位置类型选择区域,然后输入您的区域。例如 us-west1
  9. 选择如何控制对对象的访问权限部分中,清除强制执行此存储分区的公开访问权限阻止复选框。
    这样一来,您就可以模拟现实世界中的场景,例如托管公开的 Web 内容或共享数据存储库。如果不进行此更改,存储分区将强制执行严格的“仅限私密”政策;即使您已成功向文件授予公开权限,任何访问您资产的尝试都会导致 403 禁止访问错误。
  10. 依次点击继续 > 创建 > 选择 > 继续
  11. 对于身份验证方法,请选择凭证分发模式
  12. 点击创建。系统会创建您的目录,并打开目录详情页面。
  13. 身份验证方法下,点击设置存储分区权限
  14. 在对话框中,点击确认。这会验证您目录的服务账号是否对存储分区具有 Storage Object User 角色。
  15. 目录详情页面上,复制 REST 目录 URI 路径。在“运行 Spark 作业”任务期间使用此路径。

将 Parquet 文件上传到存储分区

如需将 Parquet 文件上传到存储分区的根目录,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
  2. 在存储分区列表中,点击相应存储分区的名称。例如 acai_demo
  3. 在相应存储分区的对象标签页中,依次点击上传 > 上传文件
  4. 从您在本 Codelab 的准备工作部分中克隆的 Parquet 文件夹中选择文件。
  5. 点击打开

4. 设置 VPC 网络

创建 Virtual Private Cloud (VPC) 网络和子网,使资源能够与 Google API 通信,而无需连接到公共互联网;并创建防火墙,使内部流量能够在数据处理节点之间自由流动。

  1. 在 Google Cloud 控制台中,进入 VPC 网络页面。
  2. 点击创建 VPC 网络
  3. 输入网络的名称。例如 acai-network
  4. 如需配置网络的最大传输单元 (MTU),请选中自动设置 MTU 复选框。
  5. 子网创建模式选择自动
  6. 防火墙规则部分中,选中 IPv4 防火墙规则的所有复选框
  7. 点击创建

启用专用 Google 访问通道

Dataproc Serverless 节点没有公共 IP 地址。如需与 Lakehouse Catalog 和 Cloud Storage 通信,子网必须启用专用 Google 访问通道。

  1. 在 Google Cloud 控制台中,进入 VPC 网络页面。
  2. 点击您需要为其启用专用 Google 访问通道的子网所属网络的名称。例如 us-west1
  3. 点击子网的名称。系统会显示子网详情页面。
  4. 点击修改
  5. 专用 Google 访问通道部分,选择开启
  6. 点击保存

5. 创建并运行 Spark 作业

如需创建和查询 Iceberg 表,请上传包含必要 Spark SQL 语句的 PySpark 作业。然后,使用 Managed Service for Spark 运行作业。

将 quickstart.py 上传到您的 Cloud Storage 存储分区

克隆 Codelab 资源后,请使用您的项目详细信息更新 quickstart.py 脚本,并将其上传到 Cloud Storage 存储分区。

  1. 在文本编辑器中打开 quickstart.py 脚本。
  2. 将脚本中的占位符 BUCKET_NAME 替换为您的 Cloud Storage 存储分区名称,然后保存。
  3. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区
  4. 点击相应存储分区的名称。例如 acai_demo
  5. 对象标签页中,依次点击上传 > 上传文件
  6. 在文件浏览器中,选择更新后的 quickstart.py 文件,然后点击打开

运行 Spark 作业

上传 quickstart.py 脚本后,将其作为 Managed Service for Spark 批量作业运行。

  1. 如需配置变量,请在 Cloud Shell 中运行以下命令。
    # Configuration Variables
    export PROJECT_ID="<PROJECT_ID>"
    export REGION="<REGION>"
    export BUCKET_NAME="<BUCKET_NAME>"
    export SUBNET="<SUBNET>"
    export LAKEHOUSE_CATALOG_ID="<LAKEHOUSE_CATALOG_ID>"
    export CATALOG_URI_ID="<CATALOG_URI_ID>"
    
    替换以下内容:
    • LAKEHOUSE_CATALOG_ID:包含 PySpark 应用文件的 Lakehouse 目录资源的名称。例如 acai_demo
    • PROJECT_ID:您的 Google Cloud 项目 ID。
    • REGION:运行 Managed Service for Spark 批量工作负载的区域。例如 us-west1
    • BUCKET_NAME:您的 Cloud Storage 存储分区名称。例如 acai_demo
    • SUBNET:您的 VPC 子网名称。例如 acai-network
    • CATALOG_URI_ID:在创建包含存储分区的 Lakehouse 目录时复制的 Lakehouse 目录的 URI ID。例如 https://biglake.googleapis.com/iceberg/v1/restcatalog
  2. 在 Cloud Shell 中,使用 quickstart.py 脚本运行以下适用于 Spark 的托管式服务批量作业。
      gcloud dataproc batches submit pyspark gs://${BUCKET_NAME}/quickstart.py \
         --project=${PROJECT_ID} \
         --region=${REGION} \
         --subnet=${SUBNET} \
         --version=2.2 \
         --properties="\
      spark.sql.defaultCatalog=${LAKEHOUSE_CATALOG_ID},\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}=org.apache.iceberg.spark.SparkCatalog,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.type=rest,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.uri=${CATALOG_URI_ID},\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.warehouse=gs://${BUCKET_NAME},\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.io-impl=org.apache.iceberg.gcp.gcs.GCSFileIO,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.header.x-goog-user-project=${PROJECT_ID},\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager,\
      spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.rest-metrics-reporting-enabled=false,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.header.X-Iceberg-Access-Delegation=vended-credentials,\
      spark.sql.catalog.${LAKEHOUSE_CATALOG_ID}.gcs.oauth2.refresh-credentials-endpoint=https://oauth2.googleapis.com/token"
    
    您可以在 Dataproc 批处理中查看正在运行的 Spark 作业的状态。作业完成后,系统会显示类似于以下内容的输出:
     All tables registered successfully!
     Batch [126fa2226a904d2e944c8eecbe0b1840] finished.
     metadata:
       '@type': type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata
       batch: projects/PROJECT_ID/locations/REGION/batches/126fa2226a904d2e944c8eecbe0b1840
       batchUuid: 3bff88ca-64d6-4c16-b9ad-2a47ae93ebff
       createTime: '2026-04-09T13:17:26.222727Z'
       description: Batch
       labels:
         goog-dataproc-batch-id: 126fa2226a904d2e944c8eecbe0b1840
         goog-dataproc-batch-uuid: 3bff88ca-64d6-4c16-b9ad-2a47ae93ebff
         goog-dataproc-drz-resource-uuid: batch-3bff88ca-64d6-4c16-b9ad-2a47ae93ebff
         goog-dataproc-location: REGION
       operationType: BATCH
     name: projects/PROJECT_ID/regions/REGION/operations/47bc59e8-5082-3af9-89a0-22289aa5f4b9
    

6. 查询 BigQuery 中的表

通过成功运行 Spark 批量作业,您已使用 Managed Service for Spark Serverless 作为分布式计算引擎,在 Lakehouse Metastore 中注册多个表(每个 Parquet 文件对应一个表)。注册后,Google Cloud 便可将 Cloud Storage 中的原始文件视为结构化的高性能表。

以下步骤将引导您确认元数据是否已正确同步,确保您的数据不仅安全存储,而且还可通过 BigQuery 界面完全可发现和可查询。

  1. 在 Google Cloud 控制台中,找到 BigQuery
  2. 在查询编辑器中,输入以下语句。该查询使用 project.namespace.dataset.table 语法。
    SELECT * FROM `<PROJECT_ID>.<NAMESPACE>.<ICEBERG_DATASET>.<ICEBERG_TABLE>`
    
    例如,SELECT * FROM PROJECT_ID.acai_demo.acai_dataset.order_items
    请替换以下内容:
    • PROJECT_ID:您的 Google Cloud 项目 ID。
    • NAMESPACE:在上一步中因 Spark 作业而创建的命名空间,您可以在 BigQuery 对象浏览器页面中找到它。例如 acai_demo
    • ICEBERG_DATASET:Iceberg 目录中的数据集名称,例如 acai_dataset
    • ICEBERG_TABLE:Iceberg 数据集中的表名称,例如 order_items
  3. 点击运行。查询结果会显示您通过 Spark 作业插入的数据。来自 BigQuery 的表格数据结果

7. 设置非结构化商品数据文件

在本部分中,您将在 BigQuery 中创建一个组织结构,用于存储 Froyo 配方和供应商数据,特别是 Froyo 商品详情。它还会建立 Cloud 资源连接,该连接充当安全“桥梁”,使 BigQuery 能够从 Cloud Storage 等外部来源读取文件。

创建存储分区并上传 Froyo 详细信息文件

创建供应商文件和食谱文件,并将其上传到 Cloud Storage 存储分区

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
  2. 点击创建
  3. 创建存储桶页面上,输入您的存储桶信息。完成以下每一步后,点击继续以继续执行后续步骤:
  4. 开始使用部分中,输入存储分区名称。例如 acai_pdfs
  5. 选择数据存储位置部分中,选择区域,然后输入您的区域。例如 us-west1
  6. 选择如何控制对对象的访问权限部分中,清除强制执行此存储分区的公开访问权限阻止功能复选框。
  7. 点击创建
  8. 在存储分区列表中,点击您创建的存储分区。例如 acai_pdfs
  9. 在相应存储分区的对象标签页中,依次点击上传 > 上传文件夹
  10. 选择您在此 Codelab 的准备工作部分中解压缩的 recipes 文件夹。
  11. 点击上传
  12. suppliers 文件夹重复执行上传流程。

创建连接

创建 Cloud 资源连接。这会生成一个唯一的服务账号,该账号充当 BigQuery 访问外部文件的“身份证”。

  1. 转到 BigQuery 页面。
  2. 在左侧窗格中,点击探索器。如果您没有看到左侧窗格,请点击展开左侧窗格以打开该窗格。
  3. 探索器窗格中,展开您的项目名称,然后点击连接
  4. 连接页面上,点击创建连接
  5. 对于连接类型,请选择 Vertex AI 远程模型、远程函数、BigLake 和 Spanner(Cloud 资源)
  6. 连接 ID 字段中,输入连接 ID 名称。例如 acai_pdf_connection。请务必记下此 ID,因为您稍后在此 Codelab 中设置数据扫描时需要用到它。
  7. 位置类型设置为区域,然后选择一个区域。例如 us-west1。连接应与数据集等其他资源位于同一位置。
  8. 点击创建连接
  9. 点击转到连接
  10. 连接信息窗格中,复制服务账号 ID 以在后续步骤中使用。服务账号类似于 bqcx-175930350285-qn3a@gcp-sa-bigquery-condel.iam.gserviceaccount.com

管理对服务账号的访问权限

向服务账号授予访问权限,以便 Lakehouse 可以读取您的 PDF 文件。

  1. 前往 IAM 和管理页面。
  2. 点击授予使用权限。系统随即会打开“添加主账号”对话框。
  3. 新的主账号字段中,输入您之前复制的服务账号 ID。
  4. 选择角色字段中,添加以下角色:
    • roles/storage.objectUser
    • roles/storage.objectViewer
    • roles/bigquery.user
    • roles/bigquery.dataEditor
    • roles/aiplatform.user
    • roles/storage.admin
    • roles/dataproc.serviceAgent
  5. 点击保存

如需详细了解 BigQuery 中的 IAM 角色,请参阅预定义的角色和权限

8. 管理 DataScan 作业的权限

为 Spark 和 Dataform 创建特定的服务账号(身份),然后向这些服务账号以及 Google 的自动化服务代理授予读取存储空间、运行 BigQuery 作业和使用 Vertex AI 进行探索所需的精确权限。

Spark 和 Dataform 的 IAM 访问权限

  1. 在 Google Cloud 控制台中,转到创建服务账号页面。
  2. 如果未选择,请选择您的 Google Cloud 项目。
  3. 点击创建服务账号
  4. 输入服务账号名称。例如,sa-spark-stg1。Google Cloud 控制台会根据此名称生成服务账号 ID。如有必要,请修改 ID。此 ID 创建后便无法更改。
  5. 如需设置访问权限控制,请点击创建并继续,然后继续执行下一步。
  6. 选择以下 IAM 角色,以授予项目中的服务账号。
    • roles/dataproc.worker
    • roles/storage.objectUser
    • roles/bigquery.dataEditor
    • roles/bigquery.jobUser
    • roles/aiplatform.user
    • roles/dataplex.discoveryPublishingServiceAgent
  7. 完成添加角色后,点击继续
  8. 点击完成以完成服务账号的创建过程。

用于访问 Knowledge Catalog 的 BigQuery 连接权限

  1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
  2. 在存储分区列表中,点击您为 Froyo 创建的存储分区名称。例如 acai_pdfs
  3. 权限标签页中,点击授予访问权限。系统会显示“添加主账号”对话框。
  4. 新的主账号字段中,输入您的 BigQuery 服务账号 ID。服务账号类似于 bqcx-175930350285-qn3a@gcp-sa-bigquery-condel.iam.gserviceaccount.com
  5. 选择角色下拉菜单中选择以下角色(或多个角色)。
    • roles/storage.objectUser
    • roles/dataplex.serviceAgent
    • roles/dataplex.securityAdmin
    • roles/aiplatform.serviceAgent
    • roles/dataplex.discoveryPublishingServiceAgent
  6. 点击“保存”。

9. 设置 Knowledge Catalog

构建 Knowledge Catalog,以统一与 Froyo 相关的数据,并自动发现非结构化文件(例如 PDF 食谱和 PDF 供应商)。

通过 curl 创建 DataScan

在本部分中,您将通过添加 datascan_ID 并将其指向您的 BigQuery 数据集,为 Cloud Storage 存储分区(例如 acai_pdfs)创建扫描。之后,Knowledge Catalog 将自动在 BigQuery 中为您的 PDF 创建条目。

  1. 如需扫描 PDF(供应商和食谱),请运行以下命令:
    # 1. Set your variables
    PROJECT_ID="<PROJECT_ID>"
    REGION="<REGION>"
    ENV_SUFFIX="stg1"
    DATASCAN_ID="froyo-profile-${ENV_SUFFIX}"
    BUCKET_NAME="<BUCKET_NAME>"
    
    # 2. Set this to the Name of the connection you created in Step 7
    CONNECTION_ID="<CONNECTION_ID_NAME>"
    
    # 3. Define the API Endpoint
    DATAPLEX_API="dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}"
    
    # 4. Create the DataScan via CURL
    echo "Creating Dataplex DataScan: ${DATASCAN_ID}..."
    
    curl -X POST "https://$DATAPLEX_API/dataScans?dataScanId=${DATASCAN_ID}" \
    -H "Authorization: Bearer $(gcloud auth print-access-token)" \
    -H "Content-Type: application/json" \
    -d '{
    "data": {
       "resource": "//storage.googleapis.com/projects/'"${PROJECT_ID}"'/buckets/'"${BUCKET_NAME}"'"
       },
    "executionSpec": {
       "trigger": {
          "on_demand": {}
       }
    },
    "dataDiscoverySpec": {
       "bigqueryPublishingConfig": {
          "tableType": "BIGLAKE",
          "connection": "projects/'"${PROJECT_ID}"'/locations/'"${REGION}"'/connections/'"${CONNECTION_ID}"'"
       },
       "storageConfig": {
          "unstructuredDataOptions": {
          "entity_inference_enabled": true
          }
       }
       }
    }'
    
  2. curl 命令会显示 Knowledge Catalog DataScan 结果,类似于下图。数据扫描结果

运行作业

运行以下命令:

  gcloud dataplex datascans run $DATASCAN_ID --location=$REGION

描述作业

如需描述作业,请运行以下命令:

  gcloud dataplex datascans describe $DATASCAN_ID --location=$REGION

删除数据扫描作业

如果扫描运行时间超过 10 分钟,或者作业状态长时间保持为 Pending 而未转换为 Running,则可能是由于相应区域中的资源暂时不可用。如果发生这种情况,您可以运行以下命令来删除作业,然后尝试重新创建并运行该作业。有时,初始运行可能会快速失败,并显示类似 unable to acquire necessary resources 的错误。

   gcloud dataplex datascans delete $DATASCAN_ID --location=$REGION

查看作业状态

如需检查作业状态,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往元数据整理页面。
  2. Cloud Storage 发现标签页中,点击发现扫描的名称。作业的状态
  3. 扫描详情页面上,您可以查看作业状态。
  4. 作业完成后,检查使用 curl 命令创建的已发布的数据集(例如 acai_pdfs_discovered_003)是否存在。扫描后发布的数据集

查看对象表

如需查看发现作业完成后创建的对象表,请执行以下操作:

  1. 在 Google Cloud 控制台中,找到 BigQuery
  2. 点击数据集,然后选择在上一步中创建的已发布数据集。例如 acai_pdfs_discovered_003
  3. 如需查看对象表,请点击表 ID。例如 acai_pdfs
  4. 生成的对象表如下所示:已发布的数据集对象表

10. 语义提取

您将针对在上一步中创建的非结构化对象表,推断并提取结构化表、其他数据库对象和关系。为此,您将使用 Knowledge Catalog Insights 功能生成 SQL 语句,以从非结构化表中提取结构化数据

  1. 在 Google Cloud 控制台中,前往 Knowledge Catalog 搜索页面。
  2. 搜索要查看分析洞见的数据集表。例如,acai_pdfs_discovered_003 在 Knowledge Catalog 中查看表
  3. 在搜索结果中,点击相应表格以打开其条目页面。
  4. 点击数据分析标签页。如果该标签页为空,则表示此表的分析洞见尚未生成。生成数据分析可能需要 15 到 25 分钟。
  5. 看到数据洞见后,依次点击提取 > 使用 SQL 提取使用 SQL 提取分析洞见
  6. Extract with SQL 页面中,针对 Destination,输入您的数据集。例如 acai_pdfs_discovered_003
  7. 点击提取。这会打开 BigQuery 编辑器并加载查询。
  8. 点击运行。此步骤会生成一组语句,可能需要几分钟才能完成运行。
  9. 查询完成后,您会看到以下结果:提取的分析洞见查询结果
  10. 前往 BigQuery,然后点击数据集(例如 acai_pdfs_discovered_003)。系统会在您在第 6 步中选择的数据集中创建一组新的结构化数据库对象。结构化数据库结果

为 BigQuery 中的对象生成数据分析

如需为 BigQuery 数据集生成数据洞察,您必须使用 BigQuery Studio 在 BigQuery 中访问该数据集。

  1. 在 Google Cloud 控制台中,前往 BigQuery Studio
  2. 探索器窗格中,选择相应项目,然后前往要为其生成数据分析的数据集。
  3. 点击数据分析标签页。
  4. 如果您看到启用 API 按钮,请点击该按钮以启用 Gemini for Google Cloud。系统随即会打开启用核心功能窗口。
    1. 核心功能 API 部分,点击 Gemini for Google Cloud APIBigQuery Unified API 对应的启用,然后点击下一步
    2. 权限(可选)部分中,根据需要向正文授予 IAM 角色,然后点击下一步
  5. 如需生成分析洞见并将其发布到 Knowledge Catalog,请点击生成并发布
  6. 发布后,您将能够在相应标签页中查看分析数据。BigQuery 中对象的分析洞见

11. 设置 IDE 以进行智能体数据分析

适用于 Visual Studio Code 的 Google Cloud Data Agent Kit 扩展程序是一款面向数据科学家和数据工程师的 IDE 扩展程序。借助此插件,您可以直接从 IDE 连接到 Google Data Cloud 资源和数据并进行操作。如需了解详情,请参阅 VS Code 的 Data Agent Kit 扩展程序概览

如果您想执行以下操作,VS Code 的 Data Agent Kit 扩展程序会非常有用:

  • 直接在 VS Code 中构建、测试、审核和部署可用于生产用途的数据流水线,例如 Spark ETL 或 BigQuery ETL。
  • 在 AI 助手的帮助下,探索数据、构建训练流水线、确定最佳机器学习模型,并将其部署到生产端点。
  • 连接到可信的数据源,构建高性能数据模型,并为业务利益相关方发布互动式信息中心。

安装适用于 VS Code 的 Data Agent Kit 扩展程序

  1. 打开 VS Code。
  2. 安装 Google Cloud CLI。如需了解详情,请参阅安装 Google Cloud CLI
  3. 安装适用于 VS Code 的 Data Agent Kit 扩展程序
  4. 完成扩展服务初始配置流程,该流程要求您执行以下操作:
    • 登录扩展程序
    • 安装技能、MCP 服务器
  5. 完成初始配置后,重新加载或重启窗口。如需了解详情,请参阅设置并配置适用于 VS Code 的 Data Agent Kit 扩展程序
  6. IDE 重新加载后,点击导航窗格中的 Google Data Cloud 图标,前往设置,并确保在常规设置中正确设置项目 ID 和区域 (us-west1)。

在 VS Code 中设置工作区

  1. 打开 VS Code,然后依次选择文件 > 打开文件夹 > 新建文件夹
  2. 创建一个名为 acai_test 的新文件夹,然后点击 Open。现在,VS Code 会将您打开的文件夹视为工作区。
  3. 工作区信任对话框中,选择是,我信任此作者以启用工作区中的所有功能。
  4. acai_test 工作区中创建文件夹 .github
  5. .github 文件夹中创建一个新文件 copilot-instructions.md,并在其中输入以下规则。
    ## 1. Project Context
    - **Project ID**: <PROJECT_ID>
    - **Domain**: This project is centralized around "Froyo", a brand of frozen yogurt offering multiple flavors.
    - **Documentation**: Raw PDF documents detailing flavors and ingredients are stored in Google Cloud Storage at `gs://<BUCKET_NAME>`.
    
    ## 2. Execution & Data Processing Rules
    - **CRITICAL RULE - Structured Specs**: The semantic and structured information extracted from the PDFs is available in a BigQuery dataset named `<BQ_DATASET_NAME>` (referred to as the Knowledge Catalog).
    - **CRITICAL RULE - Customer Data**: Existing Froyo customer data resides in BigQuery in the dataset `<DATASET_ID>`. When you are referencing a dataset, ensure you are using it with the project ID (`<PROJECT_ID>`) and namespace prefix `<NAMESPACE_NAME>`. For example, to query order table in this dataset you should use `<PROJECT_ID>.<NAMESPACE>.<DATASET_ID>.orders`.
    - **CRITICAL RULE - Data Joins between BigQuery dataset and Iceberg dataset**: ANY task requiring a join or integration between the BigQuery datasets `<DATASET_ID>` of the PDF data and the `<DATASET_ID>` of the customer data MUST be executed using **Spark Notebooks**.
    - **CRITICAL RULE - Notebook Kernel**: Every Spark notebook utilized MUST exclusively be configured to run on the Serverless Session template `iceberg-federation-template` as its kernel.
    - **CRITICAL RULE - Data Science**: ANY data science, machine learning, or advanced analytical task MUST be performed strictly within **Spark Notebooks** using the aforementioned setup.
    
  6. acai_test 工作区中创建另一个新文件 template.yaml,并在该文件中输入以下信息。
    labels:
     client: "vscode"
    jupyterSession:
     displayName: "iceberg-federation-template"
     kernel: "PYTHON"
    environmentConfig:
     executionConfig:
       serviceAccount: "sa-spark-dev1@<PROJECT_ID>.iam.gserviceaccount.com"
       subnetworkUri: "projects/<PROJECT_ID>/regions/<REGION>/subnetworks/<SUBNET_NAME>"
    runtimeConfig:
     version: "2.3"
     properties:
     # This enables the secure proxy URL you were looking for
       dataproc.tier: "premium"
       spark.dataproc.engine: "lightningEngine"
       spark.dataproc.lightningEngine.runtime: "native"
       spark.memory.offHeap.enabled: "true"
       spark.memory.offHeap.size: "1g"
       spark.executor.memory: "4g"
       "dataproc.component.gateway.enabled": "true"
       "dataproc.jupyter.listen.all.interfaces": "true"
       "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
       "spark.sql.defaultCatalog": "<CATALOG_NAME>"
       "spark.sql.catalog.<CATALOG_NAME>": "org.apache.iceberg.spark.SparkCatalog"
       "spark.sql.catalog.<CATALOG_NAME>.type": "rest"
       "spark.sql.catalog.<CATALOG_NAME>.uri": "<CATALOG_URI_ID>"
       "spark.sql.catalog.<CATALOG_NAME>.warehouse": "bl://projects/<PROJECT_ID>/catalogs/<CATALOG_NAME>"
       "spark.sql.catalog.<CATALOG_NAME>.rest.auth.type": "org.apache.iceberg.gcp.auth.GoogleAuthManager"
    
  7. 在 VS Code 中,点击终端,然后运行以下命令以将 template.yaml 文件导入为会话模板。代理稍后会使用此模板来创建 Spark 会话。
    gcloud beta dataproc session-templates import iceberg-federation-template \
        --source=template.yaml \
        --location=<REGION>
    
    REGION 替换为您的区域。

12. 执行智能数据分析

  1. 在 VS Code 代码编辑器中,点击切换聊天
  2. 对于配置自定义代理,选择代理VS Code 中的选择代理
  3. 搜索模型窗格中,点击管理语言模型VS Code 中的“管理模型”图标
  4. 语言模型页面上,点击添加模型
  5. 从列表中选择 Google,然后按 Enter 键确认输入。在 VS Code 中添加 Google Gemini 模型
  6. 如需输入 Google Gemini 的 API 密钥,请执行以下操作:
    1. 前往 Google AI Studio 网站。
    2. 使用您的 Google 账号登录。
    3. 在边栏中,点击获取 API 密钥
    4. 点击创建 API 密钥。系统会打开“创建新密钥”页面。
    5. 选择云项目列表中,选择导入项目
    6. 输入现有项目的名称。
    7. 点击创建密钥,然后复制 API 密钥。该密钥可用于访问您账号的 Gemini API 资源。如需了解详情,请参阅使用 Gemini API 密钥
  7. 将您生成的 API 密钥粘贴到搜索栏中,然后点击 EnterGoogle Gemini API 密钥
  8. 如果 Gemini 模型未显示,请取消隐藏它们,如下图所示:取消隐藏 Google Gemini 模型
  9. 从 Google Gemini 模型列表中选择 Gemini 3.1 Pro(预览版),然后关闭语言模型窗口。
  10. 在对话窗口中,输入以下问题:
       Search ingredients for Midnight papaya
    
  11. 经过一些互动后,您应该会看到以下结果:Midnight papaya ingredients inquiry results
  12. 在对话窗口中,输入另一个问题:
       Search allergen information for Midnight papaya
    
  13. 经过一些互动和步骤后,您会看到智能体回复过敏原名称 Soy,如下图所示:午夜木瓜过敏原检测结果
  14. 在对话窗口中,输入另一个问题:
       Build a pipeline to join products with our 'Global Loyalty' Iceberg tables in acai customer, sales data to identify popular products
    
  15. 如需选择内核,请打开 .ipynb 文件,然后依次点击选择内核 > 远程 Spark 内核 > 无服务器 Spark 上的 Iceberg-federation-template在 VS Code 中选择内核
  16. 经过一些互动和步骤后,您会看到智能体回复,其中包含笔记本中成功执行的所有步骤,以及在笔记本末尾生成的最终结果,如下图所示:Iceberg 表查询结果

13. 清理

为避免产生费用,请删除您在本实验中创建的资源。

  1. 如需删除 Knowledge Catalog DataScan,请运行以下命令:
      DATASCAN_ID="<DATASCAN_ID>"
      echo "Deleting Dataplex DataScan: ${DATASCAN_ID}"
      gcloud dataplex datascans delete "${DATASCAN_ID}" --location="${REGION}" --quiet
    
  2. 如需删除 Cloud Storage 存储分区及其所有内容,请运行以下命令:
      echo "Deleting Cloud Storage buckets: <BUCKET_NAME1> and <BUCKET_NAME2>"
      gsutil -m rm -r gs://<BUCKET_NAME1>
      gsutil -m rm -r gs://<BUCKET_NAME2>
    
  3. 如需删除 BigQuery 连接,请运行以下命令:
      CONNECTION_ID="<CONNECTION_NAME>"
      echo "Deleting BigQuery Connection: ${CONNECTION_ID}"
      bq rm --connection "${PROJECT_ID}.${REGION}.${CONNECTION_ID}"
    
  4. 如需删除 Lakehouse Catalog,请运行以下命令:
      CATALOG_ID="<CATALOG_NAME>"
      echo "Deleting Lakehouse Catalog: ${CATALOG_ID}"
      gcloud biglake catalogs delete "${CATALOG_ID}" --project="${PROJECT_ID}" --location="${REGION}" --quiet
    
  5. 如需删除包含发现的 PDF 表格的数据集,请运行以下命令:
      DATASET_NAME="<DATASET_NAME>"
      echo "Deleting BigQuery Dataset: ${DATASET_NAME}"
      bq rm -r -f "${PROJECT_ID}:${DATASET_NAME}"
    
  6. 如需删除自定义服务账号,请运行以下命令:
      SERVICE_ACCOUNT="<SERVICE_ACCOUNT>"
      echo "Deleting Service Account: ${SERVICE_ACCOUNT}"
      gcloud iam service-accounts delete "${SERVICE_ACCOUNT}"@"${PROJECT_ID}".iam.gserviceaccount.com --quiet
    
  7. 如需删除 VPC 网络,请运行以下命令:
      VPC_NETWORK="<VPC_NETWORK>"
      echo "Deleting VPC Network: ${VPC_NETWORK}"
      gcloud compute networks delete "${VPC_NETWORK}" --quiet
    
  8. 如需删除整个 Google Cloud 项目,请运行以下命令:
      gcloud projects delete "${PROJECT_ID}"
    

14. 恭喜

恭喜!您已成功将 BigQuery 表中孤立的 PDF 和 Parquet 文件的数据格局整理成一个可搜索、可联接的生态系统。您实际上构建了一个现代数据湖仓一体,可以像处理数据库中的行一样智能地处理 PDF 和大数据格式。而您只需通过 Gemini 的对话式体验,在代理中即可完成所有这些操作。

参考文档

如需深入了解本 Codelab 中使用的核心技术,请访问 Google Cloud 官方文档: