Dataproc Serverless

1. Omówienie – Google Dataproc

Dataproc to usługa w pełni zarządzana i wysoce skalowalna do uruchamiania Apache Spark, Apache Flink, Presto oraz wielu innych narzędzi i platform open source. Możesz wykorzystać Dataproc do modernizacji jezior danych, realizacji procesów ETL / ELT i bezpiecznego badania danych na globalną skalę. Rozwiązanie jest też w pełni zintegrowane z kilkoma usługami Google Cloud, takimi jak BigQuery, Cloud Storage, Vertex AI i Dataplex.

Usługa Dataproc jest dostępna w 3 wersjach:

  • Dataproc Serverless umożliwia uruchamianie zadań PySpark bez konieczności konfigurowania infrastruktury i autoskalowania. Dataproc Serverless obsługuje zadania wsadowe i sesje (notatniki) PySpark.
  • Dataproc w Google Compute Engine umożliwia zarządzanie klastrem Hadoop YARN na potrzeby zadań Spark opartych na YARN, a także narzędzi open source, takich jak Flink i Presto. Możesz dostosować klastry działające w chmurze za pomocą skalowania w pionie lub w poziomie, w tym autoskalowania.
  • Dataproc w Google Kubernetes Engine umożliwia konfigurowanie wirtualnych klastrów Dataproc w infrastrukturze GKE na potrzeby przesyłania zadań Spark, PySpark, SparkR i Spark SQL.

W tym ćwiczeniu z programowania dowiesz się, jak korzystać z Dataproc Serverless na kilka sposobów.

Platforma Apache Spark została pierwotnie opracowana z myślą o jej uruchamianiu w klastrach Hadoop, a jako menedżera zasobów używała rozwiązania YARN. Obsługa klastrów Hadoop wymaga specjalistycznej wiedzy i umiejętności prawidłowego konfigurowania wielu różnych ustawień klastrów. Oprócz tego użytkownik musi też skonfigurować osobny zestaw ustawień wymaganych przez Spark. W wielu przypadkach programiści poświęcają więcej czasu na konfigurowanie infrastruktury niż na pracę nad samym kodem Spark.

Dataproc Serverless eliminuje konieczność ręcznego konfigurowania klastrów Hadoop lub Spark. Dataproc Serverless nie działa na platformie Hadoop, a do określania wymagań dotyczących zasobów, w tym do autoskalowania, wykorzystuje własne dynamiczne przydzielanie zasobów. Za pomocą Dataproc Serverless można dostosować niewielki podzbiór usług Spark, ale w większości przypadków nie trzeba ich modyfikować.

2. Skonfiguruj

Zacznij od skonfigurowania środowiska i zasobów używanych w tym ćwiczeniu z programowania.

Utwórz projekt Google Cloud. Możesz też użyć istniejącego projektu.

Aby otworzyć Cloud Shell, kliknij go na pasku narzędzi Cloud Console.

ba0bb17945a73543.png

Cloud Shell udostępnia gotowe do użycia środowisko powłoki, z którego możesz korzystać podczas tego ćwiczenia z programowania.

68c4ebd2a8539764.png

Cloud Shell domyślnie ustawi nazwę projektu. Sprawdź to, uruchamiając polecenie echo $GOOGLE_CLOUD_PROJECT. Jeśli w danych wyjściowych nie widzisz identyfikatora projektu, ustaw go.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Ustaw dla swoich zasobów region Compute Engine region, na przykład us-central1 lub europe-west2.

export REGION=<your-region>

Włącz interfejsy API

To ćwiczenie z programowania korzysta z tych interfejsów API:

  • BigQuery
  • Dataproc

Włącz niezbędne interfejsy API. Zajmie to około minuty, a po zakończeniu pojawi się komunikat o powodzeniu.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Konfigurowanie dostępu do sieci

Sterowniki i wykonawcy Spark mają tylko prywatne adresy IP, dlatego usługa Dataproc Serverless musi mieć włączony prywatny dostęp do Google w regionie, w którym uruchamiasz zadania Spark. Aby włączyć go w podsieci default , uruchom to polecenie.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Możesz sprawdzić, czy prywatny dostęp do Google jest włączony, za pomocą tego polecenia, które zwróci wartość True lub False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Utworzenie zasobnika na dane

Utwórz zasobnik na dane, w którym będą przechowywane zasoby utworzone w tym ćwiczeniu.

Wybierz nazwę zasobnika. Nazwy zasobników muszą być globalnie niepowtarzalne dla wszystkich użytkowników.

export BUCKET=<your-bucket-name>

Utwórz zasobnik w regionie, w którym chcesz uruchamiać zadania Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

W konsoli Cloud Storage możesz sprawdzić, czy zasobnik jest dostępny. Możesz też uruchomić polecenie gsutil ls , aby zobaczyć zasobnik.

Tworzenie stałego serwera historii

Interfejs Spark udostępnia bogaty zestaw narzędzi do debugowania i informacji o zadaniach Spark. Aby wyświetlić interfejs Spark dla ukończonych zadań Dataproc Serverless, musisz utworzyć klaster Dataproc z jednym węzłem, który będzie używany jako stały serwer historii.

Ustaw nazwę stałego serwera historii.

PHS_CLUSTER_NAME=my-phs

Uruchom to polecenie.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Interfejs Spark i stały serwer historii zostaną omówione bardziej szczegółowo w dalszej części tego ćwiczenia z programowania.

3. Uruchamianie zadań Serverless Spark za pomocą Dataproc Batches

W tym przykładzie będziesz pracować na danych z publicznego zbioru danych New York City (NYC) Citi Bike Trips. NYC Citi Bikes to płatny nowojorski system rowerów publicznych. Wykonasz proste przekształcenia i wyświetlisz 10 najpopularniejszych identyfikatorów stacji rowerów publicznych Citi Bike. W tym przykładzie używamy też narzędzia open source spark-bigquery-connector, aby bezproblemowo odczytywać i zapisywać dane między Spark i BigQuery.

Skopiuj to repozytorium GitHub i przejdź (cd) do katalogu zawierającego plik citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Prześlij zadanie do Serverless Spark za pomocą pakietu Cloud SDK, który jest domyślnie dostępny w Cloud Shell. Uruchom w powłoce to polecenie, które korzysta z pakietu Cloud SDK i z interfejsu Dataproc Batches API do przesyłania zadań Serverless Spark.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Omówienie:

  • gcloud dataproc batches submit odwołuje się do interfejsu Dataproc Batches API.
  • pyspark oznacza, że przesyłasz zadanie PySpark.
  • --batch to nazwa zadania. Jeśli nie podasz identyfikatora, użyty zostanie losowy identyfikator UUID.
  • --region=${REGION} to region geograficzny, w którym będzie przetwarzane zadanie.
  • --deps-bucket=${BUCKET} to miejsce, do którego przesyłany jest lokalny plik Pythona przed uruchomieniem w środowisku Serverless.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar zawiera plik JAR dla narzędzia spark-bigquery-connector w środowisku wykonawczym Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} to pełna i jednoznaczna nazwa stałego serwera historii. W tym miejscu są przechowywane dane zdarzeń Spark (osobne od danych wyjściowych konsoli), które można wyświetlać w interfejsie Spark.
  • Końcowy -- oznacza, że wszystko, co znajduje się za nim, będzie argumentami środowiska wykonawczego programu. W tym przypadku przesyłasz nazwę zasobnika, która jest wymagana przez zadanie.

Po przesłaniu wsadu pojawią się te dane wyjściowe.

Batch [citibike-job] submitted.

Po kilku minutach zobaczysz te dane wyjściowe wraz z metadanymi zadania.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

W następnej sekcji dowiesz się, jak znaleźć logi tego zadania.

Dodatkowe funkcje

Spark Serverless oferuje dodatkowe opcje uruchamiania zadań.

  • Możesz utworzyć niestandardowy obraz Dockera, na którym będzie uruchamiane zadanie. To świetny sposób na dodanie kolejnych zależności, takich jak biblioteki R i Pythona.
  • Aby uzyskać dostęp do metadanych Hive, możesz połączyć zadanie z instancją Dataproc Metastore.
  • W celu zapewnienia dodatkowej kontroli Dataproc Serverless obsługuje konfigurację niewielkiego zbioru właściwości Spark.

4. Wskaźniki i dostrzegalność w Dataproc

Konsola Dataproc Batches zawiera listę wszystkich zadań Dataproc Serverless. W konsoli zobaczysz identyfikator wsadu, lokalizację, stan, czas utworzenia, czas trwania i typ każdego zadania. Aby wyświetlić więcej informacji o zadaniu, kliknij jego identyfikator wsadu.

Na tej stronie znajdziesz takie informacje jak Monitorowanie , które pokazuje liczbę wykonawców wsadowych Spark używanych przez zadanie na przestrzeni czasu (wskazując, jak bardzo zostało ono automatycznie skalowane).

Na karcie Szczegóły znajdziesz więcej metadanych zadania, w tym argumenty i parametry przesłane wraz z zadaniem.

Z tej strony możesz też uzyskać dostęp do wszystkich logów. Podczas uruchamiania zadań Dataproc Serverless generowane są 3 różne zestawy logów:

  • na poziomie usługi,
  • dane wyjściowe konsoli,
  • logowanie zdarzeń Spark.

Na poziomie usługi znajdują się logi wygenerowane przez usługę Dataproc Serverless. Obejmują one m.in. żądania Dataproc Serverless dotyczące dodatkowych procesorów na potrzeby autoskalowania. Aby je wyświetlić, kliknij Wyświetl logi , co spowoduje otwarcie Cloud Logging.

Dane wyjściowe konsoli można wyświetlić w sekcji Wyniki. Są to dane wyjściowe wygenerowane przez zadanie, w tym metadane, które Spark drukuje na początku zadania, oraz instrukcje print włączone do zadania.

Logowanie zdarzeń Spark jest dostępne w interfejsie Spark. Ponieważ zadanie Spark zostało skonfigurowane ze stałym serwerem historii, możesz uzyskać dostęp do interfejsu Spark, klikając Wyświetl serwer historii usługi Spark, który zawiera informacje o uruchomionych wcześniej zadaniach Spark. Więcej informacji o interfejsie Spark znajdziesz w oficjalnej dokumentacji Spark.

5. Szablony Dataproc: BQ -> GCS

Szablony Dataproc to narzędzia open source, które pomagają jeszcze bardziej uprościć zadania związane z przetwarzaniem danych w chmurze. Służą one jako otoka dla Dataproc Serverless i zawierają szablony do wielu zadań importu i eksportu danych, w tym:

  • BigQuerytoGCS i GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC i JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS i GCStoMongo

Pełna lista jest dostępna w pliku README.

W tej sekcji użyjesz szablonów Dataproc do eksportowania danych z BigQuery do GCS.

Kopiowanie repozytorium

Skopiuj repozytorium i przejdź do folderu python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Konfigurowanie środowiska

Teraz ustawisz zmienne środowiskowe. Szablony Dataproc używają zmiennej środowiskowej GCP_PROJECT na potrzeby identyfikatora projektu, więc ustaw ją na GOOGLE_CLOUD_PROJECT..

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Twój region powinien być ustawiony w środowisku z wcześniejszego etapu. Jeśli nie, ustaw go tutaj.

export REGION=<region>

Szablony Dataproc przetwarzają zadania BigQuery za pomocą narzędzia spark-bigquery-connector i wymagają, aby identyfikator URI był zawarty w zmiennej środowiskowej JARS. Ustaw zmienną JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Ustawianie parametrów szablonu

Ustaw nazwę zasobnika tymczasowego, którego ma używać usługa.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Następnie ustawisz kilka zmiennych specyficznych dla zadania. W przypadku tabeli wejściowej ponownie odwołasz się do zbioru danych BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Możesz wybrać csv, parquet, avro lub json. W tym ćwiczeniu z programowania wybierz CSV. W następnej sekcji dowiesz się, jak używać szablonów Dataproc do konwertowania typów plików.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Ustaw tryb wyjściowy na overwrite. Możesz wybrać overwrite, append, ignore lub errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Ustaw lokalizację wyjściową GCS na ścieżkę w zasobniku.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Uruchamianie szablonu

Uruchom szablon BIGQUERYTOGCS , określając go poniżej i podając ustawione parametry wejściowe.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Dane wyjściowe będą dość obszerne, ale po około minucie zobaczysz te informacje.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Możesz sprawdzić, czy pliki zostały wygenerowane, uruchamiając to polecenie.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark domyślnie zapisuje dane w wielu plikach, w zależności od ich ilości. W tym przypadku zobaczysz około 30 wygenerowanych plików. Nazwy plików wyjściowych Spark mają format part- po którym następuje 5-cyfrowy numer (reprezentujący numer części) oraz ciąg znaków skrótu. Gdy danych jest dużo, Spark zazwyczaj zapisuje je w kilku plikach. Przykładowa nazwa pliku to part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Szablony Dataproc: CSV na Parquet

Teraz użyjesz szablonów Dataproc do konwertowania danych w GCS z jednego typu pliku na inny za pomocą szablonu GCSTOGCS. Ten szablon używa SparkSQL i umożliwia też przesłanie zapytania SparkSQL, które ma być przetworzone podczas przekształcenia na potrzeby dodatkowego przetwarzania.

Potwierdzanie zmiennych środowiskowych

Sprawdź, czy zmienne środowiskowe GCP_PROJECT, REGION i GCS_STAGING_BUCKET są ustawione w poprzedniej sekcji.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Ustawianie parametrów szablonu

Teraz ustawisz parametry konfiguracji dla szablonu GCStoGCS. Zacznij od lokalizacji plików wejściowych. Pamiętaj, że jest to katalog, a nie konkretny plik, ponieważ zostaną przetworzone wszystkie pliki w katalogu. Ustaw wartość BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Ustaw format pliku wejściowego.

GCS_TO_GCS_INPUT_FORMAT=csv

Ustaw żądany format wyjściowy. Możesz wybrać parquet, json, avro lub csv.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Ustaw tryb wyjściowy na overwrite. Możesz wybrać overwrite, append, ignore lub errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Ustaw lokalizację wyjściową.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Uruchamianie szablonu

Uruchom szablon GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Dane wyjściowe będą dość obszerne, ale po około minucie powinien pojawić się komunikat o powodzeniu podobny do tego poniżej.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Możesz sprawdzić, czy pliki zostały wygenerowane, uruchamiając to polecenie.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

W przypadku tego szablonu możesz też podać zapytania SparkSQL, przekazując do szablonu gcs.to.gcs.temp.view.name i gcs.to.gcs.sql.query, co umożliwi uruchomienie zapytania SparkSQL na danych przed zapisaniem ich w GCS.

7. Zwalnianie miejsca

Aby uniknąć niepotrzebnych opłat na koncie Google Cloud Platform po ukończeniu tego ćwiczenia z programowania:

  1. Usuń zasobnik Cloud Storage dla utworzonego środowiska.
gsutil rm -r gs://${BUCKET}
  1. Usuń klaster Dataproc używany na potrzeby stałego serwera historii.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Usuń zadania Dataproc Serverless. Otwórz konsolę Batches, kliknij pole obok każdego zadania, które chcesz usunąć, i kliknij USUŃ.

Jeśli projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz też opcjonalnie go usunąć:

  1. W konsoli Google Cloud Platform otwórz stronę Projekty.
  2. Z listy projektów wybierz projekt do usunięcia i kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.

8. Co dalej?

Poniższe materiały zawierają dodatkowe informacje o tym, jak korzystać z Serverless Spark: