1. はじめに
この Codelab では、Google Cloud の統合データ レイクハウスの機能について説明します。BigLake の Apache Iceberg REST カタログ を介して提供される一般公開データセットを操作し、構造化データと非構造化データの両方に Google Cloud の AI 機能適用します。
Apache Iceberg を使用して従来の NYC Taxi データセットをクエリし、タイムトラベル でデータ変更を監査します。次に、BigQuery ML と Gemini を使用してデータに対して AI モデルを実行します。
演習内容
- Google Cloud Serverless for Apache Spark を使用して、BigLake でホストされている Apache Iceberg 一般公開データセット をクエリします。
- Apache Iceberg 形式の構造化データをクエリします。
- Apache Iceberg で タイムトラベル をデモンストレーションします。
- BigQuery ML を使用して、構造化データで予測モデルをトレーニングします。
- BigLake オブジェクト テーブル (非構造化データ)を作成し、Gemini を使用して画像を分析します。
必要なもの
- Chrome などのウェブブラウザ。
- 課金を有効にした Google Cloud プロジェクト
費用と所要時間
- 所要時間: 約 45 分。
- 推定費用: 2.00 ドル未満。一般公開データセットとサーバーレス クエリを使用して、費用を抑えます。
2. 設定と要件
このステップでは、環境を準備し、必要な API を有効にします。
Cloud Shell の起動
ほとんどのコマンドは Google Cloud Shell から実行します。
- Google Cloud コンソールの上部にある [Cloud Shell をアクティブにする] アイコンをクリックします。
- 認証を確認します。
gcloud auth list - プロジェクトを確認します。
gcloud config get project - プロジェクトが設定されていない場合は、プロジェクト ID を使用して設定します。
gcloud config set project <YOUR_PROJECT_ID>
API を有効にする
次のコマンドを実行して、BigQuery、Cloud Resource Manager、Vertex AI に必要な API を有効にします。
gcloud services enable \
bigquery.googleapis.com \
aiplatform.googleapis.com \
cloudresourcemanager.googleapis.com
環境を構成して依存関係バケットを作成する
- ターミナルで環境変数を設定します。
export PROJECT_ID=$(gcloud config get project) export REGION=us-central1 export DEPS_BUCKET=$PROJECT_ID-deps-bucket - 依存関係の Cloud Storage バケットを作成します。PySpark スクリプトは、ジョブの送信時にここにアップロードされます。
gcloud storage buckets create gs://$DEPS_BUCKET --location=$REGION
3. Apache Iceberg 一般公開カタログに接続する
このステップでは、Google Cloud の BigLake でホストされている本番環境グレードの Apache Iceberg カタログに接続します。
Apache Spark 向け Serverless バッチ CLI で Spark SQL を実行する
Google Cloud Serverless for Apache Spark を使用して、インフラストラクチャを管理せずに PySpark ジョブを実行します。一般公開の BigLake REST カタログを指すように構成します。
- BigLake REST カタログのプロパティを定義して、繰り返しを回避します。この構成は Spark に次のことを指示します。
iceberg-spark-runttimeライブラリとiceberg-gcp-bundleライブラリを使用する。- BigLake REST カタログ エンドポイント を使用して、
my_catalogという名前のカタログを構成する。 - デフォルトのローカル ファイル システムではなく、Google Cloud Storage(GCS) を使用してデータファイルを読み取る。
- この
my_catalogカタログをセッションのデフォルト として設定する。 - セキュリティを強化し、データアクセスを簡素化するために、販売された認証情報を使用する。
export METASTORE_PROPERTIES="^|^spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0,org.apache.iceberg:iceberg-gcp-bundle:1.10.0|\ spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog|\ spark.sql.catalog.my_catalog.type=rest|\ spark.sql.catalog.my_catalog.uri=https://biglake.googleapis.com/iceberg/v1/restcatalog|\ spark.sql.catalog.my_catalog.warehouse=gs://biglake-public-nyc-taxi-iceberg|\ spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.gcp.gcs.GCSFileIO|\ spark.sql.catalog.my_catalog.header.x-goog-user-project=$PROJECT_ID|\ spark.sql.catalog.my_catalog.header.X-Iceberg-Access-Delegation=vended-credentials|\ spark.sql.catalog.my_catalog.rest.auth.type=org.apache.iceberg.gcp.auth.GoogleAuthManager|\ spark.sql.defaultCatalog=my_catalog|\ spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions|\ spark.log.level=ERROR" - 簡単なテストクエリ ファイルを作成します。
cat <<EOF > test.py from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sql("SHOW TABLES IN public_data").show() EOF - バッチジョブを送信します。
出力は次のようになります。gcloud dataproc batches submit pyspark \ --project=$PROJECT_ID \ --region=$REGION \ --version=2.3 \ --properties="$METASTORE_PROPERTIES" \ --deps-bucket=gs://$DEPS_BUCKET \ test.py+-----------+----------------+-----------+ | namespace| tableName|isTemporary| +-----------+----------------+-----------+ |public_data| nyc_taxicab| false| |public_data|nyc_taxicab_2021| false| +-----------+----------------+-----------+
4. 構造化された Iceberg データをクエリする
接続すると、データセットに完全な SQL アクセス権が付与されます。Iceberg テーブルとしてモデル化された NYC Taxi データセットをクエリします。
標準集計クエリを実行する
query.py という名前のファイルを作成します。
cat <<EOF > query.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
query = """
SELECT
passenger_count,
COUNT(1) AS num_trips,
ROUND(AVG(total_amount), 2) AS avg_fare,
ROUND(AVG(trip_distance), 2) AS avg_distance
FROM public_data.nyc_taxicab
WHERE data_file_year = 2021 AND passenger_count > 0
GROUP BY passenger_count
ORDER BY num_trips DESC
"""
spark.sql(query).show()
EOF
Apache Spark 向け Serverless を使用して送信します。
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
query.py
コンソールに次のような出力が表示されます。
+---------------+---------+--------+------------+ |passenger_count|num_trips|avg_fare|avg_distance| +---------------+---------+--------+------------+ | 1| 21508009| 18.82| 3.03| | 2| 4424746| 20.22| 3.40| | 3| 1164846| 19.84| 3.27| | 5| 718282| 18.88| 3.07| | 4| 466485| 20.61| 3.44| | 6| 452467| 18.97| 3.11| | 7| 78| 65.24| 3.71| | 8| 49| 57.39| 5.88| | 9| 35| 73.26| 6.20| | 96| 1| 17.00| 2.00| | 112| 1| 15.00| 2.00| +---------------+---------+--------+------------+
ここで Apache Iceberg を使用する理由
- パーティション プルーニング: クエリは
data_file_year = 2021でフィルタされます。Iceberg を使用すると、エンジンは他の年のデータのスキャンを完全にスキップできます。 - エンジンの俊敏性: データをコピーせずに、Spark、Trino、BigQuery で実行できます。
5. Apache Iceberg のタイムトラベル
Iceberg の最も強力な機能の一つに タイムトラベル があります。過去のバージョンまたはスナップショットに存在していたデータをクエリできます。
テーブルの履歴を表示する
history.py という名前のファイルを作成します。
cat <<EOF > history.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sql("SELECT * FROM public_data.nyc_taxicab.history").show()
EOF
送信します。
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
history.py
コンソールに次のような出力が表示されます。
+--------------------+-------------------+-------------------+-------------------+ | made_current_at| snapshot_id| parent_id|is_current_ancestor| +--------------------+-------------------+-------------------+-------------------+ |2026-01-07 21:32:...|6333415779680505547| NULL| true| |2026-01-07 21:34:...|1840345522877675925|6333415779680505547| true| |2026-01-07 21:36:...|7203554539964460256|1840345522877675925| true| |2026-01-07 21:38:...|4573466015237516024|7203554539964460256| true| |2026-01-07 21:40:...|3353190952148867790|4573466015237516024| true| |2026-01-07 21:42:...|1335547378580631681|3353190952148867790| true| |2026-01-07 21:44:...|8203141258229894239|1335547378580631681| true| |2026-01-07 21:46:...|1597048231706307813|8203141258229894239| true| |2026-01-07 21:48:...|6247811509231462655|1597048231706307813| true| |2026-01-07 21:50:...|2527184310045633322|6247811509231462655| true| |2026-01-07 21:52:...|2512764101237223642|2527184310045633322| true| |2026-01-07 21:52:...|7045957533358062548|2512764101237223642| true| |2026-01-07 21:53:...| 531753237516076726|7045957533358062548| true| |2026-01-07 21:53:...|4184653573199718274| 531753237516076726| true| |2026-01-07 21:54:...|5125223829492177301|4184653573199718274| true| |2026-01-07 21:54:...|6844673237417600305|5125223829492177301| true| |2026-01-07 21:54:...|6634828203344518093|6844673237417600305| true| |2026-01-07 21:55:...|7637728273407236194|6634828203344518093| true| |2026-01-07 21:55:...|3424071684958740192|7637728273407236194| true| |2026-01-07 21:55:...|1743746294196424254|3424071684958740192| true| +--------------------+-------------------+-------------------+-------------------+
さまざまなスナップショット ID とコミットされた日時を表す行が表示されます。
現在の行数と過去の行数を比較する
timetravel.py という名前のファイルを作成します。
cat <<EOF > timetravel.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
query = """
SELECT 'Current State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab
UNION ALL
SELECT 'Past State' AS version, COUNT(*) AS count FROM public_data.nyc_taxicab VERSION AS OF 4573466015237516024
"""
spark.sql(query).show()
EOF
送信します。
gcloud dataproc batches submit pyspark \
--project=$PROJECT_ID \
--region=$REGION \
--version=2.3 \
--properties="$METASTORE_PROPERTIES" \
--deps-bucket=gs://$DEPS_BUCKET \
timetravel.py
コンソールに次のような出力が表示されます。
+-------------+----------+ | version| count| +-------------+----------+ |Current State|1293069366| | Past State| 72878594| +-------------+----------+
これにより、時間の経過に伴うデータ変更を監査できます。
6. BigQuery ML を使用した構造化 AI
Iceberg データを探索したので、BigQuery の AI 機能を使用してみましょう。一般公開の Iceberg カタログは読み取り専用であるため、BigQuery を使用して、一般公開テーブルから読み取ってワークスペースでモデルをトレーニングできます。
ローカル データセットを作成する
まず、bq CLI を使用して、AI モデルを保存するデータセットをプロジェクトに作成します。
bq mk --location=$REGION --project_id=$PROJECT_ID iceberg_ai
線形回帰モデルをトレーニングする
次に、一般公開の BigLake Iceberg テーブルを使用して線形回帰モデルをトレーニングします。
クエリファイルを作成し、bq query を使用してモデルをトレーニングします。
cat <<'EOF' > train_model.sql
CREATE OR REPLACE MODEL `iceberg_ai.predict_fare`
OPTIONS(model_type='LINEAR_REG', input_label_cols=['fare_amount']) AS
SELECT fare_amount, passenger_count, CAST(trip_distance AS FLOAT64) AS trip_distance
FROM `bigquery-public-data`.`biglake-public-nyc-taxi-iceberg`.public_data.nyc_taxicab
WHERE fare_amount > 0 AND trip_distance > 0 AND RAND() < 0.01; -- Using 1% of data to downsample
EOF
bq query --location=$REGION --use_legacy_sql=false < train_model.sql
モデルを使用して予測する
モデルのトレーニングが完了したら、ML.PREDICT を使用して新しい旅行の料金を予測できます。
クエリファイルを作成し、bq query を使用して予測を実行します。
cat <<'EOF' > predict_fare.sql
SELECT
predicted_fare_amount, passenger_count, trip_distance
FROM
ML.PREDICT(MODEL `iceberg_ai.predict_fare`,
(
SELECT 2 AS passenger_count, 5.0 AS trip_distance
)
);
EOF
bq query --location=$REGION --use_legacy_sql=false < predict_fare.sql
出力は次のようになります。
+-----------------------+-----------------+---------------+ | predicted_fare_amount | passenger_count | trip_distance | +-----------------------+-----------------+---------------+ | 14.12252095150709 | 2 | 5.0 | +-----------------------+-----------------+---------------+
7. BigLake を使用した非構造化 AI
データは行と列だけではありません。統合データ レイクハウスは、非構造化データ(画像、PDF)も処理します。オブジェクト テーブル と オブジェクト参照 を使用して非構造化データをクエリしましょう。
オブジェクト テーブル は、Cloud Storage パス内のオブジェクトを一覧表示する BigQuery の読み取り専用の外部テーブルです。各行はファイルを表し、uri、size などのメタデータの列と、ObjectRef を含む特別な ref 列があります。
オブジェクト参照 (ObjectRef)は、単一のファイルの実際のデータを指します。最新の BigQuery ML 関数(AI.GENERATE や AI.AGG など)は、ObjectRef を使用してファイル コンテンツ(画像、音声、テキスト)を読み取り、標準テーブルにバイトを読み込まずに分析します。
非構造化 AI のデータセットを作成する
まず、US マルチリージョンで bq CLI を使用して、オブジェクト テーブルを保存する 2 つ目のデータセットをプロジェクトに作成します。
bq mk --location=US --project_id=$PROJECT_ID iceberg_object_ai
外部接続を作成する
BigQuery から Cloud Storage に保存されているデータ(オブジェクト テーブルと非構造化データ)をクエリするには、外部接続を作成する必要があります。
Cloud Shell で次のコマンドを実行して、Cloud リソース接続を作成します。
bq mk --connection --project_id=$PROJECT_ID --location=US --connection_type=CLOUD_RESOURCE my-conn
接続用に作成されたサービス アカウント ID を確認します。
CONNECTION_SA=$(bq show --format=json --project_id=$PROJECT_ID --connection $PROJECT_ID.us.my-conn | jq -r '.serviceAccountId // .cloudResource.serviceAccountId')
Gemini モデルを呼び出して GCS データを読み取れるように、サービス アカウントに Vertex AI ユーザー ロールと Storage オブジェクト閲覧者 ロールを付与します。
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CONNECTION_SA" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$CONNECTION_SA" \
--role="roles/storage.objectViewer"
オブジェクト テーブルの作成
前のセクションで作成した外部接続 my-conn を使用して、非構造化データにアクセスします。クエリファイルを作成し、bq query を使用してオブジェクト テーブルを作成します。
cat <<'EOF' > create_object_table.sql
CREATE EXTERNAL TABLE `iceberg_object_ai.sample_images`
WITH CONNECTION `us.my-conn`
OPTIONS (
object_metadata = 'SIMPLE',
uris = ['gs://cloud-samples-data/vision/landmark/*']
);
EOF
bq query --use_legacy_sql=false < create_object_table.sql
オブジェクト データで Gemini を使用する
Gemini を使用してクエリを実行し、画像をダウンロードせずに評価します。
bq query を使用して標準 SQL で画像をクエリします。
cat <<EOF > query_images.sql
SELECT
uri,
image_analysis.description
FROM (
SELECT
uri,
AI.GENERATE(
(
'Identify what is happening in the image.',
ref
),
connection_id => 'us.my-conn',
endpoint => 'gemini-2.5-flash-lite',
output_schema => 'event STRING, severity STRING, description STRING'
) AS image_analysis
FROM
iceberg_object_ai.sample_images
);
EOF
bq query --use_legacy_sql=false < query_images.sql
出力例:
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| uri | description |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| gs://cloud-samples-data/vision/landmark/eiffel_tower.jpg | The Eiffel Tower stands tall against a cloudy sky, overlooking the Seine River in Paris. Boats are docked along the riverbank, and trees line the opposite shore, with bridges and buildings visible in the distance. |
| gs://cloud-samples-data/vision/landmark/pofa.jpg | A wide shot shows the Palace of Fine Arts, a monumental structure in San Francisco, California. The building features a large rotunda with a dome, surrounded by colonnades. In front of the rotunda is a lagoon. Several people are walking around the grounds. The sky is blue with a few scattered clouds. |
| gs://cloud-samples-data/vision/landmark/st_basils.jpeg | A monument stands in front of Saint Basil's Cathedral in Moscow under a bright blue sky with scattered white clouds. The cathedral features distinctive onion domes in various colors and patterns, including red, blue and white stripes, green and beige stripes, and red and blue diamonds. A large green tree partially obscures the left side of the cathedral. People are visible in the foreground near the base of the monument and the cathedral entrance. |
+----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
ObjectRef を直接探索する: 感情分析
オブジェクト テーブルはファイル参照を自動的に管理しますが、BigQuery のオブジェクト参照 を使用してこれらのオブジェクトを直接操作し、単一のファイルに対してオンザフライ分析を実行できます。
たとえば、独自の GCS バケットに保存されている小さなテキスト ファイル(先ほど作成した $DEPS_BUCKET 変数を使用)を使用して、OBJ.MAKE_REF と bq query を使用して分析できます。
まず、小さなテキスト ファイルを作成してバケットにアップロードします。
cat <<'EOF' > review.txt
This product is fantastic! It exceeded my expectations. The quality is top-notch. I highly recommend it to everyone!
EOF
gcloud storage cp review.txt gs://${DEPS_BUCKET}/review.txt
標準 SQL 内の OBJ.MAKE_REF を使用してファイルをクエリします。
cat <<EOF > sentiment_analysis.sql
SELECT
AI.GENERATE(
(
'Analyze the sentiment of this text file. Is it positive, negative, or neutral? Explain why.',
OBJ.MAKE_REF('gs://${DEPS_BUCKET}/review.txt', 'us.my-conn')
),
connection_id => 'us.my-conn',
endpoint => 'gemini-2.5-flash-lite'
).result AS ml_generate_text_result;
EOF
bq query --use_legacy_sql=false < sentiment_analysis.sql
出力例:
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ml_generate_text_result |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| This text file has a **strongly positive** sentiment. |
| |
| Here's why: |
| |
| * **Positive Keywords:** The text is filled with unequivocally positive words and phrases: |
| * "fantastic" |
| * "exceeded my expectations" |
| * "top-notch" |
| * "highly recommend" |
| |
| * **Enthusiastic Language:** The use of exclamation marks ("!") further amplifies the positive tone, indicating excitement and strong approval. |
| |
| * **Lack of Negative or Neutral Elements:** There are no words, phrases, or implications that suggest any dissatisfaction, criticism, or even indifference. |
| |
| In summary, the author's language is enthusiastic and uses multiple strong positive descriptors, leaving no room for doubt that their opinion of the product is overwhelmingly positive. |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
8. クリーンアップ
Google Cloud アカウントに継続的に課金されないようにするには、この Codelab で作成したリソースを削除します。
データセットと接続を削除する
Cloud Shell で次のコマンドを実行して、データセットと接続を削除します。
bq rm -r -f --location=$REGION iceberg_ai
bq rm -r -f --location=US iceberg_object_ai
bq rm --connection $PROJECT_ID.US.my-conn
GCS バケットとローカル ファイルを削除する
GCS バケットとローカル ファイルをクリーンアップします。
# Delete GCS buckets
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
gcloud storage rm -r gs://dataproc-temp-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://dataproc-staging-${REGION}-${PROJECT_NUMBER}-*
gcloud storage rm -r gs://${DEPS_BUCKET}
# Delete local files
rm -f train_model.sql predict_fare.sql create_object_table.sql query_images.sql sentiment_analysis.sql test.py query.py history.py timetravel.py review.txt
このラボ専用にプロジェクトを作成した場合は、プロジェクト全体を削除することもできます。
9. 完了
おめでとうございます!Apache Iceberg、BigLake、BigQuery AI を使用して、統合データ レイクハウスを構築できました。
学習した内容
- 一般公開の Apache Iceberg REST カタログに接続してクエリする方法。
- Iceberg で タイムトラベル を使用してデータセットのバージョンを監査する。
- 構造化データで BigQuery ML モデル をトレーニングする。
- オブジェクト テーブルと ObjectRef を使用して非構造化データ(画像) を接続する。
- BigQuery SQL で Gemini を直接使用 して画像を分析する。