1. 准备工作
您可能以为,汇总统计信息不会泄露相关个人的任何信息。然而,攻击者可以通过诸多手段从汇总统计信息获悉有关个人的敏感信息。
在此 Codelab 中,您将学习如何使用 PipelineDP 中的差分隐私汇总来生成隐私统计信息,以保护个人隐私。PipelineDP 是一个 Python 框架,可让您通过批处理系统(例如 Apache Spark 和 Apache Beam)将差分隐私应用于大型数据集。如需详细了解如何在 Go 中计算差分隐私统计信息,请参阅 Privacy on Beam Codelab。
隐私是指用于生成输出结果的方式不会泄露数据中有关个人的任何隐私信息。您可以通过差分隐私来实现此结果,这是一种作用强大的匿名化隐私概念,即通过汇总多个用户的数据来保护用户隐私。所有匿名化方法都会使用汇总方法,但并非所有汇总方法都能实现匿名化。另一方面,差分隐私还可以在信息泄露和隐私保护方面提供可衡量的保证。
前提条件
- 熟悉 Python
- 熟悉基本的数据汇总
- 有使用 pandas、Spark 和 Beam 的经验
学习内容
- 差分隐私的基础知识
- 如何使用 PipelineDP 计算差分隐私摘要统计信息
- 如何通过添加隐私权和实用性参数来调整结果
所需条件
- 如果您想在自己的环境中运行此 Codelab,则需要在计算机上安装 Python 3.7 或更高版本。
- 如果您想在没有自己的环境的情况下完成此 Codelab,则需要访问 Colaboratory。
2. 了解差分隐私
为了更好地了解差分隐私,请看以下简单示例。
假设您在一家在线时尚零售商的营销部门工作,并且想了解哪些产品最有可能畅销。
此图表显示了客户在访问商店网站时首先查看了哪些商品:T 恤、套头衫、袜子或牛仔裤。T 恤是最受欢迎的商品,而袜子是最不受欢迎的商品。

此图表看起来很实用,但它存在一个弊端。如果您想考虑其他信息(例如客户是否购买了商品或他们第二个查看的产品),则可能会在数据中泄露个人身份信息。
此图表显示,只有一位客户先查看了套头衫,然后实际进行了购买:

从隐私角度而言,这是一个弊端。匿名化统计信息不应泄露个体的计入情况,那么您该怎么做?您向条形图添加了随机噪声,略微降低了条形图的准确性!
此条形图并非完全准确,但仍然有用,而且不会泄露个体的计入情况:

差分隐私是指添加适量的随机噪声来掩盖个体的贡献。
此示例过于简单。正确实现差分隐私涉及更多问题,还存在许多出人意料的细微实现差异。与加密类似,自行创建差分隐私实现可能并不是一个好主意。您可以改用 PipelineDP。
3. 下载并安装 PipelineDP
您无需安装 PipelineDP 即可完成本 Codelab,因为您可以在本文档中找到所有相关代码和图表。
如需试用 PipelineDP、自行运行或稍后使用,请执行以下操作:
- 下载并安装 PipelineDP:
pip install pipeline-dp
如果您想使用 Apache Beam 运行示例,请执行以下操作:
- 下载并安装 Apache Beam:
pip install apache_beam
您可以在 PipelineDP/examples/codelab/ 目录中找到此 Codelab 的代码和数据集。
4. 计算每次观看的第一个商品的转化指标
假设您在一家在线时尚零售商工作,并且想了解在首次观看时,您的不同产品类别中哪一类带来的转化次数和转化价值最高。您希望与营销代理机构和其他内部团队分享这些信息,但又不想泄露任何个人客户的信息。
若要计算网站上每次首次查看商品时的转化指标,请执行以下操作:
- 查看
PipelineDP/examples/codelab/目录中网站访问的模拟数据集。
此屏幕截图显示了数据集示例。它包含用户 ID、用户查看过的商品、访问者是否转化,以及(如果转化)转化价值。
user_id | product_view_0 | product_view_1 | product_view_2 | product_view_3 | product_view_4 | has_conversion | conversion_value |
0 | 牛仔裤 | t_shirt | t_shirt | 无 | 无 | false | 0.0 |
1 | 牛仔裤 | t_shirt | 牛仔裤 | 跳线 | 无 | false | 0.0 |
2 | t_shirt | 跳线 | t_shirt | t_shirt | 无 | true | 105.19 |
3 | t_shirt | t_shirt | 牛仔裤 | 无 | 无 | false | 0.0 |
4 | t_shirt | 袜子 | 牛仔裤 | 牛仔裤 | 无 | false | 0.0 |
您对以下指标感兴趣:
view_counts:网站访问者首次看到每种商品的次数。total_conversion_value:访问者在转化时花费的总金额。conversion_rate:访问者的转化率。
- 以非私密方式生成指标:
conversion_metrics = df.groupby(['product_view_0'
])[['conversion_value', 'has_conversion']].agg({
'conversion_value': [len, np.sum],
'has_conversion': np.mean
})
conversion_metrics = conversion_metrics.rename(
columns={
'len': 'view_counts',
'sum': 'total_conversion_value',
'mean': 'conversion_rate'
}).droplevel(
0, axis=1)
正如您之前了解到的,这些统计信息可能会泄露数据集中的个人信息。例如,只有一个人在先看到毛衣后完成了转化。对于 22 次观看,您的转化率约为 0.05。现在,您需要将每个条形图转换为隐私图表。
- 使用
pipeline_dp.NaiveBudgetAccountant类定义隐私参数,然后指定要用于分析的epsilon和delta实参。
如何设置这些实参取决于您的具体问题。如需详细了解这些参数,请参阅“可选:调整差分隐私参数”。
此代码段使用了示例值:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
- 初始化
LocalBackend实例:
ops = pipeline_dp.LocalBackend()
由于您在本地运行此程序,而没有使用其他框架(例如 Beam 或 Spark),因此可以使用 LocalBackend 实例。
- 初始化
DPEngine实例:
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
通过 pipeline_dp.AggregateParams 类,PipelineDP 可让您指定更多参数,从而影响私密统计信息的生成。
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1,
max_contributions_per_partition=1)
- 指定要计算
count指标并使用LAPLACE噪声分布。 - 将
max_partitions_contributed实参设置为1值。
此实参用于限制用户可计入多少个不同的访问。您预计用户每天会访问一次网站,并且不在意他们是否一天当中多次访问网站。
- 将
max_contributions_per_partitions实参设置为1值。
此参数用于指定单个访问者可对单个分区(在本例中为商品类别)做出的贡献数量。
- 创建一个
data_extractor实例,用于指定在何处查找privacy_id、partition和value字段。
您的代码应如以下代码段所示:
def run_pipeline(data, ops):
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1, # A single user can only contribute to one partition.
max_contributions_per_partition=1, # For a single partition, only one contribution per user is used.
)
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0
value_extractor=lambda row: row.has_conversion)
dp_result = dp_engine.aggregate(data, params, data_extractors)
budget_accountant.compute_budgets()
return dp_result
- 添加以下代码,将 Pandas DataFrame 转换为行列表,您可以直接根据该列表计算差分隐私统计信息:
rows = [index_row[1] for index_row in df.iterrows()]
dp_result_local = run_pipeline(rows, ops) # Returns generator
list(dp_result_local)
恭喜!您已完成第一次差分隐私统计信息的计算!
此图表显示了差分隐私计数结果以及您之前计算的非隐私计数结果:

您在运行代码时生成的条形图可能与此图表不同,这没关系。由于差分隐私中使用了噪声,每次运行代码时都会生成不同的条形图,但是您会发现,这些图表与原始的非隐私条形图相似。
请注意,为了保证隐私,不要多次运行流水线,这一点对于保证隐私非常重要。如需了解详情,请参阅“计算多项统计信息”。
5. 使用公开分区
在上一部分中,您可能已经注意到,您去除了某个分区的所有光临数据,即首次在您网站上看到袜子的访问者。
这是选择所需部分或设定阈值造成的,当各输出部分是否存在取决于用户数据本身时,这一步对于确保差分隐私得到保证非常重要。在这种情况下,仅凭输出中存在某个部分就会泄露数据中存在单个用户的事实。如需详细了解此行为为何侵犯隐私权,请参阅这篇博文。为了防止出现这种隐私权违规情况,PipelineDP 仅保留包含足够数量的用户的那些分区。
当输出分区列表不依赖于用户数据时,您不需要执行此分区选择步骤。在您的示例中,情况确实如此,因为您知道客户可能首先看到的所有商品类别。
如需使用分区,请执行以下操作:
- 创建可能的分区列表:
public_partitions_products = ['jeans', 'jumper', 'socks', 't-shirt']
- 将该列表传递给
run_pipeline()函数,该函数会将其设置为pipeline_dp.AggregateParams类的附加输入:
run_pipeline(
rows, ops, total_delta=0, public_partitions=public_partitions_products)
# Returns generator
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1,
max_contributions_per_partition=1,
public_partitions=public_partitions_products)
如果您使用公开分区和 LAPLACE 噪声,可以将 total_delta 实参设置为 0 值。
现在,您可以在结果中看到所有分区或产品的数据。

公共分区不仅能让您保留更多分区,还能减少大约一半的噪声,因为您无需在选择分区上花费任何隐私预算,因此与上一次运行流水线相比,原始计数与隐私计数之间的差异略小。
使用公开分区时,有两个重要注意事项:
- 从原始数据派生分区列表时,请务必小心。如果不以差分隐私方式执行此操作,流水线就无法再提供差分隐私保证。如需了解详情,请参阅“高级:从数据中派生分区”。
- 如果某些公开部分没有数据,您需要对这些部分应用噪声以保护差分隐私。例如,如果您使用了数据集或网站中未出现的其他商品(例如裤子),这仍然是噪声,结果可能会显示对商品的某些访问,但实际上并没有。
高级:从数据中派生分区
如果您在同一流水线中以相同的非公开输出分区列表运行多次汇总,可以使用 dp_engine.select_private_partitions() 方法派生一次所需分区列表,然后将这些分区作为 public_partitions 输入提供给每次汇总。这样做不仅能从隐私角度保护安全,还可以减少需要添加的噪声,因为整个流水线只需在选择所需部分上花一次隐私预算。
def get_private_product_views(data, ops):
"""Obtains the list of product_views in a private manner.
This does not calculate any private metrics; it merely obtains the list of
product_views but does so while making sure the result is differentially private.
"""
# Set the total privacy budget.
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
# Specify how to extract privacy_id, partition_key, and value from a
# single element.
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda row: row.product_view_0,
privacy_id_extractor=lambda row: row.user_id)
# Run aggregation.
dp_result = dp_engine.select_partitions(
data, pipeline_dp.SelectPrivatePartitionsParams(
max_partitions_contributed=1),
data_extractors=data_extractors)
budget_accountant.compute_budgets()
return dp_result
6. 计算多项统计信息
现在,您已经了解 PipelineDP 的运作方式,可以看看如何将其用于一些更高级的使用场景。如开头所述,您对以下三项统计数据感兴趣。PipelineDP 可让您同时计算多个统计信息,前提是这些统计信息在 AggregateParams 实例中共享相同的参数,这一点您稍后会看到。这样不仅更简洁,而且可以一次性计算多个指标,还能更好地保护隐私。
还记得您提供给 NaiveBudgetAccountant 类的 epsilon 和 delta 参数吧,它们代表所谓的 隐私预算,用于衡量您从数据中泄露的用户隐私量。
关于隐私预算的一个重要注意事项是它是累积的。如果使用特定的 epsilon ε 和 delta δ 运行一次某个流水线,所花预算为 (ε,δ)。如果第二次运行该流水线,所花总预算为 (2ε, 2δ)。同理,如果使用 NaiveBudgetAccountant 方法计算多次统计信息,并连续使用隐私预算 ε,δ,所花总预算为 (2ε, 2δ)。这意味着,隐私保证水平正在逐步降低。
为了避免发生这种情况,当您需要基于相同的数据计算多次统计信息时,应使用包含您想使用的总预算的单个 NaiveBudgetAccountant 实例。然后,您需要指定要用于每次汇总的 epsilon 和 delta 值。最后的结果将是总体的隐私保证水平保持不变,而具体某一次汇总的 epsilon 和 delta 值越高,其准确性就会越高。
若要查看其实际效果,您可以计算 count、mean 和 sum 统计信息。
您需要基于两种不同的指标计算统计信息:conversion_value 指标,用于根据用户首先查看的产品推断产生的收入金额;has_conversion 指标,用于计算网站的访问者数量和平均转化率。
对于每个指标,您都需要单独指定用于指导私密统计信息计算的参数。您在两个指标之间分配了隐私保护预算。您要根据 has_conversion 指标计算两项统计信息,因此您希望将初始预算的三分之二分配给该指标,并将剩余的三分之一分配给 conversion_value 指标。
如需计算多项统计信息,请执行以下操作:
- 使用您要在以下三种统计信息中使用的总
epsilon和delta值来设置隐私保护预算会计师:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
- 初始化
DPEngine以计算指标:
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
- 为此指标指定参数。
params_conversion_value_metrics = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.SUM],
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=100,
public_partitions=public_partitions,
budget_weight=1/3)
最后一个实参可选择性地指定隐私预算的权重。您可以为所有变量赋予相同的权重,但如前所述,您希望将此实参设置为三分之一。
您还可以设置 min_value 和 max_value 实参,以指定应用于分区中隐私单元贡献的值的下限和上限。如果您想计算私有总和或平均值,则必须提供这些参数。您不希望出现负值,因此可以将 0 和 100 视为合理的界限。
- 提取相关数据,然后将其传递给聚合函数:
data_extractors_conversion_value_metrics = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row: row.conversion_value)
dp_result_conversion_value_metrics = (
dp_engine.aggregate(data, params_conversion_value_metrics,
data_extractors_conversion_value_metrics))
- 按照相同的步骤,根据
has_conversion变量计算这两个指标:
params_conversion_rate_metrics = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.MEAN],
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=1,
public_partitions=public_partitions,
budget_weight=2/3)
data_extractors_conversion_rate_metrics = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row: row.has_conversion)
dp_result_conversion_rate_metrics = (
dp_engine.aggregate(data, params_conversion_rate_metrics,
data_extractors_conversion_rate_metrics))
唯一的变化是 pipeline_dp.AggregateParams 实例,您现在将 mean 和 count 定义为汇总,并将隐私保护预算的三分之二分配给此计算。由于您希望两种统计信息的贡献界限相同,并基于同一 has_conversion 变量计算这两种统计信息,因此您可以将它们合并到同一 pipeline_dp.AggregateParams 实例中,并同时计算它们。
- 调用
budget_accountant.compute_budgets()方法:
budget_accountant.compute_budgets()
您可以绘制所有这三种私有统计信息,并将其与原始统计信息进行比较。根据添加的噪声,您会发现结果实际上可能会超出合理范围。在此示例中,您会看到跳出者的转化率和总转化价值为负值,因为添加的噪声以零为中心对称分布。为了便于进一步分析和处理,最好不要手动对私密统计信息进行后处理,但如果您想将这些图表添加到报告中,可以在之后将最小值简单地设置为零,而不会违反隐私保护保证。

7. 使用 Beam 运行流水线
现在的数据处理需要处理海量数据,以至于无法在本地处理。相反,许多人会使用 Beam 或 Spark 等大规模数据处理框架,并在云端运行流水线。
PipelineDP 支持 Beam 和 Spark,只需对代码进行细微更改。
如需使用 Beam 和 private_beam API 运行流水线,请执行以下操作:
- 初始化
runner变量,然后创建一个流水线,在该流水线中,您将隐私权操作应用于rows的 Beam 表示形式:
runner = fn_api_runner.FnApiRunner() # local runner
with beam.Pipeline(runner=runner) as pipeline:
beam_data = pipeline | beam.Create(rows)
- 使用所需的隐私参数创建
budget_accountant变量:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
- 创建
pcol(即不公开集合)变量,以确保所有汇总都符合您的隐私权要求:
pcol = beam_data | pbeam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda
row: row.user_id)
- 在相应类中指定私密聚合的参数。
在此示例中,您使用 pipeline_dp.aggregate_params.SumParams() 类,因为您要计算商品观看次数的总和。
- 将您的汇总参数传递给
pbeam.Sum方法,以计算统计信息:
dp_result = pcol | pbeam.Sum(params)
- 最后,您的代码应如以下代码段所示:
import pipeline_dp.private_beam as pbeam
runner = fn_api_runner.FnApiRunner() # local runner
with beam.Pipeline(runner=runner) as pipeline:
beam_data = pipeline | beam.Create(rows)
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
# Create private collection.
pcol = beam_data | pbeam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda row:
row.user_id)
# Specify parameters.
params = pipeline_dp.aggregate_params.SumParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=100,
public_partitions=public_partitions_product_views,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row:row.conversion_value)
dp_result = pcol | pbeam.Sum(params)
budget_accountant.compute_budgets()
dp_result | beam.Map(print)
8. 可选:调整隐私保护和实用性参数
您已经看到此 Codelab 中提及的相当多参数,例如 epsilon、delta 和 max_partitions_contributed 参数。我们可以将其大致分为两个类别:隐私参数和效用参数。
与隐私保护有关的参数
epsilon 和 delta 参数用于量化您通过差分隐私提供的隐私保护程度。更确切地说,它们是一种度量,用于衡量潜在攻击者通过查看匿名输出能获取有关数据的多少信息。参数值越高,攻击者获取的有关数据的信息就越多,这会带来隐私风险。另一方面,epsilon 和 delta 参数的值越低,您需要添加到输出中以实现匿名化的噪声就越多,每个部分为了保留在匿名输出中而需要包含的唯一身份用户数量也越多。在这种情况下,需要在效用参数与隐私参数之间进行权衡。
在 PipelineDP 中,当您在 NaiveBudgetAccountant 实例中设置总隐私预算时,您需要指定匿名输出所需的隐私保证。需要注意的是,如果您希望保持隐私保证水平,就必须谨慎地为每次汇总使用单独的 NaiveBudgetAccountant 实例,或者多次运行流水线,以免过度使用预算。
如需详细了解差分隐私以及隐私参数的含义,请参阅差分隐私阅读清单。
效用参数
效用参数不会影响隐私保证,但会影响准确性,进而影响输出的效用。这些参数在 AggregateParams 实例中提供,用于调节添加的噪声。
AggregateParams 实例中提供并适用于所有汇总的效用参数是 max_partitions_contributed 参数。一个分区对应于 PipelineDP 汇总操作返回的数据的一个键,因此 max_partitions_contributed 参数限制了一个用户可以在输出中贡献的不同键值的数量。如果用户计入的键数超过 max_partitions_contributed 参数的值,系统会去除部分计入,使其计入的键数正好等于 max_partitions_contributed 参数的值。
同样,大多数汇总都有一个 max_contributions_per_partition 参数。AggregateParams 实例中也会提供这些参数,每个汇总的这些参数可以有不同的值。它们对一个用户计入每个键的次数做出了限制。
添加到输出中的噪声根据 max_partitions_contributed 和 max_contributions_per_partition 参数进行调节,因此需要在此加以权衡:为每个参数分配的值越大,意味着可以保留更多数据,但最终得出的结果受噪声影响也更大。
有些汇总需要 min_value 和 max_value 参数,这些参数用于对每个用户的计入指定限制。如果用户计入的值小于分配给 min_value 参数的值,该值会增加到相应参数的值。同样,如果用户计入的值大于 max_value 参数的值,该值会减小到该参数的值。为了保留更多原始值,必须指定更大的限制。噪声根据限制的大小进行调节,因此限制较大意味着可以保留更多数据,但最终得出的结果受噪声影响也更大。
最后,noise_kind 参数支持 PipelineDP 中的两种不同噪声机制:GAUSSIAN 噪声和 LAPLACE 噪声。LAPLACE 分布以较低的贡献界限提供更高的效用,因此 PipelineDP 默认使用此分布。不过,如果您想使用 GAUSSIAN 分布噪声,可以在 AggregateParams 实例中指定它。
9. 恭喜
太棒了!您已完成 PipelineDP Codelab,并学到了许多有关差分隐私和 PipelineDP 的知识。