Cloud Dataproc'ta Apache Spark ve Jupyter Notebooks

1. Genel Bakış

Bu laboratuvarda, Cloud Dataproc'ta Apache Spark ve Jupyter not defterlerinin nasıl ayarlanacağı ve kullanılacağı açıklanmaktadır.

Jupyter not defterleri, kodunuzu etkileşimli olarak çalıştırmanıza ve sonuçlarınızı anında görmenize olanak tanıdığı için keşif amaçlı veri analizi ve makine öğrenimi modelleri oluşturmak için yaygın olarak kullanılır.

Ancak Apache Spark ve Jupyter Notebook'ları kurmak ve kullanmak karmaşık olabilir.

b9ed855863c57d6.png

Cloud Dataproc, Apache Spark, Jupyter bileşeni ve Bileşen Ağ Geçidi ile yaklaşık 90 saniyede Dataproc kümesi oluşturmanıza olanak tanıyarak bu işlemi hızlı ve kolay hale getirir.

Neler öğreneceksiniz?

Bu codelab'de şunları öğreneceksiniz:

  • Kümeniz için Google Cloud Storage paketi oluşturma
  • Jupyter ve Bileşen Ağ Geçidi ile Dataproc kümesi oluşturma,
  • Dataproc'ta JupyterLab web kullanıcı arayüzüne erişme
  • Spark BigQuery Storage bağlayıcısını kullanan bir not defteri oluşturun.
  • Spark işi çalıştırma ve sonuçları grafiğe dönüştürme.

Bu laboratuvarı Google Cloud'da çalıştırmanın toplam maliyeti yaklaşık 1 ABD dolarıdır. Cloud Dataproc fiyatlandırmasıyla ilgili tüm ayrıntıları burada bulabilirsiniz.

2. Proje oluşturma

console.cloud.google.com adresinden Google Cloud Platform Console'da oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Ardından, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu codelab'i tamamlamak birkaç dolardan fazla tutmaz. Ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet artabilir. Bu codelab'in son bölümünde projenizi temizleme adımları açıklanmaktadır.

Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerinde ücretsiz deneme sürümünden yararlanabilir.

3. Ortamınızı kurma

Öncelikle, bulut konsolunun sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

a10c47ee6ca41c54.png

Cloud Shell yüklendikten sonra, önceki adımda proje kimliğini ayarlamak için aşağıdaki komutu çalıştırın**:**

gcloud config set project <project_id>

Proje kimliğini, bulut konsolunun sol üst kısmında projenizi tıklayarak da bulabilirsiniz:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Ardından Dataproc, Compute Engine ve BigQuery Storage API'lerini etkinleştirin.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Alternatif olarak bu işlem Cloud Console'da da yapılabilir. Ekranın sol üst kısmındaki menü simgesini tıklayın.

2bfc27ef9ba2ec7d.png

Açılır menüden API Yöneticisi'ni seçin.

408af5f32c4b7c25.png

API'leri ve hizmetleri etkinleştir'i tıklayın.

a9c0e84296a7ba5b.png

Aşağıdaki API'leri arayın ve etkinleştirin:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS paketi oluşturma

Verilerinize en yakın bölgede bir Google Cloud Storage paketi oluşturun ve bu pakete benzersiz bir ad verin.

Bu, Dataproc kümesi için kullanılır.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Aşağıdaki çıkışı görmeniz gerekir.

Creating gs://<your-bucket-name>/...

5. Jupyter ve Component Gateway ile Dataproc kümenizi oluşturma

Kümenizi oluşturma

Kümeniz için ortam değişkenlerini ayarlama

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Ardından, kümenizde Jupyter ile çalışmak için gereken tüm bileşenleri içeren kümenizi oluşturmak üzere bu gcloud komutunu çalıştırın.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Kümeniz oluşturulurken aşağıdaki çıkışı görürsünüz.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Kümenizin oluşturulması yaklaşık 90 saniye sürer. Kümeniz hazır olduğunda Dataproc Cloud Console kullanıcı arayüzünden erişebilirsiniz.

Beklerken gcloud komutunda kullanılan işaretler hakkında daha fazla bilgi edinmek için okumaya devam edebilirsiniz.

Küme oluşturulduktan sonra aşağıdaki çıkışı alırsınız:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create komutunda kullanılan işaretler

gcloud dataproc create komutunda kullanılan işaretlerin dökümü aşağıda verilmiştir.

--region=${REGION}

Kümenin oluşturulacağı bölgeyi ve alt bölgeyi belirtir. Kullanılabilir bölgelerin listesini burada bulabilirsiniz.

--image-version=1.4

Kümenizde kullanılacak görüntü sürümü. Kullanılabilir sürümlerin listesini burada görebilirsiniz.

--bucket=${BUCKET_NAME}

Kümede kullanılmak üzere daha önce oluşturduğunuz Google Cloud Storage paketini belirtin. GCS grubu sağlamazsanız sizin için oluşturulur.

GCS paketi silinmediği için kümenizi silseniz bile not defterleriniz buraya kaydedilir.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc kümeniz için kullanılacak makine türleri. Kullanılabilir makine türlerinin listesini burada görebilirsiniz.

-num-workers işaretini ayarlamazsanız varsayılan olarak 1 ana düğüm ve 2 çalışan düğümü oluşturulur.

--optional-components=ANACONDA,JUPYTER

İsteğe bağlı bileşenler için bu değerleri ayarladığınızda, kümenize Jupyter ve Anaconda (Jupyter not defterleri için gereklidir) için gerekli tüm kitaplıklar yüklenir.

--enable-component-gateway

Component Gateway'i etkinleştirdiğinizde Apache Knox ve Inverting Proxy kullanılarak bir App Engine bağlantısı oluşturulur. Bu bağlantı, Jupyter ve JupyterLab web arayüzlerine kolay, güvenli ve kimliği doğrulanmış erişim sağlar. Böylece artık SSH tünelleri oluşturmanız gerekmez.

Ayrıca, işlerinizin performansını ve küme kullanım kalıplarını görmenize yardımcı olan Yarn Resource Manager ve Spark History Server gibi kümedeki diğer araçlar için bağlantılar da oluşturur.

6. Apache Spark not defteri oluşturma

JupyterLab web arayüzüne erişme

Küme hazır olduğunda Dataproc kümeleri - Cloud Console'a gidip oluşturduğunuz kümeyi tıklayarak ve Web Arayüzleri sekmesine giderek JupyterLab web arayüzünün Bileşen Ağ Geçidi bağlantısını bulabilirsiniz.

afc40202d555de47.png

Klasik not defteri arayüzü olan Jupyter'a veya Project Jupyter'ın yeni nesil kullanıcı arayüzü olarak tanımlanan JupyterLab'e erişebildiğinizi fark edeceksiniz.

JupyterLab'de birçok yeni ve harika kullanıcı arayüzü özelliği var. Bu nedenle, not defterlerini kullanmaya yeni başladıysanız veya en son iyileştirmeleri arıyorsanız JupyterLab'i kullanmanız önerilir. Resmi belgelere göre, JupyterLab zaman içinde klasik Jupyter arayüzünün yerini alacak.

Python 3 çekirdeğiyle not defteri oluşturma

a463623f2ebf0518.png

Başlatıcı sekmesinde Python 3 çekirdeğiyle (PySpark çekirdeği değil) bir not defteri oluşturmak için Python 3 not defteri simgesini tıklayın. Bu not defteri, SparkSession'ı not defterinde yapılandırmanıza ve BigQuery Storage API'yi kullanmak için gereken spark-bigquery-connector'ı eklemenize olanak tanır.

Not defterini yeniden adlandırma

196a3276ed07e1f3.png

Sol taraftaki kenar çubuğunda veya üst gezinme çubuğunda not defteri adını sağ tıklayın ve not defterini "BigQuery Storage & Spark DataFrames.ipynb" olarak yeniden adlandırın.

Spark kodunuzu not defterinde çalıştırma

fbac38062e5bb9cf.png

Bu not defterinde, BigQuery Storage API'yi kullanarak BigQuery ile Spark arasında veri okuma ve yazma aracı olan spark-bigquery-connector'ı kullanacaksınız.

BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimde önemli iyileştirmeler sağlar. Paralel veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Bu, genel olarak özellikle daha büyük veri kümelerinde performansın önemli ölçüde iyileştirilmesi anlamına gelir.

İlk hücrede, spark-bigquery-connector jar dosyasının doğru sürümünü ekleyebilmek için kümenizin Scala sürümünü kontrol edin.

Giriş [1]:

!scala -version

Çıkış [1]:f580e442576b8b1f.png Spark oturumu oluşturun ve spark-bigquery-connector paketini ekleyin.

Scala sürümünüz 2.11 ise aşağıdaki paketi kullanın.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala sürümünüz 2.12 ise aşağıdaki paketi kullanın.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Giriş [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval'i etkinleştirme

Bu, her adımda DataFrame sonuçlarını df.show() göstermeye gerek kalmadan verir ve çıktının biçimlendirmesini de iyileştirir.

Giriş [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery tablosunu Spark DataFrame'e okuma

Herkese açık bir BigQuery veri kümesinden veri okuyarak Spark DataFrame oluşturun. Bu işlem, verileri Spark kümesine yüklemek için spark-bigquery-connector ve BigQuery Storage API'yi kullanır.

Bir Spark DataFrame oluşturun ve Wikipedia sayfa görüntülemeleri için BigQuery herkese açık veri kümesinden veri yükleyin. Verileri işleme sürecinin gerçekleşeceği Spark'a yüklemek için spark-bigquery-connector kullandığınızdan veriler üzerinde sorgu çalıştırmadığınızı fark edeceksiniz. Bu kod çalıştırıldığında, Spark'ta tembel değerlendirme yapıldığı için tablo aslında yüklenmez ve yürütme işlemi bir sonraki adımda gerçekleşir.

Giriş [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Çıkış [4]:

c107a33f6fc30ca.png

Gerekli sütunları seçin ve filter() için kullanılan bir diğer ad olan where() ile filtre uygulayın.

Bu kod çalıştırıldığında bir Spark işlemi tetiklenir ve bu noktada veriler BigQuery Storage'dan okunur.

Giriş [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Çıkış [5]:

ad363cbe510d625a.png

En çok görüntülenen sayfaları görmek için başlığa göre gruplandırın ve sayfa görüntüleme sayısına göre sıralayın.

Giriş [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Çıkış [6]:f718abd05afc0f4.png

7. Not defterinde Python çizim kitaplıklarını kullanma

Spark işlerinizin çıkışını çizmek için Python'da bulunan çeşitli çizim kitaplıklarından yararlanabilirsiniz.

Spark DataFrame'i Pandas DataFrame'e dönüştürme

Spark DataFrame'i Pandas DataFrame'e dönüştürün ve datehour'u dizin olarak ayarlayın. Bu, verilerle doğrudan Python'da çalışmak ve mevcut birçok Python çizim kitaplığını kullanarak verileri çizmek istiyorsanız yararlı olur.

Giriş [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Çıkış [7]:

3df2aaa2351f028d.png

Pandas DataFrame'i çizme

Grafikleri not defterinde görüntülemek için gerekli olan matplotlib kitaplığını içe aktarın.

Giriş [8]:

import matplotlib.pyplot as plt

Pandas DataFrame'den çizgi grafik oluşturmak için Pandas plot işlevini kullanın.

Giriş [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Çıkış [9]:bade7042c3033594.png

Not defterinin GCS'ye kaydedildiğini kontrol edin

Artık Dataproc kümenizde ilk Jupyter not defteriniz çalışıyor olmalıdır. Not defterinize bir ad verin. Not defteriniz, küme oluşturulurken kullanılan GCS paketine otomatik olarak kaydedilir.

Bunu Cloud Shell'de aşağıdaki gsutil komutunu kullanarak kontrol edebilirsiniz.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Aşağıdaki çıkışı görmeniz gerekir.

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Optimizasyon ipucu: Verileri bellekte önbelleğe alma

Her seferinde BigQuery Storage'dan okumak yerine verilerin bellekte olmasını istediğiniz senaryolar olabilir.

Bu iş, verileri BigQuery'den okur ve filtreyi BigQuery'ye gönderir. Daha sonra toplama işlemi Apache Spark'ta hesaplanır.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Yukarıdaki işi, tablonun önbelleğini içerecek şekilde değiştirebilirsiniz. Artık wiki sütunundaki filtre, Apache Spark tarafından bellekte uygulanır.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Ardından, BigQuery depolama alanından verileri tekrar okumak yerine önbelleğe alınmış verileri kullanarak başka bir wiki dilini filtreleyebilirsiniz. Bu nedenle, işlem çok daha hızlı çalışır.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Önbelleği kaldırmak için

df_wiki_all.unpersist()

9. Daha fazla kullanım alanı için örnek not defterleri

Cloud Dataproc GitHub deposunda, verileri yükleme, verileri kaydetme ve verilerinizi çeşitli Google Cloud Platform ürünleri ve açık kaynak araçlarıyla çizme için yaygın Apache Spark kalıplarını içeren Jupyter not defterleri bulunur:

10. Temizleme

Bu hızlı başlangıç kılavuzu tamamlandıktan sonra GCP hesabınızın gereksiz yere ücretlendirilmesini önlemek için:

  1. Ortam için Cloud Storage paketini silin ve oluşturduğunuz
  2. Dataproc ortamını silin.

Bu codelab için özel olarak bir proje oluşturduysanız isteğe bağlı olarak projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 lisansı ile lisanslanmıştır.