1. Genel bakış - Google Dataproc
Dataproc; Apache Spark, Apache Flink, Presto ve diğer birçok açık kaynak aracı ile çerçevesini çalıştırmak için sunulan, tümüyle yönetilen ve son derece ölçeklenebilir bir hizmettir. Gezegen ölçeğinde veri gölü modernizasyonu, ETL / ELT ve güvenli veri bilimi için Dataproc'u kullanın. Dataproc, BigQuery, Cloud Storage, Vertex AI ve Dataplex dahil olmak üzere çeşitli Google Cloud hizmetleriyle de tam olarak entegre edilmiştir.
Dataproc üç farklı seçenek olarak mevcuttur:
- Dataproc Serverless, altyapı ve otomatik ölçeklendirme yapılandırmanıza gerek kalmadan PySpark işlerini çalıştırmanıza olanak tanır. Dataproc Serverless, PySpark toplu iş yüklerini ve oturumlarını / not defterlerini destekler.
- Google Compute Engine'de Dataproc, Flink ve Presto gibi açık kaynak araçların yanı sıra YARN tabanlı Spark iş yükleri için bir Hadoop YARN kümesini yönetmenize olanak tanır. Otomatik ölçeklendirme de dahil olmak üzere, bulut tabanlı kümelerinizi istediğiniz kadar dikey veya yatay ölçeklendirme ile özelleştirebilirsiniz.
- Google Kubernetes Engine'de Dataproc, Spark, PySpark, SparkR veya Spark SQL işleri göndermek için GKE altyapınızda Dataproc sanal kümelerini yapılandırmanıza olanak tanır.
Bu codelab'de, Dataproc Sunucusuz'u kullanabileceğiniz çeşitli yöntemleri öğreneceksiniz.
Apache Spark başlangıçta Hadoop kümelerinde çalışacak şekilde oluşturulmuştu ve kaynak yöneticisi olarak YARN'ı kullanıyordu. Hadoop kümelerinin bakımı için belirli bir uzmanlık gerekir ve kümelerdeki birçok farklı ayarın düzgün şekilde yapılandırılması sağlanmalıdır. Bu, Spark'ın kullanıcının ayarlamasını gerektirdiği ayrı bir düğme grubuna ek olarak sunulur. Bu durum, geliştiricilerin Spark kodu üzerinde çalışmak yerine altyapılarını yapılandırmak için daha fazla zaman harcadığı birçok senaryoya yol açar.
Dataproc Serverless, Hadoop kümelerini veya Spark'ı manuel olarak yapılandırma ihtiyacını ortadan kaldırır. Dataproc Serverless, Hadoop üzerinde çalışmaz ve otomatik ölçeklendirme dahil olmak üzere kaynak gereksinimlerini belirlemek için kendi Dinamik Kaynak Ayırma özelliğini kullanır. Spark özelliklerinin küçük bir alt kümesi Dataproc Serverless ile özelleştirilebilir ancak çoğu durumda bunları ayarlamanız gerekmez.
2. Kur
Öncelikle ortamınızı ve bu codelab'de kullanılan kaynakları yapılandıracaksınız.
Google Cloud projesi oluşturun. Mevcut bir hesabı kullanabilirsiniz.
Cloud Console araç çubuğunda Cloud Shell'i tıklayarak açın.

Cloud Shell, bu codelab için kullanabileceğiniz, kullanıma hazır bir kabuk ortamı sağlar.

Cloud Shell, proje adınızı varsayılan olarak ayarlar. echo $GOOGLE_CLOUD_PROJECT komutunu çalıştırarak tekrar kontrol edin. Çıktıda proje kimliğinizi görmüyorsanız ayarlayın.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Kaynaklarınız için us-central1 veya europe-west2 gibi bir Compute Engine bölgesi ayarlayın.
export REGION=<your-region>
API'leri etkinleştir
Bu codelab'de aşağıdaki API'ler kullanılır:
- BigQuery
- Dataproc
Gerekli API'leri etkinleştirin. Bu işlem yaklaşık bir dakika sürer ve tamamlandığında işlemin başarılı olduğunu belirten bir mesaj görüntülenir.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Ağ erişimini yapılandırma
Dataproc Serverless, Spark sürücüleri ve yürütücüleri yalnızca özel IP'lere sahip olduğundan Spark işlerinizi çalıştıracağınız bölgede Google Özel Erişim'in etkinleştirilmesini gerektirir. default alt ağında etkinleştirmek için aşağıdaki komutu çalıştırın.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Aşağıdaki komutları kullanarak Google Özel Erişim'in etkinleştirildiğini doğrulayabilirsiniz. Bu komutlar True veya False değerini verir.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Storage paketi oluşturma
Bu codelab'de oluşturulan öğeleri depolamak için kullanılacak bir depolama paketi oluşturun.
Paketiniz için bir ad seçin. Paket adları tüm kullanıcılar arasında genel olarak benzersiz olmalıdır.
export BUCKET=<your-bucket-name>
Paketi, Spark işlerinizi çalıştırmayı planladığınız bölgede oluşturun.
gsutil mb -l ${REGION} gs://${BUCKET}
Paketinizin Cloud Storage konsolunda kullanılabildiğini görebilirsiniz. Ayrıca, paketlerinizi görmek için gsutil ls komutunu da çalıştırabilirsiniz.
Kalıcı İş Geçmişi Sunucusu oluşturma
Spark kullanıcı arayüzü, zengin bir hata ayıklama araçları seti ve Spark işleriyle ilgili analizler sunar. Tamamlanan Dataproc Serverless işleri için Spark kullanıcı arayüzünü görüntülemek üzere kalıcı geçmiş sunucusu olarak kullanılacak tek düğümlü bir Dataproc kümesi oluşturmanız gerekir.
Kalıcı geçmiş sunucunuz için bir ad belirleyin.
PHS_CLUSTER_NAME=my-phs
Aşağıdaki komutu çalıştırın.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
Spark kullanıcı arayüzü ve kalıcı geçmiş sunucusu, bu codelab'in ilerleyen bölümlerinde daha ayrıntılı olarak ele alınacaktır.
3. Dataproc Batches ile sunucusuz Spark işleri çalıştırma
Bu örnekte, New York City (NYC) Citi Bike Trips herkese açık veri kümesindeki bir veri grubuyla çalışacaksınız. NYC Citi Bikes, NYC'de ücretli bir bisiklet paylaşım sistemidir. Bazı basit dönüşümler gerçekleştirecek ve en popüler on Citi Bike istasyon kimliğini yazdıracaksınız. Bu örnekte, Spark ile BigQuery arasında verileri sorunsuz bir şekilde okuyup yazmak için açık kaynaklı spark-bigquery-connector da kullanılıyor.
Aşağıdaki GitHub deposunu klonlayın ve cd dosyasını citibike.py dosyasını içeren dizine kopyalayın.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Varsayılan olarak Cloud Shell'de bulunan Cloud SDK'yı kullanarak işi Serverless Spark'a gönderin. Sunucusuz Spark işlerini göndermek için Cloud SDK'yı ve Dataproc Batches API'yi kullanan kabuğunuzda aşağıdaki komutu çalıştırın.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Bunu ayrıntılı şekilde inceleyelim:
gcloud dataproc batches submit, Dataproc Batches API'ye referans verir.pyspark, PySpark işi gönderdiğinizi gösterir.--batch, işin adıdır. Sağlanmazsa rastgele oluşturulan bir UUID kullanılır.--region=${REGION}, işin işleneceği coğrafi bölgedir.--deps-bucket=${BUCKET}, yerel Python dosyanızın sunucusuz ortamda çalıştırılmadan önce yüklendiği yerdir.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar, Spark çalışma zamanı ortamında spark-bigquery-connector için JAR dosyasını içerir.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}, kalıcı geçmiş sunucusunun tam nitelikli adıdır. Spark etkinlik verileri (konsol çıkışından ayrı) burada depolanır ve Spark kullanıcı arayüzünden görüntülenebilir.- Sondaki
--, bundan sonraki her şeyin program için çalışma zamanı bağımsız değişkenleri olacağını gösterir. Bu durumda, görevin gerektirdiği şekilde paketinizin adını gönderiyorsunuz.
Toplu işlem gönderildiğinde aşağıdaki çıkışı görürsünüz.
Batch [citibike-job] submitted.
Birkaç dakika sonra, işin meta verileriyle birlikte aşağıdaki çıkışı görürsünüz.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
Sonraki bölümde, bu işin günlüklerini nasıl bulacağınızı öğreneceksiniz.
Ek özellikler
Spark Serverless ile işlerinizi çalıştırmak için ek seçenekleriniz olur.
- İşinizin üzerinde çalışacağı bir özel Docker görüntüsü oluşturabilirsiniz. Bu, Python ve R kitaplıkları da dahil olmak üzere ek bağımlılıkları dahil etmenin harika bir yoludur.
- Hive meta verilerine erişmek için işinize bir Dataproc Metastore örneği bağlayabilirsiniz.
- Dataproc Serverless, daha fazla kontrol için küçük bir Spark özelliği grubunun yapılandırılmasını destekler.
4. Dataproc Metrikleri ve Gözlemlenebilirlik
Dataproc Batches Console, tüm Dataproc Serverless işlerinizi listeler. Konsolda her işin Toplu İşlem Kimliği, Konum, Durum, Oluşturma Zamanı, Geçen Süre ve Tür bilgilerini görürsünüz. İşinizle ilgili daha fazla bilgi görüntülemek için Toplu İşlem Kimliği'ni tıklayın.
Bu sayfada, işinizin zaman içinde kaç Toplu Spark Yürütücüsü kullandığını (ne kadar otomatik ölçeklendiğini gösterir) gösteren İzleme gibi bilgiler yer alır.
Ayrıntılar sekmesinde, işin gönderilmesiyle birlikte gönderilen tüm bağımsız değişkenler ve parametreler de dahil olmak üzere işle ilgili daha fazla meta veri görürsünüz.
Tüm günlüklere bu sayfadan da erişebilirsiniz. Dataproc Serverless işleri çalıştırıldığında üç farklı günlük grubu oluşturulur:
- Hizmet düzeyi
- Konsol çıktısı
- Spark etkinlik günlüğü
Hizmet düzeyinde: Dataproc sunucusuz hizmetinin oluşturduğu günlükleri içerir. Örneğin, Dataproc Serverless'ın otomatik ölçeklendirme için ek CPU istemesi bu durumlardan biridir. Cloud Logging'i açan Günlükleri görüntüle'yi tıklayarak bunları görüntüleyebilirsiniz.
Konsol çıkışı, Çıkış bölümünde görüntülenebilir. Bu, Spark'ın bir işe başlarken yazdırdığı meta veriler veya işe dahil edilen yazdırma ifadeleri de dahil olmak üzere iş tarafından oluşturulan çıktıdır.
Spark etkinlik günlüğü, Spark kullanıcı arayüzünden erişilebilir. Spark işinize kalıcı bir geçmiş sunucusu sağladığınız için, daha önce çalıştırdığınız Spark işleriyle ilgili bilgileri içeren View Spark History Server'ı (Spark Geçmiş Sunucusunu Görüntüle) tıklayarak Spark kullanıcı arayüzüne erişebilirsiniz. Spark kullanıcı arayüzü hakkında daha fazla bilgiyi resmi Spark belgelerinden edinebilirsiniz.
5. Dataproc Şablonları: BQ -> GCS
Dataproc Şablonları, buluttaki veri işleme görevlerini daha da basitleştirmeye yardımcı olan açık kaynak araçlardır. Bunlar, Dataproc sunucusuz için sarmalayıcı görevi görür ve aşağıdakiler de dahil olmak üzere birçok veri taşıma aracı ve dışa aktarma görevi için şablonlar içerir:
BigQuerytoGCSveGCStoBigQueryGCStoBigTableGCStoJDBCveJDBCtoGCSHivetoBigQueryMongotoGCSveGCStoMongo
Tam listeyi README dosyasında bulabilirsiniz.
Bu bölümde, BigQuery'den GCS'ye veri aktarmak için Dataproc şablonlarını kullanacaksınız.
Depoyu klonlama
Depoyu klonlayın ve python klasörüne geçin.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Ortamı yapılandırma
Şimdi ortam değişkenlerini ayarlayacaksınız. Dataproc şablonları, proje kimliğiniz için GCP_PROJECT ortam değişkenini kullanır. Bu nedenle, bu değişkeni GOOGLE_CLOUD_PROJECT. değerine ayarlayın.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Bölgeniz, önceki ortamda ayarlanmış olmalıdır. Değilse buradan ayarlayın.
export REGION=<region>
Dataproc şablonları, BigQuery işlerini işlemek için spark-bigquery-conector'ı kullanır ve URI'nin JARS ortam değişkenine dahil edilmesi gerekir. JARS değişkenini ayarlayın.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Şablon parametrelerini yapılandırma
Hizmetin kullanacağı bir hazırlama paketi adı ayarlayın.
export GCS_STAGING_LOCATION=gs://${BUCKET}
Ardından, işe özel bazı değişkenleri ayarlayacaksınız. Giriş tablosu için yine BigQuery NYC Citibike veri kümesine başvuracaksınız.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
csv, parquet, avro veya json arasından seçim yapabilirsiniz. Bu codelab için CSV'yi seçin. Bir sonraki bölümde, dosya türlerini dönüştürmek için Dataproc şablonlarının nasıl kullanılacağı açıklanmaktadır.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Çıkış modunu overwrite olarak ayarlayın. overwrite, append, ignore veya errorifexists. arasından seçim yapabilirsiniz.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
GCS çıkış konumunu paketinize ait bir yol olarak ayarlayın.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Şablonu çalıştırma
Aşağıda belirterek ve ayarladığınız giriş parametrelerini sağlayarak BIGQUERYTOGCS şablonunu çalıştırın.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
Çıkış oldukça gürültülü olur ancak yaklaşık bir dakika sonra aşağıdakileri görürsünüz.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Aşağıdaki komutu çalıştırarak dosyaların oluşturulduğunu doğrulayabilirsiniz.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
Spark, veri miktarına bağlı olarak varsayılan olarak birden fazla dosyaya yazar. Bu durumda yaklaşık 30 oluşturulmuş dosya görürsünüz. Spark çıkış dosyası adları, part ile biçimlendirilir. Ardından beş basamaklı bir sayı (parça numarasını gösterir) ve bir karma dizesi gelir. Spark, büyük miktarlarda veri için genellikle birkaç dosyaya yazar. Örnek dosya adı: part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Dataproc Şablonları: CSV'den Parquet'ye
Şimdi, GCSTOGCS'yi kullanarak GCS'deki verileri bir dosya türünden diğerine dönüştürmek için Dataproc şablonlarını kullanacaksınız. Bu şablon SparkSQL kullanır ve ek işleme için dönüşüm sırasında işlenecek bir SparkSQL sorgusu gönderme seçeneği de sunar.
Ortam değişkenlerini onaylama
GCP_PROJECT, REGION ve GCS_STAGING_BUCKET değerlerinin önceki bölümde ayarlandığını doğrulayın.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Şablon parametrelerini ayarlama
Şimdi GCStoGCS için yapılandırma parametrelerini ayarlayacaksınız. Giriş dosyalarının konumuyla başlayın. Bu işlemin, dizindeki tüm dosyalar işleneceğinden belirli bir dosya değil, dizin için geçerli olduğunu unutmayın. Bunu BIGQUERY_GCS_OUTPUT_LOCATION olarak ayarlayın.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Giriş dosyasının biçimini ayarlayın.
GCS_TO_GCS_INPUT_FORMAT=csv
İstediğiniz çıkış biçimini ayarlayın. Parquet, JSON, Avro veya CSV'yi seçebilirsiniz.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Çıkış modunu overwrite olarak ayarlayın. overwrite, append, ignore veya errorifexists. arasından seçim yapabilirsiniz.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Çıkış konumunu ayarlayın.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Şablonu çalıştırma
GCStoGCS şablonunu çalıştırın.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
Çıkış oldukça gürültülü olur ancak yaklaşık bir dakika sonra aşağıdakine benzer bir başarı mesajı görmeniz gerekir.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Aşağıdaki komutu çalıştırarak dosyaların oluşturulduğunu doğrulayabilirsiniz.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Bu şablonla, gcs.to.gcs.temp.view.name ve gcs.to.gcs.sql.query değerlerini şablona ileterek SparkSQL sorguları da sağlayabilirsiniz. Bu sayede, GCS'ye yazmadan önce veriler üzerinde SparkSQL sorgusu çalıştırılabilir.
7. Kaynakları temizleme
Bu codelab tamamlandıktan sonra GCP hesabınızın gereksiz yere ücretlendirilmesini önlemek için:
- Oluşturduğunuz ortam için Cloud Storage paketini silin.
gsutil rm -r gs://${BUCKET}
- Kalıcı geçmiş sunucunuz için kullanılan Dataproc kümesini silin.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Dataproc sunucusuz işlerini silin. Toplu İşler Konsolu'na gidin, silmek istediğiniz her işin yanındaki kutuyu işaretleyin ve SİL'i tıklayın.
Bu codelab için özel olarak bir proje oluşturduysanız isteğe bağlı olarak projeyi de silebilirsiniz:
- GCP Console'da Projeler sayfasına gidin.
- Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
- Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.
8. Sırada ne var?
Aşağıdaki kaynaklar, Serverless Spark'tan yararlanabileceğiniz ek yöntemler sunar:
- Cloud Composer'ı kullanarak Dataproc Serverless iş akışlarını nasıl düzenleyeceğinizi öğrenin.
- Dataproc Sunucusuz'u Kubeflow ardışık düzenleriyle nasıl entegre edeceğinizi öğrenin.