Dataproc Serverless

1. Omówienie Google Dataproc

Dataproc to w pełni zarządzana i wysoce skalowalna usługa do uruchamiania Apache Spark, Apache Flink, Presto oraz wielu innych narzędzi i platform open source. Używaj Dataproc do modernizacji jeziora danych, ETL / ELT i bezpiecznego badania danych w skali planety. Usługa Dataproc jest też w pełni zintegrowana z kilkoma usługami Google Cloud, takimi jak BigQuery, Cloud Storage, Vertex AI i Dataplex.

Usługa Dataproc jest dostępna w 3 wersjach:

  • Dataproc Serverless umożliwia uruchamianie zadań PySpark bez konieczności konfigurowania infrastruktury i autoskalowania. Dataproc Serverless obsługuje zadania wsadowe oraz sesje i notatniki wsadowe PySpark.
  • Dataproc w Google Compute Engine umożliwia zarządzanie klastrem Hadoop YARN dla zbiorów zadań Spark opartych na YARN, a także z narzędziami open source, takimi jak Flink i Presto. Możesz dostosować klastry w chmurze za pomocą dowolnej liczby skalowania w pionie lub poziomie, w tym także autoskalowania.
  • Dataproc w Google Kubernetes Engine umożliwia skonfigurowanie wirtualnych klastrów Dataproc w infrastrukturze GKE pod kątem przesyłania zadań Spark, PySpark, SparkR lub Spark SQL.

Podczas tego ćwiczenia w programie poznasz kilka różnych sposobów korzystania z Dataproc Serverless.

Środowisko Apache Spark zostało pierwotnie stworzone do uruchamiania w klastrach Hadoop i jako menedżera zasobów używało YARN. Obsługa klastrów Hadoop wymaga od nich specjalistycznej wiedzy i dbania o prawidłową konfigurację wielu różnych gałek w klastrach. To dodatek do osobnego zestawu pokrętła, który Spark wymaga także od użytkownika. Prowadzi to do wielu scenariuszy, w których deweloperzy spędzają więcej czasu na konfigurowaniu infrastruktury, zamiast zajmować się samym kodem Spark.

Dzięki Dataproc Serverless nie musisz ręcznie konfigurować klastrów Hadoop lub Spark. Dataproc Serverless nie działa w Hadoop i używa własnego dynamicznego przydzielania zasobów do określania wymagań dotyczących zasobów, w tym autoskalowania. W Dataproc Serverless można nadal dostosowywać niewielki podzbiór właściwości Spark, ale w większości przypadków nie trzeba modyfikować tych właściwości.

2. Skonfiguruj

Zaczniesz od skonfigurowania środowiska i zasobów używanych w tym ćwiczeniu z programowania.

Utwórz projekt Google Cloud. Możesz użyć istniejącej.

Otwórz Cloud Shell, klikając ją na pasku narzędzi konsoli Cloud.

ba0bb17945a73543.png

Cloud Shell udostępnia gotowe do użycia środowisko, którego możesz użyć do wykonania tego ćwiczenia z programowania.

68c4ebd2a8539764.png

Cloud Shell domyślnie ustawi nazwę projektu. Sprawdź je dokładnie, uruchamiając echo $GOOGLE_CLOUD_PROJECT. Jeśli nie widzisz identyfikatora projektu w danych wyjściowych, ustaw go.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Ustaw region Compute Engine dla swoich zasobów, na przykład us-central1 lub europe-west2.

export REGION=<your-region>

Włącz interfejsy API

Ćwiczenia z programowania korzystają z tych interfejsów API:

  • BigQuery
  • Dataproc

Włącz niezbędne interfejsy API. Zajmie to około minuty, a po zakończeniu pojawi się komunikat o powodzeniu.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Konfigurowanie dostępu do sieci

Dataproc Serverless wymaga włączenia prywatnego dostępu do Google w regionie, w którym będziesz uruchamiać zadania Spark, ponieważ sterowniki i wykonawcy Spark mają tylko prywatne adresy IP. Uruchom to polecenie, aby włączyć je w podsieci default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Aby sprawdzić, czy dostęp prywatny do Google jest włączony, użyj tych poleceń, które zwróci wartość True lub False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Utwórz zasobnik na dane

Utwórz zasobnik na dane, który będzie służyć do przechowywania zasobów utworzonych w ramach tego ćwiczenia z programowania.

Wybierz nazwę zasobnika. Nazwy zasobników muszą być niepowtarzalne globalnie wśród wszystkich użytkowników.

export BUCKET=<your-bucket-name>

Utwórz zasobnik w regionie, w którym chcesz uruchamiać zadania Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Gdy zobaczysz, że Twój zasobnik jest dostępny w konsoli Cloud Storage, Aby wyświetlić zasobnik, możesz też uruchomić gsutil ls.

Tworzenie stałego serwera historii

Interfejs Spark udostępnia bogaty zestaw narzędzi do debugowania oraz statystyk dotyczących zadań Spark. Aby wyświetlić interfejs usługi Spark dla ukończonych zadań Dataproc Serverless, musisz utworzyć klaster Dataproc z jednym węzłem, który będzie używany jako serwer trwałej historii.

Ustaw nazwę stałego serwera historii.

PHS_CLUSTER_NAME=my-phs

Wykonaj to polecenie.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Interfejs usługi Spark i stały serwer historii zostaną szczegółowo omówione w dalszej części szkolenia w Codelabs.

3. Uruchamianie bezserwerowych zadań Spark przy użyciu zadań wsadowych Dataproc

W tym przykładzie opracujesz zbiór danych z publicznego zbioru danych Citi Tour Travel (w Nowym Jorku). NYC Citicycles to płatny system wypożyczania rowerów na terenie Nowego Jorku. Musisz wykonać kilka prostych transformacji i wydrukować 10 najpopularniejszych identyfikatorów stacji rowerów Citi. W tym przykładzie w dużym stopniu korzysta się też z oprogramowania typu open source spark-bigquery-connector do bezproblemowego odczytu i zapisu danych między usługami Spark i BigQuery.

Sklonuj poniższe repozytorium GitHub i plik cd do katalogu z plikiem citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Prześlij zadanie do Serverless Spark za pomocą pakietu SDK Cloud, który jest domyślnie dostępny w Cloud Shell. Aby przesłać zadania Serverless Spark, uruchom w powłoce podane niżej polecenie, które korzysta z pakietu SDK Cloud i interfejsu Dataproc Batches API.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Aby to zrobić:

  • Funkcja gcloud dataproc batches submit odwołuje się do interfejsu Dataproc Batches API.
  • pyspark oznacza, że przesyłasz zadanie PySpark.
  • --batch to nazwa zadania. Jeśli identyfikator UUID nie zostanie podany, używany będzie losowo wygenerowany identyfikator UUID.
  • --region=${REGION} to region geograficzny, w którym zadanie zostanie przetworzone.
  • --deps-bucket=${BUCKET} to miejsce, do którego przesyłany jest lokalny plik Pythona, zanim zostanie on uruchomiony w środowisku bezserwerowym.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar zawiera plik jar dla spark-bigquery-connector w środowisku środowiska wykonawczego Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} to w pełni kwalifikowana nazwa stałego serwera historii. To w niej przechowywane są dane zdarzeń Spark (niezależnie od danych wyjściowych z konsoli) i są tam przechowywane i widoczne w interfejsie Spark.
  • Końcowa wartość -- oznacza, że wszystko, co znajduje się dalej, będzie argumentami środowiska wykonawczego dla programu. W takim przypadku przesyłasz nazwę zasobnika zgodnie z wymaganiami zadania.

Po przesłaniu wsadu pojawią się następujące dane wyjściowe.

Batch [citibike-job] submitted.

Po kilku minutach zobaczysz podane niżej dane wyjściowe wraz z metadanymi zadania.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

W następnej sekcji dowiesz się, jak znaleźć logi tego zadania.

Dodatkowe funkcje

Spark Serverless to usługa zapewniająca dodatkowe opcje uruchamiania zadań.

  • Możesz utworzyć niestandardowy obraz Dockera, na którym będzie działać Twoje zadanie. Jest to świetny sposób na uwzględnienie dodatkowych zależności, takich jak biblioteki Pythona i R.
  • Aby uzyskać dostęp do metadanych Hive, możesz połączyć z zadaniem instancję Dataproc Metastore.
  • Aby zapewnić dodatkową kontrolę, Dataproc Serverless obsługuje konfigurację niewielkiego zestawu właściwości Spark.

4. Wskaźniki i dostrzegalność Dataproc

Konsola Dataproc Serverless zawiera listę wszystkich zadań Dataproc Serverless. W konsoli zobaczysz wartości Identyfikator wsadowy, Lokalizacja, Stan, Czas utworzenia, Czas trwania i Typ każdego zadania. Kliknij Identyfikator zadania wsadowego, aby wyświetlić więcej informacji na jego temat.

Na tej stronie znajdziesz informacje takie jak Monitorowanie, które pokazują, ile wykonawców usługi Batch Spark były używane przez Twoje zadanie w danym okresie (co wskazuje, w jakim stopniu jest autoskalowane).

Na karcie Szczegóły zobaczysz więcej metadanych o zadaniu, w tym wszelkie argumenty i parametry, które zostały do niego przesłane.

Na tej stronie możesz też uzyskać dostęp do wszystkich dzienników. Gdy zadania Dataproc Serverless są uruchamiane, generowane są 3 różne zestawy logów:

  • Poziom usług
  • dane wyjściowe konsoli,
  • Logowanie zdarzeń Spark

Poziom usługi – obejmuje logi wygenerowane przez usługę Dataproc Serverless. Dataproc Serverless to m.in. żądanie dodatkowych procesorów do autoskalowania. Możesz je wyświetlić, klikając Wyświetl logi – spowoduje to otwarcie Cloud Logging.

Dane wyjściowe konsoli możesz sprawdzić w sekcji Dane wyjściowe. Są to dane wyjściowe wygenerowane przez zadanie, w tym metadane, które Spark wydrukuje przy uruchamianiu zadania, lub wszelkie instrukcje drukowania włączone do zadania.

Rejestrowanie zdarzeń Spark jest dostępne w interfejsie usługi Spark. Ponieważ do Twojego zadania Spark został udostępniony trwały serwer historii, możesz uzyskać dostęp do interfejsu usługi Spark, klikając Wyświetl serwer historii usługi Spark, który zawiera informacje o wcześniej uruchomionych zadaniach Spark. Więcej informacji o interfejsie Spark znajdziesz w oficjalnej dokumentacji Spark.

5. Szablony Dataproc: BQ -> GCS

Szablony Dataproc to narzędzia typu open source, które pomagają jeszcze bardziej uprościć zadania przetwarzania danych w chmurze. Pełnią one funkcję otoki Dataproc Serverless i zawierają szablony przeznaczone do wielu zadań importu i eksportowania danych, w tym:

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

Pełna lista jest dostępna w formacie README.

W tej sekcji użyjesz szablonów Dataproc do eksportowania danych z BigQuery do GCS.

Kopiowanie repozytorium

Skopiuj repozytorium i przejdź do folderu python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Konfigurowanie środowiska

Teraz ustawisz zmienne środowiskowe. Szablony Dataproc używają zmiennej środowiskowej GCP_PROJECT jako identyfikatora projektu, więc ustaw jej wartość równą GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Region powinien być ustawiony w środowisku z wcześniejszego okresu. Jeśli nie, ustaw ją tutaj.

export REGION=<region>

Szablony Dataproc używają spark-bigquery-conector do przetwarzania zadań BigQuery i wymagają umieszczenia identyfikatora URI w zmiennej środowiskowej JARS. Ustaw zmienną JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Konfigurowanie parametrów szablonu

Ustaw nazwę zasobnika przejściowego, którego ma używać usługa.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Następnie ustaw zmienne związane z poszczególnymi zadaniami. W tabeli wejściowej ponownie odwołasz się do zbioru danych BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Masz do wyboru csv, parquet, avro lub json. W przypadku tego ćwiczenia w programie wybierz CSV – w następnej sekcji dowiesz się, jak konwertować typy plików przy użyciu szablonów Dataproc.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Ustaw tryb wyjścia na overwrite. Masz do wyboru overwrite, append, ignore lub errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Ustaw lokalizację wyjściową GCS jako ścieżkę w zasobniku.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Uruchamianie szablonu

Uruchom szablon BIGQUERYTOGCS, określając go poniżej i podając ustawione parametry wejściowe.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Dane wyjściowe będą dość głośne, ale po około minucie zobaczysz następujący komunikat.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Aby sprawdzić, czy pliki zostały wygenerowane, uruchom to polecenie.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark domyślnie zapisuje dane w wielu plikach w zależności od ilości danych. W takim przypadku zobaczysz około 30 wygenerowanych plików. Nazwy plików wyjściowych Spark mają format part-, po którym następuje 5-cyfrowy numer (oznaczający numer części) oraz ciąg znaków skrótu. W przypadku dużych ilości danych Spark zazwyczaj zapisuje informacje w kilku plikach. Przykładowa nazwa pliku to part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Szablony Dataproc: plik CSV do parquet

Za pomocą szablonów Dataproc będziesz teraz konwertować dane w GCS z jednego typu na inny plik przy użyciu GCSTOGCS. Ten szablon używa SparkSQL i umożliwia przesłanie zapytania SparkSQL do przetworzenia podczas przekształcenia w celu dodatkowego przetwarzania.

Potwierdzanie zmiennych środowiskowych

Sprawdź, czy wartości GCP_PROJECT, REGION i GCS_STAGING_BUCKET są ustawione z poprzedniej sekcji.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Ustawianie parametrów szablonu

Teraz ustawisz parametry konfiguracji dla usługi GCStoGCS. Zacznij od lokalizacji plików wejściowych. Pamiętaj, że jest to katalog, a nie konkretny plik, ponieważ wszystkie pliki w katalogu zostaną przetworzone. Ustaw na BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Ustaw format pliku wejściowego.

GCS_TO_GCS_INPUT_FORMAT=csv

Ustaw odpowiedni format wyjściowy. Możesz wybrać parquet, json, avro lub csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Ustaw tryb wyjścia na overwrite. Masz do wyboru overwrite, append, ignore lub errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Ustaw lokalizację wyjściową.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Uruchamianie szablonu

Uruchom szablon GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Dane wyjściowe będą dość głośne, ale po około minucie powinien wyświetlić się komunikat o powodzeniu, podobny do tego poniżej.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Aby sprawdzić, czy pliki zostały wygenerowane, uruchom to polecenie.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Ten szablon pozwala też dostarczać zapytania SparkSQL, przekazując do szablonu gcs.to.gcs.temp.view.name i gcs.to.gcs.sql.query, co umożliwia uruchomienie zapytania SparkSQL na danych przed zapisem w GCS.

7. Czyszczenie zasobów

Aby po ukończeniu tego ćwiczenia z programowania uniknąć niepotrzebnych opłat na koncie GCP:

  1. Usuń zasobnik Cloud Storage utworzonego środowiska.
gsutil rm -r gs://${BUCKET}
  1. Usuń klaster Dataproc używany jako stały serwer historii.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Usuń zadania Dataproc Serverless. Otwórz konsolę usług zbiorczych, zaznacz pole obok każdego zadania, które chcesz usunąć, i kliknij USUŃ.

Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

8. Co dalej?

Dodatkowe sposoby korzystania z usługi Serverless Spark znajdziesz w tych materiałach: