Odwrotne ETL z Snowflake do Spanner za pomocą CSV

1. Tworzenie potoku Reverse ETL z Snowflake do Spannera za pomocą Google Cloud Storage i Dataflow

Wprowadzenie

W tym module utworzysz potok Reverse ETL. Tradycyjnie potoki ETL (Extract, Transform, Load) przenoszą dane z działających baz danych do hurtowni danych, takiej jak Snowflake, na potrzeby analizy. Potok Reverse ETL działa odwrotnie: przenosi wyselekcjonowane, przetworzone dane z hurtowni danych z powrotem do systemów operacyjnych, w których mogą one zasilać aplikacje, obsługiwać funkcje dostępne dla użytkowników lub być wykorzystywane do podejmowania decyzji w czasie rzeczywistym.

Celem jest przeniesienie przykładowego zbioru danych z tabeli Snowflake do usługi Spanner, czyli rozmieszczonej globalnie relacyjnej bazy danych idealnej do aplikacji o wysokiej dostępności.

W tym celu jako kroki pośrednie wykorzystywane są Google Cloud Storage (GCS) i Dataflow. Oto opis przepływu i uzasadnienie tej architektury:

  1. Snowflake do Google Cloud Storage (GCS) w formacie CSV:
  • Pierwszym krokiem jest wyeksportowanie danych z Snowflake w otwartym, uniwersalnym formacie. Eksportowanie do pliku CSV to powszechna i prosta metoda tworzenia przenośnych plików danych. Będziemy przechowywać te pliki w GCS, która zapewnia skalowalne i trwałe rozwiązanie do przechowywania obiektów.
  1. GCS do Spanner (przez Dataflow):
  • Zamiast pisać niestandardowy skrypt do odczytywania danych z GCS i zapisywania ich w Spannerze, używana jest usługa Google Dataflow, czyli w pełni zarządzana usługa przetwarzania danych. Dataflow udostępnia gotowe szablony przeznaczone specjalnie do tego rodzaju zadań. Korzystanie z szablonu „GCS Text to Cloud Spanner” umożliwia importowanie danych równolegle z dużą przepustowością bez konieczności pisania kodu przetwarzania danych, co pozwala zaoszczędzić dużo czasu na programowanie.

Czego się nauczysz

  • Wczytywanie danych do Snowflake
  • Jak utworzyć zasobnik GCS
  • Eksportowanie tabeli Snowflake do GCS w formacie CSV
  • Konfigurowanie instancji usługi Spanner
  • Jak wczytywać tabele CSV do Spannera za pomocą Dataflow

2. Konfiguracja, wymagania i ograniczenia

Wymagania wstępne

  • Konto Snowflake.
  • Konto Google Cloud z włączonymi interfejsami API Spanner, Cloud Storage i Dataflow.
  • Dostęp do konsoli Google Cloud w przeglądarce.
  • Terminal z zainstalowanym Google Cloud CLI.
  • Jeśli w Twojej organizacji Google Cloud jest włączona zasada iam.allowedPolicyMemberDomains, administrator może przyznać wyjątek, aby zezwolić na konta usługi z domen zewnętrznych. W odpowiednich przypadkach omówimy to w dalszej części procesu.

Uprawnienia Google Cloud Platform IAM

Aby wykonać wszystkie czynności w tym samouczku, musisz mieć na koncie Google te uprawnienia:

Konta usługi

iam.serviceAccountKeys.create

Umożliwia tworzenie kont usługi.

Spanner

spanner.instances.create

Umożliwia utworzenie nowej instancji usługi Spanner.

spanner.databases.create

Umożliwia uruchamianie instrukcji DDL w celu tworzenia

spanner.databases.updateDdl

Umożliwia uruchamianie instrukcji DDL w celu tworzenia tabel w bazie danych.

Google Cloud Storage

storage.buckets.create

Umożliwia utworzenie nowego zasobnika GCS do przechowywania wyeksportowanych plików Parquet.

storage.objects.create

Umożliwia zapisywanie wyeksportowanych plików Parquet w zasobniku GCS.

storage.objects.get

Umożliwia BigQuery odczytywanie plików Parquet z zasobnika GCS.

storage.objects.list

Umożliwia BigQuery wyświetlanie listy plików Parquet w zasobniku GCS.

Dataflow

Dataflow.workitems.lease

Umożliwia przejmowanie elementów roboczych z Dataflow.

Dataflow.workitems.sendMessage

Umożliwia procesowi roboczemu Dataflow wysyłanie wiadomości z powrotem do usługi Dataflow.

Logging.logEntries.create

Umożliwia procesom roboczym Dataflow zapisywanie wpisów logów w Google Cloud Logging.

Dla wygody możesz użyć zdefiniowanych ról, które zawierają te uprawnienia.

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

Ograniczenia

Podczas przenoszenia danych między systemami należy pamiętać o różnicach w typach danych.

  • Snowflake do CSV: podczas eksportowania typy danych Snowflake są konwertowane na standardowe reprezentacje tekstowe.
  • CSV do Spannera: podczas importowania należy zadbać o to, aby docelowe typy danych Spannera były zgodne z reprezentacjami ciągów znaków w pliku CSV. W tym module znajdziesz typowe mapowania typów.

Konfigurowanie właściwości wielokrotnego użytku

W tym module będziesz wielokrotnie potrzebować kilku wartości. Aby to ułatwić, ustawimy te wartości jako zmienne powłoki, które będą używane później.

  • GCP_REGION – konkretny region, w którym będą się znajdować zasoby GCP. Listę regionów znajdziesz tutaj.
  • GCP_PROJECT – identyfikator projektu GCP do użycia.
  • GCP_BUCKET_NAME – nazwa zasobnika GCS, który ma zostać utworzony i w którym będą przechowywane pliki danych.
  • SPANNER_INSTANCE – nazwa, która ma zostać przypisana do instancji Spannera.
  • SPANNER_DB – nazwa, która ma być przypisana do bazy danych w instancji Spanner
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

Ten moduł wymaga projektu Google Cloud.

Projekt Google Cloud

Projekt to podstawowa jednostka organizacji w Google Cloud. Jeśli administrator udostępnił adres e-mail do użycia, ten krok można pominąć.

Projekt można utworzyć za pomocą interfejsu wiersza poleceń w ten sposób:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

Więcej informacji o tworzeniu projektów i zarządzaniu nimi znajdziesz tutaj.

3. Konfigurowanie usługi Spanner

Aby zacząć korzystać ze Spannera, musisz udostępnić instancję i bazę danych. Szczegółowe informacje o konfigurowaniu i tworzeniu instancji Spanner znajdziesz tutaj.

Tworzenie instancji

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

Tworzenie bazy danych

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. Utworzysz zasobnik Google Cloud Storage

Google Cloud Storage (GCS) będzie używany do tymczasowego przechowywania plików danych CSV wygenerowanych przez Snowflake przed ich zaimportowaniem do Spanner.

Tworzenie zasobnika

Aby utworzyć zasobnik pamięci w określonym regionie (np. us-central1), użyj tego polecenia.

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

Sprawdź, czy zasobnik został utworzony

Po pomyślnym wykonaniu tego polecenia sprawdź wynik, wyświetlając listę wszystkich zasobników. Nowy zasobnik powinien pojawić się na liście wyników. Odwołania do zasobników zwykle mają przed nazwą zasobnika prefiks gs://.

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

Testowanie uprawnień do zapisu

Ten krok zapewnia prawidłowe uwierzytelnianie środowiska lokalnego i przyznanie mu niezbędnych uprawnień do zapisywania plików w nowo utworzonym zasobniku.

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

Sprawdzanie przesłanego pliku

Wyświetl listę obiektów w zasobniku. Powinna się wyświetlić pełna ścieżka do przesłanego pliku.

gcloud storage ls gs://$GCS_BUCKET_NAME

Powinny się wyświetlić te dane wyjściowe:

gs://$GCS_BUCKET_NAME/hello.txt

Aby wyświetlić zawartość obiektu w zasobniku, można użyć gcloud storage cat.

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

Zawartość pliku powinna być widoczna:

Hello, GCS

Usuwanie pliku testowego

Zasobnik Cloud Storage jest już skonfigurowany. Teraz możesz usunąć tymczasowy plik testowy.

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

Dane wyjściowe powinny potwierdzać usunięcie:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. Eksportowanie ze Snowflake do GCS

W tym module użyjemy zbioru danych TPC-H, który jest standardowym w branży testem porównawczym dla systemów wspomagających podejmowanie decyzji. Ten zbiór danych jest domyślnie dostępny na wszystkich kontach Snowflake.

Przygotowywanie danych w Snowflake

Zaloguj się na konto Snowflake i utwórz nowy arkusz.

Przykładowych danych TPC-H udostępnianych przez Snowflake nie można wyeksportować bezpośrednio z ich lokalizacji udostępnionej ze względu na uprawnienia. Najpierw tabelę ORDERS należy skopiować do osobnej bazy danych i schematu.

Utwórz bazę danych

  1. W menu po lewej stronie w sekcji Horizon Catalog najedź kursorem na Catalog (Katalog), a następnie kliknij Database Explorer (Eksplorator bazy danych).
  2. Na stronie Bazy danych w prawym górnym rogu kliknij przycisk + Baza danych.
  3. Nadaj nazwę nowej bazie danych codelabs_retl_db

Tworzenie arkusza

Aby uruchamiać polecenia SQL w bazie danych, potrzebne będą arkusze.

Aby utworzyć arkusz:

  1. W menu po lewej stronie w sekcji Praca z danymi najedź kursorem na Projekty, a następnie kliknij Obszary robocze.
  2. Na pasku bocznym Moje obszary robocze kliknij przycisk + Dodaj nowy i wybierz Plik SQL.
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

Dane wyjściowe powinny wskazywać, że skopiowano 4375 wierszy.

Konfigurowanie Snowflake w celu uzyskania dostępu do GCS

Aby umożliwić Snowflake zapisywanie danych w zasobniku GCS, musisz utworzyć integrację usługi pamięci masowejetap.

  • Integracja z pamięcią masową: obiekt Snowflake, który przechowuje wygenerowane konto usługi i informacje o uwierzytelnianiu w zewnętrznej pamięci masowej w chmurze.
  • Etap: nazwany obiekt, który odwołuje się do konkretnego zasobnika i ścieżki, używając integracji pamięci do obsługi uwierzytelniania. Zapewnia wygodną, nazwaną lokalizację dla operacji wczytywania i zwalniania danych.

Najpierw utwórz integrację pamięci masowej.

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

Następnie opisz integrację, aby uzyskać utworzone przez Snowflake konto usługi.

DESC STORAGE INTEGRATION gcs_int; 

W wynikach skopiuj wartość STORAGE_GCP_SERVICE_ACCOUNT. Będzie wyglądać jak adres e-mail.

Zapisz to konto usługi w zmiennej środowiskowej w instancji powłoki, aby móc je później ponownie wykorzystać.

export GCP_SERVICE_ACCOUNT=<Your service account>

Przyznawanie Snowflake uprawnień GCS

Teraz musisz przyznać kontu usługi Snowflake uprawnienia do zapisu w zasobniku GCS.

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

Tworzenie etapu i eksportowanie danych

Po ustawieniu uprawnień wróć do arkusza Snowflake. Utwórz etap, który korzysta z integracji, a następnie użyj polecenia COPY INTO, aby wyeksportować dane z tabeli SAMPLE_ORDERS na ten etap.

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

W panelu Wyniki powinna być widoczna wartość rows_unloaded równa 1500000.

Weryfikowanie danych w GCS

Sprawdź zasobnik GCS, aby zobaczyć utworzone przez Snowflake pliki. Potwierdza to, że eksport zakończył się powodzeniem.

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

Powinno być widocznych co najmniej kilka ponumerowanych plików CSV.

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. Wczytywanie danych do Spanner za pomocą Dataflow

Gdy dane znajdą się już w GCS, do zaimportowania ich do Spannera zostanie użyta usługa Dataflow. Dataflow to w pełni zarządzana usługa Google Cloud do przetwarzania danych strumieniowych i wsadowych. Użyjemy gotowego szablonu Google zaprojektowanego specjalnie do importowania plików tekstowych z GCS do Spannera.

Tworzenie tabeli Spanner

Najpierw utwórz tabelę docelową w usłudze Spanner. Schemat musi być zgodny z danymi w plikach CSV.

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

Tworzenie manifestu Dataflow

Szablon Dataflow wymaga pliku „manifest”. Jest to plik JSON, który informuje szablon, gdzie znaleźć źródłowe pliki danych i do której tabeli Spanner należy je wczytać.

Zdefiniuj i prześlij nowy plik regional_sales_manifest.json do zasobnika GCS:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

Włącz Dataflow API

Zanim zaczniesz korzystać z Dataflow, musisz najpierw włączyć tę usługę. Zrób to za pomocą

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

Tworzenie i uruchamianie zadania Dataflow

Zadanie importu jest teraz gotowe do uruchomienia. To polecenie uruchamia zadanie Dataflow przy użyciu szablonu GCS_Text_to_Cloud_Spanner.

Polecenie jest długie i ma kilka parametrów. Oto podział:

–gcs-location

Ścieżka do gotowego szablonu w GCS.

–region

Region, w którym będzie uruchamiane zadanie Dataflow.

–parameters

instanceId, databaseId

Docelowa instancja i baza danych usługi Spanner.

importManifest

Ścieżka GCS do utworzonego właśnie pliku manifestu.

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

Stan zadania Dataflow można sprawdzić za pomocą tego polecenia:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

Wykonanie zadania powinno zająć około 5 minut.

Weryfikowanie danych w usłudze Spanner

Gdy zadanie Dataflow zostanie wykonane, sprawdź, czy dane zostały wczytane do Spannera.

Najpierw sprawdź liczbę wierszy. Powinna wynosić 4375.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

Następnie wykonaj zapytanie dotyczące kilku wierszy, aby sprawdzić dane.

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

Zaimportowane dane z tabeli Snowflake powinny być widoczne.

7. Porządkowanie roszczeń

Czyszczenie Spannera

Usuwanie bazy danych i instancji usługi Spanner

gcloud spanner instances delete $SPANNER_INSTANCE

Czyszczenie GCS

Usuwanie zasobnika GCS utworzonego na potrzeby hostowania danych

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

Czyszczenie Snowflake

Usuwanie bazy danych

  1. W menu po lewej stronie w sekcji Katalog Horizon najedź kursorem na Katalog, a następnie kliknij Eksplorator bazy danych.
  2. Kliknij po prawej stronie bazy danych CODELABS_RETL_DB, aby rozwinąć opcje, i wybierz Usuń.
  3. W wyświetlonym oknie potwierdzenia kliknij Drop Database (Usuń bazę danych).

Usuwanie skoroszytów

  1. W menu po lewej stronie w sekcji Praca z danymi najedź kursorem na Projekty, a następnie kliknij Obszary robocze.
  2. Na pasku bocznym Mój obszar roboczy najedź kursorem na różne pliki obszaru roboczego, których używasz w tym module, aby wyświetlić ... dodatkowe opcje, a następnie kliknij je.
  3. Kliknij Usuń, a następnie jeszcze raz Usuń w oknie potwierdzenia.
  4. Zrób to w przypadku wszystkich plików obszaru roboczego SQL utworzonych na potrzeby tego modułu.

8. Gratulacje

Gratulujemy ukończenia ćwiczenia.

Omówione zagadnienia

  • Wczytywanie danych do Snowflake
  • Jak utworzyć zasobnik GCS
  • Eksportowanie tabeli Snowflake do GCS w formacie CSV
  • Konfigurowanie instancji usługi Spanner
  • Jak wczytywać tabele CSV do Spannera za pomocą Dataflow