1. Wprowadzenie – Google Dataproc
Dataproc to w pełni zarządzana i wysoce skalowalna usługa do uruchamiania Apache Spark, Apache Flink, Presto oraz wielu innych narzędzi i platform open source. Używaj Dataproc do modernizacji jeziora danych, ETL / ELT i bezpiecznego badania danych w skali planety. Usługa Dataproc jest też w pełni zintegrowana z kilkoma usługami Google Cloud, takimi jak BigQuery, Cloud Storage, Vertex AI i Dataplex.
Usługa Dataproc jest dostępna w 3 wersjach:
- Dataproc Serverless umożliwia uruchamianie zadań PySpark bez konieczności konfigurowania infrastruktury i autoskalowania. Dataproc Serverless obsługuje zadania wsadowe oraz sesje i notatniki wsadowe PySpark.
- Dataproc w Google Compute Engine umożliwia zarządzanie klastrem Hadoop YARN dla zbiorów zadań Spark opartych na YARN, a także z narzędziami open source, takimi jak Flink i Presto. Możesz dostosować klastry w chmurze za pomocą dowolnej liczby skalowania w pionie lub poziomie, w tym także autoskalowania.
- Dataproc w Google Kubernetes Engine umożliwia skonfigurowanie wirtualnych klastrów Dataproc w infrastrukturze GKE pod kątem przesyłania zadań Spark, PySpark, SparkR lub Spark SQL.
2. Tworzenie klastra Dataproc w sieci VPC Google Cloud
W tym kroku utworzysz klaster Dataproc w Google Cloud za pomocą konsoli Google Cloud.
Zacznij od włączenia interfejsu Dataproc Service API w konsoli. Po włączeniu wyszukaj „Dataproc” na pasku wyszukiwania i kliknij Utwórz klaster.
Wybierz Klaster w Compute Engine, aby używać maszyn wirtualnych Google Compute Engine(GCE) jako bazowej infrastruktury do uruchamiania klastrów Dataproc.
Jesteś teraz na stronie tworzenia klastra.
Na tej stronie:
- Podaj unikalną nazwę klastra.
- Wybierz konkretny region. Możesz też wybrać strefę, jednak w Dataproc możesz wybrać ją automatycznie. W tym ćwiczeniu w Codelabs wybierz „us-central1” i „us-central1-c”.
- Wybierz opcję „Standardowa” typ klastra. Dzięki temu będziesz mieć 1 węzeł nadrzędny.
- Na karcie Skonfiguruj węzły upewnij się, że liczba utworzonych instancji roboczych wynosi 2.
- W sekcji Dostosuj klaster zaznacz pole Włącz bramę komponentów. Umożliwia to dostęp do interfejsów internetowych w klastrze, w tym interfejsu Spark, menedżera węzłów Yarn i notatników Jupyter.
- W sekcji Komponenty opcjonalne wybierz Notatnik Jupyter. Spowoduje to skonfigurowanie klastra z serwerem notatników Jupyter.
- Pozostaw wszystkie pozostałe ustawienia bez zmian i kliknij Utwórz klaster.
Spowoduje to uruchomienie klastra Dataproc.
3. Uruchom klaster i połącz z nim SSH
Gdy stan klastra zmieni się na Uruchomiono, kliknij nazwę klastra w konsoli Dataproc.
Kliknij kartę Instancja maszyny wirtualnej, aby wyświetlić węzeł główny i 2 węzły robocze klastra.
Kliknij SSH obok węzła nadrzędnego, aby się w nim zalogować.
Uruchom polecenia hdfs, aby zobaczyć strukturę katalogów.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Interfejsy internetowe i bramy komponentów
W konsoli klastra Dataproc kliknij nazwę klastra, a następnie wybierz kartę INTERFEJSY INTERNETOWE.
Pokazuje dostępne interfejsy internetowe, w tym Jupyter. Kliknij Jupyter, aby otworzyć notatnik Jupyter. Za pomocą tego kodu możesz tworzyć notatniki w PySpark przechowywane w GCS. aby zapisać notatnik w Google Cloud Storage i otworzyć notatnik PySpark do wykorzystania w tym ćwiczeniu z programowania.
5. Monitorowanie i obserwacja zadań Spark
Po uruchomieniu klastra Dataproc utwórz zadanie wsadowe PySpark i prześlij je do klastra Dataproc.
Utwórz zasobnik Google Cloud Storage (GCS) do przechowywania skryptu PySpark. Pamiętaj, aby utworzyć zasobnik w tym samym regionie co klaster Dataproc.
Po utworzeniu zasobnika GCS skopiuj do niego poniższy plik.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Ten skrypt tworzy przykładową bazę danych Spark DataFrame i zapisuje ją jako tabelę Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Prześlij ten skrypt jako zadanie wsadowe Spark w Dataproc. W menu nawigacyjnym po lewej stronie kliknij Zadania, a następnie Prześlij zadanie.
Podaj identyfikator zadania i region. Wybierz klaster i podaj lokalizację w GCS skopiowanego skryptu Spark. To zadanie będzie uruchamiane jako zadanie wsadowe Spark w Dataproc.
W sekcji Właściwości dodaj klucz spark.submit.deployMode
z wartością client
, aby mieć pewność, że sterownik działa w węźle głównym Dataproc, a nie w węzłach roboczych. Kliknij Prześlij, aby przesłać zadanie wsadowe do Dataproc.
Skrypt Spark utworzy strukturę DataFrame i zapisze dane w tabeli Hive test_table_1
.
Po uruchomieniu zadania instrukcje drukowania konsoli będą widoczne na karcie Monitorowanie.
Po utworzeniu tabeli Hive prześlij kolejne zadanie zapytania Hive, aby wybrać zawartość tabeli i wyświetlić ją w konsoli.
Utwórz kolejne zadanie z tymi właściwościami:
Zauważ, że Typ zadania jest ustawiony na Hive, a typ źródła zapytania to Tekst zapytania, co oznacza, że cała instrukcja HiveQL zostanie zapisana w polu tekstowym Tekst zapytania.
Prześlij zadanie, zachowując pozostałe parametry jako domyślne.
Zwróć uwagę, jak HiveQL wybiera wszystkie rekordy i ekrany w konsoli.
6. Autoskalowanie
Autoskalowanie to zadanie polegające na szacowaniu „prawej” liczba węzłów roboczych klastra dla zadania.
Interfejs Dataproc AutoscalingPolicies API zapewnia mechanizm automatyzacji zarządzania zasobami klastra i umożliwia autoskalowanie maszyn wirtualnych klastra. Zasada autoskalowania to konfiguracja wielokrotnego użytku, która opisuje, w jaki sposób powinny skalować instancje robocze klastra korzystające z zasady autoskalowania. Definiuje granice, częstotliwość i agresywność skalowania, co zapewnia szczegółową kontrolę nad zasobami klastra od początku jego istnienia.
Zasady autoskalowania Dataproc są zapisywane w plikach YAML, a pliki YAML są przekazywane w poleceniu interfejsu wiersza poleceń podczas tworzenia klastra lub wybierane z zasobnika GCS podczas tworzenia klastra w konsoli Cloud.
Oto przykład zasady autoskalowania Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Konfigurowanie opcjonalnych komponentów Dataproc
Spowoduje to uruchomienie klastra Dataproc.
Gdy tworzysz klaster Dataproc, standardowe komponenty ekosystemu Apache Hadoop są w nim automatycznie instalowane (zobacz listę wersji Dataproc). Podczas tworzenia klastra możesz zainstalować w nim komponenty dodatkowe o nazwie Opcjonalne komponenty.
Podczas tworzenia klastra Dataproc w konsoli włączyliśmy komponenty opcjonalne i wybraliśmy Notatnik Jupyter jako komponent opcjonalny.
8. Czyszczenie zasobów
Aby wyczyścić klaster, po wybraniu klastra w konsoli Dataproc kliknij Zatrzymaj. Po zatrzymaniu klastra kliknij Usuń, aby go usunąć.
Po usunięciu klastra Dataproc usuń zasobniki GCS, do których został skopiowany kod.
Aby wyczyścić zasoby i zatrzymać niechciane płatności, musisz najpierw zatrzymać klaster Dataproc, a następnie go usunąć.
Przed zatrzymaniem i usunięciem klastra upewnij się, że wszystkie dane zapisywane w pamięci HDFS zostały skopiowane do GCS, aby zapewnić trwałą pamięć masową.
Aby zatrzymać klaster, kliknij Zatrzymaj.
Po zatrzymaniu klastra kliknij Usuń, aby go usunąć.
W oknie potwierdzenia kliknij Usuń, aby usunąć klaster.