Utiliser Notebooks avec Google Cloud Dataflow

1. Introduction

Cloud-Dataflow.png

Google Cloud Dataflow

Dernière mise à jour:5 juillet 2023

Qu'est-ce que Dataflow ?

Dataflow est un service géré permettant d'exécuter une grande variété de modèles de traitement des données. La documentation disponible sur ce site explique comment déployer vos pipelines de traitement de données par lot et par flux à l'aide de Dataflow, y compris des instructions sur l'utilisation des fonctionnalités du service.

Le SDK Apache Beam est un modèle de programmation Open Source qui vous permet de développer des pipelines de traitement par lot et par flux. Vous créez vos pipelines avec un programme Apache Beam, puis vous les exécutez sur le service Dataflow. La documentation Apache Beam fournit des informations conceptuelles détaillées et des supports de référence pour le modèle de programmation Apache Beam, les SDK et les autres exécuteurs.

Analyse rapide de flux de données

Dataflow accélère et simplifie le développement de pipelines de flux de données tout en réduisant la latence des données.

Opérations et gestion simplifiées

Permettez aux équipes de se concentrer sur la programmation plutôt que sur la gestion des clusters de serveurs, car l'approche sans serveur de Dataflow élimine les coûts opérationnels liés aux charges de travail d'ingénierie des données.

Réduire le coût total de possession

En associant l'autoscaling des ressources à un traitement par lot économique, Dataflow offre des capacités presque illimitées pour gérer vos charges de travail saisonnières et les pics d'activité, sans dépasser le budget.

Principales caractéristiques

Gestion automatisée des ressources et rééquilibrage dynamique des tâches

Dataflow automatise le provisionnement et la gestion des ressources de traitement afin de réduire la latence et d'optimiser l'utilisation. Ainsi, vous n'avez pas besoin de lancer ni de réserver des instances manuellement. Le partitionnement des tâches est également automatisé et optimisé pour permettre un rééquilibrage dynamique des tâches ralenties. Inutile d'utiliser les raccourcis clavier ou prétraiter vos données d'entrée.

Autoscaling horizontal

L'autoscaling horizontal des ressources de nœuds de calcul pour un débit optimal se traduit par un meilleur rapport performances-prix.

Tarification du traitement par lots dans le cadre de la planification flexible des ressources

La planification flexible des ressources (FlexRS) réduit le coût du traitement par lot afin de garantir une certaine souplesse dans le traitement des tâches (celles exécutées la nuit, par exemple) pendant la période planifiée. Ces jobs flexibles sont placés dans une file d'attente avec la garantie qu'ils seront récupérés pour être exécutés dans un délai de six heures.

Objectifs de l'atelier

L'utilisation de l'exécuteur interactif Apache Beam avec les notebooks JupyterLab vous permet de développer un pipeline de manière itérative, d'inspecter le graphique du pipeline et d'analyser chaque PCollection dans un workflow REPL (read-eval-print-loop). Ces notebooks Apache Beam sont disponibles via Vertex AI Workbench, un service géré qui héberge des machines virtuelles de notebooks pré-installées avec les derniers frameworks de data science et de machine learning.

Cet atelier de programmation porte sur la fonctionnalité introduite par les notebooks Apache Beam.

Points abordés

  • Créer une instance de notebook
  • Créer un pipeline de base
  • Lire des données à partir d'une source illimitée
  • Visualiser les données
  • Lancer une tâche Dataflow à partir du notebook
  • Enregistrer un notebook

Prérequis

  • Un projet Google Cloud Platform pour lequel la facturation est activée.
  • Google Cloud Dataflow et Google Cloud PubSub activés.

2. Configuration

  1. Dans la console Cloud, sur la page de sélection du projet, sélectionnez ou créez un projet Cloud.

Assurez-vous que les API suivantes sont activées:

  • API Dataflow
  • API Cloud Pub/Sub
  • Compute Engine
  • API Notebooks

Vous pouvez le vérifier en consultant les & Page "Services".

Dans ce guide, nous allons lire les données d'un abonnement Pub/Sub. Par conséquent, assurez-vous que le compte de service Compute Engine par défaut dispose du rôle d'éditeur, ou accordez-lui le rôle d'éditeur Pub/Sub.

3. Premiers pas avec les notebooks Apache Beam

Lancer une instance de notebook Apache Beam

  1. Lancez Dataflow sur la console:

  1. Sélectionnez la page Workbench dans le menu de gauche.
  2. Vérifiez que vous êtes bien dans l'onglet Notebooks gérés par l'utilisateur.
  3. Dans la barre d'outils, cliquez sur Nouveau notebook.
  4. Sélectionnez Apache Beam > Sans GPU.
  5. Sur la page Nouveau notebook, sélectionnez un sous-réseau pour la VM de notebook, puis cliquez sur Créer.
  6. Cliquez sur Ouvrir JupyterLab lorsque le lien devient actif. Vertex AI Workbench crée une instance de notebook Apache Beam.

4. Créer le pipeline

Créer une instance de notebook

Accédez à Fichier > Nouveau > Notebook, puis sélectionnez un noyau Apache Beam 2.47 ou une version ultérieure.

Commencez à ajouter du code à votre notebook

  • Copiez et collez le code de chaque section dans une nouvelle cellule de votre notebook
  • Exécuter la cellule

6bd3dd86cc7cf802.png

L'utilisation de l'exécuteur interactif Apache Beam avec les notebooks JupyterLab vous permet de développer un pipeline de manière itérative, d'inspecter le graphique du pipeline et d'analyser chaque PCollection dans un workflow REPL (read-eval-print-loop).

Apache Beam est installé sur votre instance de notebook. Par conséquent, incluez les modules interactive_runner et interactive_beam dans votre notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Si votre notebook utilise d'autres services Google, ajoutez les instructions d'importation suivantes:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Définir les options d'interactivité

Le code ci-dessous définit la durée de capture des données sur 60 secondes. Si vous souhaitez accélérer l'itération, définissez une durée plus courte, par exemple "10s".

ib.options.recording_duration = '60s'

Pour découvrir d'autres options interactives, consultez la classe interactive_beam.options.

Initialisez le pipeline à l'aide d'un objet InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Lire et visualiser les données

L'exemple suivant montre un pipeline Apache Beam qui crée un abonnement au sujet Pub/Sub donné et lit les données à partir de l'abonnement.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

Le pipeline compte les mots par fenêtres à partir de la source. Il crée un fenêtrage fixe, chaque fenêtre ayant une durée de 10 secondes.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Une fois les données fenêtrées, les mots sont comptés par fenêtre.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Visualiser les données

La méthode show() permet de visualiser la PCollection obtenue dans le notebook.

ib.show(windowed_word_counts, include_window_info=True)

Méthode d'affichage permettant de visualiser une PCollection sous forme de tableau.

Pour afficher les visualisations de vos données, transmettez visualize_data=True à la méthode show(). Ajouter une nouvelle cellule:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Vous pouvez appliquer plusieurs filtres à vos visualisations. La visualisation suivante vous permet de filtrer par libellé et par axe :

Méthode d'affichage permettant de visualiser une PCollection sous la forme d'un ensemble complet d'éléments d'interface utilisateur filtrables.

5. Utiliser un DataFrame Pandas

Le DataFrame Pandas est une autre visualisation utile dans les notebooks Apache Beam. L'exemple suivant convertit d'abord les mots en minuscules, puis calcule la fréquence de chaque mot.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

La méthode collect() fournit le résultat dans un DataFrame pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

Méthode de collecte représentant une PCollection dans un DataFrame pandas.

6. (Facultatif) Lancer des jobs Dataflow à partir de votre notebook

  1. Pour exécuter des jobs sur Dataflow, vous devez disposer d'autorisations supplémentaires. Assurez-vous que le compte de service Compute Engine par défaut dispose du rôle Éditeur ou accordez-lui les rôles IAM suivants:
  • Administrateur Dataflow
  • Nœud de calcul Dataflow
  • Administrateur de l'espace de stockage
  • Utilisateur du compte de service (roles/iam.serviceAccountUser)

Pour en savoir plus sur les rôles, consultez la documentation.

  1. (Facultatif) Avant d'utiliser votre notebook pour exécuter des tâches Dataflow, redémarrez le noyau, réexécutez toutes les cellules et vérifiez le résultat.
  2. Supprimez les instructions d'importation suivantes :
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Ajoutez la déclaration d'importation suivante :
from apache_beam.runners import DataflowRunner
  1. Supprimez l'option de durée d'enregistrement suivante:
ib.options.recording_duration = '60s'
  1. Ajoutez les éléments suivants à vos options de pipeline. Vous devrez ajuster l'emplacement Cloud Storage pour qu'il pointe vers un bucket que vous possédez déjà, ou vous pouvez créer un bucket à cette fin. Vous pouvez également remplacer la valeur de la région par us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. Dans le constructeur de beam.Pipeline(), remplacez InteractiveRunner par DataflowRunner. p est l'objet de pipeline lors de la création du pipeline.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Supprimez les appels interactifs de votre code. Par exemple, supprimez show(), collect(), head(), show_graph() et watch() de votre code.
  2. Pour afficher les résultats, vous devrez ajouter un récepteur. Dans la section précédente, nous avons visualisé les résultats dans le notebook, mais cette fois, nous exécutons le job en dehors de ce notebook, dans Dataflow. Par conséquent, nous avons besoin d'un emplacement externe pour nos résultats. Dans cet exemple, nous allons écrire les résultats dans des fichiers texte dans GCS (Google Cloud Storage). Comme il s'agit d'un pipeline de flux de données, avec le fenêtrage des données, nous allons créer un fichier texte par fenêtre. Pour ce faire, ajoutez les étapes suivantes à votre pipeline:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Ajoutez p.run() à la fin du code de votre pipeline.
  2. Examinez maintenant le code de votre notebook pour vérifier que vous avez bien intégré toutes les modifications. Le résultat doit ressembler à ceci:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Exécutez les cellules.
  2. Vous devriez obtenir un résultat semblable à celui-ci:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Pour vérifier si le job est en cours d'exécution, accédez à la page Jobs (Tâches) de Dataflow. Une nouvelle tâche doit s'afficher dans la liste. Le traitement des données prend environ 5 à 10 minutes.
  2. Une fois les données traitées, accédez à Cloud Storage et accédez au répertoire dans lequel Dataflow stocke les résultats (votre output_gcs_location défini). Vous devriez voir une liste de fichiers texte, avec un fichier par fenêtre. bfcc5ce9e46a8b14.png
  3. Téléchargez le fichier et examinez le contenu. Il doit contenir la liste des mots associés à leur nombre. Vous pouvez également utiliser l'interface de ligne de commande pour inspecter les fichiers. Pour ce faire, exécutez la commande suivante dans une nouvelle cellule de votre notebook:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Un résultat semblable à celui-ci s'affiche:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Et voilà ! N'oubliez pas de nettoyer et d'arrêter le job que vous avez créé (reportez-vous à la dernière étape de cet atelier de programmation).

Pour obtenir un exemple sur la façon d'exécution de cette conversion sur un notebook interactif, consultez le notebook "Nombre de mots Dataflow" de votre instance de notebook.

Vous pouvez également exporter votre notebook en tant que script exécutable, modifier le fichier .py généré à l'aide des étapes précédentes, puis déployer votre pipeline sur le service Dataflow.

7. Enregistrer votre notebook

Les notebooks que vous créez sont enregistrés en local dans votre instance de notebook en cours d'exécution. Si vous réinitialisez ou arrêtez l'instance de notebook pendant le développement, ces nouveaux notebooks sont conservés tant qu'ils sont créés dans le répertoire /home/jupyter. Toutefois, si une instance de notebook est supprimée, ces notebooks sont également supprimés.

Pour conserver vos notebooks en vue d'une utilisation ultérieure, téléchargez-les localement sur votre poste de travail, enregistrez-les sur GitHub ou exportez-les dans un autre format de fichier.

8. Nettoyer

Une fois que vous avez fini d'utiliser votre instance de notebook Apache Beam, nettoyez les ressources que vous avez créées sur Google Cloud en arrêtant l'instance de notebook et en arrêtant le job de diffusion, le cas échéant.

Si vous avez créé un projet destiné uniquement aux besoins de cet atelier de programmation, vous pouvez également l'arrêter complètement.