BigQuery Verilerini Dataproc'ta PySpark ile önceden işleme

1. Genel Bakış

Bu codelab'de, Google Cloud Platform'da Dataproc ile Apache Spark'ı kullanarak veri işleme ardışık düzeni oluşturma işlemi açıklanmaktadır. Bir depolama konumundan veri okumak, üzerinde dönüşüm gerçekleştirmek ve verileri başka bir depolama konumuna yazmak, veri bilimi ve veri mühendisliği alanında yaygın bir kullanım alanıdır. Yaygın dönüştürme işlemleri arasında veri içeriğinin değiştirilmesi, gereksiz bilgilerin çıkarılması ve dosya türlerinin değiştirilmesi bulunur.

Bu codelab'de Apache Spark hakkında bilgi edinecek, Dataproc'u PySpark ile birlikte kullanarak örnek bir ardışık düzen çalıştırmayı (Apache Spark'ın Python API'si), BigQuery'yi, Google Cloud Storage'ı ve Reddit verilerini öğreneceksiniz.

2. Apache Spark'a Giriş (İsteğe bağlı)

Web sitesine göre, " Apache Spark, büyük ölçekli veri işleme için birleştirilmiş bir analiz motorudur." Verileri paralel ve bellekte analiz edip işleyebilmenizi sağlar. Böylece birden fazla farklı makine ve düğümde çok kapsamlı paralel hesaplamalar yapabilirsiniz. İlk olarak 2014'te geleneksel MapReduce'a yükseltme olarak yayınlandı ve hâlâ büyük ölçekli hesaplamalar yapmak için kullanılan en popüler çerçevelerden biri. Apache Spark, Scala'da yazılmıştır ve sonrasında Scala, Java, Python ve R'de API'lere sahiptir. Verilerde SQL sorguları gerçekleştirmek için Spark SQL, veri akışı için Spark Streaming, makine öğrenimi için MLlib ve grafik işleme için GraphX gibi çok sayıda kitaplık içerir. Bunların tümü Apache Spark motorunda çalışır.

32add0b6a47bafbc.png

Spark tek başına çalışabilir veya ölçeklendirme için Yarn, Mesos ya da Kubernetes gibi bir kaynak yönetim hizmetinden yararlanabilir. Bu codelab için Yarn'den yararlanan Dataproc'u kullanacaksınız.

Spark'taki veriler başlangıçta RDD veya esnek dağıtılan veri kümesi olarak adlandırılan belleğe yüklendi. Spark'taki geliştirmeler o zamandan beri sütunsal stilde iki yeni veri türünün eklenmesini içeriyordu: yazılan Veri Kümesi ve türü yazılmamış Veri Çerçevesi. Kısacası RDD'ler her tür veri için idealdir. Veri kümeleri ve Veri Çerçeveleri ise tablo halindeki veriler için optimize edilmiştir. Veri kümeleri yalnızca Java ve Scala API'leriyle kullanılabildiğinden bu codelab için PySpark Dataframe API'sini kullanacağız. Daha fazla bilgi için lütfen Apache Spark belgelerine bakın.

3. Kullanım Örneği

Veri mühendisleri genellikle veri bilimcilerin verilere kolayca erişebilmesine ihtiyaç duyar. Ancak veriler genellikle başlangıçta kirli olur (mevcut haliyle analiz için kullanılması zordur) ve kullanılmadan önce temizlenmesi gerekir. Buna örnek olarak tuhaf kodlamalar veya gereksiz HTML etiketleri içerebilen web'den kazınmış veriler gösterilebilir.

Bu laboratuvarda, BigQuery'den bir veri kümesi Reddit yayını biçiminde Dataproc'ta barındırılan bir Spark kümesine yükleyecek, yararlı bilgileri çıkaracak ve işlenen verileri sıkıştırılmış CSV dosyaları olarak Google Cloud Storage'da depolayacaksınız.

be2a4551ece63bfc.png

Şirketinizin baş veri bilimci, ekiplerinin farklı doğal dil işleme sorunları üzerinde çalışmasını sağlamak istiyor. Özellikle de "r/food" alt redditindeki verileri analiz etmek istiyorlar. Ocak 2017 ile Ağustos 2019 arasında bir dolguyla başlayarak veri dökümü için ardışık düzen oluşturacaksınız.

4. BigQuery Storage API aracılığıyla BigQuery'ye erişme

tabledata.list API yöntemini kullanarak BigQuery'den veri çekmek zaman alabilir ve veri miktarı arttıkça verimsiz olabilir. Bu yöntem, JSON nesnelerinin bir listesini döndürür ve bir veri kümesinin tamamını okumak için her seferinde bir sayfa sırayla okunmasını gerektirir.

BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimle ilgili önemli iyileştirmeler sunar. Paralel olarak veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Yüksek seviyede bu, özellikle büyük veri kümelerinde performansın önemli ölçüde artması anlamına gelir.

Bu codelab'de, BigQuery ve Spark arasında veri okumak ve yazmak için spark-bigquery-connector'ı kullanacaksınız.

5. Proje Oluşturma

console.cloud.google.com adresinden Google Cloud Platform konsolunda oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Sonraki adımda, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu codelab'i çalıştırmanın maliyeti birkaç dolardan fazla olmayacaktır. Ancak daha fazla kaynak kullanmaya karar verirseniz veya bunları çalışır durumda bırakırsanız daha yüksek ücret ödemeniz gerekebilir. Bu codelab'in son bölümü, projenizi temizleme konusunda size yol gösterecektir.

Yeni Google Cloud Platform kullanıcıları 300 ABD doları değerinde ücretsiz denemeden yararlanabilir.

6. Ortamınızı Ayarlama

Şimdi, ortamınızı ayarlamak için aşağıdaki adımları uygulayın:

  • Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirme
  • Proje ayarlarını yapılandırma
  • Dataproc kümesi oluşturma
  • Google Cloud Storage paketi oluşturma

API'leri Etkinleştirme ve Ortamınızı Yapılandırma

Cloud Console'un sağ üst köşesindeki düğmeye basarak Cloud Shell'i açın.

a10c47ee6ca41c54.png

Cloud Shell yüklendikten sonra Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirmek için aşağıdaki komutları çalıştırın:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Projenizin proje kimliğini ayarlayın. Proje seçimi sayfasına gidip projenizi arayarak bulabilirsiniz. Bu ad, proje adınızla aynı olmayabilir.

e682e8227aa3c781.png

76d45fb295728542.png

Proje kimliğinizi ayarlamak için aşağıdaki komutu çalıştırın:

gcloud config set project <project_id>

Buradaki listeden bir bölge seçerek projenizin bölgesini ayarlayın. us-central1 buna örnek olarak gösterilebilir.

gcloud config set dataproc/region <region>

Dataproc kümeniz için bir ad seçin ve bu küme için bir ortam değişkeni oluşturun.

CLUSTER_NAME=<cluster_name>

Dataproc Kümesi oluşturma

Aşağıdaki komutu çalıştırarak Dataproc kümesi oluşturun:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Bu komutun tamamlanması birkaç dakika sürer. Komutun dökümünü almak için:

Bu işlem, daha önce verdiğiniz adla Dataproc kümesi oluşturma işlemini başlatır. beta API kullanıldığında, Dataproc'un Bileşen Ağ Geçidi gibi beta özellikleri etkinleştirilir.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Bu, çalışanlarınız için kullanılacak makinenin türünü belirler.

--worker-machine-type n1-standard-8

Bu, kümenizde bulunacak çalışan sayısını belirleyecektir.

--num-workers 8

Bu işlem, Dataproc'un görüntü sürümünü belirler.

--image-version 1.5-debian

Bu, kümede kullanılacak ilk kullanıma hazırlama işlemlerini yapılandırır. Burada, pip başlatma işlemini eklersiniz.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Bu, kümeye eklenecek meta veridir. Burada, pip ilk kullanıma hazırlama işlemi için meta veri sağlıyorsunuz.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Bu, kümeye yüklenecek İsteğe Bağlı Bileşenler'i ayarlar.

--optional-components=ANACONDA

Bu işlem; Zeppelin, Jupyter veya Spark History gibi ortak kullanıcı arayüzlerini görüntülemek için Dataproc'un Bileşen Ağ Geçidi'ni kullanmanıza olanak tanıyan bileşen ağ geçidini etkinleştirir.

--enable-component-gateway

Dataproc'a daha ayrıntılı giriş için lütfen bu codelab'e göz atın.

Google Cloud Storage Paketi Oluşturma

İş çıkışınız için bir Google Cloud Storage paketine ihtiyacınız olacak. Paketiniz için benzersiz bir ad belirleyin ve yeni paket oluşturmak için aşağıdaki komutu çalıştırın. Paket adları, tüm Google Cloud projelerinde tüm kullanıcılar için benzersizdir. Bu nedenle, bunu farklı adlarla birkaç kez denemeniz gerekebilir. ServiceException almazsanız başarıyla bir paket oluşturulur.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Keşif Amaçlı Veri Analizi

Ön işlemenizi gerçekleştirmeden önce incelediğiniz verilerin yapısı hakkında daha fazla bilgi edinmeniz gerekir. Bunun için, veri keşfinin iki yöntemini inceleyeceksiniz. İlk olarak BigQuery web kullanıcı arayüzünü kullanarak bazı ham verileri görüntüleyecek, ardından PySpark ve Dataproc'u kullanarak subreddit başına yayın sayısını hesaplayacaksınız.

BigQuery Web Kullanıcı Arayüzünü kullanma

Verilerinizi görüntülemek için BigQuery web kullanıcı arayüzünü kullanarak başlayın. Cloud Console'daki menü simgesinden aşağı kaydırıp "BigQuery"ye basın. BigQuery Web Kullanıcı Arayüzünü açın.

242a597d7045b4da.png

Ardından, BigQuery Web Kullanıcı Arayüzü Sorgu Düzenleyici'de aşağıdaki komutu çalıştırın. Bu, Ocak 2017'ye ait 10 tam veri satırını döndürür:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Kullanılabilir tüm sütunları ve bazı örnekleri görmek için sayfayı kaydırabilirsiniz. Özellikle, her yayının metin içeriğini temsil eden iki sütun göreceksiniz: "title" ve "selftext", ikincisi gönderinin gövdesidir. "generate_utc" gibi diğer sütunlara da dikkat edin Bu, bir yayının oluşturulduğu son tarih ve "subreddit"tir. Bu, yayının bulunduğu alt reddit'tir.

PySpark İşi Yürütme

Depoyu örnek kod ve cd ile doğru dizine klonlamak için Cloud Shell'inizde aşağıdaki komutları çalıştırın:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Her subreddit'te kaç yayın olduğunu belirlemek için PySpark'ı kullanabilirsiniz. Sonraki adımda Cloud Editor'ı açıp çalıştırmadan önce komut dosyasını cloud-dataproc/codelabs/spark-bigquery okuyabilirsiniz:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

"Terminali aç"ı tıklayın düğmesini tıklayarak Cloud Shell'inize geri dönün ve ilk PySpark işinizi yürütmek için aşağıdaki komutu çalıştırın:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Bu komut, Jobs API aracılığıyla Dataproc'a iş göndermenizi sağlar. Burada, iş türünü pyspark olarak gösteriyorsunuz. Küme adını, isteğe bağlı parametreleri ve işi içeren dosyanın adını sağlayabilirsiniz. Burada, spark-bigquery-connector öğesini işinize dahil etmenize olanak tanıyan --jars parametresini sağlarsınız. Ayrıca, günlük çıkış düzeylerini --driver-log-levels root=FATAL kullanarak da ayarlayabilirsiniz. Bu işlem, hatalar hariç tüm günlük çıkışını engeller. Spark günlükleri genelde karmaşıktır.

Bu çalıştırılması birkaç dakika sürer ve nihai çıkışınız şuna benzer olacaktır:

6c185228db47bb18.png

8. Dataproc ve Spark kullanıcı arayüzlerini keşfetme

Dataproc'ta Spark işlerini çalıştırırken işlerinizin / kümelerinizin durumunu kontrol etmek için iki kullanıcı arayüzüne erişebilirsiniz. İlki, Dataproc kullanıcı arayüzü. Bu arayüzü, menü simgesini tıklayıp aşağı kaydırarak Dataproc'a giderek bulabilirsiniz. Burada, mevcut belleğin yanı sıra bekleyen belleği ve çalışan sayısını görebilirsiniz.

6f2987346d15c8e2.png

Tamamlanan işleri görmek için işler sekmesini de tıklayabilirsiniz. Belirli bir işin İş Kimliği'ni tıklayarak bu işlerin günlükleri ve çıkışı gibi iş ayrıntılarını görebilirsiniz. 114d90129b0e4c88.png.

1b2160f0f484594a.png

Spark kullanıcı arayüzünü de görüntüleyebilirsiniz. İş sayfasından geri okunu ve ardından Web Arayüzleri'ni tıklayın. Bileşen ağ geçidi altında birkaç seçenek göreceksiniz. Bunların birçoğu, kümenizi ayarlarken İsteğe Bağlı Bileşenler aracılığıyla etkinleştirilebilir. Bu laboratuvar için "Spark History Server

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Aşağıdaki pencere açılır:

8f6786760f994fe8.png

Tamamlanan tüm işler burada gösterilir. Bir app_id öğesini tıklayarak o iş hakkında daha fazla bilgi edinebilirsiniz. Benzer şekilde, "Tamamlanmayan Uygulamaları Göster"i de tıklayabilirsiniz 'nı kullanabilirsiniz.

9. Dolgu İşinizi Çalıştırma

Şimdi belleğe veri yükleyen, gerekli bilgileri çıkaran ve çıktıyı bir Google Cloud Storage paketine aktaran bir iş çalıştıracaksınız. "Başlık", "gövde" ve (ham metin) ve "zaman damgası oluşturuldu" tıklayın. Daha sonra, bu verileri alıp CSV'ye dönüştürecek, sıkıştıracak ve gs://${BUCKET_NAME}/reddit_post/YYYY/MM/food.csv.gz URI'siyle birlikte bir pakete yükleyeceksiniz.

cloud-dataproc/codelabs/spark-bigquery/backfill.py içindeki kodu yürüten bir sarmalayıcı komut dosyası olan cloud-dataproc/codelabs/spark-bigquery/backfill.sh kodunu okumak için Cloud Editor'a tekrar bakabilirsiniz.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Kısa süre içinde birkaç iş tamamlama mesajı göreceksiniz. İşin tamamlanması 15 dakika kadar sürebilir. Ayrıca, gsutil'i kullanarak başarılı veri çıkışını doğrulamak için depolama alanı grubunuzu tekrar kontrol edebilirsiniz. Tüm işler tamamlandıktan sonra aşağıdaki komutu çalıştırın:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Aşağıdaki çıkışı göreceksiniz:

a7c3c7b2e82f9fca.png

Tebrikler, reddit yorum verilerinizin doldurulmasını başarıyla tamamladınız! Bu verilerin üzerinde model oluşturmak istiyorsanız lütfen Spark-NLP codelab'ine geçin.

10. Temizleme

Bu hızlı başlangıç işlemi tamamlandıktan sonra GCP hesabınızdan gereksiz ücretlerle karşılaşmamak için:

  1. Ortam için oluşturduğunuz ve oluşturduğunuz Cloud Storage paketini silin.
  2. Dataproc ortamını silin.

Sadece bu codelab için proje oluşturduysanız dilerseniz projeyi silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Lisans ve Apache 2.0 lisansı altında lisanslanmıştır.