1. Genel Bakış
Bu laboratuvarda, Cloud Dataproc'ta Apache Spark ve Jupyter not defterlerinin nasıl ayarlanacağı ve kullanılacağı açıklanmaktadır.
Jupyter not defterleri, kodunuzu etkileşimli olarak çalıştırmanıza ve sonuçlarınızı anında görmenize olanak tanıdığı için keşif amaçlı veri analizi ve makine öğrenimi modelleri oluşturmak için yaygın olarak kullanılır.
Ancak Apache Spark ve Jupyter Notebook'ları kurmak ve kullanmak karmaşık olabilir.

Cloud Dataproc, Apache Spark, Jupyter bileşeni ve Bileşen Ağ Geçidi ile yaklaşık 90 saniyede Dataproc kümesi oluşturmanıza olanak tanıyarak bu işlemi hızlı ve kolay hale getirir.
Neler öğreneceksiniz?
Bu codelab'de şunları öğreneceksiniz:
- Kümeniz için Google Cloud Storage paketi oluşturma
- Jupyter ve Bileşen Ağ Geçidi ile Dataproc kümesi oluşturma,
- Dataproc'ta JupyterLab web kullanıcı arayüzüne erişme
- Spark BigQuery Storage bağlayıcısını kullanan bir not defteri oluşturun.
- Spark işi çalıştırma ve sonuçları grafiğe dönüştürme.
Bu laboratuvarı Google Cloud'da çalıştırmanın toplam maliyeti yaklaşık 1 ABD dolarıdır. Cloud Dataproc fiyatlandırmasıyla ilgili tüm ayrıntıları burada bulabilirsiniz.
2. Proje oluşturma
console.cloud.google.com adresinden Google Cloud Platform Console'da oturum açın ve yeni bir proje oluşturun:



Ardından, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.
Bu codelab'i tamamlamak birkaç dolardan fazla tutmaz. Ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet artabilir. Bu codelab'in son bölümünde projenizi temizleme adımları açıklanmaktadır.
Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerinde ücretsiz deneme sürümünden yararlanabilir.
3. Ortamınızı kurma
Öncelikle, bulut konsolunun sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

Cloud Shell yüklendikten sonra, önceki adımda proje kimliğini ayarlamak için aşağıdaki komutu çalıştırın**:**
gcloud config set project <project_id>
Proje kimliğini, bulut konsolunun sol üst kısmında projenizi tıklayarak da bulabilirsiniz:


Ardından Dataproc, Compute Engine ve BigQuery Storage API'lerini etkinleştirin.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Alternatif olarak bu işlem Cloud Console'da da yapılabilir. Ekranın sol üst kısmındaki menü simgesini tıklayın.

Açılır menüden API Yöneticisi'ni seçin.

API'leri ve hizmetleri etkinleştir'i tıklayın.

Aşağıdaki API'leri arayın ve etkinleştirin:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS paketi oluşturma
Verilerinize en yakın bölgede bir Google Cloud Storage paketi oluşturun ve bu pakete benzersiz bir ad verin.
Bu, Dataproc kümesi için kullanılır.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Aşağıdaki çıkışı görmeniz gerekir.
Creating gs://<your-bucket-name>/...
5. Jupyter ve Component Gateway ile Dataproc kümenizi oluşturma
Kümenizi oluşturma
Kümeniz için ortam değişkenlerini ayarlama
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Ardından, kümenizde Jupyter ile çalışmak için gereken tüm bileşenleri içeren kümenizi oluşturmak üzere bu gcloud komutunu çalıştırın.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Kümeniz oluşturulurken aşağıdaki çıkışı görürsünüz.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Kümenizin oluşturulması yaklaşık 90 saniye sürer. Kümeniz hazır olduğunda Dataproc Cloud Console kullanıcı arayüzünden erişebilirsiniz.
Beklerken gcloud komutunda kullanılan işaretler hakkında daha fazla bilgi edinmek için okumaya devam edebilirsiniz.
Küme oluşturulduktan sonra aşağıdaki çıkışı alırsınız:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create komutunda kullanılan işaretler
gcloud dataproc create komutunda kullanılan işaretlerin dökümü aşağıda verilmiştir.
--region=${REGION}
Kümenin oluşturulacağı bölgeyi ve alt bölgeyi belirtir. Kullanılabilir bölgelerin listesini burada bulabilirsiniz.
--image-version=1.4
Kümenizde kullanılacak görüntü sürümü. Kullanılabilir sürümlerin listesini burada görebilirsiniz.
--bucket=${BUCKET_NAME}
Kümede kullanılmak üzere daha önce oluşturduğunuz Google Cloud Storage paketini belirtin. GCS grubu sağlamazsanız sizin için oluşturulur.
GCS paketi silinmediği için kümenizi silseniz bile not defterleriniz buraya kaydedilir.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc kümeniz için kullanılacak makine türleri. Kullanılabilir makine türlerinin listesini burada görebilirsiniz.
-num-workers işaretini ayarlamazsanız varsayılan olarak 1 ana düğüm ve 2 çalışan düğümü oluşturulur.
--optional-components=ANACONDA,JUPYTER
İsteğe bağlı bileşenler için bu değerleri ayarladığınızda, kümenize Jupyter ve Anaconda (Jupyter not defterleri için gereklidir) için gerekli tüm kitaplıklar yüklenir.
--enable-component-gateway
Component Gateway'i etkinleştirdiğinizde Apache Knox ve Inverting Proxy kullanılarak bir App Engine bağlantısı oluşturulur. Bu bağlantı, Jupyter ve JupyterLab web arayüzlerine kolay, güvenli ve kimliği doğrulanmış erişim sağlar. Böylece artık SSH tünelleri oluşturmanız gerekmez.
Ayrıca, işlerinizin performansını ve küme kullanım kalıplarını görmenize yardımcı olan Yarn Resource Manager ve Spark History Server gibi kümedeki diğer araçlar için bağlantılar da oluşturur.
6. Apache Spark not defteri oluşturma
JupyterLab web arayüzüne erişme
Küme hazır olduğunda Dataproc kümeleri - Cloud Console'a gidip oluşturduğunuz kümeyi tıklayarak ve Web Arayüzleri sekmesine giderek JupyterLab web arayüzünün Bileşen Ağ Geçidi bağlantısını bulabilirsiniz.

Klasik not defteri arayüzü olan Jupyter'a veya Project Jupyter'ın yeni nesil kullanıcı arayüzü olarak tanımlanan JupyterLab'e erişebildiğinizi fark edeceksiniz.
JupyterLab'de birçok yeni ve harika kullanıcı arayüzü özelliği var. Bu nedenle, not defterlerini kullanmaya yeni başladıysanız veya en son iyileştirmeleri arıyorsanız JupyterLab'i kullanmanız önerilir. Resmi belgelere göre, JupyterLab zaman içinde klasik Jupyter arayüzünün yerini alacak.
Python 3 çekirdeğiyle not defteri oluşturma

Başlatıcı sekmesinde Python 3 çekirdeğiyle (PySpark çekirdeği değil) bir not defteri oluşturmak için Python 3 not defteri simgesini tıklayın. Bu not defteri, SparkSession'ı not defterinde yapılandırmanıza ve BigQuery Storage API'yi kullanmak için gereken spark-bigquery-connector'ı eklemenize olanak tanır.
Not defterini yeniden adlandırma

Sol taraftaki kenar çubuğunda veya üst gezinme çubuğunda not defteri adını sağ tıklayın ve not defterini "BigQuery Storage & Spark DataFrames.ipynb" olarak yeniden adlandırın.
Spark kodunuzu not defterinde çalıştırma

Bu not defterinde, BigQuery Storage API'yi kullanarak BigQuery ile Spark arasında veri okuma ve yazma aracı olan spark-bigquery-connector'ı kullanacaksınız.
BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimde önemli iyileştirmeler sağlar. Paralel veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Bu, genel olarak özellikle daha büyük veri kümelerinde performansın önemli ölçüde iyileştirilmesi anlamına gelir.
İlk hücrede, spark-bigquery-connector jar dosyasının doğru sürümünü ekleyebilmek için kümenizin Scala sürümünü kontrol edin.
Giriş [1]:
!scala -version
Çıkış [1]:
Spark oturumu oluşturun ve spark-bigquery-connector paketini ekleyin.
Scala sürümünüz 2.11 ise aşağıdaki paketi kullanın.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Scala sürümünüz 2.12 ise aşağıdaki paketi kullanın.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Giriş [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval'i etkinleştirme
Bu, her adımda DataFrame sonuçlarını df.show() göstermeye gerek kalmadan verir ve çıktının biçimlendirmesini de iyileştirir.
Giriş [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery tablosunu Spark DataFrame'e okuma
Herkese açık bir BigQuery veri kümesinden veri okuyarak Spark DataFrame oluşturun. Bu işlem, verileri Spark kümesine yüklemek için spark-bigquery-connector ve BigQuery Storage API'yi kullanır.
Bir Spark DataFrame oluşturun ve Wikipedia sayfa görüntülemeleri için BigQuery herkese açık veri kümesinden veri yükleyin. Verileri işleme sürecinin gerçekleşeceği Spark'a yüklemek için spark-bigquery-connector kullandığınızdan veriler üzerinde sorgu çalıştırmadığınızı fark edeceksiniz. Bu kod çalıştırıldığında, Spark'ta tembel değerlendirme yapıldığı için tablo aslında yüklenmez ve yürütme işlemi bir sonraki adımda gerçekleşir.
Giriş [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Çıkış [4]:

Gerekli sütunları seçin ve filter() için kullanılan bir diğer ad olan where() ile filtre uygulayın.
Bu kod çalıştırıldığında bir Spark işlemi tetiklenir ve bu noktada veriler BigQuery Storage'dan okunur.
Giriş [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Çıkış [5]:

En çok görüntülenen sayfaları görmek için başlığa göre gruplandırın ve sayfa görüntüleme sayısına göre sıralayın.
Giriş [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Çıkış [6]:
7. Not defterinde Python çizim kitaplıklarını kullanma
Spark işlerinizin çıkışını çizmek için Python'da bulunan çeşitli çizim kitaplıklarından yararlanabilirsiniz.
Spark DataFrame'i Pandas DataFrame'e dönüştürme
Spark DataFrame'i Pandas DataFrame'e dönüştürün ve datehour'u dizin olarak ayarlayın. Bu, verilerle doğrudan Python'da çalışmak ve mevcut birçok Python çizim kitaplığını kullanarak verileri çizmek istiyorsanız yararlı olur.
Giriş [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Çıkış [7]:

Pandas DataFrame'i çizme
Grafikleri not defterinde görüntülemek için gerekli olan matplotlib kitaplığını içe aktarın.
Giriş [8]:
import matplotlib.pyplot as plt
Pandas DataFrame'den çizgi grafik oluşturmak için Pandas plot işlevini kullanın.
Giriş [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Çıkış [9]:
Not defterinin GCS'ye kaydedildiğini kontrol edin
Artık Dataproc kümenizde ilk Jupyter not defteriniz çalışıyor olmalıdır. Not defterinize bir ad verin. Not defteriniz, küme oluşturulurken kullanılan GCS paketine otomatik olarak kaydedilir.
Bunu Cloud Shell'de aşağıdaki gsutil komutunu kullanarak kontrol edebilirsiniz.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Aşağıdaki çıkışı görmeniz gerekir.
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Optimizasyon ipucu: Verileri bellekte önbelleğe alma
Her seferinde BigQuery Storage'dan okumak yerine verilerin bellekte olmasını istediğiniz senaryolar olabilir.
Bu iş, verileri BigQuery'den okur ve filtreyi BigQuery'ye gönderir. Daha sonra toplama işlemi Apache Spark'ta hesaplanır.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Yukarıdaki işi, tablonun önbelleğini içerecek şekilde değiştirebilirsiniz. Artık wiki sütunundaki filtre, Apache Spark tarafından bellekte uygulanır.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Ardından, BigQuery depolama alanından verileri tekrar okumak yerine önbelleğe alınmış verileri kullanarak başka bir wiki dilini filtreleyebilirsiniz. Bu nedenle, işlem çok daha hızlı çalışır.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Önbelleği kaldırmak için
df_wiki_all.unpersist()
9. Daha fazla kullanım alanı için örnek not defterleri
Cloud Dataproc GitHub deposunda, verileri yükleme, verileri kaydetme ve verilerinizi çeşitli Google Cloud Platform ürünleri ve açık kaynak araçlarıyla çizme için yaygın Apache Spark kalıplarını içeren Jupyter not defterleri bulunur:
10. Temizleme
Bu hızlı başlangıç kılavuzu tamamlandıktan sonra GCP hesabınızın gereksiz yere ücretlendirilmesini önlemek için:
- Ortam için Cloud Storage paketini silin ve oluşturduğunuz
- Dataproc ortamını silin.
Bu codelab için özel olarak bir proje oluşturduysanız isteğe bağlı olarak projeyi de silebilirsiniz:
- GCP Console'da Projeler sayfasına gidin.
- Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
- Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.
Lisans
Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 lisansı ile lisanslanmıştır.