1. 事前準備
您可能會認為匯總統計資料不會洩漏任何相關個人的資訊。不過,攻擊者可以透過匯總統計資料,以多種方式取得個人私密資訊。
在本程式碼研究室中,您將瞭解如何透過 PipelineDP 的差異化隱私匯總功能產生私人統計資料,保護個人隱私。PipelineDP 是一個 Python 架構,可讓您透過批次處理系統 (例如 Apache Spark 和 Apache Beam),對大型資料集套用差異隱私權。如要進一步瞭解如何在 Go 中計算差異隱私統計資料,請參閱「Privacy on Beam」程式碼研究室。
私密是指輸出內容的產生方式不會洩漏資料中個人的任何私密資訊。您可以透過差異隱私達成這個目標。差異隱私是去識別化的強大隱私概念,也就是匯總多位使用者的資料,以保護使用者隱私。所有去識別化方法都會使用匯總,但並非所有匯總方法都能達到去識別化效果。另一方面,差異化隱私權可提供資訊洩漏和隱私權的可測量保證。
必要條件
- 熟悉 Python
- 熟悉基本資料匯總
- 熟悉 pandas、Spark 和 Beam
課程內容
- 差異化隱私的基本概念
- 如何使用 PipelineDP 計算差異化隱私摘要統計資料
- 如何使用額外的隱私權和實用性參數調整結果
軟硬體需求
- 如要在自己的環境中執行程式碼研究室,電腦必須安裝 Python 3.7 以上版本。
- 如要在沒有自己環境的情況下完成程式碼研究室,您需要存取 Colaboratory。
2. 瞭解差異化隱私
如要進一步瞭解差異化隱私,請參閱這個簡單的例子。
假設您在線上時裝零售商的行銷部門工作,並想瞭解哪些產品最有可能售出。
這張圖表顯示顧客造訪商店網站時,最先查看的產品:T 恤、毛衣、襪子或牛仔褲。T 恤是最熱銷的商品,襪子則最不受歡迎。

這項功能看似實用,但有幾點需要注意。如果您想納入其他資訊 (例如顧客是否完成購買,或他們第二個瀏覽的產品),資料中就可能出現個人資訊。
這張圖表顯示,只有一位顧客先查看連帽上衣,然後實際購買:

從隱私權的角度來看,這並非理想做法。匿名統計資料不應揭露個別貢獻,因此該怎麼做?在長條圖中加入隨機雜訊,讓圖表不那麼準確!
這張長條圖並非完全準確,但仍有參考價值,且不會顯示個別貢獻:

差異化隱私會在資料中加入適量的隨機雜訊,遮蓋個別貢獻。
這個範例過於簡化。正確實作差異化隱私權需要更多步驟,而且實作時可能會遇到許多意想不到的細微之處。與密碼編譯類似,自行實作差異隱私權可能不是好主意。請改用 PipelineDP。
3. 下載並安裝 PipelineDP
您不需要安裝 PipelineDP 即可完成本程式碼研究室,因為本文中會提供所有相關程式碼和圖表。
如要使用 PipelineDP,請自行執行或稍後再用:
- 下載並安裝 PipelineDP:
pip install pipeline-dp
如要使用 Apache Beam 執行範例,請按照下列步驟操作:
- 下載並安裝 Apache Beam:
pip install apache_beam
您可以在 PipelineDP/examples/codelab/ 目錄中找到本程式碼研究室的程式碼和資料集。
4. 計算每項瀏覽的第一個產品的轉換指標
假設您在線上時裝零售商工作,想瞭解不同產品類別的轉換次數和價值,以及哪類產品最常被消費者優先查看。您想與行銷代理商和其他內部團隊分享這項資訊,但要避免洩漏任何個別顧客的資訊。
如要計算網站上每個瀏覽的第一個產品的轉換指標,請按照下列步驟操作:
- 查看
PipelineDP/examples/codelab/目錄中網站造訪的模擬資料集。
這張螢幕截圖是資料集的範例。其中包含使用者 ID、使用者瀏覽的產品、訪客是否完成轉換,以及轉換價值 (如有)。
user_id | product_view_0 | product_view_1 | product_view_2 | product_view_3 | product_view_4 | has_conversion | conversion_value |
0 | 牛仔褲 | t_shirt | t_shirt | 無 | 無 | false | 0.0 |
1 | 牛仔褲 | t_shirt | 牛仔褲 | 跳線 | 無 | false | 0.0 |
2 | t_shirt | 跳線 | t_shirt | t_shirt | 無 | true | 105.19 |
3 | t_shirt | t_shirt | 牛仔褲 | 無 | 無 | false | 0.0 |
4 | t_shirt | 襪子 | 牛仔褲 | 牛仔褲 | 無 | false | 0.0 |
您對下列指標感興趣:
view_counts:網站訪客首次看到各項產品的次數。total_conversion_value:訪客轉換時的消費總金額。conversion_rate:訪客轉換率。
- 以非私密方式產生指標:
conversion_metrics = df.groupby(['product_view_0'
])[['conversion_value', 'has_conversion']].agg({
'conversion_value': [len, np.sum],
'has_conversion': np.mean
})
conversion_metrics = conversion_metrics.rename(
columns={
'len': 'view_counts',
'sum': 'total_conversion_value',
'mean': 'conversion_rate'
}).droplevel(
0, axis=1)
如先前所述,這些統計資料可能會揭露資料集中個人的資訊。舉例來說,使用者先看到連身裙,之後只有一人完成轉換。22 次觀看的轉換率約為 0.05。現在,您需要將每個長條圖轉換為私人長條圖。
- 使用
pipeline_dp.NaiveBudgetAccountant類別定義隱私權參數,然後指定要用於分析的epsilon和delta引數。
這些引數的設定方式取決於您的特定問題。如要進一步瞭解這些參數,請參閱「選用:調整差異隱私權參數」。
這個程式碼片段使用範例值:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
- 初始化
LocalBackend執行個體:
ops = pipeline_dp.LocalBackend()
您可以在本機執行這個程式,不必使用 Beam 或 Spark 等其他架構,因此可以使用 LocalBackend 執行個體。
- 初始化
DPEngine執行個體:
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
您可以使用 pipeline_dp.AggregateParams 類別指定其他參數,進而影響私密統計資料的產生方式。
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1,
max_contributions_per_partition=1)
- 指定要計算
count指標,並使用LAPLACE雜訊分布。 - 將
max_partitions_contributed引數設為1值。
這個引數會限制使用者可貢獻的不同造訪次數。您預期使用者每天造訪網站一次,且不在意他們在一天內多次造訪。
- 將
max_contributions_per_partitions引數設為1值。
這項引數會指定單一訪客可對個別分割區或產品類別 (本例中) 提供的貢獻數量。
- 建立
data_extractor例項,指定要尋找privacy_id、partition和value欄位的位置。
程式碼應如以下程式碼片段所示:
def run_pipeline(data, ops):
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1, # A single user can only contribute to one partition.
max_contributions_per_partition=1, # For a single partition, only one contribution per user is used.
)
data_extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0
value_extractor=lambda row: row.has_conversion)
dp_result = dp_engine.aggregate(data, params, data_extractors)
budget_accountant.compute_budgets()
return dp_result
- 新增這段程式碼,將 Pandas DataFrame 轉換為資料列清單,您可直接從中計算差異隱私權統計資料:
rows = [index_row[1] for index_row in df.iterrows()]
dp_result_local = run_pipeline(rows, ops) # Returns generator
list(dp_result_local)
恭喜!您已計算出第一個差異隱私統計資料!
這個圖表會顯示差異隱私計數結果,以及您先前計算的非隱私計數:

執行程式碼後取得的長條圖可能與這個長條圖不同,這是正常的。由於差異隱私權中的雜訊,每次執行程式碼時,您都會取得不同的長條圖,但您會發現這些長條圖與原始非私密長條圖相似。
請注意,為確保隱私權,請勿多次執行管道。詳情請參閱「計算多項統計資料」。
5. 使用公開分區
在上一節中,您可能已注意到,您捨棄了分區的所有造訪資料,也就是首次在您網站上看到襪子的訪客。
這是因為輸出資料分割區的存在與否取決於使用者資料本身,因此必須進行資料分割區選取或設定門檻,以確保差異隱私權保障。在這種情況下,輸出內容中只要有分割區,就可能洩漏資料中存在個別使用者。如要進一步瞭解這項行為為何會侵犯隱私權,請參閱這篇網誌文章。為避免這類隱私權侵害行為,PipelineDP 只會保留使用者人數充足的分區。
如果輸出分區清單不依附於使用者私人資料,就不需要這個分區選取步驟。在您的範例中,由於您知道顧客可能先看到的所有產品類別,因此確實是這種情況。
如要使用分區:
- 建立可能的分區清單:
public_partitions_products = ['jeans', 'jumper', 'socks', 't-shirt']
- 將清單傳遞至
run_pipeline()函式,將其設為pipeline_dp.AggregateParams類別的額外輸入內容:
run_pipeline(
rows, ops, total_delta=0, public_partitions=public_partitions_products)
# Returns generator
params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT],
max_partitions_contributed=1,
max_contributions_per_partition=1,
public_partitions=public_partitions_products)
如果您使用公開分割區和 LAPLACE 雜訊,可以將 total_delta 引數設為 0 值。
現在您會看到結果中顯示所有分區或產品的資料。

公開分區不僅能讓您保留更多分區,還能減少大約一半的干擾,因為您不會將任何隱私權預算用於分區選取,因此與先前的執行作業相比,原始計數和私有計數之間的差異會稍微減少。
使用公開分區時,請留意以下兩個重要事項:
- 從原始資料衍生分區清單時,請務必謹慎。如果沒有以差異化隱私的方式執行這項操作,管道就不再提供差異化隱私保證。詳情請參閱「進階:從資料衍生分區」。
- 如果部分公開分割區沒有資料,您必須對這些分割區套用雜訊,以維護差異隱私權。舉例來說,如果你使用了資料集或網站上沒有的額外產品 (例如長褲),這仍屬於雜訊,結果可能會顯示產品有幾次造訪,但實際上沒有。
進階:從資料衍生分區
如果您在同一個管道中,使用相同的非公開輸出分區清單執行多項匯總作業,可以透過 dp_engine.select_private_partitions() 方法衍生分區清單一次,然後將分區提供給每項匯總作業做為 public_partitions 輸入內容。這不僅能保障隱私,還能減少雜訊,因為您只需在整個管道中一次使用隱私權預算進行分割區選取。
def get_private_product_views(data, ops):
"""Obtains the list of product_views in a private manner.
This does not calculate any private metrics; it merely obtains the list of
product_views but does so while making sure the result is differentially private.
"""
# Set the total privacy budget.
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=1e-5)
# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
# Specify how to extract privacy_id, partition_key, and value from a
# single element.
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda row: row.product_view_0,
privacy_id_extractor=lambda row: row.user_id)
# Run aggregation.
dp_result = dp_engine.select_partitions(
data, pipeline_dp.SelectPrivatePartitionsParams(
max_partitions_contributed=1),
data_extractors=data_extractors)
budget_accountant.compute_budgets()
return dp_result
6. 計算多項統計資料
您已瞭解 PipelineDP 的運作方式,現在可以看看如何將這項工具用於一些更進階的用途。如一開始所述,您對三項統計資料感興趣。只要多項統計資料在 AggregateParams 執行個體中共用相同參數,PipelineDP 就能同時計算這些資料 (稍後會說明)。不僅更簡潔,也更容易一次計算多個指標,而且隱私權保障更完善。
如果您還記得提供給 NaiveBudgetAccountant 類別的 epsilon 和 delta 參數,這些參數代表所謂的隱私權預算,也就是從資料洩漏的使用者隱私權量。
請務必記住,隱私權預算是累加的。如果您使用特定 epsilon ε 和 delta δ 執行管道一次,就會花費 (ε,δ) 預算。如果第二次執行,總預算為 (2ε, 2δ)。同樣地,如果您使用 NaiveBudgetAccountant 方法計算多項統計資料,且連續使用隱私權預算 ε,δ,則總預算為 (2ε, 2δ)。這表示您降低了隱私權保障。
如要規避這項限制,您必須使用單一 NaiveBudgetAccountant 執行個體,並在需要計算相同資料的多項統計資料時,使用您想使用的總預算。接著,您需要為每個匯總指定要使用的 epsilon 和 delta 值。最終您會獲得相同的整體隱私權保障,但特定匯總的 epsilon 和 delta 值越高,準確度就越高。
如要查看實際運作情形,可以計算 count、mean 和 sum 統計資料。
您可以根據兩項不同的指標計算統計資料:conversion_value 指標 (用於根據最先瀏覽的產品推斷產生的收益金額) 和 has_conversion 指標 (用於計算網站訪客人數和平均轉換率)。
您必須分別為每項指標指定參數,引導私密統計資料的計算。您在這兩項指標之間分配隱私權預算。您從 has_conversion 指標計算出兩項統計資料,因此想將初始預算的 2/3 分配給這項指標,其餘 1/3 則分配給 conversion_value 指標。
如要計算多項統計資料,請按照下列步驟操作:
- 使用您要在三項統計資料中使用的總
epsilon和delta值,設定隱私權預算會計師:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
- 初始化
DPEngine,計算指標:
dp_engine = pipeline_dp.DPEngine(budget_accountant, ops)
- 指定這項指標的參數。
params_conversion_value_metrics = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.SUM],
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=100,
public_partitions=public_partitions,
budget_weight=1/3)
最後一個引數可選擇性地指定隱私權預算的權重。您可以為所有項目指定相同權重,但如先前所述,您想將這個引數設為三分之一。
您也可以設定 min_value 和 max_value 引數,指定套用至分割區中隱私權單位所提供值的下限和上限。如要計算私有總和或平均值,就必須提供這些參數。您不希望出現負值,因此可以假設 0 和 100 是合理的界限。
- 擷取相關資料,然後傳遞至匯總函式:
data_extractors_conversion_value_metrics = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row: row.conversion_value)
dp_result_conversion_value_metrics = (
dp_engine.aggregate(data, params_conversion_value_metrics,
data_extractors_conversion_value_metrics))
- 請按照相同步驟,根據
has_conversion變數計算這兩項指標:
params_conversion_rate_metrics = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.MEAN],
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=1,
public_partitions=public_partitions,
budget_weight=2/3)
data_extractors_conversion_rate_metrics = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda row: row.user_id,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row: row.has_conversion)
dp_result_conversion_rate_metrics = (
dp_engine.aggregate(data, params_conversion_rate_metrics,
data_extractors_conversion_rate_metrics))
唯一變更的是 pipeline_dp.AggregateParams 執行個體,您現在可將 mean 和 count 定義為匯總,並將隱私權預算的 2/3 分配給這項計算。由於您希望這兩項統計資料的貢獻範圍相同,並根據相同的 has_conversion 變數計算,因此可以在同一個 pipeline_dp.AggregateParams 執行個體中合併這兩項統計資料,並同時計算。
- 呼叫
budget_accountant.compute_budgets()方法:
budget_accountant.compute_budgets()
您可以繪製這三項私人統計資料,與原始統計資料進行比較。視加入的雜訊而定,結果實際上可能會超出合理範圍。在本例中,由於加入的雜訊以零為中心對稱,因此跳躍者的轉換率和總轉換價值會顯示為負值。如要進一步分析和處理,最好不要手動後續處理私密統計資料,但如果想將這些圖表加入報表,之後只要將最小值設為零,就不會違反隱私權保障。

7. 使用 Beam 執行管道
現今的資料處理作業需要處理大量資料,以致於無法在本地處理。許多人會使用 Beam 或 Spark 等架構進行大規模資料處理,並在雲端執行管道。
PipelineDP 支援 Beam 和 Spark,您只需稍微修改程式碼即可。
如要使用 private_beam API 透過 Beam 執行管道,請按照下列步驟操作:
- 初始化
runner變數,然後建立管道,並在管道中將隱私權作業套用至rows的 Beam 代表:
runner = fn_api_runner.FnApiRunner() # local runner
with beam.Pipeline(runner=runner) as pipeline:
beam_data = pipeline | beam.Create(rows)
- 使用必要的隱私權參數建立
budget_accountant變數:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
- 建立
pcol或私人集合變數,確保所有匯總資料都符合隱私權規定:
pcol = beam_data | pbeam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda
row: row.user_id)
- 在適當的類別中指定私有匯總的參數。
這裡使用 pipeline_dp.aggregate_params.SumParams() 類別,因為您要計算產品瀏覽次數總和。
- 將匯總參數傳遞至
pbeam.Sum方法,即可計算統計資料:
dp_result = pcol | pbeam.Sum(params)
- 最後的程式碼應如下所示:
import pipeline_dp.private_beam as pbeam
runner = fn_api_runner.FnApiRunner() # local runner
with beam.Pipeline(runner=runner) as pipeline:
beam_data = pipeline | beam.Create(rows)
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=1, total_delta=0)
# Create private collection.
pcol = beam_data | pbeam.MakePrivate(
budget_accountant=budget_accountant,
privacy_id_extractor=lambda row:
row.user_id)
# Specify parameters.
params = pipeline_dp.aggregate_params.SumParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
max_partitions_contributed=1,
max_contributions_per_partition=1,
min_value=0,
max_value=100,
public_partitions=public_partitions_product_views,
partition_extractor=lambda row: row.product_view_0,
value_extractor=lambda row:row.conversion_value)
dp_result = pcol | pbeam.Sum(params)
budget_accountant.compute_budgets()
dp_result | beam.Map(print)
8. 選用:調整隱私權和實用性參數
您在本程式碼研究室中看到許多參數,例如 epsilon、delta 和 max_partitions_contributed 參數。大致可分為兩類:隱私權參數和實用參數。
隱私權參數
epsilon 和 delta 參數會量化您透過差異化隱私提供的隱私權。更精確地說,這是用來衡量潛在攻擊者可從匿名輸出內容中取得多少資料資訊。參數值越高,攻擊者就越能掌握資料資訊,這會造成隱私權風險。另一方面,epsilon 和 delta 參數的值越低,您就越需要在輸出內容中加入雜訊,才能確保匿名性,且每個分割區中需要的不重複使用者人數就越多,才能將這些使用者保留在匿名輸出內容中。在這種情況下,研究人員需要在實用性和隱私權之間做出取捨。
在 PipelineDP 中,您需要在 NaiveBudgetAccountant 執行個體中設定總隱私權預算,指定匿名輸出內容的隱私權保障。但請注意,如要確保隱私權,您必須謹慎地為每個匯總作業使用個別的 NaiveBudgetAccountant 執行個體,或多次執行管道,以免預算用盡。
如要進一步瞭解差異化隱私權和隱私權參數的意義,請參閱差異化隱私權閱讀清單。
公用事業參數
實用性參數不會影響隱私權保障,但會影響準確度,進而影響輸出內容的實用性。這些值會提供在 AggregateParams 執行個體中,並用於調整加入的雜訊。
AggregateParams 執行個體中提供的公用程式參數,適用於所有匯總作業,即 max_partitions_contributed 參數。分割區對應於 PipelineDP 匯總作業傳回的資料鍵,因此 max_partitions_contributed 參數會限制使用者可為輸出內容提供的相異鍵值數量。如果使用者提供的鍵數超過 max_partitions_contributed 參數的值,系統會捨棄部分鍵,確保鍵數符合 max_partitions_contributed 參數的值。
同樣地,大多數的匯總都有 max_contributions_per_partition 參數。這些值也會在 AggregateParams 執行個體中提供,且每個匯總作業可能會有不同的值。他們為每個鍵綁定使用者的貢獻。
輸出內容中加入的雜訊會依據 max_partitions_contributed 和 max_contributions_per_partition 參數進行調整,因此這兩者之間存在取捨關係:為每個參數指派較大的值,表示您保留的資料越多,但結果會越嘈雜。
部分匯總作業需要 min_value 和 max_value 參數,用於指定每位使用者貢獻內容的界限。如果使用者提供的價值低於指派給 min_value 參數的值,系統會將價值提高至參數值。同樣地,如果使用者提供的價值大於 max_value 參數的值,系統會將價值調降至參數值。如要保留更多原始值,必須指定較大的界限。系統會根據邊界大小調整雜訊,因此邊界越大,保留的資料就越多,但結果也會越雜訊。
最後,noise_kind 參數支援 PipelineDP 中的兩種不同雜訊機制:GAUSSIAN 和 LAPLACE 雜訊。LAPLACE 分布的實用性較高,且貢獻範圍較小,因此 PipelineDP 預設使用此分布。不過,如要使用 GAUSSIAN 分布雜訊,可以在 AggregateParams 執行個體中指定。
9. 恭喜
做得好!您已完成 PipelineDP 程式碼研究室,並深入瞭解差異化隱私和 PipelineDP。