1. Genel Bakış - Google Dataproc
Dataproc; Apache Spark, Apache Flink, Presto ve diğer birçok açık kaynak aracı ve çerçeveyi çalıştırmak için tümüyle yönetilen ve yüksek düzeyde ölçeklenebilir bir hizmettir. Dataproc'u geniş ölçekte veri gölü modernizasyonu, ETL / ELT ve güvenli veri bilimi için kullanın. Dataproc ayrıca BigQuery, Cloud Storage, Vertex AI ve Dataplex gibi çeşitli Google Cloud hizmetleriyle tamamen entegredir.
Dataproc üç çeşidiyle sunulur:
- Dataproc Sunucusuz, altyapı ve otomatik ölçeklendirme yapılandırmaya gerek kalmadan PySpark işlerini çalıştırmanıza olanak tanır. Dataproc Serverless, PySpark toplu iş yüklerini ve oturumları / not defterlerini destekler.
- Google Compute Engine'de Dataproc, Flink ve Presto gibi açık kaynaklı araçlara ek olarak YARN tabanlı Spark iş yükleri için bir Hadoop YARN kümesini yönetmenize olanak tanır. Otomatik ölçeklendirme de dahil olmak üzere bulut tabanlı kümelerinizi istediğiniz kadar dikey veya yatay ölçeklendirme ile uyarlayabilirsiniz.
- Google Kubernetes Engine'de Dataproc; Spark, PySpark, SparkR veya Spark SQL işlerini göndermek için GKE altyapınızdaki Dataproc sanal kümelerini yapılandırmanıza olanak tanır.
Bu codelab'de Dataproc Sunucusuz'u kullanmanın birkaç farklı yolunu öğreneceksiniz.
Apache Spark başlangıçta Hadoop kümeleri üzerinde çalışacak şekilde geliştirilmişti ve kaynak yöneticisi olarak YARN'ı kullanmıştı. Hadoop kümelerini yönetmek için belirli bir uzmanlık grubu gerekir ve kümeler üzerindeki birçok farklı topuzun doğru şekilde yapılandırıldığından emin olunur. Bu, Spark'ın kullanıcının ayarlamasını gerektirdiği ayrı bir düğme grubuna ek olarak yapılır. Bu durum, geliştiricilerin Spark kodu üzerinde çalışmak yerine altyapılarını yapılandırmak için daha fazla zaman harcadıkları birçok senaryoya yol açıyor.
Dataproc Sunucusuz, Hadoop kümelerini veya Spark'ı manuel olarak yapılandırma ihtiyacını ortadan kaldırır. Dataproc Serverless, Hadoop'ta çalışmaz ve otomatik ölçeklendirme dahil olmak üzere kaynak gereksinimlerini belirlemek için kendi Dinamik Kaynak Ayırma özelliğini kullanır. Spark özelliklerinin küçük bir kısmı Dataproc Serverless ile özelleştirilebilir olmaya devam eder ancak çoğu durumda bunları değiştirmeniz gerekmez.
2. Kur
Bu codelab'de kullanılan ortamınızı ve kaynaklarınızı yapılandırarak başlayacaksınız.
Bir Google Cloud projesi oluşturun. Mevcut bir kartı kullanabilirsiniz.
Cloud Console araç çubuğunda Cloud Shell'i tıklayarak açın.
Cloud Shell, bu codelab'de kullanabileceğiniz kullanıma hazır bir Shell ortamı sağlar.
Cloud Shell, projenizin adını varsayılan olarak ayarlar. echo $GOOGLE_CLOUD_PROJECT
komutunu çalıştırarak kontrol edin. Çıkışta proje kimliğinizi görmüyorsanız kodu ayarlayın.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Kaynaklarınız için us-central1
veya europe-west2
gibi bir Compute Engine bölgesi ayarlayın.
export REGION=<your-region>
API'leri etkinleştir
Codelab, aşağıdaki API'leri kullanır:
- BigQuery
- Dataproc
Gerekli API'leri etkinleştirin. Bu işlem yaklaşık bir dakika sürer ve işlem tamamlandığında işlemin başarılı olduğunu belirten bir mesaj görüntülenir.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Ağ erişimini yapılandırma
Spark sürücülerinin ve yürütücülerinin yalnızca özel IP'leri olduğundan, Dataproc Serverless, Spark işlerinizi çalıştıracağınız bölgede Google Özel Erişim'in etkinleştirilmesini gerektirir. default
alt ağında etkinleştirmek için aşağıdaki komutu çalıştırın.
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
Google Özel Erişim'in etkinleştirildiğini şu yolla doğrulayabilirsiniz: True
veya False
çıktısı.
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
Storage paketi oluşturma
Bu codelab'de oluşturulan öğeleri depolamak için kullanılacak bir depolama paketi oluşturun.
Paketiniz için bir ad seçin. Paket adları, tüm kullanıcılar genelinde genel olarak benzersiz olmalıdır.
export BUCKET=<your-bucket-name>
Paketi, Spark işlerinizi çalıştırmayı düşündüğünüz bölgede oluşturun.
gsutil mb -l ${REGION} gs://${BUCKET}
Paketinizin Cloud Storage konsolunda kullanılabilir olduğunu görebilirsiniz. Paketinizi görmek için gsutil ls
komutunu da çalıştırabilirsiniz.
Kalıcı Geçmiş Sunucusu Oluşturma
Spark kullanıcı arayüzü, Spark işleriyle ilgili bilgiler ve çok sayıda hata ayıklama aracı sunar. Tamamlanan Dataproc Sunucusuz işlerinin Spark kullanıcı arayüzünü görüntülemek için kalıcı geçmiş sunucusu olarak kullanılacak tek bir düğümlü Dataproc kümesi oluşturmanız gerekir.
Kalıcı geçmiş sunucunuz için bir ad belirleyin.
PHS_CLUSTER_NAME=my-phs
Aşağıdaki komutu çalıştırın.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Spark kullanıcı arayüzü ve kalıcı geçmiş sunucusu, codelab'in ilerleyen bölümlerinde daha ayrıntılı olarak incelenecektir.
3. Dataproc Gruplarıyla Sunucusuz Spark işlerini çalıştırma
Bu örnekte, New York (NYC) Citi bisiklet Gezileri herkese açık veri kümesinden alınan bir veri kümesiyle çalışacaksınız. NYC Citi Buildings, New York'ta bulunan ücretli bir bisiklet paylaşım sistemidir. Birkaç basit dönüştürme yapacaksınız ve en popüler on Citi bisiklet istasyonu kimliğini yazdıracaksınız. Bu örnek, Spark ve BigQuery arasında sorunsuz bir şekilde veri okuyup yazmak için açık kaynak spark-bigquery-connector'ı da kullanmaktadır.
Aşağıdaki GitHub kod deposunu ve cd
dosyasını, citibike.py
dosyasını içeren dizine klonlayın.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
İşi, Cloud Shell'de varsayılan olarak bulunan Google Cloud SDK'yı kullanarak Serverless Spark'a gönderin. Sunucusuz Spark işlerini göndermek için Google Cloud SDK ve Dataproc Batches API'yi kullanan kabuğunuzda, aşağıdaki komutu çalıştırın.
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
Bunun dökümünü almak için:
gcloud dataproc batches submit
, Dataproc Batches API'ye referans verir.pyspark
, bir PySpark işi gönderdiğinizi belirtir.--batch
işin adıdır. Sağlanmazsa rastgele oluşturulmuş bir UUID kullanılır.--region=${REGION}
, işin işleneceği coğrafi bölgedir.--deps-bucket=${BUCKET}
, yerel Python dosyanızın sunucusuz ortamda çalıştırılmadan önce yüklendiği yerdir.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
, Spark çalışma zamanı ortamındaki spark-bigquery-connector için jar dosyasını içerir.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
, kalıcı geçmiş sunucusunun tam adıdır. Spark etkinlik verileri (konsol çıkışından ayrı olarak) burada depolanır ve Spark kullanıcı arayüzünden görüntülenebilir.- İzleyen
--
, bunun dışındaki her şeyin program için çalışma zamanı bağımsız değişkenleri olacağını belirtir. Bu durumda, paketinizin adını işin gerektirdiği şekilde gönderirsiniz.
Paket gönderildiğinde aşağıdaki çıkışı görürsünüz.
Batch [citibike-job] submitted.
Birkaç dakika sonra, işe ait meta verilerle birlikte aşağıdaki çıkışı görürsünüz.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Sonraki bölümde bu işe ait günlükleri nasıl bulacağınızı öğreneceksiniz.
Ek özellikler
Spark Serverless ile işlerinizi çalıştırmak için ek seçeneklere sahipsiniz.
- İşinizin çalışacağı özel bir Docker görüntüsü oluşturabilirsiniz. Bu, Python ve R kitaplıkları dahil olmak üzere ek bağımlılıklar eklemek için harika bir yöntemdir.
- Hive meta verilerine erişmek için işinize bir Dataproc Metastore örneği bağlayabilirsiniz.
- Dataproc Serverless, daha fazla kontrol için küçük bir Spark özellikleri grubunun yapılandırmasını destekler.
4. Dataproc Metrikleri ve Gözlemlenebilirlik
Dataproc Batches Console, tüm Dataproc Sunucusuz işlerinizi listeler. Konsolda her işin Toplu Kimliği, Konum, Durum, Oluşturulma zamanı, Geçen süre ve Tür bilgilerini görürsünüz. Hakkında daha fazla bilgi görüntülemek için işinizin Toplu Kimliği'ni tıklayın.
Bu sayfada, işinizin zaman içinde kaç Toplu Spark Yürütücüsü kullandığını (ne kadar otomatik ölçeklendirildiğini gösterir) gösteren İzleme gibi bilgileri görürsünüz.
Ayrıntılar sekmesinde, işle birlikte gönderilen bağımsız değişkenler ve parametreler de dahil olmak üzere işle ilgili daha fazla meta veri görürsünüz.
Ayrıca bu sayfadan tüm günlüklere erişebilirsiniz. Dataproc Sunucusuz işleri çalıştırıldığında üç farklı günlük grubu oluşturulur:
- Hizmet düzeyi
- Konsol çıkışı
- Spark etkinlik günlük kaydı
Hizmet düzeyi, Dataproc Sunucusuz hizmetinin oluşturduğu günlükleri içerir. Buna, Dataproc Sunucusuz'un otomatik ölçeklendirme için ek CPU istemesi gibi özellikler dahildir. Bunları, Cloud Logging'i açan Günlükleri görüntüle'yi tıklayarak görüntüleyebilirsiniz.
Konsol çıkışı, Çıkış bölümünde görüntülenebilir. Bu, Spark'ın bir işe başlarken yazdırdığı meta veriler veya işe dahil edilen yazdırma ifadeleri de dahil olmak üzere, iş tarafından oluşturulan çıkıştır.
Spark etkinlik günlük kaydına Spark kullanıcı arayüzünden erişilebilir. Spark işiniz için kalıcı geçmiş sunucusu sağladığınızdan, daha önce çalıştırdığınız Spark işlerinizle ilgili bilgileri içeren View Spark History Server'ı (Spark Geçmiş Sunucusunu Görüntüle) tıklayarak Spark kullanıcı arayüzüne erişebilirsiniz. Resmi Spark belgelerinden, Spark kullanıcı arayüzü hakkında daha fazla bilgi edinebilirsiniz.
5. Dataproc Şablonları: BQ -> GCS
Dataproc Şablonları, Cloud içi veri işleme görevlerini daha da basitleştiren açık kaynaklı araçlardır. Bunlar, Dataproc Serverless için sarmalayıcı işlevi görür ve birçok veri içe ve dışa aktarma görevi için şablonlar içerir. Örneğin:
BigQuerytoGCS
veGCStoBigQuery
GCStoBigTable
GCStoJDBC
veJDBCtoGCS
HivetoBigQuery
MongotoGCS
veGCStoMongo
Tam liste için README'yi inceleyebilirsiniz.
Bu bölümde BigQuery'den GCS'ye veri aktarmak için Dataproc Şablonları'nı kullanacaksınız.
Depoyu klonlama
Depoyu klonlayıp python
klasörüne değiştirin.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Ortamı yapılandırma
Şimdi ortam değişkenlerini ayarlayacaksınız. Dataproc Şablonları, proje kimliğiniz için GCP_PROJECT
ortam değişkenini kullanır. Bu nedenle bu değişkeni GOOGLE_CLOUD_PROJECT.
olarak ayarlayın.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Bölgeniz, ortamda öncekinden ayarlanmalıdır. Ayarlanmadıysa buradan ayarlayın.
export REGION=<region>
Dataproc Şablonları, BigQuery işlerini işlemek için spark-bigquery-conector'ı kullanır ve URI'nin JARS
ortam değişkenine dahil edilmesini gerektirir. JARS
değişkenini ayarlayın.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Şablon parametrelerini yapılandırma
Hizmetin kullanacağı hazırlık paketinin adını ayarlayın.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Daha sonra işe özel bazı değişkenler ayarlayacaksınız. Giriş tablosu için yine BigQuery NYC Citibike veri kümesine referans vereceksiniz.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
csv
, parquet
, avro
veya json
arasından seçim yapabilirsiniz. Bu codelab için CSV'yi seçin. Bir sonraki bölümde, dosya türlerini dönüştürmek için Dataproc Şablonları'nı kullanma konusunu inceleyeceğiz.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Çıkış modunu overwrite
olarak ayarlayın. overwrite
, append
, ignore
veya errorifexists.
arasından seçim yapabilirsiniz
BIGQUERY_GCS_OUTPUT_MODE=overwrite
GCS çıkış konumunu, paketinizdeki yol olarak ayarlayın.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Şablonu çalıştır
BIGQUERYTOGCS
şablonunu aşağıda belirterek ve ayarladığınız giriş parametrelerini sağlayarak çalıştırın.
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Çıkış oldukça gürültülü olur, ancak bir dakika kadar sonra aşağıdakini görürsünüz.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Dosyaların oluşturulduğunu aşağıdaki komutu çalıştırarak doğrulayabilirsiniz.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark varsayılan olarak veri miktarına bağlı olarak birden fazla dosyaya yazar. Bu durumda, oluşturulmuş yaklaşık 30 dosya görürsünüz. Spark çıkış dosyası adları, part
- ve ardından beş basamaklı bir sayı (parça numarasını belirtir) ve bir karma dizesi ile biçimlendirilir. Spark, büyük miktarlarda veri için genellikle birkaç dosyaya yazar. part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
, dosya adına örnek olarak verilebilir.
6. Dataproc Şablonları: CSV'den paketlemeye
Artık GCSTOGCS kullanarak GCS'deki verileri bir dosya türünden diğerine dönüştürmek için Dataproc Şablonları'nı kullanacaksınız. Bu şablon, SparkSQL'i kullanır ve ek işleme için dönüştürme sırasında işlenecek bir SparkSQL sorgusu gönderme seçeneği sunar.
Ortam değişkenlerini onaylama
GCP_PROJECT
, REGION
ve GCS_STAGING_BUCKET
öğelerinin önceki bölümden ayarlandığını onaylayın.
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
Şablon parametrelerini ayarla
GCStoGCS
için yapılandırma parametrelerini ayarlarsınız. Giriş dosyalarının bulunduğu yerden başlayın. Dizindeki tüm dosyalar işleneceği için bunun bir dizin olduğunu, belirli bir dosya olmadığını unutmayın. Bunu BIGQUERY_GCS_OUTPUT_LOCATION
olarak ayarla.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Giriş dosyasının biçimini ayarlayın.
GCS_TO_GCS_INPUT_FORMAT=csv
İstediğiniz çıkış biçimini ayarlayın. Parket, json, avro veya csv'yi seçebilirsiniz.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Çıkış modunu overwrite
olarak ayarlayın. overwrite
, append
, ignore
veya errorifexists.
arasından seçim yapabilirsiniz
GCS_TO_GCS_OUTPUT_MODE=overwrite
Çıkış konumunu ayarlayın.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Şablonu çalıştırma
GCStoGCS
şablonunu çalıştırın.
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Çıkış oldukça gürültülü olur, ancak yaklaşık bir dakika sonra aşağıdaki gibi bir başarı mesajı görürsünüz.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Dosyaların oluşturulduğunu aşağıdaki komutu çalıştırarak doğrulayabilirsiniz.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Bu şablon sayesinde, şablona gcs.to.gcs.temp.view.name
ve gcs.to.gcs.sql.query
ileterek SparkSQL sorguları sağlama seçeneğiniz de vardır. Böylece, GCS'ye yazmadan önce veriler üzerinde bir SparkSQL sorgusu çalıştırılabilir.
7. Kaynakları temizleyin
Bu codelab tamamlandıktan sonra GCP hesabınızdan gereksiz ücretler alınmasını önlemek için:
- Oluşturduğunuz ortam için Cloud Storage paketini silin.
gsutil rm -r gs://${BUCKET}
- Kalıcı geçmiş sunucunuz için kullanılan Dataproc kümesini silin.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- Sunucusuz Dataproc işlerini silin. Toplu İşlemler Konsolu'na gidin, silmek istediğiniz her bir işin yanındaki kutuyu ve SİL'i tıklayın.
Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:
- GCP Console'da Projeler sayfasına gidin.
- Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
- Kutuya proje kimliğini yazın ve projeyi silmek için Kapat'ı tıklayın.
8. Sırada ne var?
Aşağıdaki kaynaklar, sunucusuz Spark'tan yararlanabileceğiniz ek yöntemler sunar:
- Cloud Composer'ı kullanarak Dataproc Sunucusuz iş akışlarını nasıl düzenlemeyi öğrenin.
- Dataproc Serverless ile Kubeflow ardışık düzenlerini nasıl entegre edebileceğinizi öğrenin.