Cloud Dataproc'ta Apache Spark ve Jupyter Notebooks

1. Genel Bakış

Bu laboratuvarda, Cloud Dataproc'ta Apache Spark ve Jupyter not defterlerinin nasıl ayarlanıp kullanılacağı anlatılmaktadır.

Jupyter not defterleri, kodunuzu etkileşimli olarak çalıştırmanıza ve sonuçlarınızı hemen görmenize olanak tanıdığından keşif amaçlı veri analizi ve makine öğrenimi modelleri oluşturmak için yaygın bir şekilde kullanılır.

Ancak Apache Spark ve Jupyter Notebooks'u ayarlamak ve kullanmak karmaşık olabilir.

b9ed855863c57d6.png

Cloud Dataproc; Apache Spark, Jupyter bileşeni ve Bileşen Ağ Geçidi ile yaklaşık 90 saniyede bir Dataproc Kümesi oluşturmanızı sağlayarak bu işlemi hızlı ve kolay hale getirir.

Neler öğreneceksiniz?

Bu codelab'de şunları öğreneceksiniz:

  • Kümeniz için Google Cloud Storage paketi oluşturma
  • Jupyter ve Bileşen Ağ Geçidi ile Dataproc Kümesi oluşturun.
  • Dataproc'ta JupyterLab web kullanıcı arayüzüne erişme
  • Spark BigQuery Storage bağlayıcısından yararlanarak not defteri oluşturma
  • Spark işi çalıştırma ve sonuçları çizme.

Bu laboratuvarı Google Cloud'da çalıştırmanın toplam maliyeti yaklaşık 1 ABD dolarıdır. Cloud Dataproc fiyatlandırmasıyla ilgili tüm ayrıntıları burada bulabilirsiniz.

2. Proje oluşturma

console.cloud.google.com adresinden Google Cloud Platform konsolunda oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Sonraki adımda, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu codelab'i çalıştırmanın maliyeti birkaç dolardan fazla olmayacaktır. Ancak daha fazla kaynak kullanmaya karar verirseniz veya bunları çalışır durumda bırakırsanız daha yüksek ücret ödemeniz gerekebilir. Bu codelab'in son bölümü, projenizi temizleme konusunda size yol gösterecektir.

Yeni Google Cloud Platform kullanıcıları 300 ABD doları değerinde ücretsiz denemeden yararlanabilir.

3. Ortamınızı kurma

Öncelikle Cloud Console'un sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:

a10c47ee6ca41c54.png

Cloud Shell yüklendikten sonra önceki adımdaki proje kimliğini ayarlamak için aşağıdaki komutu çalıştırın**:**

gcloud config set project <project_id>

Proje kimliğini Cloud Console'un sol üst tarafından projenizi tıklayarak da görebilirsiniz:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

Ardından Dataproc, Compute Engine ve BigQuery Storage API'lerini etkinleştirin.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Bu işlemi Cloud Console'da da yapabilirsiniz. Ekranın sol üst kısmındaki menü simgesini tıklayın.

2bfc27ef9ba2ec7d.png

Açılır menüden API Yöneticisi'ni seçin.

408af5f32c4b7c25.png

API'leri ve Hizmetleri Etkinleştir'i tıklayın.

a9c0e84296a7ba5b.png

Aşağıdaki API'leri arayın ve etkinleştirin:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS paketi oluşturma

Verilerinize en yakın bölgede bir Google Cloud Storage paketi oluşturun ve pakete benzersiz bir ad verin.

Bu, Dataproc kümesi için kullanılır.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Aşağıdaki çıkışı göreceksiniz

Creating gs://<your-bucket-name>/...

5. Dataproc Kümenizi Jupyter ve Bileşen Ağ Geçidi

Kümenizi oluşturma

Kümeniz için env değişkenlerini ayarlama

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Ardından kümenizde Jupyter ile çalışmak için gerekli tüm bileşenlerle kümenizi oluşturmak üzere bu gcloud komutunu çalıştırın.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Kümeniz oluşturulurken aşağıdaki çıkışı görürsünüz.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Kümenizin oluşturulması yaklaşık 90 saniye sürer. Küme hazır olduğunda kümenize, Dataproc Cloud Console kullanıcı arayüzünden erişebilirsiniz.

Beklerken gcloud komutunda kullanılan flag'ler hakkında daha fazla bilgi edinmek için aşağıdaki okumaya devam edebilirsiniz.

Küme oluşturulduktan sonra aşağıdaki çıkışı almanız gerekir:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create komutunda kullanılan işaretler

gcloud dataproc create komutunda kullanılan flag'lerin dökümü

--region=${REGION}

Kümenin oluşturulacağı bölgeyi ve alt bölgeyi belirtir. Özelliğin kullanılabildiği bölgelerin listesini burada görebilirsiniz.

--image-version=1.4

Kümenizde kullanılacak görüntü sürümü. Kullanılabilir sürümlerin listesini burada bulabilirsiniz.

--bucket=${BUCKET_NAME}

Küme için kullanmak üzere daha önce oluşturduğunuz Google Cloud Storage paketini belirtin. Bir GCS paketi sağlamazsanız sizin için bu paket oluşturulur.

GCS paketi silinmediğinden, kümenizi silseniz bile not defterleriniz de burada kaydedilir.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc kümeniz için kullanılacak makine türleri. Kullanılabilir makine türlerinin listesini burada görebilirsiniz.

–num-workers işaretini ayarlamazsanız varsayılan olarak 1 ana düğüm ve 2 çalışma düğümü oluşturulur

--optional-components=ANACONDA,JUPYTER

İsteğe bağlı bileşenler için bu değerleri ayarladığınızda, Jupyter ve Anaconda için gerekli tüm kitaplıklar (Jupyter not defterleri için gereklidir) kümenize yüklenir.

--enable-component-gateway

Bileşen Ağ Geçidi etkinleştirildiğinde Apache Knox ve Ters Proxy kullanarak App Engine bağlantısı oluşturulur. Bu bağlantı, Jupyter ve JupyterLab web arayüzlerine kolay, güvenli ve kimliği doğrulanmış erişim olanağı sunar. Böylece, artık SSH tünelleri oluşturmanıza gerek kalmaz.

Ayrıca, işlerinizin ve küme kullanım kalıplarınızın performansını görmenize yardımcı olan Yarn Resource Manager ve Spark History Server gibi kümedeki diğer araçlar için de bağlantılar oluşturur.

6. Apache Spark not defteri oluşturma

JupyterLab web arayüzüne erişme

Küme hazır olduğunda, JupyterLab web arayüzünün Bileşen Ağ Geçidi bağlantısını bulmak için Dataproc Kümeleri - Cloud konsolu'na ve oluşturduğunuz kümeyi tıklayıp Web Arayüzleri sekmesine gidin.

afc40202d555de47.png

Klasik not defteri arayüzü olan Jupyter'e veya Project Jupyter için yeni nesil kullanıcı arayüzü olarak tanımlanan JupyterLab'e erişiminizin olduğunu fark edeceksiniz.

JupyterLab'de birbirinden güzel pek çok yeni kullanıcı arayüzü özelliği var. Bu yüzden, not defterlerini kullanmaya yeni başladıysanız veya en yeni iyileştirmeleri yapmak istiyorsanız resmi dokümanlara göre klasik Jupyter arayüzünün yerini alacak olan JupyterLab'i kullanmanızı öneririz.

Python 3 çekirdeğiyle not defteri oluşturma

a463623f2ebf0518.png

Python 3 çekirdeğine (PySpark çekirdeğine değil) sahip bir not defteri oluşturmak için başlatıcı sekmesinden Python 3 not defteri simgesini tıklayın. Bu çekirdeği, not defterinde SparkSession'ı yapılandırmanıza ve BigQuery Storage API'yi kullanmak için gerekli olan spark-bigquery-connector'ı dahil etmenize olanak tanır.

Not defterini yeniden adlandırma

196a3276ed07e1f3.png

Soldaki veya üst gezinme bölümündeki kenar çubuğundan not defteri adını sağ tıklayıp not defterini "BigQuery Storage &" olarak yeniden adlandırın. Spark DataFrames.ipynb"

Not defterinde Spark kodunuzu çalıştırma

fbac38062e5bb9cf.png

Bu not defterinde, BigQuery Storage API'den yararlanarak BigQuery ile Spark arasında veri okuma ve yazma aracı olan spark-bigquery-connector'ı kullanacaksınız.

BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimde önemli iyileştirmeler sağlıyor. Paralel olarak veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Yüksek seviyede bu, özellikle büyük veri kümelerinde performansın önemli ölçüde artması anlamına gelir.

İlk hücrede kümenizin Scala sürümünü kontrol edin. Böylece spark-bigquery-connector jar dosyasının doğru sürümünü ekleyin.

Giriş [1]:

!scala -version

Çıkış [1]:f580e442576b8b1f.png Spark oturumu oluşturun ve spark-bigquery-connector paketini ekleyin.

Scala sürümünüz 2.11 ise aşağıdaki paketi kullanın.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala sürümünüz 2.12 ise aşağıdaki paketi kullanın.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Giriş [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval'ı etkinleştir

Bu işlem, df.show() işlevini yeniden göstermeye gerek kalmadan her adımda DataFrames sonuçlarının çıktısını verir ve çıktının biçimlendirmesini iyileştirir.

Giriş [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery tablosunu Spark DataFrame'e okuma

Herkese açık bir BigQuery veri kümesindeki verileri okuyarak Spark DataFrame oluşturun. Bu, verileri Spark kümesine yüklemek için spark-bigquery-connector ve BigQuery Storage API'den yararlanır.

Spark DataFrame oluşturun ve Wikipedia sayfa görüntülemeler için BigQuery herkese açık veri kümesinden veri yükleyin. Verileri, işlenmesinin gerçekleşeceği Spark'a yüklemek için spark-bigquery-connector özelliğini kullandığınızdan veriler üzerinde bir sorgu çalıştırmadığınızı fark edeceksiniz. Bu kod çalıştırıldığında, Spark'ta geç bir değerlendirme olduğundan tabloyu yüklemez ve yürütme bir sonraki adımda gerçekleşir.

Giriş [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Çıkış [4]:

c107a33f6fc30ca.png

Gerekli sütunları seçin ve filter() için takma ad olan where() adını kullanarak bir filtre uygulayın.

Bu kod çalıştırıldığında bir Spark işlemini tetikler ve veriler BigQuery Storage'dan okunur.

Giriş [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Çıkış [5]:

ad363cbe510d625a.png

En popüler sayfaları görmek için başlığa ve sayfa görüntüleme sayısına göre sıralama yapın

Giriş [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Çıkış [6]:f718abd05afc0f4.png

7. Not defterinde Python çizim kitaplıklarını kullanma

Spark işlerinizin çıkışını planlamak için Python'da bulunan çeşitli grafik grafiklerinden yararlanabilirsiniz.

Spark DataFrame'i Pandas DataFrame'e dönüştürme

Spark DataFrame'i Pandas DataFrame'e dönüştürün ve datehour değerini dizin olarak ayarlayın. Verilerle doğrudan Python'da çalışmak ve verileri mevcut birçok Python çizim kitaplığını kullanarak çizmek istiyorsanız bu yöntem yararlıdır.

Giriş [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Çıkış [7]:

3df2aaa2351f028d.png

Pandas Veri Çerçevesinin Grafiğini Oluşturma

Not defterinde grafikleri görüntülemek için gereken matplotlib kitaplığını içe aktarın

Giriş [8]:

import matplotlib.pyplot as plt

Pandas DataFrame'den çizgi grafik oluşturmak için Pandas grafiği işlevini kullanın.

Giriş [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Çıkış [9]:bade7042c3033594.png

Not defterinin GCS'ye kaydedilip kaydedilmediğini kontrol etme

Artık ilk Jupyter not defterinizi Dataproc kümenizde çalışır duruma getirmiş olmanız gerekir. Not defterinize bir ad verin. Bu ad, küme oluşturulurken kullanılan GCS paketine otomatik olarak kaydedilir.

Bunu, Cloud Shell'deki bu gsutil komutunu kullanarak kontrol edebilirsiniz.

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Aşağıdaki çıkışı göreceksiniz

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Optimizasyon ipucu - Verileri bellekte önbelleğe alın

Bazı senaryolarda, her seferinde BigQuery Storage'dan veri okumak yerine verilerin bellekte tutulmasını isteyebilirsiniz.

Bu iş, verileri BigQuery'den okur ve filtreyi BigQuery'ye aktarır. Ardından toplama Apache Spark'ta hesaplanacaktır.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Yukarıdaki işi, tablonun bir önbelleğini içerecek şekilde değiştirebilirsiniz. Artık wiki sütunundaki filtre, Apache Spark tarafından bellekte uygulanır.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Böylece, BigQuery depolama alanındaki verileri tekrar okumak yerine önbelleğe alınan verileri kullanarak başka bir wiki dili için filtre uygulayabilirsiniz. Bu şekilde, çalışma çok daha hızlı çalışır.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Önbelleği kaldırmak için

df_wiki_all.unpersist()

9. Daha fazla kullanım alanı için örnek not defterleri

Cloud Dataproc GitHub deposu; veri yüklemek, veri kaydetmek ve çeşitli Google Cloud Platform ürünleri ile açık kaynaklı araçlarla verilerinizi çizmek için yaygın Apache Spark kalıplarına sahip Jupyter not defterlerini içerir:

10. Temizleme

Bu hızlı başlangıç işlemi tamamlandıktan sonra GCP hesabınızdan gereksiz ücretlerle karşılaşmamak için:

  1. Ortam için oluşturduğunuz ve oluşturduğunuz Cloud Storage paketini silin.
  2. Dataproc ortamını silin.

Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Lisans ve Apache 2.0 lisansı altında lisanslanmıştır.