1. 概览
在本实验中,您将使用 BigQuery Studio 中的 Python 笔记本中的 BigQuery DataFrames,通过 Python 从数据中获取数据洞见。利用 Google 的生成式 AI 分析和直观呈现非结构化文本数据。
您将创建一个 Python 笔记本,用于对公开的客户投诉数据库进行分类和总结。此方法可经过调整,以处理任何非结构化文本数据。
目标
在本实验中,您将学习如何执行以下任务:
- 在 BigQuery Studio 中启用和使用 Python 笔记本
- 使用 BigQuery DataFrames 软件包连接到 BigQuery
- 使用 BigQuery ML 并连接到 Vertex AI 中的文本嵌入端点,根据非结构化文本数据创建嵌入
- 使用 BigQuery ML 对嵌入进行聚类
- 通过 BigQuery ML 使用 LLM 总结聚类
2. 要求
准备工作
如需按照此 Codelab 中的说明操作,您需要一个启用了 BigQuery Studio 且已关联结算账号的 Google Cloud 项目。
- 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
- 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
- 按照说明启用 BigQuery Studio 进行资产管理。
准备 BigQuery Studio
创建一个空笔记本并将其连接到运行时。
- 在 Google Cloud 控制台中前往 BigQuery Studio。
- 点击 + 按钮旁边的 ▼。
- 选择 Python 笔记本。
- 关闭模板选择器。
- 选择 + 代码 以创建新的代码单元。
- 从代码单元格安装最新版本的 BigQuery DataFrames 软件包。输入以下命令。
点击 🞂 按钮或按 Shift + Enter 键,运行代码单元。%pip install --upgrade bigframes --quiet
3. 读取公共数据集
通过在新的代码单元中运行以下代码来初始化 BigQuery DataFrames 软件包:
import bigframes.pandas as bpd
bpd.options.bigquery.ordering_mode = "partial"
注意:在本教程中,我们使用了实验性的“部分排序模式”,该模式与类似 Pandas 的过滤条件搭配使用时,可以实现更高效的查询。需要严格排序或索引的某些 pandas 功能可能无法正常运行。
消费者投诉数据库
消费者投诉数据库通过 Google Cloud 的公共数据集计划在 BigQuery 上提供。此数据集包含有关消费者金融产品和服务的投诉,数据由美国消费者金融保护局收集。
在 BigQuery 中,查询 bigquery-public-data.cfbp_complaints.complaint_database 表,以分析消费者投诉数据库。使用 bigframes.pandas.read_gbq() 方法可根据查询字符串或表 ID 创建 DataFrame。
在新的代码单元中运行以下代码,以创建名为“feedback”的 DataFrame:
feedback = bpd.read_gbq(
"bigquery-public-data.cfpb_complaints.complaint_database"
)
了解 DataFrame 的基本信息
使用 DataFrame.peek() 方法下载少量数据样本。
运行此单元:
feedback.peek()
预期输出:
date_received product ... timely_response consumer_disputed complaint_id
0 2014-03-05 Bank account or service ... True False 743665
1 2014-01-21 Bank account or service ... True False 678608
2 2020-12-31 Debt collection ... True <NA> 4041190
3 2014-02-12 Debt collection ... True False 714350
4 2015-02-23 Debt collection ... True False 1251358
注意:head() 需要排序,如果您想直观呈现部分数据,head() 的效率通常不如 peek()。
与 pandas 类似,您可以使用 DataFrame.dtypes 属性查看所有可用的列及其对应的数据类型。这些数据以与 Pandas 兼容的方式公开。
运行此单元:
feedback.dtypes
预期输出:
date_received date32[day][pyarrow]
product string[pyarrow]
subproduct string[pyarrow]
issue string[pyarrow]
subissue string[pyarrow]
consumer_complaint_narrative string[pyarrow]
company_public_response string[pyarrow]
company_name string[pyarrow]
state string[pyarrow]
zip_code string[pyarrow]
tags string[pyarrow]
consumer_consent_provided string[pyarrow]
submitted_via string[pyarrow]
date_sent_to_company date32[day][pyarrow]
company_response_to_consumer string[pyarrow]
timely_response boolean
consumer_disputed boolean
complaint_id string[pyarrow]
dtype: object
DataFrame.describe() 方法用于查询 DataFrame 中的一些基本统计信息。由于此 DataFrame 不包含任何数值列,因此它会显示非 null 值数量和不同值数量的摘要。
运行此单元:
# Exclude some of the larger columns to make the query more efficient.
feedback.drop(columns=[
"consumer_complaint_narrative",
"company_public_response",
"company_response_to_consumer",
]).describe()
预期输出:
product subproduct issue subissue company_name state ... timely_response consumer_disputed complaint_id
count 3458906 3223615 3458906 2759004 3458906 3417792 ... 3458906 768399 3458906
nunique 18 76 165 221 6694 63 ... 2 2 3458906
4. 探索数据
在深入了解实际投诉之前,请使用 DataFrame 中的类 Pandas 方法来直观呈现数据。
直观呈现 DataFrame
有多种内置的可视化方法,例如 DataFrame.plot.hist()。由于此 DataFrame 主要包含字符串和布尔值数据,我们可以先进行一些汇总,以详细了解各个列。
统计每个州收到的投诉数量。
complaints_by_state = (
feedback.groupby(
"state", as_index=False,
).size()
.rename(columns={"size": "total_complaints"})
.sort_values(by="total_complaints", ascending=False)
)
使用 DataFrame.to_pandas() 方法将其转换为 Pandas DataFrame。
complaints_pd = complaints_by_state.head(10).to_pandas()
对下载的 DataFrame 使用 Pandas 可视化方法。
complaints_pd.plot.bar(x="state", y="total_complaints")

与其他数据集联接
之前,您查看了各州的投诉数量,但这样会丢失重要的背景信息。有些州的居民人数比其他州多。与人口数据集(例如美国人口调查局的美国社区调查和 bigquery-public-data.geo_us_boundaries.states 表)联接。
us_states = bpd.read_gbq("bigquery-public-data.geo_us_boundaries.states")
us_survey = bpd.read_gbq("bigquery-public-data.census_bureau_acs.state_2020_5yr")
# Ensure there are leading 0s on GEOIDs for consistency across tables.
us_states = us_states.assign(
geo_id=us_states["geo_id"].str.pad(2, fillchar="0")
)
us_survey = us_survey.assign(
geo_id=us_survey["geo_id"].str.pad(2, fillchar="0")
)
美国社区调查通过地理 ID 识别州。与州/省/自治区/直辖市表联接,以按两个字母的州/省/自治区/直辖市代码获取人口。
pops = us_states.set_index("geo_id")[["state"]].join(
us_survey.set_index("geo_id")[["total_pop"]]
)
现在,将此数据与投诉数据库联接,以比较人口与投诉数量。
complaints_and_pops = complaints_by_state.set_index("state").join(
pops.set_index("state")
)
创建散点图,比较各州的人口与投诉数量。
(
complaints_and_pops
.to_pandas()
.plot.scatter(x="total_pop", y="total_complaints")
)

在比较人口与投诉数量时,有几个州似乎是离群点。读者可以自行练习绘制带有点标签的图表来识别这些点。同样,针对这种情况提出一些假设(例如,不同的人口统计特征、不同数量的金融服务公司等),并进行测试。
5. 计算嵌入
重要信息往往隐藏在文本、音频或图片等非结构化数据中。在此示例中,投诉数据库中的许多有用信息都包含在投诉的文本内容中。
AI 和传统技术(例如情感分析、“词袋”和 word2vec)可以从非结构化数据中提取一些定量信息。最近,与 LLM 密切相关的“向量嵌入”模型可以创建表示文本语义信息的一系列浮点数。
选择数据库的子集
运行向量嵌入模型比其他操作消耗更多资源。为了减少费用和配额问题,请为本教程的其余部分选择一个数据子集。
import bigframes.pandas as bpd
bpd.options.bigquery.ordering_mode = "partial"
feedback = bpd.read_gbq(
"bigquery-public-data.cfpb_complaints.complaint_database"
)
# Note: if not using ordering_mode = "partial", you must specify these in read_gbq
# for these to affect query efficiency.
# feedback = bpd.read_gbq(
# "bigquery-public-data.cfpb_complaints.complaint_database",
# columns=["consumer_complaint_narrative"],
# filters= [
# ("consumer_complaint_narrative", "!=", ""),
# ("date_received", "==", "2022-12-01")])
feedback.shape
与总数据库中近 350 万行数据相比,2022-12-01 提交的投诉约为 1,000 件(请通过 feedback.shape 进行检查)。
仅选择 2022 年 12 月 1 日的数据,并且仅选择 consumer_complaint_narrative 列。
import datetime
feedback = feedback[
# Filter rows by passing in a boolean Series.
(feedback["date_received"] == datetime.date(2022, 12, 1))
& ~(feedback["date_received"].isnull())
& ~(feedback["consumer_complaint_narrative"].isnull())
& (feedback["consumer_complaint_narrative"] != "")
& (feedback["state"] == "CA")
# Uncomment the following if using free credits for a workshop.
# Billing accounts with free credits have limited Vertex AI quota.
# & (feedback["product"] == "Mortgage")
][
# Filter columns by passing in a list of strings.
["consumer_complaint_narrative"]
]
feedback.shape
pandas 中的 drop_duplicates 方法需要对行进行全排序,因为它会尝试选择第一个或最后一个匹配的行,并保留与其关联的索引。
请改为通过调用 groupby 方法进行汇总,以去除重复的行。
feedback = (
feedback.groupby("consumer_complaint_narrative", as_index=False)
.size()
)[["consumer_complaint_narrative"]]
feedback.shape
生成嵌入
BigQuery DataFrames 通过 TextEmbeddingGenerator 类生成嵌入向量。这是基于 BigQuery ML 中的 ML.GENERATE_EMBEDDING 方法实现的,该方法会调用 Vertex AI 提供的文本嵌入模型。
from bigframes.ml.llm import TextEmbeddingGenerator
embedding_model = TextEmbeddingGenerator(
model_name="text-embedding-004"
)
feedback_embeddings = embedding_model.predict(feedback)
不妨看看嵌入是什么样的。这些向量表示文本嵌入模型所理解的文本语义含义。
feedback_embeddings.peek()
预期输出:
ml_generate_embedding_result \
0 [ 7.36380890e-02 2.11779331e-03 2.54309829e-...
1 [-1.10935252e-02 -5.53950183e-02 2.01338865e-...
2 [-7.85628427e-03 -5.39347418e-02 4.51385677e-...
3 [ 0.02013054 -0.0224789 -0.00164843 0.011354...
4 [-1.51684484e-03 -5.02693094e-03 1.72322839e-...
这些向量具有许多维度。查看单个嵌入向量:
feedback_embeddings["ml_generate_embedding_result"].peek().iloc[0]
嵌入生成功能采用“部分成功”合同。这意味着,某些行可能存在错误,无法生成嵌入。错误消息通过 'ml_generate_embedding_status' 列公开。空表示没有错误。
过滤嵌入内容,使其仅包含未发生错误的行。
mask = feedback_embeddings["ml_generate_embedding_status"] == ""
valid_embeddings = feedback_embeddings[mask]
valid_embeddings.shape
6. 使用文本嵌入进行聚类
现在,使用 k-means 对嵌入进行聚类。在此演示中,请使用任意数量的群组(也称为“形心”)。生产质量的解决方案应使用轮廓法等技术来调整质心数量。
from bigframes.ml.cluster import KMeans
num_clusters = 5
cluster_model = KMeans(n_clusters=num_clusters)
cluster_model.fit(valid_embeddings["ml_generate_embedding_result"])
clusters = cluster_model.predict(valid_embeddings)
clusters.peek()
移除所有嵌入失败。
mask = clusters["ml_generate_embedding_status"] == ""
clusters = clusters[mask]
查看每个质心的评论分布情况。
clusters.groupby("CENTROID_ID").size()
7. 总结聚类结果
提供与每个形心相关的一些评论,并让 Gemini 问问 Gemini 总结这些投诉。提示工程是一个新兴领域,但网上有很多不错的示例,例如 https://www.promptingguide.ai/。
from bigframes.ml.llm import GeminiTextGenerator
preamble = "What is the main concern in this list of user complaints:"
suffix = "Write the main issue using a formal tone."
# Now let's sample the raw comments and get the LLM to summarize them.
prompts = []
for centroid_id in range(1, num_clusters + 1):
cluster = clusters[clusters["CENTROID_ID"] == centroid_id]
comments = "\n".join(["- {0}".format(x) for x in cluster.content.peek(40)])
prompts.append("{}:\n{}\n{}".format(preamble, comments, suffix))
prompt_df = bpd.DataFrame(prompts)
gemini = GeminiTextGenerator(model_name="gemini-1.5-flash-001")
issues = gemini.predict(X=prompt_df, temperature=0.0)
issues.peek()
使用 Gemini 根据摘要撰写报告。
from IPython.display import display, Markdown
prompt = "Turn this list of issues into a short, concise report:"
for value in issues["ml_generate_text_llm_result"]:
prompt += "- {}".format(value)
prompt += "Using a formal tone, write a markdown text format report."
summary_df = bpd.DataFrame(([prompt]))
summary = gemini.predict(X=summary_df, temperature=0.0)
report = (summary["ml_generate_text_llm_result"].values[0])
display(Markdown(report))
8. 清理
如果您已为此教程创建新的 Google Cloud 云项目,可以将其删除,以免因创建的表格或其他资源而产生额外费用。
9. 恭喜!
您已使用 BigQuery DataFrames 分析了结构化和非结构化数据。在此过程中,您探索了 Google Cloud 的公共数据集、BigQuery Studio 中的 Python 笔记本、BigQuery ML、Vertex AI 以及 BigQuery Studio 的自然语言到 Python 功能。真了不起!
后续步骤
- 不妨尝试在笔记本中生成 Python 代码。BigQuery Studio 中的 Python 笔记本由 Colab Enterprise 提供支持。提示:我发现让 Gemini 帮忙生成测试数据非常有用。
- 在 GitHub 上探索 BigQuery DataFrames 的示例笔记本。
- 创建在 BigQuery Studio 中运行笔记本的调度。
- 部署使用 BigQuery DataFrames 的远程函数,以将第三方 Python 软件包与 BigQuery 集成。