端到端迁移:从 Cloud SQL 数据库迁移到 Cloud Spanner (GoogleSQL)

1. 准备工作

此 Codelab 将指导您将 Cloud SQL 上的单个 MySQL 数据库迁移到使用 GoogleSQL 方言的 Cloud Spanner 数据库。重点是基本的端到端迁移流程,演示核心步骤。您将使用 Google Cloud 服务,包括 Spanner 迁移工具 (SMT)、Dataflow、Datastream、PubSub 和 Google Cloud Storage。

您将了解的内容:

  • 如何设置 Cloud SQL 和 Cloud Spanner 示例实例。
  • 如何使用 Spanner 迁移工具 (SMT) 将 Cloud SQL MySQL 架构转换为与 Spanner 兼容的架构。
  • 如何使用 Dataflow 将数据从 Cloud SQL 批量迁移到 Cloud Spanner。
  • 如何使用 Datastream 和 Dataflow 设置从 Cloud SQL 到 Cloud Spanner 的持续复制 (CDC)。
  • 如何设置从 Cloud Spanner 到 Cloud SQL 的反向复制。

本 Codelab 不涵盖以下内容:

  • 来自分片实例的迁移。
  • 迁移期间的复杂数据转换。
  • 高级错误处理或死信队列 (DLQ)。
  • 迁移性能调优。
  • 应用迁移:此 Codelab 侧重于数据库层(架构和数据)。它不涵盖重新部署或迁移应用服务的运营流程。

所需条件

  • 启用了结算功能的 Google Cloud 项目。
  • 具有足够的 IAM 权限,可用于启用 API 以及创建/管理 Cloud SQL、Spanner、Dataflow、Datastream 和 GCS 资源。虽然项目 Owner 角色对于 Codelab 来说最简单,但“环境设置”中将介绍更具体的角色。
  • 网络浏览器,例如 Google Chrome。
  • 基本熟悉 Google Cloud 控制台和 gcloud 等命令行工具。
  • 对 shell 环境的访问权限。建议使用 Cloud Shell,因为它包含 gcloud

有关上述设置的更多详细信息,请参阅“环境设置”部分。

2. 了解迁移过程

迁移数据库涉及将数据从源 Cloud SQL 数据库实例迁移到 Spanner 实例。本部分概述了迁移中使用的架构和关键工具。

迁移流程架构

迁移过程包括以下阶段:

1. 架构转换:

  • 用途:将源数据库架构转换为兼容的 Cloud Spanner 架构。
  • 工具:Spanner 迁移工具 (SMT)
  • 流程:SMT 会分析源数据库架构,并生成等效的 Spanner 数据定义语言 (DDL)。在目标 Spanner 实例中,系统会创建一个数据库,然后自动应用 DDL。

2. 批量数据迁移:

  • 用途:用于将现有数据从源数据库初始完全加载到已预配的 Spanner 表中。
  • 工具:Dataflow,使用 Google 提供的 Sourcedb to Spanner 模板。
  • 流程:此 Dataflow 作业会从指定的源表中读取所有数据,并将其写入相应的 Spanner 表中。此步骤在创建 Spanner 架构后完成。

3. 实时迁移 (CDC):

  • 用途:近乎实时地捕获源数据库中的持续更改并将其应用到 Cloud Spanner,从而最大限度地缩短迁移期间的停机时间。
  • 工具
  • Datastream:从源数据库捕获更改(插入、更新、删除),并将其写入 Cloud Storage (GCS)。
  • Dataflow:使用 Datastream to Spanner 模板从 GCS 读取更改事件,并将其应用到 Cloud Spanner。

4. 反向复制:

  • 用途:将数据更改从 Cloud Spanner 复制回源数据库。这对于回退策略、分阶段迁移或在源中维护副本以用于特定应用场景非常有用。
  • 工具:Dataflow,使用 Spanner to SourceDb 模板。
  • 处理:此作业利用 Spanner 变更数据流来捕获 Spanner 中的修改,并将其写回源数据库实例。

下图展示了组件和数据传输:

b9e12d4151bf3bb7.png

主要术语:

  • Spanner 迁移工具 (SMT):一种用于评估 MySQL 架构、建议 Spanner 架构等效项并生成 Spanner 数据定义语言 (DDL) 的工具。
  • 数据定义语言 (DDL):用于定义和修改数据库结构的语句,例如 CREATE TABLE 语句。SMT 会根据 Cloud SQL 架构生成 Spanner DDL。
  • Dataflow:一种全托管式无服务器数据处理服务。在此 Codelab 中,它用于运行 Google 提供的模板,以进行批量数据传输、应用 Datastream 更改和反向复制。
  • Datastream:一种无服务器的变更数据捕获 (CDC) 和复制服务。在本 Codelab 中,它用于将更改从 Cloud SQL 流式传输到 Cloud Storage。
  • Spanner 变更数据流:一种 Spanner 功能,可用于实时流式传输数据更改(插入、更新、删除),用作反向复制的来源。
  • Pub/Sub:一种消息传递服务,用于将生成事件的服务与处理事件的服务分离开。在此 Codelab 中,每当 Datastream 将新的更改文件上传到 Cloud Storage 时,它都会触发 Dataflow 来处理更新。

3. 环境设置

在开始迁移之前,您需要设置 Google Cloud 项目并启用必要的服务。

1. 选择或创建 Google Cloud 项目

您需要一个启用了结算功能的 Google Cloud 项目才能使用本 Codelab 中的服务。

  1. 在 Google Cloud 控制台中,前往项目选择器页面:前往项目选择器
  2. 选择或创建 Google Cloud 项目。
  3. 确保您的项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

2. 打开 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了 gcloud CLI 和您需要的其他工具。

  • 点击 Google Cloud 控制台右上角的激活 Cloud Shell 按钮。
  • 一个 Cloud Shell 会话随即会在控制台底部的新框架内打开,并显示命令行提示符。

22d57633bc12106d.png

3. 设置项目变量和环境变量

在 Cloud Shell 中,为您的项目 ID 和要使用的区域设置一些环境变量。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region

gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION
gcloud config set compute/zone $ZONE

echo "Project ID: $PROJECT_ID"
echo "Region: $REGION"
echo "Zone: $ZONE"

4. 启用必需的 Google Cloud API

启用 Cloud Spanner、Dataflow、Datastream 和其他相关服务所需的 API。

gcloud services enable \
  spanner.googleapis.com \
  dataflow.googleapis.com \
  datastream.googleapis.com \
  pubsub.googleapis.com \
  storage.googleapis.com \
  compute.googleapis.com \
  sqladmin.googleapis.com \
  servicenetworking.googleapis.com \
  cloudresourcemanager.googleapis.com

此命令可能需要几分钟时间才能完成。

5. 配置服务账号权限

Dataflow 作业和 Datastream 需要特定权限才能与其他 Google Cloud 服务进行交互。此 Codelab 中的 Dataflow 作业将使用默认的 Compute Engine 服务账号。

首先,获取您的项目编号:

export PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
export SA_EMAIL="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"

现在,向 Compute Engine 默认服务账号授予所需的 IAM 角色:

# Role for Dataflow to run jobs
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/dataflow.admin" \
    --condition=None

# Roles for Dataflow workers
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/dataflow.worker" \
    --condition=None

# Role to connect to Cloud SQL instance
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/cloudsql.client" \
    --condition=None

# Role to read/write from Cloud Spanner
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/spanner.databaseUser" \
    --condition=None

# Role to access GCS buckets (Datastream output, Dataflow temp, JDBC driver)
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/storage.objectAdmin" \
    --condition=None

# Roles for Datastream and Pub/Sub (for CDC)
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/datastream.viewer"

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SA_EMAIL}" \
    --role="roles/pubsub.subscriber"

6. 创建 Cloud Storage 存储桶

在与其他资源相同的区域中创建 GCS 存储分区。此存储分区将存储 JDBC 驱动程序、Datastream 输出,并供 Dataflow 用于临时文件。

export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
echo "Created bucket: gs://$BUCKET_NAME"

7. 安装 Spanner 迁移工具 (SMT)

确保在 Cloud Shell 环境中安装了 Spanner 迁移工具 (SMT)。

sudo apt-get update && sudo apt-get install google-cloud-cli-spanner-migration-tool

# Verify installation 
gcloud alpha spanner migrate web --help

此命令应显示 SMT Web 界面的帮助信息,从而确认 gcloud 组件已安装。此 Codelab 将使用 SMT 的 CLI 功能,这些功能属于同一组件。

4. 设置源 Cloud SQL 数据库

在本部分中,您将创建并配置一个具有公共 IP 的 Cloud SQL for MySQL 实例,以用作源数据库。

1. 创建 Cloud SQL for MySQL 实例

在 Cloud Shell 中运行以下 gcloud 命令,以创建 MySQL 8.0 实例。已启用二进制日志记录(Datastream 所需),并且实例已配置公共 IP。

export SQL_INSTANCE_NAME="source-mysql-instance"
export DB_ROOT_PASSWORD="Welcome@1" # Replace with a strong password if you prefer

gcloud sql instances create $SQL_INSTANCE_NAME \
  --database-version=MYSQL_8_0 \
  --tier=db-n1-standard-2 \
  --region=$REGION \
  --root-password=$DB_ROOT_PASSWORD \
  --enable-bin-log \
  --assign-ip
  • --enable-bin-log:Datastream 捕获更改时需要此参数。
  • --assign-ip:确保实例获得公共 IP 地址。

创建实例需要几分钟时间。您可以在“Cloud SQL 实例”页面上查看实例是否已创建。

2. 配置已获授权的网络

如需通过公共 IP 连接到实例,您需要将 IP 地址添加到“已获授权的网络”列表中。

获取 Cloud Shell IP:

export CLOUD_SHELL_IP=$(curl -s ipinfo.io/ip)
echo "Your Cloud Shell IP: $CLOUD_SHELL_IP"

授权 Cloud Shell IP 和开放访问权限

以下命令会添加您的 Cloud Shell IP。它还添加了 0.0.0.0/0,允许从任何 IP 地址访问。这对于简化 Dataflow 工作器的连接(无需复杂的网络设置)是必要的。

gcloud sql instances patch $SQL_INSTANCE_NAME \
  --authorized-networks="${CLOUD_SHELL_IP}/32,0.0.0.0/0"

3. 从 Cloud Shell 连接到 Cloud SQL 实例

提取分配的公共 IP 地址

export SQL_INSTANCE_IP=$(gcloud sql instances list --filter="name=$SQL_INSTANCE_NAME" --format="value(PRIMARY_ADDRESS)") 
echo "Cloud SQL Public IP: $SQL_INSTANCE_IP"

此 IP 地址将用于连接。

从 CloudShell 连接到 Cloud SQL 实例

使用标准 mysql 客户端进行连接,使用获得的公共 IP 地址:

mysql -h $SQL_INSTANCE_IP -u root -p

出现提示时,输入您设置的 root 密码 (Welcome@1)。现在,您将看到 mysql> 提示。

4. 创建数据库和示例数据

mysql> 提示符下,执行以下 SQL 命令:

CREATE DATABASE music_db;
USE music_db;

CREATE TABLE Singers (
    SingerId   BIGINT NOT NULL,
    FirstName  VARCHAR(1024),
    LastName   VARCHAR(1024),
    BirthDate  DATE,
    AlbumCount BIGINT,
    PRIMARY KEY (SingerId)
);

CREATE TABLE Albums (
    SingerId     BIGINT NOT NULL,
    AlbumId      BIGINT NOT NULL,
    AlbumTitle   VARCHAR(1024),
    ReleaseDate  DATE,
    PRIMARY KEY (SingerId, AlbumId),
    CONSTRAINT FK_Albums_Singers FOREIGN KEY (SingerId) REFERENCES Singers (SingerId)
);

INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES
(1, 'Marc', 'Richards', '1970-09-03', 2),
(2, 'Catalina', 'Smith', '1990-08-17', 1),
(3, 'Alice', 'Trentor', '1991-10-02', 3);

INSERT INTO Albums (SingerId, AlbumId, AlbumTitle, ReleaseDate) VALUES
(1, 1, 'Total Junk', '2014-03-15'),
(1, 2, 'Go Go Go', '2016-11-01'),
(2, 1, 'Green', '2018-02-28'),
(3, 1, 'Blue', '2019-01-10'),
(3, 2, 'Red', '2020-05-22'),
(3, 3, 'Purple', '2022-11-11');

您可以在此处找到上述架构的转储文件。

5. 验证数据

快速检查数据是否存在:

SELECT 'Singers music_db' as tbl, COUNT(*) FROM music_db.Singers
UNION ALL
SELECT 'Albums music_db', COUNT(*) FROM music_db.Albums;

EXIT;

您应该会看到每个表的数量。

+------------------+----------+
| tbl              | COUNT(*) |
+------------------+----------+
| Singers music_db |        3 |
| Albums music_db  |        6 |
+------------------+----------+

5. 设置 Cloud Spanner

现在,您将设置要将数据迁移到的目标 Cloud Spanner 实例。

1. 创建 Cloud Spanner 实例

在 Cloud SQL 实例所在的区域中创建一个 Cloud Spanner 实例。此命令使用 100 个处理单元创建一个适合此 Codelab 的小型实例。

export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="music-db-migrated"
export SPANNER_CONFIG="regional-${REGION}"

gcloud spanner instances create $SPANNER_INSTANCE_NAME \
  --config=$SPANNER_CONFIG \
  --description="Target Spanner Instance" \
  --processing-units=100

创建实例可能需要一到两分钟。

6. 使用 Spanner 迁移工具 (SMT) 转换架构

使用 SMT CLI 分析 MySQL 数据库 (music_db) 并生成 Spanner 架构定义语言 (DDL)。由于 Cloud SQL 实例配置了公共 IP 和适当的已获授权网络,因此 SMT 可以直接连接。

1. 为 SMT 准备环境

验证是否已在之前的步骤中设置必要的环境变量:

echo "Cloud SQL Instance Public IP: $SQL_INSTANCE_IP" 
echo "Cloud SQL Root Password: $DB_ROOT_PASSWORD" 
echo "Spanner Instance: $SPANNER_INSTANCE_NAME" 
echo "Spanner Database: $SPANNER_DATABASE_NAME" 
echo "Project ID: $PROJECT_ID"

2. 为 music_db 运行架构转换

执行 SMT schema 命令,直接连接到 Cloud SQL 公共 IP 地址:

gcloud alpha spanner migrate schema \
--source=mysql \
--source-profile="host=${SQL_INSTANCE_IP},port=3306,user=root,password=${DB_ROOT_PASSWORD},dbName=music_db" \
--target-profile="project=${PROJECT_ID},instance=${SPANNER_INSTANCE_NAME},dbName=${SPANNER_DATABASE_NAME}" \
--prefix="music-db"

此命令通过代理连接到 Cloud SQL 实例,并生成以 music-db 为前缀的架构文件。

3. 查看生成的文件

SMT 会在当前目录中创建一些文件。以下是关键的广告资源:

  • music-db.schema.ddl.txt:生成的 Spanner DDL 语句。
  • music-db-.overrides.json:包含手动映射更改的架构替换文件。
  • music-db.session.json:架构迁移的会话文件。
  • music-db.report.txt:架构转换的评估报告。

您可以使用 ls music-db-* 列出这些资源

4. 验证 Cloud Spanner 中的架构

检查 Spanner 数据库中是否已创建表。

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT table_name FROM information_schema.tables WHERE table_schema = '' ORDER BY table_name"

您应该会看到以下输出内容:

table_name: Albums
table_name: Singers

可选:如果您想检查 Spanner DDL,请运行以下命令:

gcloud spanner databases ddl describe $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME

7. 初始化变更数据捕获 (CDC)

在本部分中,您将为迁移设置“记录器”。通过在批量数据加载开始之前配置 DatastreamPub/Sub,您可以确保捕获并排队源数据库中的每一项更改,从而防止在过渡期间丢失任何数据。实时迁移需要此设置。

1. 创建 Datastream 连接配置文件

来源配置文件 (Cloud SQL)

此配置文件连接到 Cloud SQL 实例的公共 IP。Datastream 将使用 IP 许可名单进行连接。

export SQL_CP_NAME="mysql-src-cp"
gcloud datastream connection-profiles create $SQL_CP_NAME \
  --location=$REGION \
  --type=mysql \
  --mysql-hostname=$SQL_INSTANCE_IP \
  --mysql-port=3306 \
  --mysql-username=root \
  --mysql-password=$DB_ROOT_PASSWORD \
  --display-name="Cloud SQL Source - Public IP"

注意:此连接依赖于 Cloud SQL 实例的已授权网络,后者允许访问。如之前使用 0.0.0.0/0 配置的那样,Datastream 的公共 IP 可以连接。在生产环境中,您需要将 0.0.0.0/0 替换为 Datastream IP 许可名单和地区中列出的您所在地区的特定 IP 范围。

目标配置文件 (Cloud Storage)

指向存储分区的根目录。

export GCS_CP_NAME="gcs-dest-cp"
gcloud datastream connection-profiles create $GCS_CP_NAME \
  --location=$REGION \
  --type=google-cloud-storage \
  --bucket=$BUCKET_NAME \
  --root-path=/ \
  --display-name="GCS Destination" --force

2. 创建 Datastream 数据流

创建要从 music_db 复制的流。

export STREAM_NAME="mysql-to-spanner-stream"
export GCS_STREAM_PATH="data/${STREAM_NAME}"

gcloud datastream streams create $STREAM_NAME \
  --location=$REGION \
  --display-name="MySQL to Spanner CDC Stream" \
  --source=$SQL_CP_NAME \
  --destination=$GCS_CP_NAME \
  --mysql-source-config=<(echo "
includeObjects:
  mysqlDatabases:
  - database: 'music_db'
") \
  --gcs-destination-config=<(echo "
path: ${GCS_STREAM_PATH}
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}
") \
  --backfill-none
  • Datastream 会将文件写入 gs://${BUCKET_NAME}/${GCS_STREAM_PREFIX}/
  • Datastream 将以 Avro 格式写入文件。运行实时迁移命令时,我们将指定 inputFileFormat 为 avro,以便流水线正确处理文件。
  • 使用较小的文件轮换设置有助于在 Codelab 中更快地看到更改。

此命令可能需要一些时间才能完成。查看状态:gcloud datastream streams describe $STREAM_NAME --location=$REGION

3. 启动 Datastream 流

gcloud datastream streams update $STREAM_NAME \
  --location=$REGION \
  --state=RUNNING

检查状态:gcloud datastream streams describe $STREAM_NAME --location=$REGION. 状态最初为 STARTING,过一段时间后会变为 RUNNING。仅当您确认其处于 RUNNING 状态后,才继续执行下一步。

4. 为 GCS 通知设置 Pub/Sub

创建 Pub/Sub 主题:

export PUBSUB_TOPIC="datastream-gcs-updates"
gcloud pubsub topics create $PUBSUB_TOPIC

创建 GCS 通知

data/ 前缀下创建对象时发送通知。

gcloud storage buckets notifications create gs://${BUCKET_NAME} --topic=projects/$PROJECT_ID/topics/$PUBSUB_TOPIC --payload-format=json --object-prefix=data/

创建 Pub/Sub 订阅

包含建议的确认截止时间。

export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
gcloud pubsub subscriptions create $PUBSUB_SUBSCRIPTION \
  --topic=$PUBSUB_TOPIC \
  --ack-deadline=600

8. 将数据从 Cloud SQL 批量迁移到 Spanner

在 Spanner 架构就绪后,您现在需要将 Cloud SQL music_db 数据库中的现有数据复制到 Cloud Spanner。您将使用 Sourcedb to Spanner Dataflow Flex 模板,该模板旨在将数据从可通过 JDBC 访问的数据库批量复制到 Spanner。

1. 为 music_db 运行批量迁移 Dataflow 作业

在 Cloud Shell 中执行以下命令,以启动 Dataflow 作业。此命令使用 gcloud dataflow flex-template run 命令,引用 Google 提供的用于将 JDBC 数据批量迁移到 Spanner 的模板。

export JOB_NAME_MUSIC="mysql-music-db-to-spanner-bulk-$(date +%Y%m%d-%H%M%S)"
export MUSIC_DB_JDBC_URL="jdbc:mysql://${SQL_INSTANCE_IP}:3306/music_db"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration-output"

gcloud dataflow flex-template run $JOB_NAME_MUSIC \
  --project=$PROJECT_ID \
  --region=$REGION \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Sourcedb_to_Spanner_Flex" \
--max-workers=2 \
--num-workers=1 \
--worker-machine-type=n2-highmem-8 \
  --parameters \
sourceConfigURL="$MUSIC_DB_JDBC_URL",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
outputDirectory="$OUTPUT_DIR/music_db",\
username="root",\
password="$DB_ROOT_PASSWORD",\
jdbcDriverClassName="com.mysql.cj.jdbc.Driver",\
jdbcDriverJars="gs://${BUCKET_NAME}/lib/mysql-connector-j-8.0.33.jar",\
spannerHost="https://batch-spanner.googleapis.com"

关键参数说明:

  • sourceConfigURL:来源 music_db 的 JDBC 连接字符串。
  • instanceIddatabaseIdprojectId:指定目标 Cloud Spanner 实例和数据库。
  • outputDirectory:一个 Cloud Storage 路径,Dataflow 会将未能成功迁移的任何记录的相关信息写入该路径。
  • jdbcDriverClassName:指定 MySQL JDBC 驱动程序。
  • jdbcDriverJars:已暂存的 JDBC 驱动程序 JAR 的 GCS 路径。
  • spannerHost:使用针对 Spanner 写入操作进行了批量优化的端点。
  • maxWorkersnumWorkers:控制 Dataflow 作业的扩缩。对于此小型数据集,保持较低值。

网络注意事项:此作业通过 Cloud SQL 实例的公共 IP 连接到该实例。之所以能够这么做,是因为您之前已将 0.0.0.0/0 添加到实例的授权网络。这样,具有外部 IP 的 Dataflow 工作器虚拟机便可访问数据库。

2. 监控 Dataflow 作业

您可以在 Google Cloud 控制台中跟踪作业的进度:

  1. 前往 Dataflow 作业页面:前往 Dataflow 作业
  2. 找到名为 mysql-music-db-to-spanner-bulk-... 的作业,然后点击它。
  3. 查看作业图和指标。等待作业状态变为成功。此过程大约需要 5-15 分钟。

ebbb94c0db535809.png

  • 如果作业遇到问题,请查看 Dataflow 作业详情页面中的日志标签页,了解错误消息。
  • 作业指标可提供有关作业进度和资源消耗(例如吞吐量和 CPU 利用率)的更多信息。

3. 验证 Cloud Spanner 中的数据

Dataflow 作业成功完成后,请确认数据已复制到 Spanner 表中。使用 gcloud 查询 Spanner 数据库:

# Verify row counts
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT COUNT(*) as row_count FROM Singers" 
# Expected output: 3

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT COUNT(*) as row_count FROM Albums" 
# Expected output: 6 

# Inspect some data 
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT SingerId, FirstName, LastName FROM Singers ORDER BY SingerId"

预期输出:

row_count: 3
row_count: 6
SingerId: 1
FirstName: Marc
LastName: Richards

SingerId: 2
FirstName: Catalina
LastName: Smith

SingerId: 3
FirstName: Alice
LastName: Trentor

从 Cloud SQL 到 Cloud Spanner 的初始批量数据加载现已完成。下一步是设置实时复制,以捕获持续发生的变化。

9. 开始实时迁移 (CDC)

现在,批量数据加载已完成,接下来您将使用 Datastream 设置一个持续复制流,以捕获来自 Cloud SQL 的变更数据捕获 (CDC) 事件,并使用 Dataflow 流式作业近乎实时地将这些变更应用到 Cloud Spanner。

1. 运行实时迁移 Dataflow 作业

启动流式 Dataflow 作业,以从 GCS 读取数据并写入 Spanner。此模板将使用 GCS Pub/Sub 通知来立即处理新文件。

export JOB_NAME_CDC="datastream-to-spanner-cdc-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR="gs://${BUCKET_NAME}/dlq"

gcloud dataflow flex-template run $JOB_NAME_CDC \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
  --parameters \
gcsPubSubSubscription="projects/${PROJECT_ID}/subscriptions/${PUBSUB_SUBSCRIPTION}",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
inputFileFormat="avro",\
deadLetterQueueDirectory="$DLQ_DIR",\
streamName="projects/${PROJECT_ID}/locations/${REGION}/streams/${STREAM_NAME}"

关键形参

  • gcsPubSubSubscription:用于监听来自 GCS 的新文件通知的 Pub/Sub 订阅。这样,作业就可以在 Datastream 写入更改时立即处理这些更改。
  • inputFileFormat="avro":告知 Dataflow 预期会收到来自 Datastream 的 Avro 文件。此值必须与 Datastream“目标”配置(例如,avroFileFormatjsonFileFormat)相匹配。
  • deadLetterQueueDirectory:一个 GCS 路径,作业会将未能处理的记录(例如,由于架构不匹配)存储在该路径中,以供日后手动审核。
  • streamName:Datastream 流的完整资源路径,用于让 Dataflow 作业跟踪复制状态和元数据。

Dataflow 作业控制台中监控作业启动情况。

2. 测试实时迁移

对源 Cloud SQL music_db 应用更改,以测试 CDC 流水线。

连接到 Cloud SQL:

mysql -h $SQL_INSTANCE_IP -u root -p

输入密码 (Welcome@1),然后选择数据库:

USE music_db;

-- INSERT
INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES (4, 'Elena', 'Nadal', '1985-05-30', 0);
SELECT * FROM Singers WHERE SingerId = 4;

-- UPDATE
UPDATE Singers SET LastName = 'Richards-Smith' WHERE SingerId = 1;
SELECT * FROM Singers WHERE SingerId = 1;

-- DELETE
DELETE FROM Albums WHERE SingerId = 2; 
DELETE FROM Singers WHERE SingerId = 2;
SELECT * FROM Singers WHERE SingerId = 2;

EXIT;

在 Spanner 中进行验证(过一会儿)

# Verify INSERT: This should return the new row for Elena Nadal.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT * FROM Singers WHERE SingerId = 4"

# Verify UPDATE: This should show LastName as Richards-Smith.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = 1"

# Verify DELETE: This should now return 0 rows.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT * FROM Albums WHERE SingerId = 2"

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT * FROM Singers WHERE SingerId = 2"

预期输出:

SingerId: 4
FirstName: Elena
LastName: Nadal
BirthDate: 1985-05-30
AlbumCount: 0

SingerId: 1
FirstName: Marc
LastName: Richards-Smith

3. 在 Spanner 中进行最终验证

检查 Spanner 中 Singers 表的总体状态:

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT SingerId, FirstName, LastName, AlbumCount FROM Singers ORDER BY SingerId"

预期输出:

SingerId: 1
FirstName: Marc
LastName: Richards-Smith
AlbumCount: 2

SingerId: 3
FirstName: Alice
LastName: Trentor
AlbumCount: 3

SingerId: 4
FirstName: Elena
LastName: Nadal
AlbumCount: 0

10. 设置反向复制(从 Spanner 到 Cloud SQL)

为了应对可能需要回滚或在一段时间内保持 Cloud SQL 数据库与 Spanner 同步的场景,您可以设置反向复制。此流水线使用 Spanner 变更数据流来捕获 Spanner 中的更改,并将其写回 Cloud SQL music_db

1. 创建 Spanner 变更数据流

首先,您需要在 Spanner 数据库中创建一个变更数据流,以跟踪 SingersAlbums 表中的变更。

export CHANGE_STREAM_NAME="MusicDBChangeStream"

gcloud spanner databases ddl update $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --ddl="CREATE CHANGE STREAM $CHANGE_STREAM_NAME FOR Singers, Albums"

此变更数据流现在将记录对指定表的所有数据修改。

2. 为 Dataflow 元数据创建 Spanner 数据库

Spanner to SourceDB Dataflow 模板需要单独的 Spanner 数据库来存储元数据,以便管理变更数据流使用情况。

export SPANNER_METADATA_DB_NAME="reverse-replication-metadata"

gcloud spanner databases create $SPANNER_METADATA_DB_NAME \
  --instance=$SPANNER_INSTANCE_NAME

3. 为 Dataflow 准备 Cloud SQL 连接配置

Dataflow 模板需要 Cloud Storage 中的一个 JSON 文件,其中包含目标 Cloud SQL 数据库的连接详细信息。

创建名为 shard_config.json 的本地文件:

cat << EOF > shard_config.json
[
  {
    "logicalShardId": "mysql_shard",
    "host": "${SQL_INSTANCE_IP}",
    "port": "3306",
    "user": "root",
    "password": "${DB_ROOT_PASSWORD}",
    "dbName": "music_db"
  }
]
EOF

将此文件上传到您的 GCS 存储分区:

export SHARD_CONFIG_FILE="gs://${BUCKET_NAME}/shard_config.json"
gcloud storage cp shard_config.json $SHARD_CONFIG_FILE

4. 运行反向复制 Dataflow 作业

使用 Spanner_to_SourceDb Flex 模板启动 Dataflow 作业。

export JOB_NAME_REVERSE="spanner-to-mysql-reverse-$(date +%Y%m%d-%H%M%S)"
export REVERSE_DLQ_DIR="gs://${BUCKET_NAME}/reverse-dlq"

gcloud dataflow flex-template run $JOB_NAME_REVERSE \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--max-workers=2 \
--num-workers=1 \
--additional-experiments=use_runner_v2 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Spanner_to_SourceDb" \
  --parameters \
changeStreamName="$CHANGE_STREAM_NAME",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
spannerProjectId="$PROJECT_ID",\
metadataInstance="$SPANNER_INSTANCE_NAME",\
metadataDatabase="$SPANNER_METADATA_DB_NAME",\
sourceShardsFilePath="$SHARD_CONFIG_FILE",\
deadLetterQueueDirectory="$REVERSE_DLQ_DIR"

关键形参

  • changeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
  • metadataInstance, metadataDatabase:用于存储元数据的 Spanner 实例/数据库,连接器使用这些元数据来控制变更数据流 API 数据的使用。
  • sourceShardsFilePathshard_config.json 的 GCS 路径。
  • filtrationMode:指定如何根据条件舍弃某些记录。默认值为 forward_migration(过滤使用正向迁移流水线写入的记录)

网络注意事项:Dataflow 工作器将使用 shard_config.json 中指定的公共 IP 连接到 Cloud SQL 实例。由于 Cloud SQL 实例的“已获授权的网络”中存在 0.0.0.0/0 条目,因此允许此连接。

Dataflow 作业控制台中监控作业启动情况。

5. 测试反向复制

现在,直接在 Cloud Spanner 中进行更改,并验证这些更改是否反映在 Cloud SQL 中。仅当 Dataflow 作业已启动并处于处理状态时,才执行此操作。

测试 INSERTUPDATEDELETE

# INSERT: Insert a new singer (SingerId 5) into Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES (5, 'David', 'Chen', '1995-02-18', 0)"

# UPDATE: Update SingerId 3's AlbumCount in Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="UPDATE Singers SET AlbumCount = 5 WHERE SingerId = 3"

# DELETE: Delete SingerId 1 from Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="DELETE FROM Albums WHERE SingerId = 1"

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="DELETE FROM Singers WHERE SingerId = 1"

Cloud SQL 中的验证(过一会儿)

连接到 Cloud SQL:

mysql -h $SQL_INSTANCE_IP -u root -p

在系统提示时输入密码 (Welcome@1),然后在 mysql> 提示符处运行以下 SQL 命令。

USE music_db; 
-- Verify INSERT: This should show the new row for David Chen
SELECT * FROM Singers WHERE SingerId = 5;

-- Verify UPDATE: This should show AlbumCount as 5.
SELECT SingerId, FirstName, AlbumCount FROM Singers WHERE SingerId = 3;

-- Verify DELETE: This should return an empty set.
SELECT * FROM Albums WHERE SingerId = 1; 
SELECT * FROM Singers WHERE SingerId = 1; 

-- Final Verification
SELECT SingerId, FirstName, LastName, AlbumCount FROM Singers ORDER BY SingerId;
EXIT;

Cloud SQL 中的预期输出应反映在 Spanner 中所做的更改。

+----------+-----------+----------------+------------+
| SingerId | FirstName | LastName       | AlbumCount |
+----------+-----------+----------------+------------+
|        3 | Alice     | Trentor        |          5 |
|        4 | Elena     | Nadal          |          0 |
|        5 | David     | Chen           |          0 |
+----------+-----------+----------------+------------+

这会确认反向复制流水线是否正常运行,是否能将 Spanner 中的更改同步回 Cloud SQL。

11. 清理资源

为避免您的 Google Cloud 账号产生进一步费用,请删除在此 Codelab 期间创建的资源。

设置环境变量(如果需要)

检查环境变量是否已正确设置:

echo "PROJECT_ID: $PROJECT_ID"
echo "REGION: $REGION"
echo "SQL_INSTANCE_NAME: $SQL_INSTANCE_NAME"
echo "SPANNER_INSTANCE_NAME: $SPANNER_INSTANCE_NAME"
echo "BUCKET_NAME: $BUCKET_NAME"
echo "STREAM_NAME: $STREAM_NAME"
echo "SQL_CP_NAME: $SQL_CP_NAME"
echo "GCS_CP_NAME: $GCS_CP_NAME"
echo "PUBSUB_SUBSCRIPTION: $PUBSUB_SUBSCRIPTION"
echo "PUBSUB_TOPIC: $PUBSUB_TOPIC"
echo "CHANGE_STREAM_NAME: $CHANGE_STREAM_NAME"

列出您的作业,以查找正在运行的 Dataflow 作业的作业 ID。相应地导出 JOB_ID_CDCJOB_ID_REVERSE

gcloud dataflow jobs list --region=$REGION --filter="state=Running"
export JOB_ID_CDC=<PASTE_JOB_ID_HERE>
export JOB_ID_REVERSE=<PASTE_JOB_ID_HERE>

如果您处于新的 Cloud Shell 会话中,请重新导出关键环境变量:

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or the region you used
export SQL_INSTANCE_NAME="source-mysql-instance"
export SPANNER_INSTANCE_NAME="target-spanner-instance"
export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
export STREAM_NAME="mysql-to-spanner-stream"
export SQL_CP_NAME="mysql-src-cp"
export GCS_CP_NAME="gcs-dest-cp"
export PUBSUB_TOPIC="datastream-gcs-updates"
export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
export CHANGE_STREAM_NAME="MusicDBChangeStream"

停止 Dataflow 流式处理作业

取消 Datastream to Spanner(实时迁移)作业:

gcloud dataflow jobs cancel $JOB_ID_CDC --region=$REGION --project=$PROJECT_ID

取消 Spanner to Cloud SQL(反向复制)作业:

gcloud dataflow jobs cancel $JOB_ID_REVERSE --region=$REGION --project=$PROJECT_ID

删除 Datastream 资源

停止并删除数据流:

gcloud datastream streams update $STREAM_NAME \
  --location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME \
  --location=$REGION --project=$PROJECT_ID --quiet

删除连接配置文件

gcloud datastream connection-profiles delete $SQL_CP_NAME \
  --location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $GCS_CP_NAME \
  --location=$REGION --project=$PROJECT_ID --quiet

删除 Pub/Sub 资源

删除订阅:

gcloud pubsub subscriptions delete $PUBSUB_SUBSCRIPTION \
  --project=$PROJECT_ID --quiet

删除主题:

gcloud pubsub topics delete $PUBSUB_TOPIC \
  --project=$PROJECT_ID --quiet

删除 Cloud SQL 实例

系统会自动删除其中的数据库 (music_db)。

gcloud sql instances delete $SQL_INSTANCE_NAME \
  --project=$PROJECT_ID --quiet

删除 Cloud Spanner 实例

这也会删除其中的数据库(music-db-migratedreverse-replication-metadata)。

gcloud spanner instances delete $SPANNER_INSTANCE_NAME \
  --project=$PROJECT_ID --quiet

删除 GCS 存储分区和内容

gcloud storage rm --recursive gs://${BUCKET_NAME}

删除本地文件

移除 Cloud Shell 主目录中生成的所有文件:

rm -f music-db* shard_config.json

您现在已清理为此 Codelab 创建的资源。