1. 概览/简介
虽然由 Web、应用服务器和数据库组成的多层应用是 Web 开发的基础,也是许多网站的起点,但成功往往会带来可伸缩性、集成和敏捷性方面的挑战。例如,如何实时处理数据,以及如何将数据分发到多个关键业务系统?这些问题以及互联网规模应用的需求推动了对分布式消息传递系统的需求,并催生了一种使用数据流水线来实现弹性实时系统的架构模式。因此,了解如何将实时数据发布到分布式消息传递系统,以及如何构建数据流水线,对于开发者和架构师来说都是至关重要的技能。
构建内容
在此 Codelab 中,您将构建一个天气数据流水线,该流水线从物联网 (IoT) 设备开始,利用消息队列接收和传送数据,利用无服务器函数将数据移至数据仓库,然后创建一个显示信息的信息中心。我们将使用配备天气传感器的 Raspberry Pi 作为 IoT 设备,并使用 Google Cloud Platform 的多个组件来构建数据流水线。虽然构建 Raspberry Pi 很有用,但这是此 Codelab 的可选部分,并且流式天气数据可以替换为脚本。

完成此 Codelab 中的步骤后,您将拥有一个流式数据流水线,该流水线可为显示温度、湿度、和气压的控制台提供数据。

学习内容
- 如何使用 Google Pub/Sub
- 如何部署 Google Cloud Functions 函数
- 如何利用 Google BigQuery
- 如何使用 Google 数据洞察创建信息中心
- 此外,如果您构建 IoT 传感器,还将学习如何利用 Google Cloud SDK 以及如何保护对 Google Cloud Platform 的远程访问调用
所需条件
- Google Cloud Platform 账号。Google Cloud Platform 的新用户有资格获享$300 免费试用。
如果您想构建此 Codelab 的 IoT 传感器部分,而不是利用示例数据和脚本,还需要以下组件(可在此处订购完整套件或单个零件)…
- Raspberry Pi Zero W(含电源、SD 存储卡和保护壳)
- USB 读卡器
- USB 集线器(用于将键盘和鼠标连接到 Raspberry Pi 上的唯一 USB 端口)
- 母对母面包板导线
- GPIO Hammer 标头
- BME280 传感器
- 焊铁和焊料
此外,我们还假设您可以使用带 HDMI 输入端口的电脑显示器或电视、HDMI 数据线、键盘和鼠标。
2. 准备工作
自定进度的环境设置
如果您还没有 Google 账号(Gmail 或 G Suite),则必须创建一个。无论您是否已有 Google 账号,都请务必利用$300 免费试用!
登录 Google Cloud Platform 控制台 ( console.cloud.google.com)。您可以使用默认项目(“我的第一个项目”)来完成此实验,也可以选择创建一个新项目。如果您想创建新项目,可以使用“管理资源”页面。项目 ID 在所有 Google Cloud 项目中必须是唯一的名称(以下所示的名称已被占用,您无法使用)。请记下您的项目 ID(即,您的项目 ID 将为 _____),因为稍后需要用到。


在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。请务必浏览 Codelab 末尾的“清理”部分。
3. 创建 BigQuery 表
BigQuery 是一种无服务器、扩缩能力极强且经济实惠的企业数据仓库,非常适合用于存储从 IoT 设备流式传输的数据,同时还允许分析信息中心查询相关信息。
我们来创建一个用于存储所有 IoT 天气数据的表。从 Cloud 控制台中选择 BigQuery。系统会在新窗口中打开 BigQuery(请勿关闭原始窗口,因为您需要再次访问该窗口)。

点击项目名称旁边的向下箭头图标,然后选择“创建新数据集”

为数据集输入“weatherData”,选择要存储该数据集的位置,然后点击“确定”

点击数据集旁边的“+”号即可创建新表

对于源数据,选择创建空表。在“目标表名称”中,输入 weatherDataTable。在架构下,点击添加字段按钮,直到总共有 9 个字段。填写如下所示的字段,同时确保为每个字段选择适当的类型。一切就绪后,点击创建表格按钮。

您应该会看到如下结果:

您现在已设置好数据仓库,可以接收天气数据了。
4. 创建 Pub/Sub 主题
Cloud Pub/Sub 简单、可靠、可伸缩,可以用作数据流分析和事件驱动型计算系统的基础。因此,它非常适合处理传入的 IoT 消息,然后允许下游系统处理这些消息。
如果您仍处于 BigQuery 窗口中,请切换回 Cloud 控制台。如果您关闭了 Cloud 控制台,请前往 https://console.cloud.google.com
在 Cloud 控制台中,依次选择 Pub/Sub 和“主题”。

如果您看到“启用 API”提示,请点击“启用 API”按钮。

点击“创建主题”按钮

输入“weatherdata”作为主题名称,然后点击“创建”

您应该会看到新创建的主题

现在,您有了一个 Pub/Sub 主题,既可向其发布 IoT 消息,也可允许其他进程访问这些消息。
安全地向主题发布内容
如果您计划从 Google Cloud 控制台之外的资源(例如物联网传感器)向 Pub/Sub 主题发布消息,则需要使用服务账号更严格地控制访问权限,并通过创建信任证书来确保连接的安全性。
在 Cloud 控制台中,依次选择“IAM 和管理”和“服务账号”

点击“创建服务账号”按钮

在“角色”下拉菜单中,选择“Pub/Sub Publisher”角色

输入服务账号名称 (iotWeatherPublisher),选中“提供新的私钥”复选框,确保“密钥类型”设置为 JSON,然后点击“创建”

安全密钥将自动下载。只有一个密钥,因此请务必妥善保管。点击“关闭”。

您应该会看到系统已创建服务账号,并且该账号已关联密钥 ID。

为了方便日后访问密钥,我们将密钥存储在 Google Cloud Storage 中。在 Cloud 控制台中,选择“存储”和“浏览器”。

点击“创建存储分区”按钮

为存储分区选择一个名称(该名称在整个 Google Cloud 中必须是全局唯一的),然后点击“创建”按钮

找到自动下载的安全密钥,然后将其拖放到存储分区中或上传到存储分区中

密钥上传完成后,应显示在 Cloud Storage 浏览器中

记下存储分区名称和安全密钥文件名以供稍后使用。
5. 创建 Cloud Functions 函数
云计算使得完全无服务器的计算模型成为可能,在这种模型中,逻辑可以按需运行,以响应来自任何位置的事件。在本实验中,每次有消息发布到天气主题时,Cloud Functions 函数都会启动,读取消息,然后将其存储在 BigQuery 中。
在 Cloud 控制台中,选择 Cloud Functions

如果您看到 API 消息,请点击“启用 API”按钮

点击“创建函数”按钮

在“名称”字段中,输入 function-weatherPubSubToBQ。对于触发器,选择 Cloud Pub/Sub 主题,然后在“主题”下拉菜单中选择“weatherdata”。对于源代码,请选择内嵌编辑器。在 index.js 标签页中,将以下代码粘贴到初始代码上。请务必更改 projectId、datasetId 和 tableId 的常量,以适应您的环境。
/**
* Background Cloud Function to be triggered by PubSub.
*
* @param {object} event The Cloud Functions event.
* @param {function} callback The callback function.
*/
exports.subscribe = function (event, callback) {
const BigQuery = require('@google-cloud/bigquery');
const projectId = "myProject"; //Enter your project ID here
const datasetId = "myDataset"; //Enter your BigQuery dataset name here
const tableId = "myTable"; //Enter your BigQuery table name here -- make sure it is setup correctly
const PubSubMessage = event.data;
// Incoming data is in JSON format
const incomingData = PubSubMessage.data ? Buffer.from(PubSubMessage.data, 'base64').toString() : "{'sensorID':'na','timecollected':'1/1/1970 00:00:00','zipcode':'00000','latitude':'0.0','longitude':'0.0','temperature':'-273','humidity':'-1','dewpoint':'-273','pressure':'0'}";
const jsonData = JSON.parse(incomingData);
var rows = [jsonData];
console.log(`Uploading data: ${JSON.stringify(rows)}`);
// Instantiates a client
const bigquery = BigQuery({
projectId: projectId
});
// Inserts data into a table
bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows)
.then((foundErrors) => {
rows.forEach((row) => console.log('Inserted: ', row));
if (foundErrors && foundErrors.insertErrors != undefined) {
foundErrors.forEach((err) => {
console.log('Error: ', err);
})
}
})
.catch((err) => {
console.error('ERROR:', err);
});
// [END bigquery_insert_stream]
callback();
};
在 package.json 标签页中,将以下代码粘贴到其中的占位代码上
{
"name": "function-weatherPubSubToBQ",
"version": "0.0.1",
"private": true,
"license": "Apache-2.0",
"author": "Google Inc.",
"dependencies": {
"@google-cloud/bigquery": "^0.9.6"
}
}
如果“要执行的函数”设置为“HelloWorld”,请将其更改为“subscribe”。点击“创建”按钮

大约需要 2 分钟,您的函数才会显示已部署

恭喜!您刚刚通过 Cloud Functions 将 Pub/Sub 连接到了 BigQuery。
6. 设置 IoT 硬件(可选)
组装 Raspberry Pi 和传感器
如果固定内容超过 7 个,请将标题缩减为仅包含 7 个固定内容。将排针焊接到传感器板上。

将锤头针脚小心地安装到 Raspberry Pi 中。

按照此处的步骤格式化 SD 卡并安装 NOOBS(全新开箱软件)安装程序。将 SD 卡插入 Raspberry Pi,然后将 Raspberry Pi 装入其外壳。

使用面包板导线,根据下图将传感器连接到 Raspberry Pi。

Raspberry Pi 针脚 | 传感器连接 |
针脚 1 (3.3V) | 车辆识别号 |
Pin 3 (CPIO2) | SDI |
引脚 5 (GPIO3) | SCK |
引脚 9(接地) | GND |

连接显示器(使用 mini-HDMI 连接器)、键盘/鼠标(使用 USB 集线器),最后连接电源适配器。
配置 Raspberry Pi 和传感器
Raspberry Pi 完成启动后,选择 Raspbian 作为所需的操作系统,确保所需的语言正确无误,然后点击“安装”(窗口左上角的硬盘图标)。

点击 Wi-Fi 图标(屏幕右上角),然后选择一个网络。如果是安全网络,请输入密码(预共享密钥)。

点击树莓图标(屏幕左上角),选择“偏好设置”,然后选择“Raspberry Pi 配置”。在“接口”标签页中,启用 I2C。在“本地化”标签页中,设置语言区域和时区。设置时区后,允许 Raspberry Pi 重新启动。

重新启动完成后,点击“终端”图标以打开终端窗口。

输入以下命令,确保传感器已正确连接。
sudo i2cdetect -y 1
结果应如下所示 - 请确保显示的是 77。

安装 Google Cloud SDK
为了利用 Google Cloud 平台上的工具,需要在 Raspberry Pi 上安装 Google Cloud SDK。该 SDK 包含管理和利用 Google Cloud Platform 所需的工具,并支持多种编程语言。
在 Raspberry Pi 上打开一个终端窗口(如果尚未打开),并设置一个环境变量,使 SDK 版本与 Raspberry Pi 上的操作系统相匹配。
export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)"
现在,添加 Google Cloud SDK 软件包的存储位置,以便安装工具在被要求安装 SDK 时知道在哪里查找。
echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
添加 Google 软件包代码库中的公钥,以便 Raspberry Pi 在安装期间验证安全性并信任相应内容
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
确保 Raspberry Pi 上的所有软件都是最新版本,并安装核心 Google Cloud SDK
sudo apt-get update && sudo apt-get install google-cloud-sdk
当系统提示“Do you want to continue?”时,按 Enter 键。
使用 Python 软件包管理器安装 tendo 软件包。此软件包用于检查脚本是否多次运行,并正在为天气脚本的应用安装。
pip install tendo
使用 Python 软件包管理器确保已安装并更新了适用于 Python 的 Google Cloud PubSub 和 OAuth2 软件包
sudo pip install --upgrade google-cloud-pubsub
sudo pip install --upgrade oauth2client
初始化 Google Cloud SDK
借助该 SDK,您可以远程、经过身份验证地访问 Google Cloud。在此 Codelab 中,它将用于访问存储分区,以便轻松将安全密钥下载到 Raspberry Pi。
在 Raspberry Pi 的命令行中,输入
gcloud init --console-only
当系统提示“Would you like to log in (Y/n)?”(您想登录吗 [Y/n]?)时,按 Enter 键。
当您看到“在浏览器中前往以下链接:”后跟一个以 https://accounts.google.com/o/oauth 开头的长网址时,请将鼠标悬停在该网址上,然后右键点击并选择“复制网址”。然后打开网络浏览器(屏幕左上角的蓝色地球图标),右键点击地址栏,然后点击“粘贴”。
看到登录界面后,输入与您的 Google Cloud 账号关联的电子邮件地址,然后按 Enter 键。然后,输入您的密码并点击“下一步”按钮。
系统会提示您 Google Cloud SDK 想要访问您的 Google 账号。点击“允许”按钮。
系统会向您显示验证码。使用鼠标突出显示该代码,然后右键点击并选择“复制”。返回终端窗口,确保光标位于“Enter verification code:”右侧,然后使用鼠标右键点击并选择“粘贴”。按 Enter 按钮。
如果系统要求您“选择要使用的云项目”,请输入与您在本 Codelab 中使用的项目名称对应的数字,然后按 Enter 键。
如果系统提示您启用 Compute API,请按 Enter 键以启用该 API。之后,系统会要求您配置 Google Compute Engine 设置。按 Enter 键。系统会显示一个潜在区域/地区的列表,请选择一个靠近您的区域/地区,输入相应的编号,然后按 Enter 键。
稍后,您会看到一些额外的信息。Google Cloud SDK 现已配置完毕。您可以关闭网页浏览器窗口,因为您以后不再需要它。
安装传感器软件和天气脚本
在 Raspberry Pi 的命令行中,克隆用于从输入/输出引脚读取信息的所需软件包。
git clone https://github.com/adafruit/Adafruit_Python_GPIO
安装下载的软件包
cd Adafruit_Python_GPIO
sudo python setup.py install
cd ..
克隆可启用天气传感器的项目代码
git clone https://github.com/googlecodelabs/iot-data-pipeline
将传感器驱动程序复制到与下载的其余软件相同的目录中。
cd iot-data-pipeline/third_party/Adafruit_BME280
mv Adafruit_BME280.py ../..
cd ../..
通过输入内容修改脚本...
nano checkWeather.py
将项目更改为您的项目 ID,并将主题更改为您的 Pub/Sub 主题的名称(这些内容已在本 Codelab 的“准备工作”和“创建 Pub/Sub 主题”部分中记下)。
将 sensorID、sensorZipCode、sensorLat 和 sensorLong 值更改为任意值。如需查找特定位置或地址的纬度和经度值,请点击此处。
完成必要的更改后,按 Ctrl-X 组合键开始退出 nano 编辑器。按 Y 确认。
# constants - change to fit your project and location
SEND_INTERVAL = 10 #seconds
sensor = BME280(t_mode=BME280_OSAMPLE_8, p_mode=BME280_OSAMPLE_8, h_mode=BME280_OSAMPLE_8)
credentials = GoogleCredentials.get_application_default()
project="myProject" #change this to your Google Cloud project id
topic = "myTopic" #change this to your Google Cloud PubSub topic name
sensorID = "s-Googleplex"
sensorZipCode = "94043"
sensorLat = "37.421655"
sensorLong = "-122.085637"
安装安全密钥
将安全密钥(来自“安全地发布到主题”部分)复制到 Raspberry Pi。
如果您使用 SFTP 或 SCP 将安全密钥从本地机器复制到 Raspberry Pi(复制到 /home/pi 目录),则可以跳过下一步,直接跳到导出路径。
如果您将安全密钥放入存储分区中,则需要记住存储分区的名称和文件的名称。使用 gsutil 命令复制安全密钥。此命令可以访问 Google Storage(这就是它之所以命名为 gsutil 以及文件路径以 gs:// 开头的原因)。请务必将以下命令更改为包含您的存储分区名称和文件名。
gsutil cp gs://nameOfYourBucket/yourSecurityKeyFilename.json .
您应该会看到一条消息,告知您文件正在复制,然后看到一条消息,告知您操作已完成。
在 Raspberry Pi 的命令行中,导出安全密钥的路径(更改文件名以与您拥有的文件名一致)
export GOOGLE_APPLICATION_CREDENTIALS=/home/pi/iot-data-pipeline/yourSecurityKeyFilename.json
现在,您已完成 IoT 天气传感器的制作,可以开始将数据传输到 Google Cloud 了。
7. 启动数据流水线
可能需要启用 Compute API
来自 Raspberry Pi 的数据流
如果您构建了 Raspberry Pi IoT 天气传感器,请启动用于读取天气数据并将其推送到 Google Cloud Pub/Sub 的脚本。如果您不在 /home/pi/iot-data-pipeline 目录中,请先移至该目录
cd /home/pi/iot-data-pipeline
启动天气脚本
python checkWeather.py
您应该会看到终端窗口每分钟回显天气数据结果。在数据流动的情况下,您可以跳到下一部分(检查数据是否在流动)。
模拟数据流式传输
如果您没有构建物联网天气传感器,可以使用存储在 Google Cloud Storage 中的公共数据集,并将其馈送到现有的 Pub/Sub 主题中,从而模拟数据流式传输。Google Dataflow 将与 Google 提供的用于从 Cloud Storage 读取数据并发布到 Pub/Sub 的模板一起使用。
在此过程中,Dataflow 需要一个临时存储位置,因此我们为此目的创建一个存储分区。
在 Cloud 控制台中,选择“存储”和“浏览器”。

点击“创建存储分区”按钮

为存储分区选择一个名称(请注意,该名称在整个 Google Cloud 中必须是全局唯一的),然后点击“创建”按钮。请记住此存储分区的名称,因为您很快就会用到它。

在 Cloud 控制台中,选择 Dataflow。

点击“基于模板创建作业”(屏幕的上半部分)

填写作业详细信息,如下所示,并注意以下事项:
- 输入作业名称 dataflow-gcs-to-pubsub
- 系统应根据项目的托管位置自动选择区域,无需更改。
- 选择“GCS Text to Cloud Pub/Sub”Cloud Dataflow 模板
- 对于“输入 Cloud Storage 文件”,请输入 gs://codelab-iot-data-pipeline-sampleweatherdata/*.json(这是一个公开数据集)
- 对于输出 Pub/Sub 主题,确切的路径将取决于您的项目名称,看起来类似于“projects/yourProjectName/topics/weatherdata”
- 将临时位置设置为您刚刚创建的 Google Cloud Storage 存储分区的名称以及“tmp”的文件名前缀。格式应为“gs://myStorageBucketName/tmp”。
填写完所有信息(见下文)后,点击“运行作业”按钮

Dataflow 作业应会开始运行。

Dataflow 作业大约需要 1 分钟才能完成。

8. 检查数据是否在流动
Cloud Functions 日志
确保 Cloud Functions 函数由 Pub/Sub 触发
gcloud beta functions logs read function-weatherPubSubToBQ
日志应显示函数正在执行、正在接收数据以及正在将数据插入到 BigQuery 中

BigQuery 数据
检查以确保数据正在流入 BigQuery 表。在 Cloud 控制台中,前往 BigQuery (bigquery.cloud.google.com)。

在项目名称(位于窗口左侧)下方,依次点击数据集 (weatherData) 和表 (weatherDataTable),然后点击“查询表”按钮

向 SQL 语句添加星号,使其显示为 SELECT * FROM...,如下所示,然后点击“运行查询”按钮

如果系统提示,请点击“运行查询”按钮

如果您能看到结果,则表明数据正在正常流动。

在数据开始流动后,您现在可以构建分析信息中心了。
9. 创建数据洞察信息中心
Google 数据洞察可以将您的数据转变成详实的信息中心和报告,这些信息中心和报告不仅易于阅读和分享,而且在各个方面都可自定义。
在网络浏览器中,前往 https://datastudio.google.com

在“开始创建新的报告”下,点击“空白”,然后点击“开始”按钮

点击复选框以接受条款,点击“下一步”按钮,选择您感兴趣的电子邮件,然后点击“完成”按钮。再次点击“开始创建新的报告”下方的“空白”

点击“创建新数据源”按钮

点击 BigQuery,然后点击“授权”按钮,再选择您希望在数据洞察中使用的 Google 账号(应与您在 Codelab 中使用的账号相同)。

点击“允许”按钮

选择您的项目名称、数据集和表。然后点击“连接”按钮。

按如下所示更改类型字段(除 timecollected 和 sensorID 之外,所有字段都应为数字)。请注意,timecollected 设置为“日期 小时”(而不仅仅是“日期”)。更改“汇总”字段,如下所示(、温度、湿度和气压应为平均值,其他所有内容均应设置为“无”)。点击“创建报告”按钮。

点击“添加到报告”按钮进行确认

如果系统要求您选择 Google 账号,请选择您的账号,然后点击“允许”按钮,以允许数据洞察将报告存储在 Google 云端硬盘中。

系统会显示一个空白画布,供您在其中创建信息中心。从顶部的一行图标中,选择“时间序列”。

在空白工作表的左上角绘制一个矩形。它应占据约四分之一的空白纸张。

在窗口的右侧,选择“样式”标签页。将“缺失数据”从“直线到零”更改为“折线”。在“左侧 Y 轴”部分中,删除“轴最小值”中的 0,将其更改为(自动)。

点击工作表中的图表,然后复制/粘贴 (Ctrl-C/Ctrl-V) 3 次。对齐各个图表,使每个图表占据布局的四分之一。

点击每个图表,然后在“时间序列属性和数据”部分下点击现有指标(),选择要显示的其他指标,直到所有四项天气读数(、温度、湿度和气压)都有各自的图表。


您现在拥有了一个基本的信息中心!

10. 恭喜!
您已创建完整的数据流水线!在此过程中,您学习了如何使用 Google Pub/Sub、如何部署无服务器函数、如何利用 BigQuery 以及如何使用数据洞察创建分析信息中心。此外,您还了解了如何安全地使用 Google Cloud SDK 将数据导入 Google Cloud Platform。最后,您现在已经获得了一些实操经验,了解了可处理高流量并保持可用性的重要架构模式。

清理
在完成天气数据和分析流水线的实验后,您可以移除正在运行的资源。
如果您自行构建了 IoT 传感器,请将其关闭。在终端窗口中按 Ctrl-C 停止脚本,然后输入以下内容以关闭 Raspberry Pi 的电源
shutdown -h now
前往 Cloud Functions,点击 function-weatherPubSubToBQ 旁边的复选框,然后点击“删除”

前往 Pub/Sub,点击“主题”,点击 weatherdata 主题旁边的复选框,然后点击“删除”

前往“存储空间”,点击存储分区旁边的复选框,然后点击“删除”

前往 bigquery.cloud.google.com,点击项目名称旁边的向下箭头,点击 weatherData 数据集右侧的向下箭头,然后点击“删除数据集”。

当系统提示时,输入数据集 ID (weatherData),以完成数据删除。
