Dataproc kümesinde Hadoop kelime sayısı işi çalıştırma

1. Giriş

İş akışları, veri analizinde yaygın bir kullanım alanıdır. İş akışlarında, anlamlı bilgileri bulmak için veriler alınır, dönüştürülür ve analiz edilir. Google Cloud Platform'da iş akışlarını düzenlemek için kullanılan araç, popüler açık kaynak iş akışı aracı Apache Airflow'un barındırılan sürümü olan Cloud Composer'dır. Bu laboratuvarda, Cloud Composer'ı kullanarak Cloud Dataproc kümesi oluşturan, Cloud Dataproc ve Apache Hadoop ile analiz eden ve ardından Cloud Dataproc kümesini silen basit bir iş akışı oluşturacaksınız.

Cloud Composer nedir?

Cloud Composer, bulutlara ve şirket içi veri merkezlerine yayılan ardışık düzenler yazmanız, planlamanız ve imzalamanız için size destek veren, tümüyle yönetilen bir iş akışı düzenleme hizmetidir. Popüler Apache Airflow açık kaynak projesi temel alınarak geliştirilen ve Python programlama diliyle çalıştırılan Cloud Composer, taahhüt vermeden ve kolayca kullanılır.

Kullanıcılar, Apache Airflow'un yerel bir örneği yerine Cloud Composer'ı kullanarak Airflow'un en iyi özelliklerinden yararlanabilir ve kurulum veya yönetim ek yüküyle uğraşmak zorunda kalmaz.

Apache Airflow nedir?

Apache Airflow, iş akışlarını programatik olarak oluşturmak, planlamak ve izlemek için kullanılan bir açık kaynak aracıdır. Laboratuvar boyunca göreceğiniz Airflow ile ilgili hatırlamanız gereken birkaç temel terim vardır:

  • DAG: DAG (Yönlü Düz Ağaç), planlamak ve çalıştırmak istediğiniz düzenli görevler koleksiyonudur. İş akışları olarak da adlandırılan DAG'ler, standart Python dosyalarında tanımlanır.
  • Operatör: Bir operatör, iş akışındaki tek bir görevi tanımlar.

Cloud Dataproc nedir?

Cloud Dataproc, Google Cloud Platform'un tümüyle yönetilen Apache Spark ve Apache Hadoop hizmetidir. Cloud Dataproc, diğer GCP hizmetleriyle kolayca entegre olarak size veri işleme, analiz ve makine öğrenimi için güçlü ve eksiksiz bir platform sunar.

Neler yapacaksınız?

Bu codelab'de, Cloud Composer'da aşağıdaki görevleri tamamlayan bir Apache Airflow iş akışının nasıl oluşturulacağı ve çalıştırılacağı gösterilmektedir:

  • Cloud Dataproc kümesi oluşturur.
  • Kümede bir Apache Hadoop kelime sayısı işi çalıştırır ve sonuçlarını Cloud Storage'a verir.
  • Kümeyi siler.

Neler öğreneceksiniz?

  • Cloud Composer'da Apache Airflow iş akışı oluşturma ve çalıştırma
  • Bir veri kümesinde analiz çalıştırmak için Cloud Composer ve Cloud Dataproc'u kullanma
  • Google Cloud Platform Console, Cloud SDK ve Airflow web arayüzü üzerinden Cloud Composer ortamınıza erişme

Gerekenler

  • GCP hesabı
  • Temel CLI bilgisi
  • Temel seviyede Python bilgisi

2. GCP'yi ayarlama

Projeyi oluşturma

Bir Google Cloud Platform projesi seçin veya oluşturun.

Sonraki adımlarda kullanacağınız proje kimliğinizi not edin.

Yeni bir proje oluşturuyorsanız proje kimliği, oluşturma sayfasında Proje Adı'nın hemen altında bulunur.

Daha önce bir proje oluşturduysanız kimliği, konsol ana sayfasındaki Proje Bilgileri kartında bulabilirsiniz.

API'leri etkinleştirme

Cloud Composer, Cloud Dataproc ve Cloud Storage API'lerini etkinleştirin.Etkinleştirildikten sonra "Kimlik bilgilerine git" düğmesini yoksayabilir ve eğitimin sonraki adımına geçebilirsiniz.

Composer ortamı oluşturma

Aşağıdaki yapılandırmayla Cloud Composer ortamı oluşturun:

  • Ad: my-composer-environment
  • Konum: us-central1
  • Zone: us-central1-a

Diğer tüm yapılandırmalar varsayılan değerlerinde kalabilir. En altta "Oluştur"u tıklayın.

Cloud Storage paketi oluşturma

Projenizde aşağıdaki yapılandırmaya sahip bir Cloud Storage paketi oluşturun:

  • Ad: <your-project-id>
  • Varsayılan depolama sınıfı: Çok bölgeli
  • Konum: Amerika Birleşik Devletleri
  • Erişim denetimi modeli: ayrıntılı

Hazır olduğunuzda "Oluştur"a basın.

3. Apache Airflow'u kurma

Composer ortamı bilgilerini görüntüleme

GCP Console'da Ortamlar sayfasını açın.

Ayrıntılarını görmek için ortamın adını tıklayın.

Ortam ayrıntıları sayfasında Airflow web arayüzü URL'si, Google Kubernetes Engine küme kimliği, Cloud Storage paketinin adı ve /dags klasörünün yolu gibi bilgiler yer alır.

Airflow'da DAG (Yönlü Düz Ağaç), planlamak ve çalıştırmak istediğiniz düzenli görevler topluluğudur. İş akışları olarak da adlandırılan DAG'ler, standart Python dosyalarında tanımlanır. Cloud Composer yalnızca /dags klasöründeki DAG'leri planlar. /dags klasörü, ortamınızı oluşturduğunuzda Cloud Composer'ın otomatik olarak oluşturduğu Cloud Storage paketindedir.

Apache Airflow ortam değişkenlerini ayarlama

Apache Airflow değişkenleri, ortam değişkenlerinden farklı, Airflow'a özgü bir kavramdır. Bu adımda şu üç Airflow değişkenini ayarlayacaksınız: gcp_project, gcs_bucket ve gce_zone.

gcloud ile Değişkenleri Ayarlama

İlk olarak, Cloud SDK'nın sizin için kolayca yüklendiği Cloud Shell'inizi açın.

Ortam değişkenini COMPOSER_INSTANCE Composer ortamınızın adıyla ayarlayın.

COMPOSER_INSTANCE=my-composer-environment

Gcloud komut satırı aracını kullanarak Airflow değişkenlerini ayarlamak için gcloud composer environments run komutunu variables alt komutuyla birlikte kullanın. Bu gcloud composer komutu, Airflow CLI alt komutu variables'ı yürütür. Alt komut, bağımsız değişkenleri gcloud komut satırı aracına aktarır.

Bu komutu üç kez çalıştıracak ve değişkenleri projenizle alakalı olanlarla değiştireceksiniz.

Aşağıdaki komutu kullanarak gcp_project değerini ayarlayın. <your-project-id> yerine 2. adımda not aldığınız proje kimliğini girin.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Çıkışınız aşağıdaki gibi görünecektir:

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Aşağıdaki komutu kullanarak gcs_bucket değerini ayarlayın. <your-bucket-name> yerine 2. adımda not aldığınız paket kimliğini girin. Önerimizi uyguladıysanız paket adınız proje kimliğinizle aynıdır. Çıkışınız önceki komuta benzer olacaktır.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Aşağıdaki komutu kullanarak gce_zone değerini ayarlayın. Çıkışınız önceki komutlara benzer olacaktır.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(İsteğe bağlı) gcloud değişkenini görüntüleme

Bir değişkenin değerini görmek için get bağımsız değişkeniyle variables Airflow CLI alt komutunu çalıştırın veya Airflow kullanıcı arayüzünü kullanın.

Örneğin:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Bunu, az önce ayarladığınız üç değişkenden herhangi biriyle yapabilirsiniz: gcp_project, gcs_bucket ve gce_zone.

4. Örnek İş Akışı

5. adımda kullanacağımız DAG'nin koduna göz atalım. Şimdilik dosyaları indirme konusunda endişelenmeyin, yalnızca buradaki adımları uygulayın.

Bu kapsamlı bir konu. Şimdi bunu biraz inceleyelim.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Bazı Airflow içe aktarma işlemleriyle başlıyoruz:

  • airflow.models: Airflow veritabanında verilere erişmemize ve veri oluşturmamıza olanak tanır.
  • airflow.contrib.operators - Topluluk operatörlerinin yaşadığı yer. Bu durumda, Cloud Dataproc API'ye erişmek için dataproc_operator izni gerekir.
  • airflow.utils.trigger_rule: Operatörlerimize tetikleyici kuralları eklemek için. Tetikleme kuralları, bir operatörün üst öğelerinin durumuna göre yürütülüp yürütülmeyeceği konusunda ayrıntılı kontrol sağlar.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Bu, çıkış dosyamızın konumunu belirtir. Buradaki önemli satır, Airflow veritabanından gcs_bucket değişken değerini alacak olan models.Variable.get('gcs_bucket') satırıdır.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - Sonunda Cloud Dataproc kümesinde çalıştıracağımız .jar dosyasının konumu. Sizin için GCP'de zaten barındırılıyor.
  • input_file - Hadoop işimizin sonunda üzerinde hesaplama yapacağı verileri içeren dosyanın konumu. 5. adımda verileri bu konuma birlikte yükleyeceğiz.
  • wordcount_args: JAR dosyasına ileteceğimiz bağımsız değişkenler.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Bu, önceki gün gece yarısını temsil eden eşdeğer bir datetime nesnesi verir. Örneğin, bu işlem 4 Mart saat 11:00'da yürütülürse tarih saat nesnesi 3 Mart saat 00:00'ı temsil eder. Bu durum, Airflow'un planlamayı nasıl ele aldığıyla ilgilidir. Bu konuyla ilgili daha fazla bilgiye buradan ulaşabilirsiniz.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Yeni bir DAG oluşturulduğunda sözlük biçimindeki default_dag_args değişkeni sağlanmalıdır:

  • 'email_on_failure': Bir görev başarısız olduğunda e-posta uyarılarının gönderilip gönderilmeyeceğini gösterir.
  • 'email_on_retry' - Bir görev yeniden denendiğinde e-posta uyarılarının gönderilip gönderilmeyeceğini gösterir.
  • 'retries' - Airflow'un DAG hatası durumunda kaç kez yeniden deneme yapması gerektiğini gösterir.
  • 'retry_delay' - Airflow'un yeniden denemeden önce ne kadar beklemesi gerektiğini gösterir.
  • 'project_id': DAG'ye hangi GCP proje kimliğiyle ilişkilendirileceğini söyler. Bu, daha sonra Dataproc operatörüyle birlikte gereklidir.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

with models.DAG kullanıldığında komut dosyası, altındaki her şeyi aynı DAG'ye dahil eder. Ayrıca üç bağımsız değişkenin iletildiğini görüyoruz:

  • Bir dize olan ilk değer, oluşturduğumuz DAG'ye verilecek addır. Bu örnekte composer_hadoop_tutorial kullanıyoruz.
  • schedule_interval: Burada bir gün olarak ayarladığımız datetime.timedelta nesnesi. Bu, bu DAG'nin, 'default_dag_args' içinde daha önce ayarlanan 'start_date' tarihinden sonra günde bir kez yürütülmeye çalışacağı anlamına gelir.
  • default_args: DAG için varsayılan bağımsız değişkenleri içeren, daha önce oluşturduğumuz sözlük

Dataproc kümesi oluşturma

Ardından, Cloud Dataproc kümesi oluşturan bir dataproc_operator.DataprocClusterCreateOperator oluşturacağız.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Bu operatörde birkaç bağımsız değişken görüyoruz. Bunlardan ilki hariç hepsi bu operatöre özeldir:

  • task_id: BashOperator'da olduğu gibi, bu da operatöre atadığımız ve Airflow kullanıcı arayüzünden görüntülenebilen addır.
  • cluster_name - Cloud Dataproc kümesine atadığımız ad. Burada, composer-hadoop-tutorial-cluster-{{ ds_nodash }} olarak adlandırdık (isteğe bağlı ek bilgiler için bilgi kutusuna bakın).
  • num_workers - Cloud Dataproc kümesine ayırdığımız çalışan sayısı
  • zone - Kümenin Airflow veritabanında kaydedildiği şekliyle bulunmasını istediğimiz coğrafi bölge. Bu, 3. adımda ayarladığımız 'gce_zone' değişkenini okur.
  • master_machine_type - Cloud Dataproc ana makinesine ayırmak istediğimiz makine türü
  • worker_machine_type - Cloud Dataproc çalışanına ayırmak istediğimiz makine türü

Apache Hadoop işi gönderme

dataproc_operator.DataProcHadoopOperator, Cloud Dataproc kümesine iş göndermemize olanak tanır.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Çeşitli parametreler sunuyoruz:

  • task_id - DAG'nin bu parçasına atadığımız ad
  • main_jar - Kümede çalıştırmak istediğimiz .jar dosyasının konumu
  • cluster_name - İşin üzerinde çalıştırılacağı kümenin adı. Bunun, önceki operatörde bulduğumuzla aynı olduğunu fark edeceksiniz.
  • arguments - .jar dosyasını komut satırından yürütüyormuş gibi jar dosyasına aktarılan bağımsız değişkenler

Kümeyi silme

Oluşturacağımız son operatör dataproc_operator.DataprocClusterDeleteOperator operatörüdür.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Adından da anlaşılacağı gibi bu operatör, belirli bir Cloud Dataproc kümesini siler. Bu konuda üç argüman görüyoruz:

  • task_id: BashOperator'da olduğu gibi, bu da operatöre atadığımız ve Airflow kullanıcı arayüzünden görüntülenebilen addır.
  • cluster_name - Cloud Dataproc kümesine atadığımız ad. Burada, kümeyi composer-hadoop-tutorial-cluster-{{ ds_nodash }} olarak adlandırdık (isteğe bağlı ek bilgiler için "Dataproc kümesi oluşturma" bölümünden sonraki bilgi kutusuna bakın).
  • trigger_rule - Bu adımın başında içe aktarma işlemleri sırasında tetikleyici kurallardan kısaca bahsetmiştik ancak burada bir tetikleyici kuralın nasıl çalıştığını görüyoruz. Varsayılan olarak, bir Airflow operatörü, yukarı akış operatörlerinin tümü başarıyla tamamlanmadığı sürece yürütülmez. ALL_DONE tetikleme kuralı, başarılı olup olmadıklarına bakılmaksızın yalnızca tüm yukarı akış operatörlerinin tamamlanmasını gerektirir. Burada bu, Hadoop işi başarısız olsa bile kümeyi yine de kapatmak istediğimiz anlamına gelir.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Son olarak, bu operatörlerin belirli bir sırada yürütülmesini istiyoruz ve bunu Python bit kaydırma operatörlerini kullanarak belirtebiliriz. Bu durumda, create_dataproc_cluster her zaman önce, ardından run_dataproc_hadoop ve son olarak delete_dataproc_cluster yürütülür.

Tüm bilgiler bir araya getirildiğinde kod şu şekilde görünür:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Airflow dosyalarını Cloud Storage'a yükleme

DAG'yi /dags klasörünüze kopyalama

  1. İlk olarak, Cloud SDK'nın sizin için kolayca yüklendiği Cloud Shell'inizi açın.
  2. Python örnekleri deposunu klonlayın ve composer/workflows dizinine geçin.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. DAG klasörünüzün adını ortam değişkeni olarak ayarlamak için aşağıdaki komutu çalıştırın:
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Eğitim kodunu /dags klasörünüzün oluşturulduğu yere kopyalamak için aşağıdaki gsutil komutunu çalıştırın.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Çıkışınız aşağıdaki gibi görünecektir:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Airflow kullanıcı arayüzünü kullanma

GCP Console'u kullanarak Airflow web arayüzüne erişmek için:

  1. Ortamlar sayfasını açın.
  2. Ortamın Airflow web sunucusu sütununda yeni pencere simgesini tıklayın. Airflow web kullanıcı arayüzü yeni bir tarayıcı penceresinde açılır.

Airflow kullanıcı arayüzü hakkında bilgi için Web arayüzüne erişme başlıklı makaleyi inceleyin.

Görüntüleme Değişkenleri

Daha önce ayarladığınız değişkenler ortamınızda kalıcı hale getirilir. Airflow kullanıcı arayüzü menü çubuğundan Yönetici > Değişkenler'i seçerek değişkenleri görüntüleyebilirsiniz.

Liste sekmesi seçili ve aşağıdaki anahtarları ve değerleri içeren bir tablo gösteriliyor: anahtar: gcp_project, değer: project-id anahtar: gcs_bucket, değer: gs://bucket-name anahtar: gce_zone, değer: zone

DAG çalıştırmalarını keşfetme

DAG dosyanızı Cloud Storage'daki dags klasörüne yüklediğinizde Cloud Composer dosyayı ayrıştırır. Hata bulunmazsa iş akışının adı DAG listesinde görünür ve iş akışı hemen çalıştırılmak üzere sıraya alınır. DAG'lerinize bakmak için sayfanın üst kısmındaki DAG'ler'i tıklayın.

84a29c71f20bff98.png

DAG ayrıntıları sayfasını açmak için composer_hadoop_tutorial simgesini tıklayın. Bu sayfada, iş akışı görevlerinin ve bağımlılıklarının grafiksel bir gösterimi yer alır.

f4f1663c7a37f47c.png

Şimdi araç çubuğunda Grafik Görünümü'nü tıklayın ve her görevin durumunu görmek için fareyle grafiğin üzerine gelin. Her görevin etrafındaki kenarlığın da durumu gösterdiğini unutmayın (yeşil kenarlık = çalışıyor; kırmızı = başarısız vb.).

4c5a0c6fa9f88513.png

İş akışını Grafik Görünümü'nden tekrar çalıştırmak için:

  1. Airflow kullanıcı arayüzü grafik görünümünde create_dataproc_cluster grafiğini tıklayın.
  2. Üç görevi sıfırlamak için Temizle'yi, ardından onaylamak için Tamam'ı tıklayın.

fd1b23b462748f47.png

Ayrıca, aşağıdaki GCP Console sayfalarına giderek composer-hadoop-tutorial iş akışının durumunu ve sonuçlarını da kontrol edebilirsiniz:

  • Küme oluşturma ve silme işlemlerini izlemek için Cloud Dataproc kümeleri. İş akışı tarafından oluşturulan kümenin kısa ömürlü olduğunu unutmayın. Bu küme yalnızca iş akışı süresince var olur ve son iş akışı görevinin bir parçası olarak silinir.
  • Apache Hadoop kelime sayısı işini görüntülemek veya izlemek için Cloud Dataproc İşleri'ni kullanın. İş günlüğü çıkışını görmek için iş kimliğini tıklayın.
  • Bu codelab için oluşturduğunuz Cloud Storage paketindeki wordcount klasöründe kelime sayısının sonuçlarını görmek için Cloud Storage Tarayıcı'yı kullanın.

7. Temizleme

Bu codelab'de kullanılan kaynaklar için GCP hesabınızın ücretlendirilmesini önlemek amacıyla:

  1. (İsteğe bağlı) Verilerinizi kaydetmek için Cloud Composer ortamına ait Cloud Storage paketinden ve bu codelab için oluşturduğunuz depolama paketinden verileri indirin.
  2. Bu codelab için oluşturduğunuz Cloud Storage paketini silin.
  3. Ortam için Cloud Storage paketini silin.
  4. Cloud Composer ortamını silin. Ortamınızı silmenin, ortamın Storage paketini silmediğini unutmayın.

İsterseniz projeyi de silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuda proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.