使用支持 AI 的 BigQuery DataFrames 软件包从结构化和非结构化数据中获取数据洞见

1. 概览

在本实验中,您将使用 BigQuery Studio 中的 Python 笔记本中的 BigQuery DataFrames,通过 Python 从数据中获取数据洞见。利用 Google 的生成式 AI 分析和直观呈现非结构化文本数据。

您将创建一个 Python 笔记本,用于对公开的客户投诉数据库进行分类和总结。此方法可经过调整,以处理任何非结构化文本数据。

目标

在本实验中,您将学习如何执行以下任务:

  • 在 BigQuery Studio 中启用和使用 Python 笔记本
  • 使用 BigQuery DataFrames 软件包连接到 BigQuery
  • 使用 BigQuery ML 并连接到 Vertex AI 中的文本嵌入端点,根据非结构化文本数据创建嵌入
  • 使用 BigQuery ML 对嵌入进行聚类
  • 通过 BigQuery ML 使用 LLM 总结聚类

2. 要求

  • 一个浏览器,例如 ChromeFirefox
  • 启用了结算功能的 Google Cloud 项目

准备工作

如需按照此 Codelab 中的说明操作,您需要一个启用了 BigQuery Studio 且已关联结算账号的 Google Cloud 项目。

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
  3. 按照说明启用 BigQuery Studio 进行资产管理

准备 BigQuery Studio

创建一个空笔记本并将其连接到运行时。

  1. 在 Google Cloud 控制台中前往 BigQuery Studio
  2. 点击 + 按钮旁边的
  3. 选择 Python 笔记本
  4. 关闭模板选择器。
  5. 选择 + 代码 以创建新的代码单元。
  6. 从代码单元格安装最新版本的 BigQuery DataFrames 软件包。输入以下命令。
    %pip install --upgrade bigframes --quiet
    
    点击 🞂 按钮或按 Shift + Enter 键,运行代码单元。

3. 读取公共数据集

通过在新的代码单元中运行以下代码来初始化 BigQuery DataFrames 软件包:

import bigframes.pandas as bpd

bpd.options.bigquery.ordering_mode = "partial"

注意:在本教程中,我们使用了实验性的“部分排序模式”,该模式与类似 Pandas 的过滤条件搭配使用时,可以实现更高效的查询。需要严格排序或索引的某些 pandas 功能可能无法正常运行。

消费者投诉数据库

消费者投诉数据库通过 Google Cloud 的公共数据集计划在 BigQuery 上提供。此数据集包含有关消费者金融产品和服务的投诉,数据由美国消费者金融保护局收集。

在 BigQuery 中,查询 bigquery-public-data.cfbp_complaints.complaint_database 表,以分析消费者投诉数据库。使用 bigframes.pandas.read_gbq() 方法可根据查询字符串或表 ID 创建 DataFrame。

在新的代码单元中运行以下代码,以创建名为“feedback”的 DataFrame:

feedback = bpd.read_gbq(
    "bigquery-public-data.cfpb_complaints.complaint_database"
)

了解 DataFrame 的基本信息

使用 DataFrame.peek() 方法下载少量数据样本。

运行此单元

feedback.peek()

预期输出:

  date_received                  product ... timely_response  consumer_disputed complaint_id  
0    2014-03-05  Bank account or service ...            True              False       743665   
1    2014-01-21  Bank account or service ...            True              False       678608   
2    2020-12-31          Debt collection ...            True               <NA>      4041190   
3    2014-02-12          Debt collection ...            True              False       714350   
4    2015-02-23          Debt collection ...            True              False      1251358   

注意:head() 需要排序,如果您想直观呈现部分数据,head() 的效率通常不如 peek()

与 pandas 类似,您可以使用 DataFrame.dtypes 属性查看所有可用的列及其对应的数据类型。这些数据以与 Pandas 兼容的方式公开。

运行此单元

feedback.dtypes

预期输出:

date_received                   date32[day][pyarrow]
product                              string[pyarrow]
subproduct                           string[pyarrow]
issue                                string[pyarrow]
subissue                             string[pyarrow]
consumer_complaint_narrative         string[pyarrow]
company_public_response              string[pyarrow]
company_name                         string[pyarrow]
state                                string[pyarrow]
zip_code                             string[pyarrow]
tags                                 string[pyarrow]
consumer_consent_provided            string[pyarrow]
submitted_via                        string[pyarrow]
date_sent_to_company            date32[day][pyarrow]
company_response_to_consumer         string[pyarrow]
timely_response                              boolean
consumer_disputed                            boolean
complaint_id                         string[pyarrow]
dtype: object

DataFrame.describe() 方法用于查询 DataFrame 中的一些基本统计信息。由于此 DataFrame 不包含任何数值列,因此它会显示非 null 值数量和不同值数量的摘要。

运行此单元

# Exclude some of the larger columns to make the query more efficient.
feedback.drop(columns=[
  "consumer_complaint_narrative",
  "company_public_response",
  "company_response_to_consumer",
]).describe()

预期输出:

         product  subproduct    issue  subissue  company_name    state ... timely_response  consumer_disputed  complaint_id
count    3458906     3223615  3458906   2759004       3458906  3417792 ...         3458906             768399       3458906
nunique       18          76      165       221          6694       63 ...               2                  2       3458906

4. 探索数据

在深入了解实际投诉之前,请使用 DataFrame 中的类 Pandas 方法来直观呈现数据。

直观呈现 DataFrame

有多种内置的可视化方法,例如 DataFrame.plot.hist()。由于此 DataFrame 主要包含字符串和布尔值数据,我们可以先进行一些汇总,以详细了解各个列。

统计每个州收到的投诉数量。

complaints_by_state = (
  feedback.groupby(
    "state", as_index=False,
  ).size()
  .rename(columns={"size": "total_complaints"})
  .sort_values(by="total_complaints", ascending=False)
)

使用 DataFrame.to_pandas() 方法将其转换为 Pandas DataFrame。

complaints_pd = complaints_by_state.head(10).to_pandas()

对下载的 DataFrame 使用 Pandas 可视化方法。

complaints_pd.plot.bar(x="state", y="total_complaints")

条形图:显示加利福尼亚州是投诉最多的州

与其他数据集联接

之前,您查看了各州的投诉数量,但这样会丢失重要的背景信息。有些州的居民人数比其他州多。与人口数据集(例如美国人口调查局的美国社区调查bigquery-public-data.geo_us_boundaries.states)联接。

us_states = bpd.read_gbq("bigquery-public-data.geo_us_boundaries.states")
us_survey = bpd.read_gbq("bigquery-public-data.census_bureau_acs.state_2020_5yr")

# Ensure there are leading 0s on GEOIDs for consistency across tables.
us_states = us_states.assign(
    geo_id=us_states["geo_id"].str.pad(2, fillchar="0")
)

us_survey = us_survey.assign(
    geo_id=us_survey["geo_id"].str.pad(2, fillchar="0")
)

美国社区调查通过地理 ID 识别州。与州/省/自治区/直辖市表联接,以按两个字母的州/省/自治区/直辖市代码获取人口。

pops = us_states.set_index("geo_id")[["state"]].join(
  us_survey.set_index("geo_id")[["total_pop"]]
)

现在,将此数据与投诉数据库联接,以比较人口与投诉数量。

complaints_and_pops = complaints_by_state.set_index("state").join(
    pops.set_index("state")
)

创建散点图,比较各州的人口与投诉数量。

(
  complaints_and_pops
  .to_pandas()
  .plot.scatter(x="total_pop", y="total_complaints")
)

比较人口与投诉的散点图

在比较人口与投诉数量时,有几个州似乎是离群点。读者可以自行练习绘制带有点标签的图表来识别这些点。同样,针对这种情况提出一些假设(例如,不同的人口统计特征、不同数量的金融服务公司等),并进行测试。

5. 计算嵌入

重要信息往往隐藏在文本、音频或图片等非结构化数据中。在此示例中,投诉数据库中的许多有用信息都包含在投诉的文本内容中。

AI 和传统技术(例如情感分析、“词袋”和 word2vec)可以从非结构化数据中提取一些定量信息。最近,与 LLM 密切相关的“向量嵌入”模型可以创建表示文本语义信息的一系列浮点数。

选择数据库的子集

运行向量嵌入模型比其他操作消耗更多资源。为了减少费用和配额问题,请为本教程的其余部分选择一个数据子集。

import bigframes.pandas as bpd

bpd.options.bigquery.ordering_mode = "partial"

feedback = bpd.read_gbq(
    "bigquery-public-data.cfpb_complaints.complaint_database"
)

# Note: if not using ordering_mode = "partial", you must specify these in read_gbq
# for these to affect query efficiency.
# feedback = bpd.read_gbq(
#    "bigquery-public-data.cfpb_complaints.complaint_database",
#     columns=["consumer_complaint_narrative"],
#     filters= [
#         ("consumer_complaint_narrative", "!=", ""),
#         ("date_received", "==", "2022-12-01")])

feedback.shape

与总数据库中近 350 万行数据相比,2022-12-01 提交的投诉约为 1,000 件(请通过 feedback.shape 进行检查)。

仅选择 2022 年 12 月 1 日的数据,并且仅选择 consumer_complaint_narrative 列。

import datetime

feedback = feedback[
    # Filter rows by passing in a boolean Series.
    (feedback["date_received"] == datetime.date(2022, 12, 1))
    & ~(feedback["date_received"].isnull())
    & ~(feedback["consumer_complaint_narrative"].isnull())
    & (feedback["consumer_complaint_narrative"] != "")
    & (feedback["state"] == "CA")

    # Uncomment the following if using free credits for a workshop.
    # Billing accounts with free credits have limited Vertex AI quota.
    # & (feedback["product"] == "Mortgage")
][
    # Filter columns by passing in a list of strings.
    ["consumer_complaint_narrative"]
]

feedback.shape

pandas 中的 drop_duplicates 方法需要对行进行全排序,因为它会尝试选择第一个或最后一个匹配的行,并保留与其关联的索引。

请改为通过调用 groupby 方法进行汇总,以去除重复的行。

feedback = (
  feedback.groupby("consumer_complaint_narrative", as_index=False)
  .size()
)[["consumer_complaint_narrative"]]

feedback.shape

生成嵌入

BigQuery DataFrames 通过 TextEmbeddingGenerator 类生成嵌入向量。这是基于 BigQuery ML 中的 ML.GENERATE_EMBEDDING 方法实现的,该方法会调用 Vertex AI 提供的文本嵌入模型

from bigframes.ml.llm import TextEmbeddingGenerator

embedding_model = TextEmbeddingGenerator(
    model_name="text-embedding-004"
)
feedback_embeddings = embedding_model.predict(feedback)

不妨看看嵌入是什么样的。这些向量表示文本嵌入模型所理解的文本语义含义。

feedback_embeddings.peek()

预期输出:

                        ml_generate_embedding_result  \
0  [ 7.36380890e-02  2.11779331e-03  2.54309829e-...   
1  [-1.10935252e-02 -5.53950183e-02  2.01338865e-...   
2  [-7.85628427e-03 -5.39347418e-02  4.51385677e-...   
3  [ 0.02013054 -0.0224789  -0.00164843  0.011354...   
4  [-1.51684484e-03 -5.02693094e-03  1.72322839e-...   

这些向量具有许多维度。查看单个嵌入向量:

feedback_embeddings["ml_generate_embedding_result"].peek().iloc[0]

嵌入生成功能采用“部分成功”合同。这意味着,某些行可能存在错误,无法生成嵌入。错误消息通过 'ml_generate_embedding_status' 列公开。空表示没有错误。

过滤嵌入内容,使其仅包含未发生错误的行。

mask = feedback_embeddings["ml_generate_embedding_status"] == ""
valid_embeddings = feedback_embeddings[mask]
valid_embeddings.shape

6. 使用文本嵌入进行聚类

现在,使用 k-means 对嵌入进行聚类。在此演示中,请使用任意数量的群组(也称为“形心”)。生产质量的解决方案应使用轮廓法等技术来调整质心数量。

from bigframes.ml.cluster import KMeans

num_clusters = 5
cluster_model = KMeans(n_clusters=num_clusters)
cluster_model.fit(valid_embeddings["ml_generate_embedding_result"])
clusters = cluster_model.predict(valid_embeddings)
clusters.peek()

移除所有嵌入失败。

mask = clusters["ml_generate_embedding_status"] == ""
clusters = clusters[mask]

查看每个质心的评论分布情况。

clusters.groupby("CENTROID_ID").size()

7. 总结聚类结果

提供与每个形心相关的一些评论,并让 Gemini 问问 Gemini 总结这些投诉。提示工程是一个新兴领域,但网上有很多不错的示例,例如 https://www.promptingguide.ai/。

from bigframes.ml.llm import GeminiTextGenerator

preamble = "What is the main concern in this list of user complaints:"
suffix = "Write the main issue using a formal tone."

# Now let's sample the raw comments and get the LLM to summarize them.
prompts = []
for centroid_id in range(1, num_clusters + 1):
  cluster = clusters[clusters["CENTROID_ID"] == centroid_id]
  comments = "\n".join(["- {0}".format(x) for x in cluster.content.peek(40)])
  prompts.append("{}:\n{}\n{}".format(preamble, comments, suffix))

prompt_df = bpd.DataFrame(prompts)
gemini = GeminiTextGenerator(model_name="gemini-1.5-flash-001")
issues = gemini.predict(X=prompt_df, temperature=0.0)
issues.peek()

使用 Gemini 根据摘要撰写报告。

from IPython.display import display, Markdown

prompt = "Turn this list of issues into a short, concise report:"
for value in issues["ml_generate_text_llm_result"]:
  prompt += "- {}".format(value)
prompt += "Using a formal tone, write a markdown text format report."

summary_df = bpd.DataFrame(([prompt]))
summary = gemini.predict(X=summary_df, temperature=0.0)

report = (summary["ml_generate_text_llm_result"].values[0])
display(Markdown(report))

8. 清理

如果您已为此教程创建新的 Google Cloud 云项目,可以将其删除,以免因创建的表格或其他资源而产生额外费用。

9. 恭喜!

您已使用 BigQuery DataFrames 分析了结构化和非结构化数据。在此过程中,您探索了 Google Cloud 的公共数据集、BigQuery Studio 中的 Python 笔记本、BigQuery ML、Vertex AI 以及 BigQuery Studio 的自然语言到 Python 功能。真了不起!

后续步骤