Dataproc Sunucusuz

1. Genel Bakış - Google Dataproc

Dataproc; Apache Spark, Apache Flink, Presto ve diğer birçok açık kaynak aracı ve çerçeveyi çalıştırmak için tümüyle yönetilen ve yüksek düzeyde ölçeklenebilir bir hizmettir. Dataproc'u geniş ölçekte veri gölü modernizasyonu, ETL / ELT ve güvenli veri bilimi için kullanın. Dataproc ayrıca BigQuery, Cloud Storage, Vertex AI ve Dataplex gibi çeşitli Google Cloud hizmetleriyle tamamen entegredir.

Dataproc üç çeşidiyle sunulur:

  • Dataproc Sunucusuz, altyapı ve otomatik ölçeklendirme yapılandırmaya gerek kalmadan PySpark işlerini çalıştırmanıza olanak tanır. Dataproc Serverless, PySpark toplu iş yüklerini ve oturumları / not defterlerini destekler.
  • Google Compute Engine'de Dataproc, Flink ve Presto gibi açık kaynaklı araçlara ek olarak YARN tabanlı Spark iş yükleri için bir Hadoop YARN kümesini yönetmenize olanak tanır. Otomatik ölçeklendirme de dahil olmak üzere bulut tabanlı kümelerinizi istediğiniz kadar dikey veya yatay ölçeklendirme ile uyarlayabilirsiniz.
  • Google Kubernetes Engine'de Dataproc; Spark, PySpark, SparkR veya Spark SQL işlerini göndermek için GKE altyapınızdaki Dataproc sanal kümelerini yapılandırmanıza olanak tanır.

Bu codelab'de Dataproc Sunucusuz'u kullanmanın birkaç farklı yolunu öğreneceksiniz.

Apache Spark başlangıçta Hadoop kümeleri üzerinde çalışacak şekilde geliştirilmişti ve kaynak yöneticisi olarak YARN'ı kullanmıştı. Hadoop kümelerini yönetmek için belirli bir uzmanlık grubu gerekir ve kümeler üzerindeki birçok farklı topuzun doğru şekilde yapılandırıldığından emin olunur. Bu, Spark'ın kullanıcının ayarlamasını gerektirdiği ayrı bir düğme grubuna ek olarak yapılır. Bu durum, geliştiricilerin Spark kodu üzerinde çalışmak yerine altyapılarını yapılandırmak için daha fazla zaman harcadıkları birçok senaryoya yol açıyor.

Dataproc Sunucusuz, Hadoop kümelerini veya Spark'ı manuel olarak yapılandırma ihtiyacını ortadan kaldırır. Dataproc Serverless, Hadoop'ta çalışmaz ve otomatik ölçeklendirme dahil olmak üzere kaynak gereksinimlerini belirlemek için kendi Dinamik Kaynak Ayırma özelliğini kullanır. Spark özelliklerinin küçük bir kısmı Dataproc Serverless ile özelleştirilebilir olmaya devam eder, ancak çoğu durumda bunları değiştirmeniz gerekmez.

2. Kur

Bu codelab'de kullanılan ortamınızı ve kaynaklarınızı yapılandırarak başlayacaksınız.

Bir Google Cloud projesi oluşturun. Mevcut bir kartı kullanabilirsiniz.

Cloud Console araç çubuğunda Cloud Shell'i tıklayarak açın.

ba0bb17945a73543.png

Cloud Shell, bu codelab'de kullanabileceğiniz kullanıma hazır bir Shell ortamı sağlar.

68c4ebd2a8539764.png

Cloud Shell, projenizin adını varsayılan olarak ayarlar. echo $GOOGLE_CLOUD_PROJECT komutunu çalıştırarak kontrol edin. Çıkışta proje kimliğinizi görmüyorsanız kodu ayarlayın.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Kaynaklarınız için us-central1 veya europe-west2 gibi bir Compute Engine bölgesi ayarlayın.

export REGION=<your-region>

API'leri etkinleştir

Codelab, aşağıdaki API'leri kullanır:

  • BigQuery
  • Dataproc

Gerekli API'leri etkinleştirin. Bu işlem yaklaşık bir dakika sürer ve işlem tamamlandığında işlemin başarılı olduğunu belirten bir mesaj görüntülenir.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Ağ erişimini yapılandırma

Spark sürücülerinin ve yürütücülerinin yalnızca özel IP'leri olduğundan, Dataproc Serverless, Spark işlerinizi çalıştıracağınız bölgede Google Özel Erişim'in etkinleştirilmesini gerektirir. default alt ağında etkinleştirmek için aşağıdaki komutu çalıştırın.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Google Özel Erişim'in etkinleştirildiğini şu yolla doğrulayabilirsiniz: True veya False çıktısı.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Storage paketi oluşturma

Bu codelab'de oluşturulan öğeleri depolamak için kullanılacak bir depolama paketi oluşturun.

Paketiniz için bir ad seçin. Paket adları, tüm kullanıcılar genelinde genel olarak benzersiz olmalıdır.

export BUCKET=<your-bucket-name>

Paketi, Spark işlerinizi çalıştırmayı düşündüğünüz bölgede oluşturun.

gsutil mb -l ${REGION} gs://${BUCKET}

Paketinizin Cloud Storage konsolunda kullanılabilir olduğunu görebilirsiniz. Paketinizi görmek için gsutil ls komutunu da çalıştırabilirsiniz.

Kalıcı Geçmiş Sunucusu Oluşturma

Spark kullanıcı arayüzü, Spark işleriyle ilgili bilgiler ve çok sayıda hata ayıklama aracı sunar. Tamamlanan Dataproc Sunucusuz işlerinin Spark kullanıcı arayüzünü görüntülemek için kalıcı geçmiş sunucusu olarak kullanılacak tek bir düğümlü Dataproc kümesi oluşturmanız gerekir.

Kalıcı geçmiş sunucunuz için bir ad belirleyin.

PHS_CLUSTER_NAME=my-phs

Aşağıdaki komutu çalıştırın.

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

Spark kullanıcı arayüzü ve kalıcı geçmiş sunucusu, codelab'in ilerleyen bölümlerinde daha ayrıntılı olarak incelenecektir.

3. Dataproc Gruplarıyla Sunucusuz Spark işlerini çalıştırma

Bu örnekte, New York (NYC) Citi bisiklet Gezileri herkese açık veri kümesinden alınan bir veri kümesiyle çalışacaksınız. NYC Citi Buildings, New York'ta bulunan ücretli bir bisiklet paylaşım sistemidir. Birkaç basit dönüştürme yapacaksınız ve en popüler on Citi bisiklet istasyonu kimliğini yazdıracaksınız. Bu örnek, Spark ve BigQuery arasında sorunsuz bir şekilde veri okuyup yazmak için açık kaynak spark-bigquery-connector'ı da kullanmaktadır.

Aşağıdaki GitHub kod deposunu ve cd dosyasını, citibike.py dosyasını içeren dizine klonlayın.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

İşi, Cloud Shell'de varsayılan olarak bulunan Google Cloud SDK'yı kullanarak Serverless Spark'a gönderin. Sunucusuz Spark işlerini göndermek için Google Cloud SDK ve Dataproc Batches API'yi kullanan kabuğunuzda, aşağıdaki komutu çalıştırın.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Bunun dökümünü almak için:

  • gcloud dataproc batches submit, Dataproc Batches API'ye referans verir.
  • pyspark, bir PySpark işi gönderdiğinizi belirtir.
  • --batch işin adıdır. Sağlanmazsa rastgele oluşturulmuş bir UUID kullanılır.
  • --region=${REGION}, işin işleneceği coğrafi bölgedir.
  • --deps-bucket=${BUCKET}, yerel Python dosyanızın sunucusuz ortamda çalıştırılmadan önce yüklendiği yerdir.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar, Spark çalışma zamanı ortamındaki spark-bigquery-connector için jar dosyasını içerir.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}, kalıcı geçmiş sunucusunun tam adıdır. Spark etkinlik verileri (konsol çıkışından ayrı olarak) burada depolanır ve Spark kullanıcı arayüzünden görüntülenebilir.
  • İzleyen --, bunun dışındaki her şeyin program için çalışma zamanı bağımsız değişkenleri olacağını belirtir. Bu durumda, paketinizin adını işin gerektirdiği şekilde gönderirsiniz.

Paket gönderildiğinde aşağıdaki çıkışı görürsünüz.

Batch [citibike-job] submitted.

Birkaç dakika sonra, işin meta verileriyle birlikte aşağıdaki çıkışı görürsünüz.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

Sonraki bölümde bu işe ait günlükleri nasıl bulacağınızı öğreneceksiniz.

Ek özellikler

Spark Serverless ile işlerinizi çalıştırmak için ek seçeneklere sahipsiniz.

  • İşinizin çalışacağı özel bir Docker görüntüsü oluşturabilirsiniz. Bu, Python ve R kitaplıkları dahil olmak üzere ek bağımlılıklar eklemek için harika bir yöntemdir.
  • Hive meta verilerine erişmek için işinize bir Dataproc Metastore örneği bağlayabilirsiniz.
  • Dataproc Serverless, daha fazla kontrol için küçük bir Spark özellikleri grubunun yapılandırmasını destekler.

4. Dataproc Metrikleri ve Gözlemlenebilirlik

Dataproc Batches Console, tüm Dataproc Sunucusuz işlerinizi listeler. Konsolda her işin Toplu Kimliği, Konum, Durum, Oluşturulma zamanı, Geçen süre ve Tür bilgilerini görürsünüz. Hakkında daha fazla bilgi görüntülemek için işinizin Toplu Kimliği'ni tıklayın.

Bu sayfada, işinizin zaman içinde kaç Toplu Spark Yürütücüsü kullandığını (ne kadar otomatik ölçeklendirildiğini gösterir) gösteren İzleme gibi bilgileri görürsünüz.

Ayrıntılar sekmesinde, işle birlikte gönderilen bağımsız değişkenler ve parametreler de dahil olmak üzere işle ilgili daha fazla meta veri görürsünüz.

Ayrıca bu sayfadan tüm günlüklere erişebilirsiniz. Dataproc Sunucusuz işleri çalıştırıldığında üç farklı günlük grubu oluşturulur:

  • Hizmet düzeyi
  • Konsol çıkışı
  • Spark etkinlik günlük kaydı

Hizmet düzeyi, Dataproc Sunucusuz hizmetinin oluşturduğu günlükleri içerir. Buna, Dataproc Sunucusuz'un otomatik ölçeklendirme için ek CPU istemesi gibi özellikler dahildir. Bunları, Cloud Logging'i açan Günlükleri görüntüle'yi tıklayarak görüntüleyebilirsiniz.

Konsol çıkışı, Çıkış bölümünde görüntülenebilir. Bu, Spark'ın bir işe başlarken yazdırdığı meta veriler veya işe dahil edilen yazdırma ifadeleri de dahil olmak üzere, iş tarafından oluşturulan çıkıştır.

Spark etkinlik günlük kaydına Spark kullanıcı arayüzünden erişilebilir. Spark işiniz için kalıcı geçmiş sunucusu sağladığınızdan, daha önce çalıştırdığınız Spark işlerinizle ilgili bilgileri içeren View Spark History Server'ı (Spark Geçmiş Sunucusunu Görüntüle) tıklayarak Spark kullanıcı arayüzüne erişebilirsiniz. Resmi Spark belgelerinden, Spark kullanıcı arayüzü hakkında daha fazla bilgi edinebilirsiniz.

5. Dataproc Şablonları: BQ -> GCS

Dataproc Şablonları, Cloud içi veri işleme görevlerini daha da basitleştiren açık kaynaklı araçlardır. Bunlar, Dataproc Serverless için sarmalayıcı işlevi görür ve birçok veri içe ve dışa aktarma görevi için şablonlar içerir. Örneğin:

  • BigQuerytoGCS ve GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC ve JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS ve GCStoMongo

Tam liste için README'yi inceleyebilirsiniz.

Bu bölümde BigQuery'den GCS'ye veri aktarmak için Dataproc Şablonları'nı kullanacaksınız.

Depoyu klonlama

Depoyu klonlayıp python klasörüne değiştirin.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Ortamı yapılandırma

Şimdi ortam değişkenlerini ayarlayacaksınız. Dataproc Şablonları, proje kimliğiniz için GCP_PROJECT ortam değişkenini kullanır. Bu nedenle bu değişkeni GOOGLE_CLOUD_PROJECT. olarak ayarlayın.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Bölgeniz, ortamda öncekinden ayarlanmalıdır. Ayarlanmadıysa buradan ayarlayın.

export REGION=<region>

Dataproc Şablonları, BigQuery işlerini işlemek için spark-bigquery-conector'ı kullanır ve URI'nin JARS ortam değişkenine dahil edilmesini gerektirir. JARS değişkenini ayarlayın.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Şablon parametrelerini yapılandırma

Hizmetin kullanacağı hazırlık paketinin adını ayarlayın.

export GCS_STAGING_LOCATION=gs://${BUCKET}

Daha sonra işe özel bazı değişkenler ayarlayacaksınız. Giriş tablosu için yine BigQuery NYC Citibike veri kümesine referans vereceksiniz.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

csv, parquet, avro veya json arasından seçim yapabilirsiniz. Bu codelab için CSV'yi seçin. Bir sonraki bölümde, dosya türlerini dönüştürmek için Dataproc Şablonları'nı kullanma konusunu inceleyeceğiz.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Çıkış modunu overwrite olarak ayarlayın. overwrite, append, ignore veya errorifexists. arasından seçim yapabilirsiniz

BIGQUERY_GCS_OUTPUT_MODE=overwrite

GCS çıkış konumunu, paketinizdeki yol olarak ayarlayın.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Şablonu çalıştır

BIGQUERYTOGCS şablonunu aşağıda belirterek ve ayarladığınız giriş parametrelerini sağlayarak çalıştırın.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

Çıkış oldukça gürültülü olur, ancak bir dakika kadar sonra aşağıdakini görürsünüz.

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Dosyaların oluşturulduğunu aşağıdaki komutu çalıştırarak doğrulayabilirsiniz.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark varsayılan olarak veri miktarına bağlı olarak birden fazla dosyaya yazar. Bu durumda, oluşturulmuş yaklaşık 30 dosya görürsünüz. Spark çıkış dosyası adları, part- ve ardından beş basamaklı bir sayı (parça numarasını belirtir) ve bir karma dizesi ile biçimlendirilir. Spark, büyük miktarlarda veri için genellikle birkaç dosyaya yazar. part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv, dosya adına örnek olarak verilebilir.

6. Dataproc Şablonları: CSV'den paketlemeye

Artık GCSTOGCS kullanarak GCS'deki verileri bir dosya türünden diğerine dönüştürmek için Dataproc Şablonları'nı kullanacaksınız. Bu şablon, SparkSQL'i kullanır ve ek işleme için dönüştürme sırasında işlenecek bir SparkSQL sorgusu gönderme seçeneği sunar.

Ortam değişkenlerini onaylama

GCP_PROJECT, REGION ve GCS_STAGING_BUCKET öğelerinin önceki bölümden ayarlandığını onaylayın.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Şablon parametrelerini ayarla

GCStoGCS için yapılandırma parametrelerini ayarlarsınız. Giriş dosyalarının bulunduğu yerden başlayın. Dizindeki tüm dosyalar işleneceği için bunun bir dizin olduğunu, belirli bir dosya olmadığını unutmayın. Bunu BIGQUERY_GCS_OUTPUT_LOCATION olarak ayarla.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Giriş dosyasının biçimini ayarlayın.

GCS_TO_GCS_INPUT_FORMAT=csv

İstediğiniz çıkış biçimini ayarlayın. Parket, json, avro veya csv'yi seçebilirsiniz.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Çıkış modunu overwrite olarak ayarlayın. overwrite, append, ignore veya errorifexists. arasından seçim yapabilirsiniz

GCS_TO_GCS_OUTPUT_MODE=overwrite

Çıkış konumunu ayarlayın.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Şablonu çalıştır

GCStoGCS şablonunu çalıştırın.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

Çıkış oldukça gürültülü olur, ancak bir dakika kadar sonra aşağıdaki gibi bir başarı mesajı görürsünüz.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Dosyaların oluşturulduğunu aşağıdaki komutu çalıştırarak doğrulayabilirsiniz.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Bu şablon sayesinde, şablona gcs.to.gcs.temp.view.name ve gcs.to.gcs.sql.query ileterek SparkSQL sorguları sağlama seçeneğiniz de vardır. Böylece, GCS'ye yazmadan önce veriler üzerinde bir SparkSQL sorgusu çalıştırılabilir.

7. Kaynakları temizleyin

Bu codelab tamamlandıktan sonra GCP hesabınızdan gereksiz ücretler alınmasını önlemek için:

  1. Oluşturduğunuz ortam için Cloud Storage paketini silin.
gsutil rm -r gs://${BUCKET}
  1. Kalıcı geçmiş sunucunuz için kullanılan Dataproc kümesini silin.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Sunucusuz Dataproc işlerini silin. Toplu İşlemler Konsolu'na gidin, silmek istediğiniz her bir işin yanındaki kutuyu ve SİL'i tıklayın.

Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve projeyi silmek için Kapat'ı tıklayın.

8. Sırada ne var?

Aşağıdaki kaynaklar, sunucusuz Spark'tan yararlanabileceğiniz ek yöntemler sunar: