Dataproc'ta PySpark ile BigQuery verilerini ön işleme

1. Genel Bakış

Bu kod laboratuvarında, Google Cloud Platform'da Dataproc ile Apache Spark kullanılarak veri işleme ardışık düzeninin nasıl oluşturulacağı ele alınmaktadır. Veri bilimi ve veri mühendisliğinde, verileri bir depolama konumundan okumak, üzerinde dönüşümler gerçekleştirmek ve başka bir depolama konumuna yazmak yaygın bir kullanım alanıdır. Verilerin içeriğini değiştirme, gereksiz bilgileri kaldırma ve dosya türlerini değiştirme yaygın dönüşümler arasındadır.

Bu codelab'de Apache Spark hakkında bilgi edinecek, PySpark (Apache Spark'ın Python API'si) ile Dataproc'i kullanarak örnek bir ardışık düzen çalıştıracak, BigQuery, Google Cloud Storage ve Reddit'ten alınan verileri kullanacaksınız.

2. Apache Spark'a giriş (İsteğe bağlı)

Web sitesine göre, "Apache Spark, büyük ölçekli veri işleme için birleşik bir analiz motorudur." Verileri paralel ve bellekte analiz etmenize ve işlemenize olanak tanır. Bu sayede, birden fazla farklı makine ve düğümde büyük ölçekli paralel hesaplama yapabilirsiniz. İlk olarak 2014'te geleneksel MapReduce'un yükseltmesi olarak kullanıma sunulan bu çerçeve, büyük ölçekli hesaplamalar yapmak için en popüler çerçevelerden biri olmaya devam etmektedir. Apache Spark, Scala'da yazılmıştır ve Scala, Java, Python ve R'de API'lere sahiptir. Veriler üzerinde SQL sorguları yürütmek için Spark SQL, veri akışı için Spark Streaming, makine öğrenimi için MLlib ve grafik işleme için GraphX gibi çok sayıda kitaplık içerir. Bunların tümü Apache Spark motorunda çalışır.

32add0b6a47bafbc.png

Spark kendi başına çalışabilir veya ölçeklendirme için Yarn, Mesos ya da Kubernetes gibi bir kaynak yönetimi hizmetinden yararlanabilir. Bu codelab için Yarn'ı kullanan Dataproc'i kullanacaksınız.

Spark'taki veriler başlangıçta belleğe RDD veya esnek dağıtılmış veri kümesi olarak yükleniyordu. O zamandan beri Spark'ta geliştirilen iki yeni sütun stili veri türü eklendi: türü belirtilmiş veri kümesi ve türü belirtilmemiş veri çerçevesi. Genel olarak, RDD'ler her tür veri için mükemmeldir. Veri kümeleri ve veri çerçeveleri ise tablo biçimindeki veriler için optimize edilmiştir. Veri kümeleri yalnızca Java ve Scala API'leriyle kullanılabildiğinden bu kod laboratuvarında PySpark Dataframe API'yi kullanmaya devam edeceğiz. Daha fazla bilgi için lütfen Apache Spark dokümanlarına bakın.

3. Kullanım Örneği

Veri mühendisleri genellikle verilerin veri bilimcilerin kolayca erişebileceği şekilde düzenlenmesini ister. Ancak veriler genellikle başlangıçta kirlidir (mevcut haliyle analizler için kullanılması zordur) ve çok faydalı olabilmesi için temizlenmesi gerekir. Buna örnek olarak, web'den kopyalanan ve garip kodlamalar veya alakasız HTML etiketleri içerebilen veriler verilebilir.

Bu laboratuvarda, BigQuery'deki Reddit yayınları biçimindeki bir veri kümesini Dataproc'te barındırılan bir Spark kümesine yükleyecek, yararlı bilgileri ayıklayacak ve işlenen verileri Google Cloud Storage'da sıkıştırılmış CSV dosyaları olarak depolayacaksınız.

be2a4551ece63bfc.png

Şirketinizdeki baş veri bilimci, ekiplerinin farklı doğal dil işleme sorunları üzerinde çalışmasını istiyor. Özellikle "r/food" alt dizinindeki verileri analiz etmek istiyorlar. Ocak 2017 ile Ağustos 2019 arasındaki bir veri doldurma işlemiyle başlayan bir veri dökümü ardışık düzeni oluşturursunuz.

4. BigQuery Storage API üzerinden BigQuery'ye erişme

tabledata.list API yöntemini kullanarak BigQuery'den veri çekmek, veri miktarı arttıkça zaman alıcı ve verimli olmayabilir. Bu yöntem, JSON nesnelerinin bir listesini döndürür ve veri kümesinin tamamını okumak için her seferinde bir sayfanın sırayla okunmasını gerektirir.

BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişme konusunda önemli iyileştirmeler sunar. Paralel veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Genel olarak bu, özellikle daha büyük veri kümelerinde önemli ölçüde iyileştirilmiş performans anlamına gelir.

Bu codelab'de, BigQuery ile Spark arasında veri okumak ve yazmak için spark-bigquery-connector'ı kullanacaksınız.

5. Proje oluşturma

console.cloud.google.com adresinde Google Cloud Platform Console'da oturum açın ve yeni bir proje oluşturun:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

Ardından, Google Cloud kaynaklarını kullanabilmek için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.

Bu kod laboratuvarını çalıştırmak birkaç dolardan fazlaya mal olmaz ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet daha yüksek olabilir. Bu codelab'in son bölümünde, projenizi temizleme konusunda size yol gösterilecektir.

Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerindeki ücretsiz deneme sürümünden yararlanabilir.

6. Ortamınızı kurma

Ardından, ortamınızı ayarlamak için aşağıdakileri yapmanız gerekir:

  • Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirme
  • Proje ayarlarını yapılandırma
  • Dataproc kümesi oluşturma
  • Google Cloud Storage paketi oluşturma

API'leri etkinleştirme ve ortamınızı yapılandırma

Cloud Console'unuzun sağ üst köşesindeki düğmeye basarak Cloud Shell'i açın.

a10c47ee6ca41c54.png

Cloud Shell yüklendikten sonra Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirmek için aşağıdaki komutları çalıştırın:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Projenizin proje kimliğini ayarlayın. Proje seçim sayfasına gidip projenizi arayarak bulabilirsiniz. Bu ad, projenizin adıyla aynı olmayabilir.

e682e8227aa3c781.png

76d45fb295728542.png

Proje kimliğinizi ayarlamak için aşağıdaki komutu çalıştırın:

gcloud config set project <project_id>

Buradaki listeden birini seçerek projenizin bölgesini ayarlayın. Örneğin, us-central1 olabilir.

gcloud config set dataproc/region <region>

Dataproc kümeniz için bir ad seçin ve bunun için bir ortam değişkeni oluşturun.

CLUSTER_NAME=<cluster_name>

Dataproc Kümesi Oluşturma

Aşağıdaki komutu yürüterek bir Dataproc kümesi oluşturun:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Bu komutun tamamlanması birkaç dakika sürer. Komutu ayrıntılı olarak incelemek için:

Bu işlem, daha önce belirttiğiniz adla bir Dataproc kümesinin oluşturulmasını başlatır. beta API'yi kullanmak, Dataproc'in Bileşen Ağ Geçidi gibi beta özelliklerini etkinleştirir.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Bu işlem, çalışanlarınız için kullanılacak makinenin türünü belirler.

--worker-machine-type n1-standard-8

Bu işlem, kümenizdeki çalışan sayısını belirler.

--num-workers 8

Bu işlem, Dataproc'in görüntü sürümünü ayarlar.

--image-version 1.5-debian

Bu işlem, kümede kullanılacak ilk kullanıma hazırlama işlemlerini yapılandırır. Burada pip ilk kullanıma hazırlama işlemini dahil ediyorsunuz.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Bu, kümeye eklenecek meta verilerdir. Burada, pip başlatma işlemi için meta veri sağlıyorsunuz.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Bu işlem, kümeye yüklenecek isteğe bağlı bileşenleri ayarlar.

--optional-components=ANACONDA

Bu işlem, Zeppelin, Jupyter veya Spark Geçmişi gibi ortak kullanıcı arayüzlerini görüntülemek için Dataproc'in Bileşen Ağ Geçidi'ni kullanmanıza olanak tanıyan bileşen ağ geçidini etkinleştirir.

--enable-component-gateway

Dataproc'e daha ayrıntılı bir giriş için lütfen bu codelab'e göz atın.

Google Cloud Storage paketi oluşturma

İş çıkışınız için bir Google Cloud Storage paketine ihtiyacınız vardır. Paketiniz için benzersiz bir ad belirleyin ve yeni bir paket oluşturmak için aşağıdaki komutu çalıştırın. Paket adları, tüm Google Cloud projeleri ve kullanıcılar için benzersizdir. Bu nedenle, bunu farklı adlarla birkaç kez denemeniz gerekebilir. ServiceException almazsanız paket başarıyla oluşturulur.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Keşfedici Veri Analizi

Ön işleme işleminizi gerçekleştirmeden önce, kullandığınız verilerin yapısı hakkında daha fazla bilgi edinmeniz gerekir. Bunu yapmak için iki veri keşif yöntemini inceleyeceksiniz. Öncelikle BigQuery web arayüzünü kullanarak bazı ham verileri görüntüleyecek, ardından PySpark ve Dataproc'i kullanarak her alt dizin için gönderi sayısını hesaplayacaksınız.

BigQuery Web Kullanıcı Arayüzünü kullanma

Verilerinizi görüntülemek için BigQuery web kullanıcı arayüzünü kullanmaya başlayın. Cloud Console'daki menü simgesinden aşağı kaydırın ve BigQuery web kullanıcı arayüzünü açmak için "BigQuery"ye basın.

242a597d7045b4da.png

Ardından, BigQuery web kullanıcı arayüzü sorgu düzenleyicisinde aşağıdaki komutu çalıştırın. Bu işlem, Ocak 2017'ye ait 10 tam veri satırı döndürür:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Sayfayı kaydırarak mevcut tüm sütunları ve bazı örnekleri görebilirsiniz. Özellikle, her yayındaki metin içeriğini temsil eden iki sütun görürsünüz: "title" ve "selftext". "selftext", yayının gövdesidir. Ayrıca, bir yayının oluşturulduğu UTC saati olan "created_utc" ve yayının bulunduğu alt dizin olan "subreddit" gibi diğer sütunlara da dikkat edin.

PySpark işi yürütme

Depoyu örnek kodla klonlamak ve doğru dizine gitmek için Cloud Shell'inizde aşağıdaki komutları çalıştırın:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Her alt dizinde kaç tane yayın olduğunu belirlemek için PySpark'ı kullanabilirsiniz. Cloud Düzenleyici'yi açıp bir sonraki adımda çalıştırmadan önce komut dosyasını cloud-dataproc/codelabs/spark-bigquery okuyabilirsiniz:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cloud Shell'inize geri dönmek için Cloud Düzenleyici'deki "Terminali Aç" düğmesini tıklayın ve ilk PySpark işinizi yürütmek için aşağıdaki komutu çalıştırın:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Bu komut, Jobs API aracılığıyla Dataproc'e iş göndermenize olanak tanır. Burada iş türünü pyspark olarak belirtiyorsunuz. Küme adını, isteğe bağlı parametreleri ve işi içeren dosyanın adını sağlayabilirsiniz. Burada, spark-bigquery-connector parametresini --jars parametresi ile birlikte işinize eklemenize olanak tanıyan --jars parametresini sağlıyorsunuz. Günlük çıkış seviyelerini --driver-log-levels root=FATAL kullanarak da ayarlayabilirsiniz. Bu, hatalar hariç tüm günlük çıkışlarını engeller. Spark günlükleri genellikle oldukça fazla bilgi içerir.

Bu işlemin çalışması birkaç dakika sürer ve nihai çıkışınız aşağıdaki gibi görünür:

6c185228db47bb18.png

8. Dataproc ve Spark kullanıcı arayüzlerini keşfetme

Dataproc'te Spark işleri çalıştırırken işlerinizin / kümelerinizin durumunu kontrol etmek için iki kullanıcı arayüzüne erişebilirsiniz. Bunlardan ilki, menü simgesini tıklayıp aşağı kaydırarak Dataproc'e ulaşabileceğiniz Dataproc kullanıcı arayüzüdür. Burada, mevcut belleğin yanı sıra bekleyen belleği ve çalışan sayısını görebilirsiniz.

6f2987346d15c8e2.png

Tamamlanan işleri görmek için işler sekmesini de tıklayabilirsiniz. Belirli bir işin iş kimliğini tıklayarak günlükler ve bu işlerin çıkışı gibi iş ayrıntılarını görebilirsiniz. 114d90129b0e4c88.png

1b2160f0f484594a.png

Spark kullanıcı arayüzünü de görüntüleyebilirsiniz. İş sayfasında geri oku ve ardından Web Arayüzleri'ni tıklayın. Bileşen ağ geçidi altında çeşitli seçenekler görürsünüz. Bunların çoğu, kümenizi oluştururken İsteğe Bağlı Bileşenler aracılığıyla etkinleştirilebilir. Bu laboratuvar için "Spark Geçmiş Sunucusu"nu tıklayın.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Bu işlemle aşağıdaki pencere açılır:

8f6786760f994fe8.png

Tamamlanmış tüm işler burada gösterilir. İş hakkında daha fazla bilgi edinmek için herhangi bir application_id değerini tıklayabilirsiniz. Benzer şekilde, şu anda çalışan tüm işleri görüntülemek için açılış sayfasının en altındaki "Tamamlanmamış Başvuruları Göster"i tıklayabilirsiniz.

9. Doldurma işinizi çalıştırma

Artık verileri belleğe yükleyen, gerekli bilgileri ayıklayan ve çıkışı bir Google Cloud Storage paketine aktaran bir iş çalıştıracaksınız. Her Reddit yorumu için "title", "body" (ham metin) ve "timestamp created" değerlerini ayıklayacaksınız. Ardından bu verileri alıp CSV'ye dönüştürür, sıkıştırır ve gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz URI'sine sahip bir pakete yüklersiniz.

cloud-dataproc/codelabs/spark-bigquery/backfill.py'teki kodu yürüten bir sarmalayıcı komut dosyası olan cloud-dataproc/codelabs/spark-bigquery/backfill.sh için kodu okumak üzere Cloud Düzenleyici'ye tekrar göz atabilirsiniz.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Kısa süre içinde bir dizi iş tamamlanma mesajı görürsünüz. İşin tamamlanması 15 dakikayı bulabilir. Verilerin başarıyla aktarıldığını doğrulamak için gsutil'i kullanarak depolama paketinizi tekrar kontrol edebilirsiniz. Tüm işler tamamlandığında aşağıdaki komutu çalıştırın:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Aşağıdaki çıkışı göreceksiniz:

a7c3c7b2e82f9fca.png

Tebrikler, Reddit yorum verileriniz için doldurma işlemini başarıyla tamamladınız. Bu verileri temel alarak nasıl model oluşturabileceğinizi öğrenmek istiyorsanız lütfen Spark-NLP codelab'ine geçin.

10. Temizleme

Bu hızlı başlangıç kılavuzunun tamamlanmasının ardından GCP hesabınızdan gereksiz ücretler alınmasını önlemek için:

  1. Ortam için oluşturduğunuz Cloud Storage paketini silin.
  2. Dataproc ortamını silin.

Yalnızca bu kod laboratuvarının kullanımı için bir proje oluşturduysanız dilerseniz projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve projeyi silmek için Kapat'ı tıklayın.

Lisans

Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 Lisansı ile lisanslanmıştır.