1. Omówienie Google Dataproc
Dataproc to w pełni zarządzana i wysoce skalowalna usługa do uruchamiania Apache Spark, Apache Flink, Presto oraz wielu innych narzędzi i platform open source. Używaj Dataproc do modernizacji jeziora danych, ETL / ELT i bezpiecznego badania danych w skali planety. Usługa Dataproc jest też w pełni zintegrowana z kilkoma usługami Google Cloud, takimi jak BigQuery, Cloud Storage, Vertex AI i Dataplex.
Usługa Dataproc jest dostępna w 3 wersjach:
- Dataproc Serverless umożliwia uruchamianie zadań PySpark bez konieczności konfigurowania infrastruktury i autoskalowania. Dataproc Serverless obsługuje zadania wsadowe oraz sesje i notatniki wsadowe PySpark.
- Dataproc w Google Compute Engine umożliwia zarządzanie klastrem Hadoop YARN dla zbiorów zadań Spark opartych na YARN, a także z narzędziami open source, takimi jak Flink i Presto. Możesz dostosować klastry w chmurze za pomocą dowolnej liczby skalowania w pionie lub poziomie, w tym także autoskalowania.
- Dataproc w Google Kubernetes Engine umożliwia skonfigurowanie wirtualnych klastrów Dataproc w infrastrukturze GKE pod kątem przesyłania zadań Spark, PySpark, SparkR lub Spark SQL.
Podczas tego ćwiczenia w programie poznasz kilka różnych sposobów korzystania z Dataproc Serverless.
Środowisko Apache Spark zostało pierwotnie stworzone do uruchamiania w klastrach Hadoop i jako menedżera zasobów używało YARN. Obsługa klastrów Hadoop wymaga od nich specjalistycznej wiedzy i dbania o prawidłową konfigurację wielu różnych gałek w klastrach. To dodatek do osobnego zestawu pokrętła, który Spark wymaga także od użytkownika. Prowadzi to do wielu scenariuszy, w których deweloperzy spędzają więcej czasu na konfigurowaniu infrastruktury, zamiast zajmować się samym kodem Spark.
Dzięki Dataproc Serverless nie musisz ręcznie konfigurować klastrów Hadoop lub Spark. Dataproc Serverless nie działa w Hadoop i używa własnego dynamicznego przydzielania zasobów do określania wymagań dotyczących zasobów, w tym autoskalowania. W Dataproc Serverless można nadal dostosowywać niewielki podzbiór właściwości Spark, ale w większości przypadków nie trzeba modyfikować tych właściwości.
2. Skonfiguruj
Zaczniesz od skonfigurowania środowiska i zasobów używanych w tym ćwiczeniu z programowania.
Utwórz projekt Google Cloud. Możesz użyć istniejącej.
Otwórz Cloud Shell, klikając ją na pasku narzędzi konsoli Cloud.
Cloud Shell udostępnia gotowe do użycia środowisko, którego możesz użyć do wykonania tego ćwiczenia z programowania.
Cloud Shell domyślnie ustawi nazwę projektu. Sprawdź je dokładnie, uruchamiając echo $GOOGLE_CLOUD_PROJECT
. Jeśli nie widzisz identyfikatora projektu w danych wyjściowych, ustaw go.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Ustaw region Compute Engine dla swoich zasobów, na przykład us-central1
lub europe-west2
.
export REGION=<your-region>
Włącz interfejsy API
Ćwiczenia z programowania korzystają z tych interfejsów API:
- BigQuery
- Dataproc
Włącz niezbędne interfejsy API. Zajmie to około minuty, a po zakończeniu pojawi się komunikat o powodzeniu.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Konfigurowanie dostępu do sieci
Dataproc Serverless wymaga włączenia prywatnego dostępu do Google w regionie, w którym będziesz uruchamiać zadania Spark, ponieważ sterowniki i wykonawcy Spark mają tylko prywatne adresy IP. Uruchom to polecenie, aby włączyć je w podsieci default
.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
Aby sprawdzić, czy dostęp prywatny do Google jest włączony, użyj tych poleceń, które zwróci wartość True
lub False
.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Utwórz zasobnik na dane
Utwórz zasobnik na dane, który będzie służyć do przechowywania zasobów utworzonych w ramach tego ćwiczenia z programowania.
Wybierz nazwę zasobnika. Nazwy zasobników muszą być niepowtarzalne globalnie wśród wszystkich użytkowników.
export BUCKET=<your-bucket-name>
Utwórz zasobnik w regionie, w którym chcesz uruchamiać zadania Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Gdy zobaczysz, że Twój zasobnik jest dostępny w konsoli Cloud Storage, Aby wyświetlić zasobnik, możesz też uruchomić gsutil ls
.
Tworzenie stałego serwera historii
Interfejs Spark udostępnia bogaty zestaw narzędzi do debugowania oraz statystyk dotyczących zadań Spark. Aby wyświetlić interfejs usługi Spark dla ukończonych zadań Dataproc Serverless, musisz utworzyć klaster Dataproc z jednym węzłem, który będzie używany jako serwer trwałej historii.
Ustaw nazwę stałego serwera historii.
PHS_CLUSTER_NAME=my-phs
Wykonaj to polecenie.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Interfejs usługi Spark i stały serwer historii zostaną szczegółowo omówione w dalszej części szkolenia w Codelabs.
3. Uruchamianie bezserwerowych zadań Spark przy użyciu zadań wsadowych Dataproc
W tym przykładzie opracujesz zbiór danych z publicznego zbioru danych Citi Tour Travel (w Nowym Jorku). NYC Citicycles to płatny system wypożyczania rowerów na terenie Nowego Jorku. Musisz wykonać kilka prostych transformacji i wydrukować 10 najpopularniejszych identyfikatorów stacji rowerów Citi. W tym przykładzie w dużym stopniu korzysta się też z oprogramowania typu open source spark-bigquery-connector do bezproblemowego odczytu i zapisu danych między usługami Spark i BigQuery.
Sklonuj poniższe repozytorium GitHub i plik cd
do katalogu z plikiem citibike.py
.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Prześlij zadanie do Serverless Spark za pomocą pakietu SDK Cloud, który jest domyślnie dostępny w Cloud Shell. Aby przesłać zadania Serverless Spark, uruchom w powłoce podane niżej polecenie, które korzysta z pakietu SDK Cloud i interfejsu Dataproc Batches API.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
Aby to zrobić:
- Funkcja
gcloud dataproc batches submit
odwołuje się do interfejsu Dataproc Batches API. pyspark
oznacza, że przesyłasz zadanie PySpark.--batch
to nazwa zadania. Jeśli identyfikator UUID nie zostanie podany, używany będzie losowo wygenerowany identyfikator UUID.--region=${REGION}
to region geograficzny, w którym zadanie zostanie przetworzone.--deps-bucket=${BUCKET}
to miejsce, do którego przesyłany jest lokalny plik Pythona, zanim zostanie on uruchomiony w środowisku bezserwerowym.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
zawiera plik jar dla spark-bigquery-connector w środowisku środowiska wykonawczego Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
to w pełni kwalifikowana nazwa stałego serwera historii. To w niej przechowywane są dane zdarzeń Spark (niezależnie od danych wyjściowych z konsoli) i są tam przechowywane i widoczne w interfejsie Spark.- Końcowa wartość
--
oznacza, że wszystko, co znajduje się dalej, będzie argumentami środowiska wykonawczego dla programu. W takim przypadku przesyłasz nazwę zasobnika zgodnie z wymaganiami zadania.
Po przesłaniu wsadu pojawią się następujące dane wyjściowe.
Batch [citibike-job] submitted.
Po kilku minutach zobaczysz podane niżej dane wyjściowe wraz z metadanymi zadania.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
W następnej sekcji dowiesz się, jak znaleźć logi tego zadania.
Dodatkowe funkcje
Spark Serverless to usługa zapewniająca dodatkowe opcje uruchamiania zadań.
- Możesz utworzyć niestandardowy obraz Dockera, na którym będzie działać Twoje zadanie. Jest to świetny sposób na uwzględnienie dodatkowych zależności, takich jak biblioteki Pythona i R.
- Aby uzyskać dostęp do metadanych Hive, możesz połączyć z zadaniem instancję Dataproc Metastore.
- Aby zapewnić dodatkową kontrolę, Dataproc Serverless obsługuje konfigurację niewielkiego zestawu właściwości Spark.
4. Wskaźniki i dostrzegalność Dataproc
Konsola Dataproc Serverless zawiera listę wszystkich zadań Dataproc Serverless. W konsoli zobaczysz wartości Identyfikator wsadowy, Lokalizacja, Stan, Czas utworzenia, Czas trwania i Typ każdego zadania. Kliknij Identyfikator zadania wsadowego, aby wyświetlić więcej informacji na jego temat.
Na tej stronie znajdziesz informacje takie jak Monitorowanie, które pokazują, ile wykonawców usługi Batch Spark były używane przez Twoje zadanie w danym okresie (co wskazuje, w jakim stopniu jest autoskalowane).
Na karcie Szczegóły zobaczysz więcej metadanych o zadaniu, w tym wszelkie argumenty i parametry, które zostały do niego przesłane.
Na tej stronie możesz też uzyskać dostęp do wszystkich dzienników. Gdy zadania Dataproc Serverless są uruchamiane, generowane są 3 różne zestawy logów:
- Poziom usług
- dane wyjściowe konsoli,
- Logowanie zdarzeń Spark
Poziom usługi – obejmuje logi wygenerowane przez usługę Dataproc Serverless. Dataproc Serverless to m.in. żądanie dodatkowych procesorów do autoskalowania. Możesz je wyświetlić, klikając Wyświetl logi – spowoduje to otwarcie Cloud Logging.
Dane wyjściowe konsoli możesz sprawdzić w sekcji Dane wyjściowe. Są to dane wyjściowe wygenerowane przez zadanie, w tym metadane, które Spark wydrukuje przy uruchamianiu zadania, lub wszelkie instrukcje drukowania włączone do zadania.
Rejestrowanie zdarzeń Spark jest dostępne w interfejsie usługi Spark. Ponieważ do Twojego zadania Spark został udostępniony trwały serwer historii, możesz uzyskać dostęp do interfejsu usługi Spark, klikając Wyświetl serwer historii usługi Spark, który zawiera informacje o wcześniej uruchomionych zadaniach Spark. Więcej informacji o interfejsie Spark znajdziesz w oficjalnej dokumentacji Spark.
5. Szablony Dataproc: BQ -> GCS
Szablony Dataproc to narzędzia typu open source, które pomagają jeszcze bardziej uprościć zadania przetwarzania danych w chmurze. Pełnią one funkcję otoki Dataproc Serverless i zawierają szablony przeznaczone do wielu zadań importu i eksportowania danych, w tym:
BigQuerytoGCS
iGCStoBigQuery
GCStoBigTable
GCStoJDBC
iJDBCtoGCS
HivetoBigQuery
MongotoGCS
iGCStoMongo
Pełna lista jest dostępna w formacie README.
W tej sekcji użyjesz szablonów Dataproc do eksportowania danych z BigQuery do GCS.
Kopiowanie repozytorium
Skopiuj repozytorium i przejdź do folderu python
.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Konfigurowanie środowiska
Teraz ustawisz zmienne środowiskowe. Szablony Dataproc używają zmiennej środowiskowej GCP_PROJECT
jako identyfikatora projektu, więc ustaw jej wartość równą GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Region powinien być ustawiony w środowisku z wcześniejszego okresu. Jeśli nie, ustaw ją tutaj.
export REGION=<region>
Szablony Dataproc używają spark-bigquery-conector do przetwarzania zadań BigQuery i wymagają umieszczenia identyfikatora URI w zmiennej środowiskowej JARS
. Ustaw zmienną JARS
.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Konfigurowanie parametrów szablonu
Ustaw nazwę zasobnika przejściowego, którego ma używać usługa.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Następnie ustaw zmienne związane z poszczególnymi zadaniami. W tabeli wejściowej ponownie odwołasz się do zbioru danych BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Masz do wyboru csv
, parquet
, avro
lub json
. W przypadku tego ćwiczenia w programie wybierz CSV – w następnej sekcji dowiesz się, jak konwertować typy plików przy użyciu szablonów Dataproc.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Ustaw tryb wyjścia na overwrite
. Masz do wyboru overwrite
, append
, ignore
lub errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Ustaw lokalizację wyjściową GCS jako ścieżkę w zasobniku.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Uruchamianie szablonu
Uruchom szablon BIGQUERYTOGCS
, określając go poniżej i podając ustawione parametry wejściowe.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Dane wyjściowe będą dość głośne, ale po około minucie zobaczysz następujący komunikat.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Aby sprawdzić, czy pliki zostały wygenerowane, uruchom to polecenie.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark domyślnie zapisuje dane w wielu plikach w zależności od ilości danych. W takim przypadku zobaczysz około 30 wygenerowanych plików. Nazwy plików wyjściowych Spark mają format part
-, po którym następuje 5-cyfrowy numer (oznaczający numer części) oraz ciąg znaków skrótu. W przypadku dużych ilości danych Spark zazwyczaj zapisuje informacje w kilku plikach. Przykładowa nazwa pliku to part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
.
6. Szablony Dataproc: plik CSV do parquet
Za pomocą szablonów Dataproc będziesz teraz konwertować dane w GCS z jednego typu na inny plik przy użyciu GCSTOGCS. Ten szablon używa SparkSQL i umożliwia przesłanie zapytania SparkSQL do przetworzenia podczas przekształcenia w celu dodatkowego przetwarzania.
Potwierdzanie zmiennych środowiskowych
Sprawdź, czy wartości GCP_PROJECT
, REGION
i GCS_STAGING_BUCKET
są ustawione z poprzedniej sekcji.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
Ustawianie parametrów szablonu
Teraz ustawisz parametry konfiguracji dla usługi GCStoGCS
. Zacznij od lokalizacji plików wejściowych. Pamiętaj, że jest to katalog, a nie konkretny plik, ponieważ wszystkie pliki w katalogu zostaną przetworzone. Ustaw na BIGQUERY_GCS_OUTPUT_LOCATION
.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Ustaw format pliku wejściowego.
GCS_TO_GCS_INPUT_FORMAT=csv
Ustaw odpowiedni format wyjściowy. Możesz wybrać parquet, json, avro lub csv.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Ustaw tryb wyjścia na overwrite
. Masz do wyboru overwrite
, append
, ignore
lub errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Ustaw lokalizację wyjściową.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Uruchamianie szablonu
Uruchom szablon GCStoGCS
.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Dane wyjściowe będą dość głośne, ale po około minucie powinien wyświetlić się komunikat o powodzeniu, podobny do tego poniżej.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Aby sprawdzić, czy pliki zostały wygenerowane, uruchom to polecenie.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Ten szablon pozwala też dostarczać zapytania SparkSQL, przekazując do szablonu gcs.to.gcs.temp.view.name
i gcs.to.gcs.sql.query
, co umożliwia uruchomienie zapytania SparkSQL na danych przed zapisem w GCS.
7. Czyszczenie zasobów
Aby po ukończeniu tego ćwiczenia z programowania uniknąć niepotrzebnych opłat na koncie GCP:
- Usuń zasobnik Cloud Storage utworzonego środowiska.
gsutil rm -r gs://${BUCKET}
- Usuń klaster Dataproc używany jako stały serwer historii.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Usuń zadania Dataproc Serverless. Otwórz konsolę usług zbiorczych, zaznacz pole obok każdego zadania, które chcesz usunąć, i kliknij USUŃ.
Jeśli Twój projekt został utworzony tylko na potrzeby tego ćwiczenia z programowania, możesz go też opcjonalnie usunąć:
- W konsoli GCP otwórz stronę Projekty.
- Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
- W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.
8. Co dalej?
Dodatkowe sposoby korzystania z usługi Serverless Spark znajdziesz w tych materiałach:
- Dowiedz się, jak orchestować przepływy pracy Dataproc Serverless za pomocą usługi Cloud Composer.
- Dowiedz się, jak zintegrować Dataproc Serverless z potokami Kubeflow.