1. 概览/简介
虽然由 Web、应用服务器和数据库组成的多层应用是 Web 开发的基础,也是许多网站的起点,但成功通常会给可伸缩性、集成和敏捷性带来挑战。例如,如何实时处理数据,以及如何将数据分发到多个关键业务系统?这些问题再加上互联网规模应用的需求,催生了对分布式消息传递系统的需求,并催生了使用数据流水线实现弹性实时系统的架构模式。因此,对于开发者和架构师来说,了解如何将实时数据发布到分布式消息传递系统,以及如何构建数据流水线,都是至关重要的技能。
构建内容
在此 Codelab 中,您将构建一个天气数据流水线,首先构建一台物联网 (IoT) 设备,利用消息队列接收和传递数据,利用无服务器功能将数据移至数据仓库,然后创建一个显示信息的信息中心。我们将使用带有天气传感器的 Raspberry Pi 作为 IoT 设备,并使用 Google Cloud Platform 的几个组件构成数据流水线。虽然构建 Raspberry Pi 对您有益,但这只是此 Codelab 的一个可选部分,可以将流式天气数据替换为脚本。
完成此 Codelab 中的步骤后,您将得到一个流式数据流水线,该数据流水线馈送到一个显示温度、湿度、和气压的信息中心。
学习内容
- 如何使用 Google Pub/Sub
- 如何部署 Google Cloud Functions 函数
- 如何利用 Google BigQuery
- 如何使用 Google 数据洞察创建信息中心
- 此外,如果您构建了 IoT 传感器,还可以学习如何利用 Google Cloud SDK 以及如何保护对 Google Cloud Platform 的远程访问调用
所需条件
- Google Cloud Platform 账号。Google Cloud Platform 的新用户有资格获享 $300 免费试用。
如果您想构建此 Codelab 的物联网传感器部分,而不是利用示例数据和脚本,则还需要以下内容(此处可将其作为完整套件订购,也可单独订购)...
- Raspberry Pi Zero W,附带电源、SD 存储卡和保护壳
- USB 读卡器
- USB 集线器(用于将键盘和鼠标连接到 Raspberry Pi 上的唯一 USB 端口)
- 母对母型面包板电线
- GPIO 锤头标头
- BME280 传感器
- 带焊料的烙铁
此外,还假定能够使用配有 HDMI 输入接口的计算机显示器或电视、HDMI 线缆、键盘和鼠标。
2. 准备工作
自定进度的环境设置
如果您还没有 Google 账号(Gmail 或 G Suite),则必须创建一个。无论您是否已有 Google 账号,请务必获享 $300 免费试用!
登录 Google Cloud Platform 控制台 (console.cloud.google.com)。您可以使用默认项目(“我的第一个项目”)来完成本实验,也可以选择创建新项目。如果您想创建新项目,可以使用“管理资源”页面。项目 ID 必须是所有 Google Cloud 项目中的唯一名称(下方显示的名称已被占用,您无法使用)。记下您的项目 ID(即您的项目 ID 将为 _____),因为稍后需要用到。
运行此 Codelab 应该不会超过几美元,但如果您决定使用更多资源或让它们继续运行,费用可能会更高。请务必完成此 Codelab 末尾的“清理”部分。
3. 创建 BigQuery 表
BigQuery 是一个无服务器、可扩展性强、费用低廉的企业数据仓库,是存储从 IoT 设备流式传输的数据的理想之选,同时还支持分析信息中心查询相关信息。
我们来创建一个表,用于存储所有 IoT 天气数据。从 Cloud 控制台中选择 BigQuery。系统随即会在新窗口中打开 BigQuery(请勿关闭原始窗口,因为您需要再次访问该窗口)。
点击项目名称旁边的向下箭头图标,然后选择“创建新数据集”
为数据集输入“weatherData”,选择要存储该数据集的位置,然后点击“确定”
点击数据集旁边的“+”号以创建新表
对于源数据,选择创建空表。对于“目标表名称”,输入 weatherDataTable。在架构下,点击添加字段按钮,直到共有 9 个字段为止。按照下方所示填写字段,同时确保为每个字段选择适当的类型。完成所有操作后,点击创建表按钮。
您应该会看到如下结果...
您现在已设置好可以接收天气数据的数据仓库。
4. 创建 Pub/Sub 主题
Cloud Pub/Sub 简单、可靠、可伸缩,可以用作数据流分析和事件驱动型计算系统的基础。因此,它非常适合处理传入的 IoT 消息,然后允许下游系统进行处理。
如果您仍然位于 BigQuery 窗口中,请切换回 Cloud 控制台。如果您关闭了 Cloud 控制台,请前往 https://console.cloud.google.com
在 Cloud 控制台中,依次选择“Pub/Sub”和“主题”。
如果您看到“启用 API”提示,请点击“启用 API”按钮。
点击“创建主题”按钮
输入“weatherdata”作为主题名称,然后点击“创建”
您应该会看到新创建的主题
您现在已经拥有一个 Pub/Sub 主题,可向该主题发布 IoT 消息并允许其他进程访问这些消息。
安全发布到主题
如果您计划从 Google Cloud 控制台以外的资源(例如 IoT 传感器)向 Pub/Sub 主题发布消息,则必须使用服务账号更严格地控制访问权限,并通过创建信任证书来确保连接的安全性。
在 Cloud 控制台中,依次选择“IAM 和管理”和“服务账号”
点击“创建服务账号”按钮
在“角色”下拉列表中,选择“Pub/Sub Publisher”角色
输入服务账号名称 (iotWeatherPublisher),选中“提供新的私钥”复选框,确保“密钥类型”设置为 JSON,然后点击“创建”
系统会自动下载安全密钥。只有一个密钥,因此务必不要丢失。点击“关闭”。
您应该会看到系统已创建一个服务账号,并且有一个与其关联的密钥 ID。
为便于日后轻松访问密钥,我们将将其存储在 Google Cloud Storage 中。在 Cloud 控制台中,依次选择“Storage”和“Browser”。
点击“创建存储桶”按钮
为存储桶选择一个名称(该名称必须在整个 Google Cloud 中保持全局唯一),然后点击“创建”按钮
找到自动下载的安全密钥,然后将其拖放或上传到存储分区中
密钥上传完成后,它应该会显示在 Cloud Storage 浏览器中
记下存储分区名称和安全密钥文件名,以备后续使用。
5. 创建 Cloud Functions 函数
云计算使完全无服务器的计算模型成为可能。借助云计算,您可以按需执行逻辑,响应来自任何位置的事件。在本实验中,每当有消息发布到天气主题时,Cloud Functions 函数都会启动,读取该消息,然后将其存储在 BigQuery 中。
在 Cloud 控制台中,选择 Cloud Functions
如果您看到 API 消息,请点击“启用 API”按钮
点击“创建函数”按钮
在“名称”字段中,输入“function-weatherPubSubToBQ”。在“触发器”中,选择“Cloud Pub/Sub 主题”,然后在“主题”下拉菜单中选择“weatherdata”。对于源代码,请选择内嵌编辑器。在 index.js 标签页中,将以下代码粘贴到起始位置。请务必更改 projectId、datasetId 和 tableId 的常量以适应您的环境。
/**
* Background Cloud Function to be triggered by PubSub.
*
* @param {object} event The Cloud Functions event.
* @param {function} callback The callback function.
*/
exports.subscribe = function (event, callback) {
const BigQuery = require('@google-cloud/bigquery');
const projectId = "myProject"; //Enter your project ID here
const datasetId = "myDataset"; //Enter your BigQuery dataset name here
const tableId = "myTable"; //Enter your BigQuery table name here -- make sure it is setup correctly
const PubSubMessage = event.data;
// Incoming data is in JSON format
const incomingData = PubSubMessage.data ? Buffer.from(PubSubMessage.data, 'base64').toString() : "{'sensorID':'na','timecollected':'1/1/1970 00:00:00','zipcode':'00000','latitude':'0.0','longitude':'0.0','temperature':'-273','humidity':'-1','dewpoint':'-273','pressure':'0'}";
const jsonData = JSON.parse(incomingData);
var rows = [jsonData];
console.log(`Uploading data: ${JSON.stringify(rows)}`);
// Instantiates a client
const bigquery = BigQuery({
projectId: projectId
});
// Inserts data into a table
bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows)
.then((foundErrors) => {
rows.forEach((row) => console.log('Inserted: ', row));
if (foundErrors && foundErrors.insertErrors != undefined) {
foundErrors.forEach((err) => {
console.log('Error: ', err);
})
}
})
.catch((err) => {
console.error('ERROR:', err);
});
// [END bigquery_insert_stream]
callback();
};
在 package.json 标签页中,将以下代码粘贴到
{
"name": "function-weatherPubSubToBQ",
"version": "0.0.1",
"private": true,
"license": "Apache-2.0",
"author": "Google Inc.",
"dependencies": {
"@google-cloud/bigquery": "^0.9.6"
}
}
如果“要执行的函数”设置为“HelloWorld”,请将其更改为“subscribe”。点击“创建”按钮
大约需要 2 分钟,函数才会显示已部署
恭喜!您刚刚通过 Functions 将 Pub/Sub 连接到了 BigQuery。
6. 设置 IoT 硬件(可选)
组装 Raspberry Pi 和传感器
如果引脚超过 7 个,请将针座缩减为仅 7 个引脚。将接头引脚焊接到传感器板。
小心地将锤子头文件头引脚安装到 Raspberry Pi 中。
按照此处的步骤,格式化 SD 卡并安装 NOOBS(新开箱即用软件)安装程序。将 SD 卡插入 Raspberry Pi,然后将 Raspberry Pi 放入其外壳。
根据下图,使用面包板线将传感器连接到 Raspberry Pi。
Raspberry Pi 引脚 | 传感器连接 |
引脚 1 (3.3V) | 车辆识别号 |
引脚 3 (CPIO2) | SDI |
引脚 5 (GPIO3) | SCK |
引脚 9(地面) | 地盘 |
依次连接显示器(使用迷你 HDMI 连接器)、键盘/鼠标(使用 USB 集线器)和电源适配器。
配置 Raspberry Pi 和传感器
Raspberry Pi 完成启动后,为所需的操作系统选择 Raspbian,确保所需的语言正确无误,然后点击“安装”(窗口左上方的硬盘图标)。
点击 Wi-Fi 图标(屏幕右上角),然后选择一个网络。如果是安全网络,请输入密码(预共享密钥)。
点击 Raspberry 图标(屏幕左上方),依次选择 Preferences(偏好设置)和 Raspberry Pi 配置 (Raspberry Pi Configuration)。从“Interfaces”标签页中,启用 I2C。在“本地化”标签页中,设置语言区域和时区。设置时区后,允许 Raspberry Pi 重新启动。
重新启动完成后,点击“终端”图标以打开终端窗口。
输入以下命令,确保传感器已正确连接。
sudo i2cdetect -y 1
结果应如下所示 - 确保结果为 77。
安装 Google Cloud SDK
如需在 Google Cloud 平台上使用工具,您需要在 Raspberry Pi 上安装 Google Cloud SDK。该 SDK 包含管理和利用 Google Cloud Platform 所需的工具,且支持多种编程语言。
在 Raspberry Pi 上打开一个终端窗口(如果尚未打开),然后设置一个环境变量,使其将 SDK 版本与 Raspberry Pi 上的操作系统相匹配。
export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)"
现在,添加 Google Cloud SDK 软件包的存储位置,以便在系统要求安装 SDK 时,安装工具知道在哪里查找。
echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
添加 Google 软件包代码库中的公钥,以便 Raspberry Pi 在安装过程中验证安全性并信任内容
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
确保 Raspberry Pi 上的所有软件都是最新版本并安装核心 Google Cloud SDK
sudo apt-get update && sudo apt-get install google-cloud-sdk
当系统提示“是否要继续?”时,按 Enter 键。
使用 Python 软件包管理器安装 Teno 软件包。此软件包用于检查脚本是否多次运行,以及是否正在安装以应用于天气脚本。
pip install tendo
使用 Python 软件包管理器确保已安装适用于 Python 的 Google Cloud Pub/Sub 和 OAuth2 软件包,并且这些软件包是最新版本
sudo pip install --upgrade google-cloud-pubsub
sudo pip install --upgrade oauth2client
初始化 Google Cloud SDK
此 SDK 允许对 Google Cloud 进行经过身份验证的远程访问。在此 Codelab 中,它将用于访问存储桶,以便将安全密钥轻松下载到 Raspberry Pi。
在 Raspberry Pi 的命令行中,输入
gcloud init --console-only
当系统提示“Would you like to log in (Y/n)?”时,请按 Enter 键。
如果您看到“Go to the following link in your browser:”(在浏览器中转到以下链接:),后跟以 https://accounts.google.com/o/oauth?... 开头的长网址,请将鼠标悬停在该网址上,点击右键并选择“复制网址”。然后打开网络浏览器(屏幕左上角的蓝色地球图标),右键点击地址栏,然后点击“粘贴”。
看到“登录”界面后,输入与您的 Google Cloud 账号关联的电子邮件地址,然后按 Enter 键。然后输入您的密码并点击“下一步”按钮。
系统会提示您 Google Cloud SDK 想要访问您的 Google 账号。点击“允许”按钮。
系统会向您显示验证码。使用鼠标将其突出显示,然后右键点击并选择“复制”。返回终端窗口,确保光标位于“输入验证码:”右侧,然后用鼠标右键点击,选择“粘贴”。按 Enter 键。
如果系统要求您“选择要使用的云项目:”,请输入您在此 Codelab 中一直使用的项目名称所对应的数字,然后按 Enter 键。
如果系统提示您启用 Compute API,请按 Enter 按钮将其启用。然后,系统会要求您配置 Google Compute Engine 设置。按 Enter 键。您会看到潜在区域/可用区列表。请选择您附近的区域/可用区,输入相应的数字,然后按 Enter 键。
稍后,您会看到系统显示一些其他信息。Google Cloud SDK 现已配置完毕。您可以关闭网络浏览器窗口,因为您日后不需要它了。
安装传感器软件和天气脚本
在 Raspberry Pi 的命令行中,克隆所需的软件包,以便从输入/输出引脚读取信息。
git clone https://github.com/adafruit/Adafruit_Python_GPIO
安装下载的软件包
cd Adafruit_Python_GPIO
sudo python setup.py install
cd ..
克隆用于启用天气传感器的项目代码
git clone https://github.com/googlecodelabs/iot-data-pipeline
将传感器驱动程序复制到与下载的其余软件相同的目录中。
cd iot-data-pipeline/third_party/Adafruit_BME280
mv Adafruit_BME280.py ../..
cd ../..
输入以下内容即可修改脚本:
nano checkWeather.py
将项目更改为您的项目 ID,将主题更改为 Pub/Sub 主题的名称(此 Codelab 的“设置”和“创建 Pub/Sub 主题”部分对此进行了说明)。
将 sensorID、sensorZipCode、sensorLat 和 sensorLong 值更改为您想要的任何值。您可以在此处查看具体位置或地址的经纬度值。
完成必要的更改后,按 Ctrl-X 开始退出 nano 编辑器。按 Y 键即可确认。
# constants - change to fit your project and location
SEND_INTERVAL = 10 #seconds
sensor = BME280(t_mode=BME280_OSAMPLE_8, p_mode=BME280_OSAMPLE_8, h_mode=BME280_OSAMPLE_8)
credentials = GoogleCredentials.get_application_default()
project="myProject" #change this to your Google Cloud project id
topic = "myTopic" #change this to your Google Cloud PubSub topic name
sensorID = "s-Googleplex"
sensorZipCode = "94043"
sensorLat = "37.421655"
sensorLong = "-122.085637"
安装安全密钥
将安全密钥(从“安全发布到主题”部分)复制到 Raspberry Pi。
如果您使用 SFTP 或 SCP 将安全密钥从本地机器复制到 Raspberry Pi(复制到 /home/pi 目录),则可以跳过下一步,直接跳转到导出路径。
如果您将安全密钥放在存储分区中,则需要记住存储分区的名称和文件的名称。使用 gsutil 命令复制安全密钥。此命令可以访问 Google Storage(这就是它命名为 gsutil 的原因,以及文件路径以 gs:// 开头的原因)。请务必将以下命令更改为存储分区名称和文件名。
gsutil cp gs://nameOfYourBucket/yourSecurityKeyFilename.json .
您应该会看到一条消息,说明文件正在复制,然后操作已完成。
在 Raspberry Pi 的命令行中,导出指向安全密钥的路径(请将文件名更改为与您拥有的文件名一致)
export GOOGLE_APPLICATION_CREDENTIALS=/home/pi/iot-data-pipeline/yourSecurityKeyFilename.json
现在,您已经拥有了一个已完成的 IoT 天气传感器,可以开始向 Google Cloud 传输数据了。
7. 启动数据流水线
可能需要启用 Compute API
从 Raspberry Pi 流式传输数据
如果您构建了 Raspberry Pi IoT 天气传感器,请启动脚本,以读取天气数据并将其推送到 Google Cloud Pub/Sub。如果您不在 /home/pi/iot-data-pipeline 目录中,请先移至该目录
cd /home/pi/iot-data-pipeline
启动天气脚本
python checkWeather.py
您应该会看到终端窗口每分钟回显天气数据结果。数据流动后,您可以跳转到下一部分(检查数据是否在流动)。
模拟数据流式传输
如果您未构建 IoT 天气传感器,则可以使用已存储在 Google Cloud Storage 中的公共数据集并将其馈送到现有 Pub/Sub 主题,以模拟数据流。我们将使用 Google Dataflow 以及 Google 提供的模板,从 Cloud Storage 读取数据并将其发布到 Pub/Sub。
在此过程中,Dataflow 需要一个临时存储位置,因此我们创建一个存储桶来实现此目的。
在 Cloud 控制台中,选择“存储”,然后选择“浏览器”。
点击“创建存储桶”按钮
为存储桶选择一个名称(请注意,该名称必须在整个 Google Cloud 中保持全局唯一),然后点击“创建”按钮。请记住此存储桶的名称,因为您很快就会用到它。
在 Cloud Console 中,选择 Dataflow。
点击“基于模板创建作业”(屏幕的上半部分)
请按照下方所示填写作业详细信息,并注意以下事项:
- 输入 dataflow-gcs-to-pubsub 作业名称
- 您的区域应根据项目的托管位置自动选择,无需更改。
- 选择 GCS Text to Cloud Pub/Sub 的 Cloud Dataflow 模板
- 对于输入 Cloud Storage 文件,请输入 gs://codelab-iot-data-pipeline-sampleweatherdata/*.json(这是公共数据集)
- 对于输出 Pub/Sub 主题,确切路径取决于您的项目名称,并且将类似于“projects/yourProjectName/topics/weatherdata”
- 将“临时位置”设置为您刚刚创建的 Google Cloud Storage 存储桶的名称,并将文件名前缀设置为“tmp”。格式应如下所示:“gs://myStorageBucketName/tmp”。
填写完所有信息(见下文)后,点击“运行作业”按钮
Dataflow 作业应该开始运行。
Dataflow 作业大约需要一分钟才能完成。
8. 检查数据是否在流通
Cloud Functions 函数日志
确保 Pub/Sub 会触发 Cloud Functions 函数
gcloud beta functions logs read function-weatherPubSubToBQ
日志应显示函数正在执行、正在接收数据以及正在将数据插入到 BigQuery 中
BigQuery 数据
检查以确保数据流入 BigQuery 表。在 Cloud 控制台中,前往 BigQuery (bigquery.cloud.google.com)。
在项目名称(位于窗口左侧)下,点击数据集 (weatherData),接着点击表 (weatherDataTable),然后点击“查询表”按钮
向 SQL 语句添加一个星号,使其显示 SELECT * FROM...(如下所示),然后点击“RUN QUERY”按钮
如果出现提示,请点击“运行查询”按钮
如果您看到了结果,则表明数据正在正常流动。
数据流动后,您现在可以构建分析信息中心了。
9. 创建数据洞察信息中心
Google 数据洞察可以将您的数据转变成详实的信息中心和报告,这些信息中心和报告不仅易于阅读和分享,而且在各个方面都可自定义。
在网络浏览器中,转到 https://datastudio.google.com
在“开始创建新的报告”下方,点击“空白”,然后点击“开始”按钮
点击复选框接受条款,点击“下一步”按钮,选择您希望接收的电子邮件,然后点击“完成”按钮。同样,在“创建新报告”下,点击“空白”
点击“创建新数据源”按钮
依次点击“BigQuery”和“授权”按钮,然后选择您要与 Data Studio 搭配使用的 Google 账号(应与您在本 Codelab 中一直使用的账号相同)。
点击“允许”按钮
选择项目名称、数据集和表。然后点击“连接”按钮。
更改 type 字段,如下所示(除了 timecollected 和 sensorID 之外,所有内容都应为数字)。请注意,收集的时间设置为日期小时(而不仅仅是日期)。更改“汇总”字段,如下所示(露点、温度、湿度和压力应为平均值,所有其他值应设为“无”)。点击“创建报告”按钮。
点击“添加到报告”按钮进行确认
如果系统要求您选择您的 Google 账号,请选择您的 Google 账号,然后点击“允许”按钮,以允许数据洞察将其报告存储在 Google 云端硬盘中。
系统会显示一个空白画布,供您在其中创建信息中心。在最上面一行的图标中,选择“时序”。
在空白工作表的左上角绘制一个矩形。它应占据空白工作表的约 ¼。
选择窗口右侧的“样式”标签页。将“缺失的数据”从“Line To Zero”更改为“Line Breaks”。在“左侧 Y 轴”部分中,删除“轴最小值”中的 0,将其更改为“(自动)”。
点击工作表中的图表,然后复制/粘贴 (Ctrl-C/Ctrl-V) 3 次。对齐图表,使每个图表占总布局的 1⁄4。
点击每个图表,然后在“时序属性和数据”部分下点击现有指标(),选择要显示的不同指标,直到所有四个天气读数(、温度、湿度和压力)都有自己的图表。
您现在已经有了一个基本的信息中心!
10. 恭喜!
您已创建完整的数据流水线!在此过程中,您学习了如何使用 Google Pub/Sub、如何部署无服务器函数、如何利用 BigQuery,以及如何使用数据洞察创建分析信息中心。此外,您还学习了如何安全地使用 Google Cloud SDK 将数据引入 Google Cloud Platform。最后,您现在对一个重要的架构模式有一些实操经验,该模式可以在保持可用性的同时处理大量作业。
清理
完成对天气数据和分析流水线的实验后,您可以移除正在运行的资源。
如果是您组装的 IoT 传感器,请将其关闭。在终端窗口中按 Ctrl-C 停止脚本,然后输入以下命令关闭 Raspberry Pi
shutdown -h now
转到 Cloud Functions,点击“function-weatherPubSubToBQ”旁边的复选框,然后点击“删除”
前往 Pub/Sub,点击“主题”,点击天气数据主题旁边的复选框,然后点击“删除”
转到“存储”,点击存储分区旁边的复选框,然后点击“删除”
前往 bigquery.cloud.google.com,点击项目名称旁边的向下箭头,点击 weatherData 数据集右侧的向下箭头,然后点击“Delete dataset”(删除数据集)。
出现提示时,输入数据集 ID (weatherData) 以完成数据删除。