1. Présentation
Qu'est-ce que Dataflow ?
Dataflow est un service géré permettant d'exécuter une grande variété de modèles de traitement des données. La documentation disponible sur ce site explique comment déployer vos pipelines de traitement de données par lot et par flux à l'aide de Dataflow, y compris des instructions sur l'utilisation des fonctionnalités du service.
Le SDK Apache Beam est un modèle de programmation Open Source qui vous permet de développer des pipelines de traitement par lot et par flux. Vous créez vos pipelines avec un programme Apache Beam, puis vous les exécutez sur le service Dataflow. La documentation Apache Beam fournit des informations conceptuelles détaillées et des supports de référence pour le modèle de programmation Apache Beam, les SDK et les autres exécuteurs.
Analyse rapide de flux de données
Dataflow accélère et simplifie le développement de pipelines de flux de données tout en réduisant la latence des données.
Opérations et gestion simplifiées
Permettez aux équipes de se concentrer sur la programmation plutôt que sur la gestion des clusters de serveurs, car l'approche sans serveur de Dataflow élimine les coûts opérationnels liés aux charges de travail d'ingénierie des données.
Réduire le coût total de possession
En associant l'autoscaling des ressources à un traitement par lot économique, Dataflow offre des capacités presque illimitées pour gérer vos charges de travail saisonnières et les pics d'activité, sans dépasser le budget.
Principales caractéristiques
Gestion automatisée des ressources et rééquilibrage dynamique des tâches
Dataflow automatise le provisionnement et la gestion des ressources de traitement afin de réduire la latence et d'optimiser l'utilisation. Ainsi, vous n'avez pas besoin de lancer ni de réserver des instances manuellement. Le partitionnement des tâches est également automatisé et optimisé pour permettre un rééquilibrage dynamique des tâches ralenties. Inutile d'utiliser les raccourcis clavier ou prétraiter vos données d'entrée.
Autoscaling horizontal
L'autoscaling horizontal des ressources de nœuds de calcul pour un débit optimal se traduit par un meilleur rapport performances-prix.
Tarification du traitement par lot dans le cadre de la planification flexible des ressources
La planification flexible des ressources (FlexRS) réduit le coût du traitement par lot afin de garantir une certaine souplesse dans le traitement des tâches (celles exécutées la nuit, par exemple) pendant la période planifiée. Ces jobs flexibles sont placés dans une file d'attente avec la garantie qu'ils seront récupérés pour être exécutés dans un délai de six heures.
Ce tutoriel est tiré de la page https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.
Points abordés
- Créer un projet Maven avec Apache Beam à l'aide du SDK Java
- Exécuter un exemple de pipeline dans la console Google Cloud Platform
- Supprimer le bucket Cloud Storage associé et son contenu
Prérequis
Comment allez-vous utiliser ce tutoriel ?
Quel est votre niveau d'expérience avec les services Google Cloud Platform ?
<ph type="x-smartling-placeholder">2. Préparation
Configuration de l'environnement d'auto-formation
- Connectez-vous à Cloud Console, puis créez un projet ou réutilisez un projet existant. (Si vous n'avez pas encore de compte Gmail ou G Suite, vous devez en créer un.)
Mémorisez l'ID du projet. Il s'agit d'un nom unique permettant de différencier chaque projet Google Cloud (le nom ci-dessus est déjà pris ; vous devez en trouver un autre). Il sera désigné par le nom PROJECT_ID
tout au long de cet atelier de programmation.
- Vous devez ensuite activer la facturation dans Cloud Console pour pouvoir utiliser les ressources Google Cloud.
L'exécution de cet atelier de programmation est très peu coûteuse, voire gratuite. Veillez à suivre les instructions de la section "Nettoyer" qui indique comment désactiver les ressources afin d'éviter les frais une fois ce tutoriel terminé. Les nouveaux utilisateurs de Google Cloud peuvent participer au programme d'essai sans frais pour bénéficier d'un crédit de 300 $.
Activer les API
Cliquez sur l'icône de menu en haut à gauche de l'écran.
Sélectionnez API et Services > Tableau de bord dans le menu déroulant.
Sélectionnez + Activer les API et les services.
Recherchez "Compute Engine". dans le champ de recherche. Cliquez sur "API Compute Engine". dans la liste des résultats.
Sur la page Google Compute Engine, cliquez sur Activer.
Une fois l'API activée, cliquez sur la flèche pour revenir à la page précédente.
Recherchez maintenant les API ci-dessous et activez-les :
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- JSON Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- API Cloud Resource Manager
3. Créer un bucket Cloud Storage
Dans la console Google Cloud Platform, cliquez sur l'icône Menu en haut à gauche de l'écran:
Faites défiler la page vers le bas et sélectionnez Cloud Storage > Navigateur dans la sous-section Stockage:
Le navigateur Cloud Storage doit s'afficher. Si vous utilisez un projet qui ne contient actuellement aucun bucket Cloud Storage, vous verrez une invitation à créer un bucket. Appuyez sur le bouton Créer un bucket pour en créer un:
Saisissez un nom pour ce bucket. Comme le montre la boîte de dialogue, les noms de buckets doivent être uniques dans Cloud Storage. Ainsi, si vous choisissez un nom évident, tel que "test", vous constaterez probablement que quelqu'un d'autre a déjà créé un bucket portant ce nom. Vous recevrez alors un message d'erreur.
Il existe également des règles concernant les caractères autorisés dans les noms de buckets. Si le nom de votre bucket commence et se termine par une lettre ou un chiffre, et que vous n'utilisez que des tirets au milieu, tout va bien. Si vous essayez d'utiliser des caractères spéciaux, ou de commencer ou de terminer par une lettre ou un chiffre, la boîte de dialogue vous rappellera les règles.
Saisissez un nom unique pour le bucket, puis cliquez sur Créer. Si vous choisissez un service déjà utilisé, le message d'erreur ci-dessus s'affiche. Une fois le bucket créé, vous êtes redirigé vers celui-ci (qui est vide) dans le navigateur:
Les noms de buckets que vous verrez seront bien sûr différents, car ils doivent être uniques dans tous les projets.
4. Démarrer Cloud Shell
Activer Cloud Shell
- Dans Cloud Console, cliquez sur Activer Cloud Shell .
Si vous n'avez encore jamais démarré Cloud Shell, un écran intermédiaire s'affiche en dessous de la ligne de séparation pour décrire de quoi il s'agit. Si tel est le cas, cliquez sur Continuer (cet écran ne s'affiche qu'une seule fois). Voici à quoi il ressemble :
Le provisionnement et la connexion à Cloud Shell ne devraient pas prendre plus de quelques minutes.
Cette machine virtuelle contient tous les outils de développement nécessaires. Elle intègre un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances réseau et l'authentification. Vous pouvez réaliser une grande partie, voire la totalité, des activités de cet atelier dans un simple navigateur ou sur votre Chromebook.
Une fois connecté à Cloud Shell, vous êtes en principe authentifié et le projet est défini avec votre ID de projet.
- Exécutez la commande suivante dans Cloud Shell pour vérifier que vous êtes authentifié :
gcloud auth list
Résultat de la commande
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
Résultat de la commande
[core] project = <PROJECT_ID>
Si vous obtenez un résultat différent, exécutez cette commande :
gcloud config set project <PROJECT_ID>
Résultat de la commande
Updated property [core/project].
5. Créer un projet Maven
Après le lancement de Cloud Shell, commençons par créer un projet Maven à l'aide du SDK Java pour Apache Beam.
Apache Beam est un modèle de programmation Open Source pour les pipelines de données. Vous définissez ces pipelines avec un programme Apache Beam et pouvez choisir un exécuteur, tel que Dataflow, pour les exécuter.
Exécutez la commande mvn archetype:generate
dans votre interface système comme suit:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Une fois la commande exécutée, vous devriez voir un nouveau répertoire nommé first-dataflow
sous votre répertoire actuel. first-dataflow
contient un projet Maven incluant le SDK Cloud Dataflow pour Java, ainsi que des exemples de pipelines.
6. Exécuter un pipeline de traitement de texte dans Cloud Dataflow
Commençons par enregistrer l'ID de projet et les noms de buckets Cloud Storage en tant que variables d'environnement. Vous pouvez effectuer cette opération dans Cloud Shell. Veillez à remplacer <your_project_id>
par l'ID de votre projet.
export PROJECT_ID=<your_project_id>
Nous allons maintenant faire de même pour le bucket Cloud Storage. N'oubliez pas de remplacer <your_bucket_name>
par le nom unique que vous avez utilisé précédemment pour créer votre bucket.
export BUCKET_NAME=<your_bucket_name>
Accédez au répertoire first-dataflow/
.
cd first-dataflow
Nous allons exécuter un pipeline intitulé WordCount, qui lit du texte, segmente les lignes en mots individuels et compte le nombre de fois où chacun de ces mots apparaît. Nous allons d'abord exécuter le pipeline. Pendant l'exécution, nous allons voir ce qui se passe à chaque étape.
Démarrez le pipeline en exécutant la commande mvn compile exec:java
dans la fenêtre de votre interface système ou de votre terminal. Pour les arguments --project, --stagingLocation,
et --output
, la commande ci-dessous fait référence aux variables d'environnement que vous avez configurées précédemment à cette étape.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Pendant que le job est en cours d'exécution, recherchons-le dans la liste des jobs.
Ouvrez l'interface utilisateur Web de Cloud Dataflow dans la console Google Cloud Platform. La tâche "wordcount" s'affiche et son état indique En cours d'exécution:
Examinons maintenant les paramètres du pipeline. Commencez par cliquer sur le nom de votre tâche :
Lorsque vous sélectionnez une tâche, vous pouvez afficher le graphique d'exécution. Sur le graphique d'exécution d'un pipeline, chaque transformation est représentée sous la forme d'une case qui indique son nom et certaines informations sur son état. Vous pouvez cliquer sur la flèche en haut à droite de chaque étape pour afficher plus de détails :
Nous allons à présent détailler les transformations apportées aux données par le pipeline à chaque étape :
- Read: à cette étape, le pipeline lit les données d'une source d'entrée. Dans le cas présent, il s'agit d'un fichier texte stocké dans Cloud Storage comprenant l'intégralité du texte de la pièce de Shakespeare, Le Roi Lear. Notre pipeline lit le fichier ligne par ligne et génère chacune une
PCollection
, où chaque ligne de notre fichier texte est un élément de la collection. - CountWords: l'étape
CountWords
se compose de deux parties. Tout d'abord, elle utilise une fonction ParDo (Parallel Do) nomméeExtractWords
pour tokeniser chaque ligne en mots individuels. La fonction ExtractWords crée une nouvelle PCollection, où chaque élément correspond à un mot. L'étape suivante,Count
, utilise une transformation fournie par le SDK Java qui renvoie des paires clé/valeur où la clé correspond à un mot unique et la valeur au nombre de fois où il apparaît. Voici la méthode d'implémentation deCountWords
. Vous pouvez consulter l'intégralité du fichier WordCount.java sur GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: cette méthode appelle le
FormatAsTextFn
, copié ci-dessous, qui met en forme chaque paire clé/valeur dans une chaîne imprimable.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: au cours de cette étape, nous écrivons les chaînes imprimables dans plusieurs fichiers texte segmentés.
Nous allons examiner les résultats du pipeline dans quelques minutes.
Examinez maintenant la page Job info (Informations sur la tâche) à droite du graphique. Elle inclut les paramètres du pipeline que nous avons inclus dans la commande mvn compile exec:java
.
Les compteurs personnalisés pour le pipeline s'affichent également dans cette section. Dans notre exemple, ils indiquent le nombre de lignes vides détectées jusqu'à présent pendant l'exécution. Vous pouvez ajouter des compteurs à votre pipeline afin de suivre les métriques spécifiques à une application.
Vous pouvez cliquer sur l'icône Journaux en bas de la console pour afficher les messages d'erreur spécifiques.
Par défaut, le panneau affiche les messages du journal du job qui indiquent l'état du job dans son ensemble. Vous pouvez utiliser le sélecteur de niveau de gravité minimal pour filtrer les messages de progression et d'état de la tâche.
La sélection d'une étape du pipeline dans le graphique remplace la vue affichée par les journaux générés par votre code et par le code généré s'exécutant dans l'étape du pipeline.
Pour revenir aux journaux du job, désélectionnez l'étape en cliquant à l'extérieur du graphique ou en utilisant le bouton "Fermer" dans le panneau latéral droit.
Vous pouvez utiliser le bouton Journaux de nœud de calcul de l'onglet "Journaux" pour afficher les journaux des nœuds de calcul pour les instances Compute Engine qui exécutent votre pipeline. Les journaux des tâches comportent des lignes de journal générées par votre code et le code Dataflow généré qui l'exécute.
Si vous essayez de déboguer une erreur dans le pipeline, une journalisation supplémentaire apparaît généralement dans les journaux de travail pour vous aider à résoudre le problème. Gardez à l'esprit que ces journaux sont agrégés pour l'ensemble des nœuds de calcul, et peuvent être filtrés et recherchés.
L'interface de surveillance de Dataflow n'affiche que les messages de journal les plus récents. Vous pouvez afficher tous les journaux en cliquant sur le lien "Google Cloud Observability" (Observabilité Google Cloud) à droite du volet des journaux.
Voici un récapitulatif des différents types de journaux consultables à partir de la page Monitoring → Journaux:
- Les journaux job-message contiennent des messages au niveau du job générés par divers composants de Dataflow. Il peut s'agir, par exemple, de la configuration de l'autoscaling, du démarrage ou de l'arrêt des nœuds de calcul, de la progression d'une étape de la tâche et des erreurs associées à la tâche. Les erreurs au niveau des nœuds de calcul qui sont dues au plantage du code utilisateur et qui sont présentes dans les journaux worker se propagent également dans les journaux job-message.
- Les journaux worker sont produits par les nœuds de calcul Dataflow. Les nœuds de calcul effectuent la majeure partie du travail en pipeline (par exemple, en appliquant vos ParDo aux données). Les journaux des nœuds de calcul contiennent des messages enregistrés par votre code et par Dataflow.
- Les journaux worker-startup sont présents sur la plupart des jobs Dataflow et peuvent capturer les messages liés au processus de démarrage. Le processus de démarrage comprend le téléchargement des fichiers JAR d'un job à partir de Cloud Storage, puis le démarrage des nœuds de calcul. En cas de problème lors du démarrage des nœuds de calcul, vous pouvez consulter ces journaux.
- Les journaux shuffler contiennent les messages des nœuds de calcul qui consolident les résultats des opérations de pipeline exécutées en parallèle.
- Les journaux docker et kubelet contiennent des messages liés à ces technologies publiques, qui sont utilisées sur les nœuds de calcul Dataflow.
À l'étape suivante, nous allons vérifier que votre job a bien été exécuté.
7. Vérifier la bonne exécution de votre job
Ouvrez l'interface utilisateur Web de Cloud Dataflow dans la console Google Cloud Platform.
La tâche "wordcount" doit s'afficher. Elle présente l'état En cours d'exécution au début, puis l'état Réussie:
L'exécution de la tâche prend entre trois et quatre minutes.
Vous vous souvenez de l'étape où vous avez exécuté le pipeline et spécifié un bucket de sortie ? Examinons le résultat, car nous voulons évidemment savoir combien de fois apparaît chaque mot de la pièce Le Roi Lear. Revenez au navigateur Cloud Storage dans la console Google Cloud Platform. Dans votre bucket, vous devriez voir les fichiers de sortie et les fichiers intermédiaires créés par votre tâche :
8. Arrêter vos ressources
Vous pouvez arrêter vos ressources à partir de la console Google Cloud Platform.
Ouvrez le navigateur Cloud Storage dans la console Google Cloud Platform.
Cochez la case à côté du bucket que vous avez créé, puis cliquez sur SUPPRIMER pour supprimer définitivement le bucket et son contenu.
9. Félicitations !
Vous avez appris à créer un projet Maven à l'aide du SDK Cloud Dataflow, à exécuter un exemple de pipeline dans la console Google Cloud Platform, ainsi qu'à supprimer le bucket Cloud Storage associé et son contenu.
En savoir plus
- Documentation relative à Dataflow : https://cloud.google.com/dataflow/docs/
Licence
Ce contenu est concédé sous licence générique Attribution 3.0 Creative Commons et licence Apache 2.0.