1. Genel Bakış
Bu laboratuvarda, Cloud Dataproc'ta Apache Spark ve Jupyter not defterlerinin nasıl ayarlanıp kullanılacağı anlatılmaktadır.
Jupyter not defterleri, kodunuzu etkileşimli olarak çalıştırmanıza ve sonuçlarınızı hemen görmenize olanak tanıdığından keşif amaçlı veri analizi ve makine öğrenimi modelleri oluşturmak için yaygın bir şekilde kullanılır.
Ancak Apache Spark ve Jupyter Notebooks'u ayarlamak ve kullanmak karmaşık olabilir.
Cloud Dataproc; Apache Spark, Jupyter bileşeni ve Bileşen Ağ Geçidi ile yaklaşık 90 saniyede bir Dataproc Kümesi oluşturmanızı sağlayarak bu işlemi hızlı ve kolay hale getirir.
Neler öğreneceksiniz?
Bu codelab'de şunları öğreneceksiniz:
- Kümeniz için Google Cloud Storage paketi oluşturma
- Jupyter ve Bileşen Ağ Geçidi ile Dataproc Kümesi oluşturun.
- Dataproc'ta JupyterLab web kullanıcı arayüzüne erişme
- Spark BigQuery Storage bağlayıcısından yararlanarak not defteri oluşturma
- Spark işi çalıştırma ve sonuçları çizme.
Bu laboratuvarı Google Cloud'da çalıştırmanın toplam maliyeti yaklaşık 1 ABD dolarıdır. Cloud Dataproc fiyatlandırmasıyla ilgili tüm ayrıntıları burada bulabilirsiniz.
2. Proje oluşturma
console.cloud.google.com adresinden Google Cloud Platform konsolunda oturum açın ve yeni bir proje oluşturun:
Sonraki adımda, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.
Bu codelab'i çalıştırmanın maliyeti birkaç dolardan fazla olmayacaktır. Ancak daha fazla kaynak kullanmaya karar verirseniz veya bunları çalışır durumda bırakırsanız daha yüksek ücret ödemeniz gerekebilir. Bu codelab'in son bölümü, projenizi temizleme konusunda size yol gösterecektir.
Yeni Google Cloud Platform kullanıcıları 300 ABD doları değerinde ücretsiz denemeden yararlanabilir.
3. Ortamınızı kurma
Öncelikle Cloud Console'un sağ üst köşesindeki düğmeyi tıklayarak Cloud Shell'i açın:
Cloud Shell yüklendikten sonra önceki adımdaki proje kimliğini ayarlamak için aşağıdaki komutu çalıştırın**:**
gcloud config set project <project_id>
Proje kimliğini Cloud Console'un sol üst tarafından projenizi tıklayarak da görebilirsiniz:
Ardından Dataproc, Compute Engine ve BigQuery Storage API'lerini etkinleştirin.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Bu işlemi Cloud Console'da da yapabilirsiniz. Ekranın sol üst kısmındaki menü simgesini tıklayın.
Açılır menüden API Yöneticisi'ni seçin.
API'leri ve Hizmetleri Etkinleştir'i tıklayın.
Aşağıdaki API'leri arayın ve etkinleştirin:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS paketi oluşturma
Verilerinize en yakın bölgede bir Google Cloud Storage paketi oluşturun ve pakete benzersiz bir ad verin.
Bu, Dataproc kümesi için kullanılır.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Aşağıdaki çıkışı göreceksiniz
Creating gs://<your-bucket-name>/...
5. Dataproc Kümenizi Jupyter ve Bileşen Ağ Geçidi
Kümenizi oluşturma
Kümeniz için env değişkenlerini ayarlama
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Ardından kümenizde Jupyter ile çalışmak için gerekli tüm bileşenlerle kümenizi oluşturmak üzere bu gcloud komutunu çalıştırın.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Kümeniz oluşturulurken aşağıdaki çıkışı görürsünüz.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Kümenizin oluşturulması yaklaşık 90 saniye sürer. Küme hazır olduğunda kümenize, Dataproc Cloud Console kullanıcı arayüzünden erişebilirsiniz.
Beklerken gcloud komutunda kullanılan flag'ler hakkında daha fazla bilgi edinmek için aşağıdaki okumaya devam edebilirsiniz.
Küme oluşturulduktan sonra aşağıdaki çıkışı almanız gerekir:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create komutunda kullanılan işaretler
gcloud dataproc create komutunda kullanılan flag'lerin dökümü
--region=${REGION}
Kümenin oluşturulacağı bölgeyi ve alt bölgeyi belirtir. Özelliğin kullanılabildiği bölgelerin listesini burada görebilirsiniz.
--image-version=1.4
Kümenizde kullanılacak görüntü sürümü. Kullanılabilir sürümlerin listesini burada bulabilirsiniz.
--bucket=${BUCKET_NAME}
Küme için kullanmak üzere daha önce oluşturduğunuz Google Cloud Storage paketini belirtin. Bir GCS paketi sağlamazsanız sizin için bu paket oluşturulur.
GCS paketi silinmediğinden, kümenizi silseniz bile not defterleriniz de burada kaydedilir.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc kümeniz için kullanılacak makine türleri. Kullanılabilir makine türlerinin listesini burada görebilirsiniz.
–num-workers işaretini ayarlamazsanız varsayılan olarak 1 ana düğüm ve 2 çalışma düğümü oluşturulur
--optional-components=ANACONDA,JUPYTER
İsteğe bağlı bileşenler için bu değerleri ayarladığınızda, Jupyter ve Anaconda için gerekli tüm kitaplıklar (Jupyter not defterleri için gereklidir) kümenize yüklenir.
--enable-component-gateway
Bileşen Ağ Geçidi etkinleştirildiğinde Apache Knox ve Ters Proxy kullanarak App Engine bağlantısı oluşturulur. Bu bağlantı, Jupyter ve JupyterLab web arayüzlerine kolay, güvenli ve kimliği doğrulanmış erişim olanağı sunar. Böylece, artık SSH tünelleri oluşturmanıza gerek kalmaz.
Ayrıca, işlerinizin ve küme kullanım kalıplarınızın performansını görmenize yardımcı olan Yarn Resource Manager ve Spark History Server gibi kümedeki diğer araçlar için de bağlantılar oluşturur.
6. Apache Spark not defteri oluşturma
JupyterLab web arayüzüne erişme
Küme hazır olduğunda, JupyterLab web arayüzünün Bileşen Ağ Geçidi bağlantısını bulmak için Dataproc Kümeleri - Cloud konsolu'na ve oluşturduğunuz kümeyi tıklayıp Web Arayüzleri sekmesine gidin.
Klasik not defteri arayüzü olan Jupyter'e veya Project Jupyter için yeni nesil kullanıcı arayüzü olarak tanımlanan JupyterLab'e erişiminizin olduğunu fark edeceksiniz.
JupyterLab'de birbirinden güzel pek çok yeni kullanıcı arayüzü özelliği var. Bu yüzden, not defterlerini kullanmaya yeni başladıysanız veya en yeni iyileştirmeleri yapmak istiyorsanız resmi dokümanlara göre klasik Jupyter arayüzünün yerini alacak olan JupyterLab'i kullanmanızı öneririz.
Python 3 çekirdeğiyle not defteri oluşturma
Python 3 çekirdeğine (PySpark çekirdeğine değil) sahip bir not defteri oluşturmak için başlatıcı sekmesinden Python 3 not defteri simgesini tıklayın. Bu çekirdeği, not defterinde SparkSession'ı yapılandırmanıza ve BigQuery Storage API'yi kullanmak için gerekli olan spark-bigquery-connector'ı dahil etmenize olanak tanır.
Not defterini yeniden adlandırma
Soldaki veya üst gezinme bölümündeki kenar çubuğundan not defteri adını sağ tıklayıp not defterini "BigQuery Storage &" olarak yeniden adlandırın. Spark DataFrames.ipynb"
Not defterinde Spark kodunuzu çalıştırma
Bu not defterinde, BigQuery Storage API'den yararlanarak BigQuery ile Spark arasında veri okuma ve yazma aracı olan spark-bigquery-connector'ı kullanacaksınız.
BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimde önemli iyileştirmeler sağlıyor. Paralel olarak veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Yüksek seviyede bu, özellikle büyük veri kümelerinde performansın önemli ölçüde artması anlamına gelir.
İlk hücrede kümenizin Scala sürümünü kontrol edin. Böylece spark-bigquery-connector jar dosyasının doğru sürümünü ekleyin.
Giriş [1]:
!scala -version
Çıkış [1]: Spark oturumu oluşturun ve spark-bigquery-connector paketini ekleyin.
Scala sürümünüz 2.11 ise aşağıdaki paketi kullanın.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Scala sürümünüz 2.12 ise aşağıdaki paketi kullanın.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Giriş [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval'ı etkinleştir
Bu işlem, df.show() işlevini yeniden göstermeye gerek kalmadan her adımda DataFrames sonuçlarının çıktısını verir ve çıktının biçimlendirmesini iyileştirir.
Giriş [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery tablosunu Spark DataFrame'e okuma
Herkese açık bir BigQuery veri kümesindeki verileri okuyarak Spark DataFrame oluşturun. Bu, verileri Spark kümesine yüklemek için spark-bigquery-connector ve BigQuery Storage API'den yararlanır.
Spark DataFrame oluşturun ve Wikipedia sayfa görüntülemeler için BigQuery herkese açık veri kümesinden veri yükleyin. Verileri, işlenmesinin gerçekleşeceği Spark'a yüklemek için spark-bigquery-connector özelliğini kullandığınızdan veriler üzerinde bir sorgu çalıştırmadığınızı fark edeceksiniz. Bu kod çalıştırıldığında, Spark'ta geç bir değerlendirme olduğundan tabloyu yüklemez ve yürütme bir sonraki adımda gerçekleşir.
Giriş [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Çıkış [4]:
Gerekli sütunları seçin ve filter()
için takma ad olan where()
adını kullanarak bir filtre uygulayın.
Bu kod çalıştırıldığında bir Spark işlemini tetikler ve veriler BigQuery Storage'dan okunur.
Giriş [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Çıkış [5]:
En popüler sayfaları görmek için başlığa ve sayfa görüntüleme sayısına göre sıralama yapın
Giriş [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Çıkış [6]:
7. Not defterinde Python çizim kitaplıklarını kullanma
Spark işlerinizin çıkışını planlamak için Python'da bulunan çeşitli grafik grafiklerinden yararlanabilirsiniz.
Spark DataFrame'i Pandas DataFrame'e dönüştürme
Spark DataFrame'i Pandas DataFrame'e dönüştürün ve datehour değerini dizin olarak ayarlayın. Verilerle doğrudan Python'da çalışmak ve verileri mevcut birçok Python çizim kitaplığını kullanarak çizmek istiyorsanız bu yöntem yararlıdır.
Giriş [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Çıkış [7]:
Pandas Veri Çerçevesinin Grafiğini Oluşturma
Not defterinde grafikleri görüntülemek için gereken matplotlib kitaplığını içe aktarın
Giriş [8]:
import matplotlib.pyplot as plt
Pandas DataFrame'den çizgi grafik oluşturmak için Pandas grafiği işlevini kullanın.
Giriş [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Çıkış [9]:
Not defterinin GCS'ye kaydedilip kaydedilmediğini kontrol etme
Artık ilk Jupyter not defterinizi Dataproc kümenizde çalışır duruma getirmiş olmanız gerekir. Not defterinize bir ad verin. Bu ad, küme oluşturulurken kullanılan GCS paketine otomatik olarak kaydedilir.
Bunu, Cloud Shell'deki bu gsutil komutunu kullanarak kontrol edebilirsiniz.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Aşağıdaki çıkışı göreceksiniz
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Optimizasyon ipucu - Verileri bellekte önbelleğe alın
Bazı senaryolarda, her seferinde BigQuery Storage'dan veri okumak yerine verilerin bellekte tutulmasını isteyebilirsiniz.
Bu iş, verileri BigQuery'den okur ve filtreyi BigQuery'ye aktarır. Ardından toplama Apache Spark'ta hesaplanacaktır.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Yukarıdaki işi, tablonun bir önbelleğini içerecek şekilde değiştirebilirsiniz. Artık wiki sütunundaki filtre, Apache Spark tarafından bellekte uygulanır.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Böylece, BigQuery depolama alanındaki verileri tekrar okumak yerine önbelleğe alınan verileri kullanarak başka bir wiki dili için filtre uygulayabilirsiniz. Bu şekilde, çalışma çok daha hızlı çalışır.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Önbelleği kaldırmak için
df_wiki_all.unpersist()
9. Daha fazla kullanım alanı için örnek not defterleri
Cloud Dataproc GitHub deposu; veri yüklemek, veri kaydetmek ve çeşitli Google Cloud Platform ürünleri ile açık kaynaklı araçlarla verilerinizi çizmek için yaygın Apache Spark kalıplarına sahip Jupyter not defterlerini içerir:
10. Temizleme
Bu hızlı başlangıç işlemi tamamlandıktan sonra GCP hesabınızdan gereksiz ücretlerle karşılaşmamak için:
- Ortam için oluşturduğunuz ve oluşturduğunuz Cloud Storage paketini silin.
- Dataproc ortamını silin.
Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:
- GCP Console'da Projeler sayfasına gidin.
- Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
- Kutuya proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.
Lisans
Bu çalışma, Creative Commons Attribution 3.0 Genel Lisans ve Apache 2.0 lisansı altında lisanslanmıştır.