Not defterlerini Google Cloud Dataflow ile Kullanma

1. Giriş

Cloud-Dataflow.png

Google Cloud Dataflow

Son güncelleme: 5 Temmuz 2023

Dataflow nedir?

Dataflow, çok çeşitli veri işleme kalıpları yürütmek için yönetilen bir hizmettir. Bu sitedeki belgeler, hizmet özelliklerini kullanma talimatları da dahil olmak üzere toplu veri ve akış verisi işleme ardışık düzenlerinizi Dataflow kullanarak nasıl dağıtacağınızı gösterir.

Apache Beam SDK, hem toplu hem de akış ardışık düzenleri geliştirmenizi sağlayan açık kaynaklı bir programlama modelidir. Ardışık düzenlerinizi Apache Beam programıyla oluşturur ve bunları Dataflow hizmetinde çalıştırırsınız. Apache Beam belgeleri; Apache Beam programlama modeli, SDK'lar ve diğer koşucular için ayrıntılı kavramsal bilgiler ve referans malzemeleri sağlar.

Hızlı akışlı veri analizi

Dataflow, daha düşük veri gecikmesiyle hızlı ve basitleştirilmiş akış veri ardışık düzeni geliştirilebilmesini sağlar.

Operasyonları ve yönetimi basitleştirme

Dataflow'un sunucusuz yaklaşımının veri mühendisliği iş yüklerindeki işlem ek yükünü ortadan kaldırması sayesinde ekiplerin sunucu kümelerini yönetmek yerine programlamaya odaklanmasını sağlayın.

Toplam sahip olma maliyetini azaltma

Otomatik kaynak ölçeklendirme ile maliyet açısından optimize edilmiş toplu işleme özelliğinin bir arada sunulması, Dataflow'un sezonluk ve ani artışlar gösteren iş yüklerinizi aşırı harcama yapmadan yönetme konusunda neredeyse sınırsız bir kapasite sunduğu anlamına gelir.

Temel özellikler

Otomatik kaynak yönetimi ve dinamik iş dengeleme

Dataflow, gecikmeyi en aza indirmek ve kullanımı en üst düzeye çıkarmak için kaynak işleme süreçlerinin sağlanmasını ve yönetimini otomatik hale getirir. Böylece, örnekleri hızlandırmanız veya elle ayırmanız gerekmez. İş bölümlendirme, geciken işleri dinamik olarak yeniden dengelemek için otomatik hale getirilmiş ve optimize edilmiştir. "Kısayol tuşları" aramanıza gerek yok veya giriş verilerinizi önceden işleyin.

Yatay otomatik ölçeklendirme

Optimum işleme hızı için çalışan kaynaklarının yatay olarak otomatik ölçeklendirilmesi, genel fiyat-performans oranını artırır.

Toplu işlem için esnek kaynak planlama fiyatlandırması

Gece devam etmesi gereken işlerde olduğu gibi iş planlama süresinde esneklik ile işlem yapmak için esnek kaynak planlama (FlexRS) imkanı, toplu işlem için daha düşük bir fiyat sunar. Bu esnek işler, altı saatlik süre içinde yürütmeye alınacakları garantisiyle sıraya sokulur.

Bu program kapsamında uygulayacağınız adımlar

Apache Beam etkileşimli çalıştırıcısını JupyterLab not defterleriyle kullanmanız, ardışık düzenleri geliştirmenize, ardışık düzen grafiğinizi incelemenize ve PCollection'ları bir okuma-eval-yazdırma döngüsü (REPL) iş akışı ile ayrıştırmanıza olanak tanır. Bu Apache Beam not defterleri, en son veri bilimi ve makine öğrenimi çerçeveleriyle önceden yüklenmiş not defteri sanal makinelerini barındıran yönetilen bir hizmet olan Vertex AI Workbench aracılığıyla kullanıma sunulmuştur.

Bu codelab'de, Apache Beam not defterlerinin kullanıma sunduğu işlevlere odaklanılmaktadır.

Neler öğreneceksiniz?

  • Not defteri örneği oluşturma
  • Temel ardışık düzen oluşturma
  • Sınırsız kaynaktan veri okuma
  • Verileri görselleştirme
  • Not defterinden Dataflow İşi başlatma
  • Not defteri kaydetme

Gerekenler

  • Faturalandırmanın etkin olduğu bir Google Cloud Platform projesi.
  • Google Cloud Dataflow ve Google Cloud PubSub etkinleştirilir.

2. Kurulum

  1. Cloud Console'daki proje seçici sayfasında bir Cloud projesi seçin veya oluşturun.

Aşağıdaki API'leri etkinleştirdiğinizden emin olun:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

Bunu, API'nin ve Hizmetler sayfası.

Bu kılavuzda Pub/Sub aboneliğinden verileri okuyacağız. Bu nedenle, Compute Engine varsayılan hizmet hesabına Düzenleyici rolüne sahip olduğundan emin olun veya bu hesaba Pub/Sub Düzenleyici rolü verin.

3. Apache Beam not defterlerini kullanmaya başlama

Apache Beam not defterleri örneği başlatma

  1. Konsolda Dataflow'u başlatın:

  1. Soldaki menüyü kullanarak Workbench sayfasını seçin.
  2. Kullanıcı tarafından yönetilen not defterleri sekmesinde olduğunuzdan emin olun.
  3. Araç çubuğunda Yeni Not Defteri'ni tıklayın.
  4. Apache Beam > öğesini seçin. GPU'suz.
  5. New notbook (Yeni not defteri) sayfasında, not defteri sanal makinesi için bir alt ağ seçin ve Create'i (Oluştur) tıklayın.
  6. Bağlantı etkin hale geldiğinde JupyterLab'i aç'ı tıklayın. Vertex AI Workbench, yeni bir Apache Beam not defteri örneği oluşturur.

4. Ardışık düzeni oluşturma

Not defteri örneği oluşturma

Dosya > Yeni > Not Defteri'ni tıklayın ve Apache Beam 2.47 veya sonraki sürümlere sahip bir çekirdek seçin.

Not defterinize kod eklemeye başlama

  • Her bölümdeki kodu kopyalayıp not defterinizdeki yeni hücrelere yapıştırın
  • Hücreyi çalıştır

6bd3dd86cc7cf802.png

Apache Beam etkileşimli çalıştırıcısını JupyterLab not defterleriyle kullanmanız, ardışık düzenleri geliştirmenize, ardışık düzen grafiğinizi incelemenize ve PCollection'ları bir okuma-eval-yazdırma döngüsü (REPL) iş akışı ile ayrıştırmanıza olanak tanır.

Apache Beam, not defteri örneğinize yüklü olduğu için not defterinize interactive_runner ve interactive_beam modüllerini ekleyin.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Not defteriniz başka Google hizmetlerini kullanıyorsa aşağıdaki aktarma ifadelerini ekleyin:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Etkileşim seçeneklerini ayarlama

Aşağıda, veri yakalama süresi 60 saniye olarak ayarlanmaktadır. Daha hızlı iterasyon yapmak istiyorsanız daha düşük bir süreye (ör. "10 sn") ayarlayın.

ib.options.recording_duration = '60s'

Etkileşimli ek seçenekler için activate_beam.options sınıfına bakın.

InteractiveRunner nesnesi kullanarak ardışık düzeni başlatın.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Verileri okuma ve görselleştirme

Aşağıdaki örnekte, belirtilen Pub/Sub konusuna abonelik oluşturan ve abonelikten okuma yapan bir Apache Beam ardışık düzeni gösterilmektedir.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Ardışık düzen, kelimeleri kaynaktaki pencerelere göre sayar. Her pencerenin 10 saniye uzunluğunda olacak şekilde sabit pencereler oluşturur.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Veriler pencere açıldıktan sonra kelimeler aralıkla sayılır.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Verileri görselleştirme

show() yöntemi, not defterinde elde edilen PCollection'ı görselleştirir.

ib.show(windowed_word_counts, include_window_info=True)

PCollection'ı tablo biçiminde görselleştiren şov yöntemi.

Verilerinizin görselleştirmelerini görüntülemek için visualize_data=True öğesini show() yöntemine iletin. Yeni bir hücre ekleyin:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Görselleştirmelerinize birden fazla filtre uygulayabilirsiniz. Aşağıdaki görselleştirme, etikete ve eksene göre filtreleme yapmanıza olanak tanır:

PCollection'ı zengin bir filtrelenebilir kullanıcı arayüzü öğeleri grubu olarak görselleştiren show yöntemi.

5. Pandas Veri Çerçevesini Kullanma

Apache Beam not defterlerindeki kullanışlı diğer bir görselleştirme de Pandas DataFrame'dir. Aşağıdaki örnekte ilk olarak kelimeler küçük harfe dönüştürülür ve ardından her kelimenin sıklığı hesaplanır.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() yöntemi, çıkışı bir Pandas DataFrame'de sağlar.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame'de PCollection'ı temsil eden toplama yöntemi.

6. (İsteğe bağlı) Not defterinizden Dataflow işlerini başlatma

  1. Dataflow'da iş çalıştırmak için ek izinlere ihtiyacınız var. Compute Engine varsayılan hizmet hesabının Düzenleyici rolüne sahip olduğundan emin olun veya hesaba aşağıdaki IAM rollerini verin:
  • Dataflow Yöneticisi
  • Dataflow Çalışanı
  • Storage Yöneticisi ve
  • Hizmet Hesabı Kullanıcısı (roles/iam.serviceAccountUser)

Belgelerde roller hakkında daha fazla bilgi edinin.

  1. (İsteğe bağlı) Dataflow işlerini çalıştırmak için not defterinizi kullanmadan önce çekirdeği yeniden başlatın, tüm hücreleri tekrar çalıştırın ve çıkışı doğrulayın.
  2. Aşağıdaki içe aktarma ifadelerini kaldırın:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Aşağıdaki içe aktarma deyimini ekleyin:
from apache_beam.runners import DataflowRunner
  1. Aşağıdaki kayıt süresi seçeneğini kaldırın:
ib.options.recording_duration = '60s'
  1. Ardışık düzen seçeneklerinize aşağıdakileri ekleyin. Cloud Storage konumunu, sahip olduğunuz bir pakete işaret edecek şekilde ayarlamanız gerekir. Dilerseniz bu amaçla yeni bir paket de oluşturabilirsiniz. us-central1 olan bölge değerini de değiştirebilirsiniz.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() öğesinin oluşturucusunda InteractiveRunner değerini DataflowRunner ile değiştirin. p, ardışık düzeninizi oluşturan ardışık düzen nesnesidir.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Etkileşimli çağrıları kodunuzdan kaldırın. Örneğin, show(), collect(), head(), show_graph() ve watch() değerlerini kodunuzdan kaldırın.
  2. Sonuçları görmek için havuz eklemeniz gerekir. Önceki bölümde not defterindeki sonuçları görselleştiriyorduk ancak bu kez işi bu not defterinin dışında, yani Dataflow'da çalıştırıyoruz. Bu nedenle, sonuçlarımız için harici bir konuma ihtiyacımız var. Bu örnekte, sonuçları GCS'deki (Google Cloud Storage) metin dosyalarına yazacağız. Bu, veri pencereli bir akış ardışık düzeni olduğundan her pencere için bir metin dosyası oluşturmak isteyeceğiz. Bunun için ardışık düzeninize aşağıdaki adımları ekleyin:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Ardışık düzen kodunuzun sonuna p.run() ekleyin.
  2. Şimdi tüm değişiklikleri dahil ettiğinizden emin olmak için not defteri kodunuzu gözden geçirin. Şuna benzer bir görünümde olacaktır:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Hücreleri çalıştırın.
  2. Şuna benzer bir çıkış alırsınız:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. İşin çalışıp çalışmadığını doğrulamak için Dataflow'un İşler sayfasına gidin. Listede yeni bir iş görünecektir. İşin verileri işlemeye başlaması yaklaşık 5-10 dakika sürer.
  2. Veriler işlendikten sonra, Cloud Storage'a gidin ve Dataflow'un sonuçları depoladığı dizine (tanımladığınız output_gcs_location) gidin. Pencere başına bir dosya olacak şekilde bir metin dosyası listesi gösterilir. bfcc5ce9e46a8b14.png
  3. Dosyayı indirin ve içeriği inceleyin. Sayıları ile eşlenen kelimelerin listesini içermelidir. Alternatif olarak, dosyaları incelemek için komut satırı arayüzünü kullanın. Bu işlemi, aşağıdakini not defterinizdeki yeni bir hücrede çalıştırarak yapabilirsiniz:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Şuna benzer bir çıkış görürsünüz:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. İşte bu kadar. Oluşturduğunuz işi temizlemeyi ve durdurmayı unutmayın (bu codelab'in son adımına bakın).

Bu dönüşümün etkileşimli bir not defterinde nasıl gerçekleştirileceğiyle ilgili bir örnek için not defteri örneğinizdeki Dataflow Kelime Sayısı not defterine bakın.

Dilerseniz not defterinizi yürütülebilir komut dosyası olarak dışa aktarabilir, oluşturulan .py dosyasını önceki adımları kullanarak değiştirebilir ve ardından Dataflow hizmetine ardışık düzeninizi dağıtabilirsiniz.

7. Not defteriniz kaydediliyor

Oluşturduğunuz not defterleri, çalışan not defteri örneğinize yerel olarak kaydedilir. Geliştirme sırasında not defteri örneğini sıfırlar veya kapatırsanız bu yeni not defterleri, /home/jupyter dizini altında oluşturuldukları sürece korunur. Ancak bir not defteri örneği silindiğinde bu not defterleri de silinir.

Not defterlerinizi gelecekte kullanmak üzere saklamak için yerel olarak iş istasyonunuza indirin, GitHub'a kaydedin veya farklı bir dosya biçimine aktarın.

8. Temizleme

Apache Beam not defteri örneğinizi kullanmayı bitirdikten sonra, not defteri örneğini kapatarak ve çalıştırdıysanız akış işini durdurarak Google Cloud'da oluşturduğunuz kaynakları temizleyin.

Yalnızca bu codelab'de kullanmak için proje oluşturduysanız projeyi tamamen kapatabilirsiniz.