1. Présentation

Qu'est-ce que Dataflow ?
Dataflow est un service géré permettant d'exécuter une grande variété de modèles de traitement des données. La documentation sur ce site explique comment déployer les pipelines de traitement de données par lots et par flux à l'aide de Dataflow. Elle contient également des instructions concernant l'utilisation des fonctionnalités du service.
Le SDK Apache Beam est un modèle de programmation Open Source qui vous permet de développer des pipelines par lots et en flux continu. Vous créez des pipelines avec un programme Apache Beam, puis vous les exécutez sur le service Dataflow. La documentation Apache Beam fournit des informations conceptuelles détaillées et des documents de référence pour le modèle de programmation Apache Beam, les SDK et les autres exécuteurs.
Analyse rapide des flux de données
Dataflow accélère et simplifie le développement de pipelines de flux de données tout en réduisant la latence des données.
Simplifier les opérations et la gestion
L'approche sans serveur de Dataflow supprime les coûts d'exploitation liés aux charges de travail des ingénieurs en données. Les équipes peuvent ainsi se concentrer sur la programmation plutôt que sur la gestion des clusters de serveurs.
Réduire le coût total de possession
En associant l'autoscaling des ressources à un traitement par lot économique, Dataflow offre des capacités presque illimitées pour gérer les charges de travail des périodes saisonnières et des pics d'activité, sans dépasser le budget.
Principales caractéristiques
Gestion automatisée des ressources et rééquilibrage dynamique des tâches
Dataflow automatise le provisionnement et la gestion des ressources de traitement pour optimiser leur utilisation et réduire la latence au maximum. Vous n'avez donc pas besoin de lancer ni de réserver des instances manuellement. Le partitionnement des tâches est également automatisé et optimisé pour permettre un rééquilibrage dynamique des tâches ralenties. Résultat : inutile de mémoriser les raccourcis clavier ni de prétraiter les données de saisie.
Autoscaling horizontal
L'autoscaling horizontal des ressources de nœuds de calcul pour un débit optimal se traduit par un meilleur rapport performances-prix.
Tarification du traitement par lot dans le cadre de la planification flexible des ressources
La planification flexible des ressources (FlexRS) réduit le coût des traitements par lot grâce à une certaine souplesse dans le traitement des tâches (celles exécutées la nuit, par exemple) pendant la période planifiée. Ces tâches flexibles sont placées dans une file d'attente avec la garantie qu'elles seront récupérées pour être exécutées dans un délai de six heures.
Ce tutoriel est adapté de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.
Points abordés
- Créer un projet Maven avec Apache Beam à l'aide du SDK Java
- Exécuter un exemple de pipeline dans la console Google Cloud Platform
- Supprimer le bucket Cloud Storage associé et son contenu
Prérequis
Comment allez-vous utiliser ce tutoriel ?
Quel est votre niveau d'expérience avec les services Google Cloud Platform ?
2. Préparation
Configuration de l'environnement d'auto-formation
- Connectez-vous à Cloud Console, puis créez un projet ou réutilisez un projet existant. (Si vous n'avez pas encore de compte Gmail ou G Suite, vous devez en créer un.)
Mémorisez l'ID du projet. Il s'agit d'un nom unique permettant de différencier chaque projet Google Cloud (le nom ci-dessus est déjà pris ; vous devez en trouver un autre). Il sera désigné par le nom PROJECT_ID tout au long de cet atelier de programmation.
- Vous devez ensuite activer la facturation dans Cloud Console pour pouvoir utiliser les ressources Google Cloud.
L'exécution de cet atelier de programmation est très peu coûteuse, voire sans frais. Veillez à suivre les instructions de la section "Nettoyer" qui indique comment désactiver les ressources afin d'éviter les frais une fois ce tutoriel terminé. Les nouveaux utilisateurs de Google Cloud peuvent participer au programme d'essai sans frais pour bénéficier d'un crédit de 300 $.
Activer les API
Cliquez sur l'icône de menu en haut à gauche de l'écran.

Sélectionnez API et services > Tableau de bord dans le menu déroulant.

Sélectionnez + Activer les API et les services.

Saisissez "Compute Engine" dans le champ de recherche. Cliquez sur "API Compute Engine" dans la liste des résultats qui s'affiche.

Sur la page Google Compute Engine, cliquez sur Activer.

Une fois l'API activée, cliquez sur la flèche pour revenir à la page précédente.
Recherchez maintenant les API ci-dessous et activez-les :
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- JSON Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- API Cloud Resource Manager
3. Créer un bucket Cloud Storage
Dans la console Google Cloud Platform, cliquez sur l'icône Menu en haut à gauche de l'écran :

Faites défiler la page vers le bas et sélectionnez Cloud Storage > Navigateur dans la sous-section Stockage :

Vous devriez à présent voir l'explorateur Cloud Storage. Si vous utilisez un projet qui ne comporte actuellement aucun bucket Cloud Storage, vous verrez une invitation à en créer un. Appuyez sur le bouton Créer un bucket pour en créer un :

Saisissez un nom pour ce bucket. Comme indiqué dans la boîte de dialogue, les noms de buckets doivent être uniques dans l'ensemble de Cloud Storage. Par conséquent, si vous choisissez un nom évident, tel que "test", il est probable que quelqu'un d'autre ait déjà créé un bucket portant ce nom et que vous receviez un message d'erreur.
Il existe également des règles concernant les caractères autorisés dans les noms de buckets. Si vous commencez et terminez le nom de votre bucket par une lettre ou un chiffre, et que vous n'utilisez que des tirets au milieu, tout ira bien. Si vous essayez d'utiliser des caractères spéciaux, ou de commencer ou de terminer le nom de votre bucket par autre chose qu'une lettre ou un chiffre, la boîte de dialogue vous rappellera les règles.

Saisissez un nom unique pour votre bucket, puis cliquez sur Créer. Si vous choisissez un nom déjà utilisé, le message d'erreur ci-dessus s'affiche. Une fois le bucket créé, vous serez redirigé vers ce nouveau bucket vide dans le navigateur :

Le nom du bucket que vous voyez sera bien sûr différent, car il doit être unique dans tous les projets.
4. Démarrer Cloud Shell
Activer Cloud Shell
- Dans Cloud Console, cliquez sur Activer Cloud Shell
.
Si vous n'avez encore jamais démarré Cloud Shell, un écran intermédiaire s'affiche en dessous de la ligne de séparation pour décrire de quoi il s'agit. Si tel est le cas, cliquez sur Continuer (cet écran ne s'affiche qu'une seule fois). Voici à quoi il ressemble :
Le provisionnement et la connexion à Cloud Shell ne devraient pas prendre plus de quelques minutes.
Cette machine virtuelle contient tous les outils de développement nécessaires. Elle intègre un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances réseau et l'authentification. Vous pouvez réaliser une grande partie, voire la totalité, des activités de cet atelier dans un simple navigateur ou sur votre Chromebook.
Une fois connecté à Cloud Shell, vous êtes en principe authentifié et le projet est défini avec votre ID de projet.
- Exécutez la commande suivante dans Cloud Shell pour vérifier que vous êtes authentifié :
gcloud auth list
Résultat de la commande
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
Résultat de la commande
[core] project = <PROJECT_ID>
Si vous obtenez un résultat différent, exécutez cette commande :
gcloud config set project <PROJECT_ID>
Résultat de la commande
Updated property [core/project].
5. Créer un projet Maven
Une fois Cloud Shell lancé, commençons par créer un projet Maven à l'aide du SDK Java pour Apache Beam.
Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez ces pipelines avec un programme Apache Beam et pouvez choisir un exécuteur, tel que Dataflow, pour les lancer.
Exécutez la commande mvn archetype:generate dans votre interface système, comme indiqué ci-dessous :
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Après avoir exécuté la commande, vous devriez constater que votre répertoire actuel comporte un nouveau répertoire nommé first-dataflow. first-dataflow contient un projet Maven incluant le SDK Cloud Dataflow pour Java, ainsi que des exemples de pipelines.
6. Exécuter un pipeline de traitement de texte dans Cloud Dataflow
Commençons par enregistrer l'ID de notre projet et les noms de nos buckets Cloud Storage en tant que variables d'environnement. Vous pouvez effectuer cette opération dans Cloud Shell. Veillez à remplacer <your_project_id> par l'ID de votre projet.
export PROJECT_ID=<your_project_id>
Nous allons maintenant faire de même pour le bucket Cloud Storage. N'oubliez pas de remplacer <your_bucket_name> par le nom unique que vous avez utilisé pour créer votre bucket lors d'une étape précédente.
export BUCKET_NAME=<your_bucket_name>
Accédez au répertoire first-dataflow/.
cd first-dataflow
Nous allons exécuter un pipeline intitulé WordCount, qui lit du texte, segmente les lignes en mots individuels et compte le nombre de fois où chacun de ces mots apparaît. Nous allons d'abord exécuter le pipeline, puis examiner chaque étape de l'exécution.
Démarrez le pipeline en exécutant la commande mvn compile exec:java dans la fenêtre de votre interface système ou de votre terminal. Pour les arguments --project, --stagingLocation, et --output, la commande ci-dessous fait référence aux variables d'environnement que vous avez configurées précédemment dans cette étape.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Pendant que le job s'exécute, vérifiez qu'il apparaît dans la liste des jobs.
Ouvrez l'interface utilisateur Web Cloud Dataflow dans la console Google Cloud Platform. La tâche "wordcount" s'affiche et son état indique Running (En cours d'exécution) :

Examinons maintenant les paramètres du pipeline. Commencez par cliquer sur le nom de votre tâche :

Lorsque vous sélectionnez un job, vous pouvez afficher le graphique d'exécution. Sur le graphique d'exécution d'un pipeline, chaque transformation est représentée sous la forme d'une case qui indique son nom et certaines informations sur son état. Vous pouvez cliquer sur la flèche en haut à droite de chaque étape pour afficher plus de détails :

Nous allons à présent détailler les transformations apportées aux données par le pipeline à chaque étape :
- Lire : lors de cette étape, le pipeline lit les données à partir d'une source d'entrée. Dans ce cas, il s'agit d'un fichier texte provenant de Cloud Storage et contenant l'intégralité du texte de la pièce de théâtre de Shakespeare Le Roi Lear. Notre pipeline lit le fichier ligne par ligne et génère une
PCollectionpour chaque ligne de notre fichier texte. - CountWords : l'étape
CountWordss'effectue en deux parties. Tout d'abord, il utilise une fonction Do parallèle (ParDo) nomméeExtractWordspour segmenter chaque ligne en mots individuels. La sortie d'ExtractWords est une nouvelle PCollection où chaque élément est un mot. L'étape suivante,Count, utilise une transformation fournie par le SDK Java qui renvoie des paires clé/valeur, où la clé est un mot unique et la valeur est le nombre de fois où il apparaît. Voici comment implémenter la méthodeCountWords. Vous pouvez consulter l'intégralité du fichier WordCount.java sur GitHub :
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements : appelle
FormatAsTextFn, copié ci-dessous, qui met en forme chaque paire clé/valeur en une chaîne imprimable.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts : dans cette étape, nous écrivons les chaînes imprimables dans plusieurs fichiers texte fragmentés.
Nous allons examiner les résultats du pipeline dans quelques minutes.
Consultez maintenant la page Job info (Informations sur le job) à droite du graphique, qui présente les paramètres de pipeline inclus dans la commande mvn compile exec:java.


Les compteurs personnalisés pour le pipeline s'affichent également dans cette section. Dans notre exemple, ils indiquent le nombre de lignes vides détectées jusqu'à présent pendant l'exécution. Vous pouvez ajouter des compteurs à votre pipeline pour suivre des métriques spécifiques à l'application.

Vous pouvez cliquer sur l'icône Journaux en bas de la console pour afficher les messages d'erreur spécifiques.

Par défaut, le panneau affiche les messages du journal des tâches, qui indiquent l'état de la tâche dans son ensemble. Vous pouvez utiliser le sélecteur "Niveau de gravité minimal" pour filtrer les messages concernant la progression et l'état de la tâche.

La sélection d'une étape du pipeline dans le graphique remplace la vue affichée par les journaux générés par votre code et le code généré s'exécutant dans le cadre de cette étape du pipeline.
Pour revenir à la vue "Journaux des tâches", désélectionnez l'étape en cliquant à l'extérieur du graphique ou en utilisant le bouton "Fermer" du panneau de droite.
Vous pouvez utiliser le bouton Journaux des nœuds de calcul dans l'onglet "Journaux" pour afficher les journaux des nœuds de calcul des instances Compute Engine qui exécutent votre pipeline. Les journaux des tâches comportent des lignes de journal générées par votre code et le code Dataflow généré qui l'exécute.
Si vous essayez de déboguer un échec dans le pipeline, vous trouverez souvent des journaux supplémentaires dans les journaux du nœud de calcul qui vous aideront à résoudre le problème. N'oubliez pas que ces journaux sont agrégés pour tous les nœuds de calcul. Vous pouvez les filtrer et les rechercher.

L'interface de surveillance de Dataflow affiche uniquement les messages de journal les plus récents. Vous pouvez afficher tous les journaux en cliquant sur le lien Google Cloud Observability à droite du volet des journaux.

Voici un récapitulatif des différents types de journaux consultables à partir de la page "Surveillance → Journaux" :
- Les journaux job-message contiennent des messages au niveau des tâches générés par plusieurs composants de Dataflow. Il peut s'agir de la configuration de l'autoscaling, du démarrage ou de l'arrêt des nœuds de calcul, de la progression d'une étape de la tâche et des erreurs concernant la tâche. Les erreurs au niveau des nœuds de calcul qui sont dues au plantage du code utilisateur et qui s'affichent dans les journaux worker se propagent également dans les journaux job-message.
- Les journaux worker sont produits par les nœuds de calcul Dataflow. Les nœuds de calcul effectuent la majeure partie du travail en pipeline (par exemple, en appliquant vos ParDo aux données). Les journaux worker contiennent des messages enregistrés par votre code et par Dataflow.
- Les journaux worker-startup sont présents sur la plupart des tâches Dataflow et peuvent capturer les messages liés au processus de démarrage. Le processus de démarrage comprend le téléchargement des fichiers JAR d'un job à partir de Cloud Storage, puis le démarrage des nœuds de calcul. Il est recommandé d'examiner ces journaux en cas de problème lié au démarrage des nœuds de calcul.
- Les journaux shuffler contiennent les messages des nœuds de calcul qui consolident les résultats des opérations de pipeline exécutées en parallèle.
- Les journaux docker et kubelet contiennent les messages liés à ces technologies publiques, qui sont utilisées sur les nœuds de calcul Dataflow.
À l'étape suivante, nous vérifierons que votre job a bien été exécuté.
7. Vérifier la bonne exécution de votre job
Ouvrez l'interface utilisateur Web Cloud Dataflow dans la console Google Cloud Platform.
Votre tâche "wordcount" présente au début l'état Running (En cours d'exécution), puis l'état Succeeded (Réussie) :

L'exécution de la tâche prend entre trois et quatre minutes.
Vous vous souvenez de l'étape où vous avez exécuté le pipeline et spécifié un bucket de sortie ? Examinons le résultat, car nous voulons évidemment savoir combien de fois apparaît chaque mot de la pièce Le Roi Lear. Revenez au navigateur Cloud Storage dans la console Google Cloud Platform. Dans votre bucket, vous devriez voir les fichiers de sortie et les fichiers intermédiaires créés par votre tâche :

8. Arrêter vos ressources
Vous pouvez arrêter vos ressources depuis la console Google Cloud Platform.
Ouvrez le navigateur Cloud Storage dans la console Google Cloud Platform.

Cochez la case à côté du bucket que vous avez créé, puis cliquez sur SUPPRIMER pour supprimer définitivement le bucket et son contenu.


9. Félicitations !
Vous avez appris à créer un projet Maven à l'aide du SDK Cloud Dataflow, à exécuter un exemple de pipeline dans la console Google Cloud Platform, ainsi qu'à supprimer le bucket Cloud Storage associé et son contenu.
En savoir plus
- Documentation relative à Dataflow : https://cloud.google.com/dataflow/docs/
Licence
Ce contenu est concédé sous licence Creative Commons Attribution 3.0 Generic et Apache 2.0.