1. Genel Bakış
Bu codelab'de, Apache Spark ile Dataproc'u kullanarak Google Cloud Platform'da veri işleme ardışık düzeni oluşturma konusu ele alınacaktır. Veri biliminde ve veri mühendisliğinde, verileri bir depolama konumundan okuma, üzerinde dönüşümler gerçekleştirme ve başka bir depolama konumuna yazma yaygın bir kullanım alanıdır. Yaygın dönüşümler arasında verilerin içeriğini değiştirme, gereksiz bilgileri kaldırma ve dosya türlerini değiştirme yer alır.
Bu codelab'de Apache Spark hakkında bilgi edinecek, Dataproc'u PySpark (Apache Spark'ın Python API'si), BigQuery, Google Cloud Storage ve Reddit'ten alınan verilerle kullanarak örnek bir ardışık düzen çalıştıracaksınız.
2. Apache Spark'a giriş (isteğe bağlı)
Web sitesine göre "Apache Spark, büyük ölçekli veri işleme için birleşik bir analiz motorudur." Verileri paralel ve bellek içi olarak analiz edip işlemenize olanak tanır. Bu sayede, birden fazla farklı makine ve düğümde büyük ölçekli paralel hesaplama yapılabilir. İlk olarak 2014'te geleneksel MapReduce'e yükseltme olarak yayınlanan Spark, büyük ölçekli hesaplamalar yapmak için hâlâ en popüler çerçevelerden biridir. Apache Spark, Scala ile yazılmıştır ve Scala, Java, Python ve R'de API'leri vardır. Veriler üzerinde SQL sorguları yürütmek için Spark SQL, verileri yayınlamak için Spark Streaming, makine öğrenimi için MLlib ve grafik işleme için GraphX gibi çok sayıda kitaplık içerir. Bu kitaplıkların tümü Apache Spark motorunda çalışır.

Spark kendi başına çalışabilir veya ölçeklendirme için Yarn, Mesos ya da Kubernetes gibi bir kaynak yönetimi hizmetinden yararlanabilir. Bu codelab'de Yarn'ı kullanan Dataproc'u kullanacaksınız.
Spark'taki veriler başlangıçta belleğe RDD (esnek dağıtımlı veri kümesi) adı verilen bir yapıya yükleniyordu. Spark'taki geliştirme çalışmaları kapsamında, iki yeni sütun tarzı veri türü eklenmiştir: türü belirtilmiş olan Veri Kümesi ve türü belirtilmemiş olan Veri Çerçevesi. Kabaca söylemek gerekirse RDD'ler her tür veri için uygundur. Veri kümeleri ve veri çerçeveleri ise tablo biçimindeki veriler için optimize edilmiştir. Veri kümeleri yalnızca Java ve Scala API'leriyle kullanılabildiğinden bu codelab'de PySpark Dataframe API'yi kullanmaya devam edeceğiz. Daha fazla bilgi için lütfen Apache Spark belgelerini inceleyin.
3. Kullanım Örneği
Veri mühendisleri genellikle verilerin veri bilimciler tarafından kolayca erişilebilir olmasını ister. Ancak veriler genellikle başlangıçta kirlidir (mevcut durumda analiz için kullanılması zordur) ve çok fazla kullanılabilmesi için temizlenmesi gerekir. Bunun bir örneği, web'den kopyalanmış ve garip kodlamalar veya gereksiz HTML etiketleri içerebilen verilerdir.
Bu laboratuvarda, Reddit gönderileri biçiminde BigQuery'den bir veri kümesini Dataproc'ta barındırılan bir Spark kümesine yükleyecek, yararlı bilgileri ayıklayacak ve işlenen verileri Google Cloud Storage'da sıkıştırılmış CSV dosyaları olarak depolayacaksınız.

Şirketinizdeki baş veri bilimci, ekiplerinin farklı doğal dil işleme sorunları üzerinde çalışmasını istiyor. Özellikle "r/food" adlı alt dizindeki verileri analiz etmek istiyorlar. Ocak 2017'den Ağustos 2019'a kadar olan bir geri doldurma işlemiyle başlayarak veri dökümü için bir ardışık düzen oluşturacaksınız.
4. BigQuery Storage API aracılığıyla BigQuery'ye erişme
tabledata.list API yöntemini kullanarak BigQuery'den veri çekmek, veri miktarı arttıkça zaman alıcı ve verimli olmayabilir. Bu yöntem, JSON nesnelerinin bir listesini döndürür ve tüm veri kümesini okumak için sayfaların tek tek okunmasını gerektirir.
BigQuery Storage API, RPC tabanlı bir protokol kullanarak BigQuery'deki verilere erişimde önemli iyileştirmeler sağlar. Paralel veri okuma ve yazma işlemlerinin yanı sıra Apache Avro ve Apache Arrow gibi farklı serileştirme biçimlerini destekler. Bu, genel olarak özellikle daha büyük veri kümelerinde performansın önemli ölçüde iyileştirilmesi anlamına gelir.
Bu codelab'de, BigQuery ile Spark arasında veri okuma ve yazma işlemleri için spark-bigquery-connector'ı kullanacaksınız.
5. Proje oluşturma
console.cloud.google.com adresinden Google Cloud Platform Console'da oturum açın ve yeni bir proje oluşturun:



Ardından, Google Cloud kaynaklarını kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir.
Bu codelab'i tamamlamak birkaç dolardan fazla tutmaz. Ancak daha fazla kaynak kullanmaya karar verirseniz veya kaynakları çalışır durumda bırakırsanız maliyet artabilir. Bu codelab'in son bölümünde projenizi temizleme adımları açıklanmaktadır.
Google Cloud Platform'un yeni kullanıcıları 300 ABD doları değerinde ücretsiz deneme sürümünden yararlanabilir.
6. Ortamınızı Kurma
Şimdi ortamınızı kurmak için aşağıdaki adımları uygulayacaksınız:
- Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirme
- Proje ayarlarını yapılandırma
- Dataproc kümesi oluşturma
- Google Cloud Storage paketi oluşturma
API'leri Etkinleştirme ve Ortamınızı Yapılandırma
Cloud Console'unuzun sağ üst köşesindeki düğmeye basarak Cloud Shell'i açın.

Cloud Shell yüklendikten sonra Compute Engine, Dataproc ve BigQuery Storage API'lerini etkinleştirmek için aşağıdaki komutları çalıştırın:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
Projenizin proje kimliğini ayarlayın. Proje seçimi sayfasına gidip projenizi arayarak bulabilirsiniz. Bu ad, proje adınızla aynı olmayabilir.


Proje kimliğinizi ayarlamak için aşağıdaki komutu çalıştırın:
gcloud config set project <project_id>
Bölge ayarını yapmak için buradaki listeden bir bölge seçin. Örneğin, us-central1.
gcloud config set dataproc/region <region>
Dataproc kümeniz için bir ad seçin ve bu adla ilgili bir ortam değişkeni oluşturun.
CLUSTER_NAME=<cluster_name>
Dataproc kümesi oluşturma
Aşağıdaki komutu çalıştırarak bir Dataproc kümesi oluşturun:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
Bu komutun tamamlanması birkaç dakika sürer. Komutu parçalamak için:
Bu işlem, daha önce sağladığınız adla bir Dataproc kümesinin oluşturulmasını başlatır. beta API'yi kullanmak, Component Gateway gibi Dataproc'un beta özelliklerini etkinleştirir.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
Bu işlem, çalışanlarınız için kullanılacak makine türünü ayarlar.
--worker-machine-type n1-standard-8
Bu işlem, kümenizin sahip olacağı çalışan sayısını belirler.
--num-workers 8
Bu işlem, Dataproc'un görüntü sürümünü ayarlar.
--image-version 1.5-debian
Bu işlem, kümede kullanılacak ilk kullanıma hazırlama işlemlerini yapılandırır. Burada, pip ilk kullanıma hazırlama işlemini dahil ediyorsunuz.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
Bu, kümeye eklenecek meta verilerdir. Burada, pip başlatma işlemi için meta veri sağlıyorsunuz.
--metadata 'PIP_PACKAGES=google-cloud-storage'
Bu işlem, kümeye yüklenecek isteğe bağlı bileşenleri ayarlar.
--optional-components=ANACONDA
Bu işlem, Zeppelin, Jupyter veya Spark Geçmişi gibi yaygın kullanıcı arayüzlerini görüntülemek için Dataproc'un Bileşen Ağ Geçidi'ni kullanmanıza olanak tanıyan bileşen ağ geçidini etkinleştirir.
--enable-component-gateway
Dataproc'a daha ayrıntılı bir giriş için lütfen bu codelab'i inceleyin.
Google Cloud Storage paketi oluşturma
İş çıkışınız için bir Google Cloud Storage paketi gerekir. Paketiniz için benzersiz bir ad belirleyin ve yeni bir paket oluşturmak üzere aşağıdaki komutu çalıştırın. Paket adları, tüm kullanıcılar için tüm Google Cloud projelerinde benzersizdir. Bu nedenle, farklı adlarla birkaç kez denemeniz gerekebilir. ServiceException almazsanız paket başarıyla oluşturulur.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. Keşfedici Veri Analizi
Ön işleme yapmadan önce, ele aldığınız verilerin yapısı hakkında daha fazla bilgi edinmeniz gerekir. Bunu yapmak için iki veri keşfi yöntemini inceleyeceksiniz. Öncelikle BigQuery web arayüzünü kullanarak bazı ham verileri görüntüleyecek, ardından PySpark ve Dataproc'u kullanarak her bir alt dizin için gönderi sayısını hesaplayacaksınız.
BigQuery Web Kullanıcı Arayüzünü kullanma
Verilerinizi görüntülemek için BigQuery web kullanıcı arayüzünü kullanarak başlayın. Cloud Console'daki menü simgesinden aşağı kaydırıp "BigQuery"ye basarak BigQuery web kullanıcı arayüzünü açın.

Ardından, BigQuery web kullanıcı arayüzü sorgu düzenleyicisinde aşağıdaki komutu çalıştırın. Bu işlev, Ocak 2017'den itibaren 10 tam veri satırı döndürür:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;

Kullanılabilir tüm sütunları ve bazı örnekleri görmek için sayfayı kaydırabilirsiniz. Özellikle, her yayının metin içeriğini temsil eden iki sütun görürsünüz: "title" (başlık) ve "selftext" (yayın gövdesi). Ayrıca, gönderinin oluşturulduğu UTC saati olan "created_utc" ve gönderinin bulunduğu alt dizin olan "subreddit" gibi diğer sütunlara da dikkat edin.
PySpark işi yürütme
Örnek kodu içeren depoyu klonlamak ve doğru dizine gitmek için Cloud Shell'de aşağıdaki komutları çalıştırın:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
Her bir alt dizin için kaç gönderi olduğunu belirlemek üzere PySpark'ı kullanabilirsiniz. Cloud Editor'ı açabilir ve bir sonraki adımda çalıştırmadan önce komut dosyasını cloud-dataproc/codelabs/spark-bigquery okuyabilirsiniz:


Cloud Shell'e geri dönmek için Cloud Editor'da "Open Terminal" (Terminali Aç) düğmesini tıklayın ve ilk PySpark işinizi çalıştırmak için aşağıdaki komutu çalıştırın:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
Bu komut, Jobs API aracılığıyla Dataproc'a iş göndermenize olanak tanır. Burada iş türünü pyspark olarak belirtiyorsunuz. Küme adını, isteğe bağlı parametreleri ve işi içeren dosyanın adını sağlayabilirsiniz. Burada, --jars parametresini sağlıyorsunuz. Bu parametre, spark-bigquery-connector öğesini işinize dahil etmenize olanak tanır. Hatalar hariç tüm günlük çıkışlarını bastıracak olan --driver-log-levels root=FATAL kullanarak günlük çıkış düzeylerini de ayarlayabilirsiniz. Spark günlükleri genellikle oldukça gürültülüdür.
Bu işlemin çalışması birkaç dakika sürer ve nihai çıkışınız aşağıdaki gibi görünür:

8. Dataproc ve Spark kullanıcı arayüzlerini keşfetme
Dataproc'ta Spark işleri çalıştırırken işlerinizin / kümelerinizin durumunu kontrol etmek için iki kullanıcı arayüzüne erişebilirsiniz. İlk seçenek olan Dataproc kullanıcı arayüzünü, menü simgesini tıklayıp Dataproc'a kaydırarak bulabilirsiniz. Burada, kullanılabilir mevcut belleğin yanı sıra bekleyen bellek ve çalışan sayısını görebilirsiniz.

Tamamlanan işleri görmek için işler sekmesini de tıklayabilirsiniz. Belirli bir işin iş kimliğini tıklayarak bu işlerin günlükleri ve çıkışları gibi iş ayrıntılarını görebilirsiniz. 

Spark kullanıcı arayüzünü de görüntüleyebilirsiniz. İş sayfasında geri okunu ve ardından Web Arayüzleri'ni tıklayın. Bileşen ağ geçidi altında çeşitli seçenekler görürsünüz. Bunların çoğu, kümenizi oluştururken İsteğe Bağlı Bileşenler aracılığıyla etkinleştirilebilir. Bu laboratuvarda "Spark History Server"ı tıklayın.


Aşağıdaki pencere açılır:

Tamamlanan tüm işler burada gösterilir. İş hakkında daha fazla bilgi edinmek için herhangi bir application_id değerini tıklayabilirsiniz. Benzer şekilde, şu anda yayınlanan tüm işleri görüntülemek için açılış sayfasının en altındaki "Eksik başvuruları göster"i tıklayabilirsiniz.
9. Doldurma işinizi çalıştırma
Artık verileri belleğe yükleyen, gerekli bilgileri ayıklayan ve çıkışı bir Google Cloud Storage paketine boşaltan bir iş çalıştıracaksınız. Her Reddit yorumu için "başlık", "gövde" (ham metin) ve "oluşturma zaman damgası" bilgilerini ayıklayacaksınız. Ardından bu verileri alıp CSV'ye dönüştürür, sıkıştırır ve gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz URI'sine sahip bir pakete yüklersiniz.
cloud-dataproc/codelabs/spark-bigquery/backfill.sh için kodu okumak üzere Cloud Editor'a tekrar başvurabilirsiniz. Bu kod, cloud-dataproc/codelabs/spark-bigquery/backfill.py içinde kodu yürütmek için kullanılan bir sarmalayıcı komut dosyasıdır.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
Kısa süre içinde bir dizi iş tamamlama mesajı görürsünüz. İşin tamamlanması 15 dakikayı bulabilir. Başarılı veri çıkışını doğrulamak için gsutil'i kullanarak depolama paketinizi de kontrol edebilirsiniz. Tüm işler tamamlandıktan sonra aşağıdaki komutu çalıştırın:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
Aşağıdaki çıkışı göreceksiniz:

Tebrikler, Reddit yorumları verileriniz için yedek doldurma işlemini başarıyla tamamladınız. Bu verilerin üzerine nasıl modeller oluşturabileceğinizi öğrenmek istiyorsanız lütfen Spark-NLP codelab'ine geçin.
10. Temizleme
Bu hızlı başlangıç kılavuzu tamamlandıktan sonra GCP hesabınızın gereksiz yere ücretlendirilmesini önlemek için:
- Ortam için Cloud Storage paketini silin ve oluşturduğunuz
- Dataproc ortamını silin.
Bu codelab için özel olarak bir proje oluşturduysanız isteğe bağlı olarak projeyi de silebilirsiniz:
- GCP Console'da Projeler sayfasına gidin.
- Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
- Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.
Lisans
Bu çalışma, Creative Commons Attribution 3.0 Genel Amaçlı Lisans ve Apache 2.0 lisansı ile lisanslanmıştır.