1. Descripción general

¿Qué es Dataflow?
Dataflow es un servicio administrado para ejecutar una amplia variedad de patrones de procesamiento de datos. La documentación de este sitio te muestra cómo implementar las canalizaciones de procesamiento de datos por lotes y de transmisión con Dataflow, incluidas las instrucciones para usar las características del servicio.
El SDK de Apache Beam es un modelo de programación de código abierto que te permite desarrollar canalizaciones de transmisión y por lotes. Puedes crear tus canalizaciones con un programa de Apache Beam y, luego, ejecutarlas en el servicio de Dataflow. La documentación de Apache Beam proporciona información conceptual detallada y material de referencia sobre el modelo de programación de Apache Beam, los SDK y otros ejecutores.
Transmite estadísticas de datos con velocidad
Dataflow permite desarrollar canalizaciones de transmisión de datos de forma simplificada y rápida con una latencia de datos más baja.
Simplifica las operaciones y la administración
Permite que los equipos se centren en programar en lugar de administrar clústeres de servidores, ya que el enfoque sin servidores de Dataflow quita la sobrecarga operativa de las cargas de trabajo de ingeniería de datos.
Reduce el costo total de propiedad
Dataflow combina el ajuste de escala automático de los recursos con las capacidades de procesamiento por lotes con optimización del costo, por lo que puede ofrecer una capacidad prácticamente ilimitada para que administre las cargas de trabajo estacionales y con incrementos bruscos sin gastar de más.
Funciones clave
Administración de recursos automatizada y rebalanceo dinámico de trabajos
Dataflow automatiza el aprovisionamiento y la administración de los recursos de procesamiento para minimizar la latencia y maximizar el uso, de modo que no tengas que iniciar instancias manualmente ni reservarlas. La partición de trabajo también está automatizada y optimizada para volver a balancear de forma dinámica las tareas atrasadas. No es necesario buscar “teclas de acceso rápido” ni procesar previamente los datos de entrada.
Ajuste de escala automático horizontal
El ajuste de escala automático horizontal de los recursos de los trabajadores para alcanzar una capacidad de procesamiento óptima tiene como resultado una mejor relación general entre precio y rendimiento.
Programación flexible de recursos a un bajo precio para el procesamiento por lotes
A fin de procesar los trabajos, como los nocturnos, de forma flexible y según su programación, la programación flexible de recursos (FlexRS) ofrece un precio más bajo para el procesamiento por lotes. Estos trabajos flexibles se posicionan en una fila que garantiza su recuperación para ejecutarlos en un período de seis horas.
Este instructivo se adaptó de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
Qué aprenderás
- Cómo crear un proyecto de Maven con Apache Beam usando el SDK de Java
- Cómo ejecutar una canalización de ejemplo con Google Cloud Platform Console
- Cómo borrar el bucket de Cloud Storage asociado y sus contenidos
Requisitos
¿Cómo usarás este instructivo?
¿Cómo calificarías tu experiencia con el uso de los servicios de Google Cloud Platform?
2. Configuración y requisitos
Configuración del entorno de autoaprendizaje
- Accede a la consola de Cloud y crea un proyecto nuevo o reutiliza uno existente. (Si todavía no tienes una cuenta de Gmail o de G Suite, debes crear una).
Recuerde el ID de proyecto, un nombre único en todos los proyectos de Google Cloud (el nombre anterior ya se encuentra en uso y no lo podrá usar). Se mencionará más adelante en este codelab como PROJECT_ID.
- A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud recursos.
Ejecutar este codelab no debería costar mucho, tal vez nada. Asegúrate de seguir las instrucciones de la sección “Realiza una limpieza”, en la que se aconseja cómo cerrar recursos para que no se te facture más allá de este instructivo. Los usuarios nuevos de Google Cloud son aptos para participar en el programa Prueba gratuita de $300.
Habilitar las API
Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

En el menú desplegable, selecciona APIs y servicios > Panel.

Selecciona + Habilitar APIs y servicios.

Busca "Compute Engine" en el cuadro de búsqueda. Haz clic en "Compute Engine API" en la lista de resultados que aparece.

En la página de Google Compute Engine, haz clic en Habilitar.

Una vez habilitada, haz clic en la flecha para volver.
Ahora busca las siguientes APIs y habilítalas también:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- JSON de Cloud Storage
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- APIs de Cloud Resource Manager
3. Crear un nuevo bucket de Cloud Storage
En Google Cloud Platform Console, haz clic en el ícono de menú en la parte superior izquierda de la pantalla:

Desplázate hacia abajo y selecciona Cloud Storage > Navegador en la subsección Almacenamiento:

Ahora deberías ver el navegador de Cloud Storage y, si usas un proyecto que actualmente no tiene ningún bucket de Cloud Storage, verás una invitación para crear uno nuevo. Presiona el botón Crear bucket para crear uno:

Ingresa un nombre para tu bucket. Como se indica en el cuadro de diálogo, los nombres de los buckets deben ser únicos en todo Cloud Storage. Por lo tanto, si eliges un nombre obvio, como "prueba", es probable que otra persona ya haya creado un bucket con ese nombre y recibas un error.
También hay algunas reglas sobre los caracteres permitidos en los nombres de los buckets. Si comienzas y terminas el nombre de tu bucket con una letra o un número, y solo usas guiones en el medio, no tendrás problemas. Si intentas usar caracteres especiales o comenzar o terminar el nombre del bucket con algo que no sea una letra o un número, el cuadro de diálogo te recordará las reglas.

Ingresa un nombre único para tu bucket y presiona Crear. Si eliges algo que ya está en uso, verás el mensaje de error que se muestra arriba. Cuando hayas creado correctamente un bucket, se te redirigirá a tu nuevo bucket vacío en el navegador:

Por supuesto, el nombre del bucket que verás será diferente, ya que debe ser único en todos los proyectos.
4. Inicie Cloud Shell
Activar Cloud Shell
- En la consola de Cloud, haz clic en Activar Cloud Shell
.
Si nunca ha iniciado Cloud Shell, aparecerá una pantalla intermedia (debajo de la mitad inferior de la página) que describe qué es. Si ese es el caso, haz clic en Continuar (y no volverás a verlo). Así es como se ve la pantalla única:
El aprovisionamiento y la conexión a Cloud Shell solo tomará unos minutos.
Esta máquina virtual está cargada con todas las herramientas de desarrollo que necesitarás. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Gran parte de tu trabajo en este codelab, si no todo, se puede hacer simplemente con un navegador o tu Chromebook.
Una vez conectado a Cloud Shell, debería ver que ya se autenticó y que el proyecto ya se configuró con tu ID del proyecto.
- En Cloud Shell, ejecuta el siguiente comando para confirmar que está autenticado:
gcloud auth list
Resultado del comando
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
Resultado del comando
[core] project = <PROJECT_ID>
De lo contrario, puedes configurarlo con el siguiente comando:
gcloud config set project <PROJECT_ID>
Resultado del comando
Updated property [core/project].
5. Cree un proyecto de Maven
Después de que se inicie Cloud Shell, comencemos por crear un proyecto de Maven con el SDK de Java para Apache Beam.
Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir estas canalizaciones con un programa Apache Beam y puedes elegir un ejecutor, como Dataflow, para ejecutar tu canalización.
Ejecuta el comando mvn archetype:generate en tu shell de la siguiente manera:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
Después de ejecutar el comando, deberías ver un directorio nuevo llamado first-dataflow en tu directorio actual. first-dataflow contiene un proyecto de Maven que incluye el SDK de Cloud Dataflow para Java y canalizaciones de ejemplo.
6. Ejecute una canalización de procesamiento de texto en Cloud Dataflow
Comencemos por guardar el ID del proyecto y los nombres de los buckets de Cloud Storage como variables de entorno. Puedes hacer esto en Cloud Shell. Asegúrate de reemplazar <your_project_id> por el ID de tu proyecto.
export PROJECT_ID=<your_project_id>
Ahora haremos lo mismo con el bucket de Cloud Storage. Recuerda reemplazar <your_bucket_name> por el nombre único que usaste para crear tu bucket en un paso anterior.
export BUCKET_NAME=<your_bucket_name>
Cambia al directorio first-dataflow/.
cd first-dataflow
A continuación, ejecutaremos una canalización denominada WordCount, que lee texto, convierte las líneas de texto en tokens de palabras individuales y cuenta la frecuencia con la que aparece cada una. Primero, ejecutaremos la canalización y, mientras se ejecuta, analizaremos lo que sucede en cada paso.
Para iniciar la canalización, ejecuta el comando mvn compile exec:java en tu shell o ventana de terminal. Para los argumentos --project, --stagingLocation, y --output, el siguiente comando hace referencia a las variables de entorno que configuraste anteriormente en este paso.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Mientras se ejecuta el trabajo, busquémoslo en la lista de trabajos.
Abre la IU web de Cloud Dataflow en Google Cloud Platform Console. Deberías ver tu trabajo de conteo de palabras con el estado En ejecución:

Ahora, veamos los parámetros de la canalización. Comience por hacer clic en el nombre de su trabajo:

Cuando seleccionas un trabajo, puedes ver el grafo de ejecución. El gráfico de ejecución de una canalización representa cada una de sus transformaciones como un cuadro que contiene el nombre de la transformación y algo de información de su estado. Puede hacer clic en el símbolo que aparece en la esquina superior derecha de cada paso para ver más detalles:

Veamos cómo la canalización transforma los datos en cada paso:
- Lectura: En este paso, la canalización lee de una fuente de entrada. En este caso, es un archivo de texto de Cloud Storage con el texto completo de la obra de Shakespeare El rey Lear. Nuestra canalización lee el archivo línea por línea y genera cada una como una
PCollection, en la que cada línea de nuestro archivo de texto es un elemento de la colección. - CountWords: El paso
CountWordsse divide en dos partes. Primero, usa una función Do paralela (ParDo) llamadaExtractWordspara dividir cada línea en palabras individuales. La salida de ExtractWords es una PCollection nueva en la que cada elemento es una palabra. El siguiente paso,Count, utiliza una transformación proporcionada por el SDK de Java que devuelve pares clave-valor en los que la clave es una palabra única y el valor es la cantidad de veces que aparece. Este es el método que implementaCountWords. Puedes consultar el archivo WordCount.java completo en GitHub:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: Invoca el
FormatAsTextFn, que se copia a continuación, que da formato a cada par clave-valor en una cadena imprimible.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: En este paso, escribimos las cadenas imprimibles en varios archivos de texto fragmentados.
En unos minutos, veremos los resultados obtenidos de la canalización.
Ahora, veamos la página Información del trabajo a la derecha del gráfico, que incluye los parámetros de canalización que incluimos en el comando mvn compile exec:java.


También puede ver los Contadores personalizados de la canalización, que, en este caso, muestran cuántas líneas vacías se encontraron hasta el momento durante la ejecución. Puedes agregar contadores nuevos a tu canalización para hacer un seguimiento de las métricas específicas de la aplicación.

Puedes hacer clic en el ícono de Registros en la parte inferior de la consola para ver los mensajes de error específicos.

De forma predeterminada, el panel muestra mensajes de registro de trabajos que informan el estado del trabajo en su totalidad. Puedes usar el selector Gravedad mínima para filtrar los mensajes de estado y de progreso del trabajo.

Seleccionar un paso de canalización en el grafo cambia la vista a los registros que genera tu código y al código generado que se ejecuta en el paso de la canalización.
Para volver a Registros de trabajos, anula la selección del paso haciendo clic fuera del gráfico o con el botón Cerrar en el panel lateral derecho.
Puedes usar el botón Registros de trabajador en la pestaña de registros para ver los registros de trabajador de las instancias de Compute Engine que ejecutan tu canalización. Los registros de trabajador constan de líneas de registro generadas por su código y el código generado por Dataflow que lo ejecuta.
Si intentas depurar una falla en la canalización, muchas veces habrá registros adicionales en los Registros de trabajador que te ayudarán a resolver el problema. Ten en cuenta que estos registros se agregan en todos los trabajadores y se pueden filtrar y buscar.

La Interfaz de supervisión de Dataflow solo muestra los mensajes de registro más recientes. Para ver todos los registros, haz clic en el vínculo de Google Cloud Observability que se encuentra en el lado derecho del panel de registros.

Aquí hay un resumen de los diferentes tipos de registro disponibles para ver en la página Supervisión → Registros:
- Los registros de job-message contienen mensajes a nivel del trabajo que generan varios componentes de Dataflow. Los ejemplos incluyen la configuración del ajuste de escala automático, cuando los trabajadores inician o cierran una instancia, el avance en el paso del trabajo y los errores del trabajo. Los errores a nivel del trabajador que se originan en una falla del código del usuario y que están presentes en los registros del trabajador también se propagan a los registros job-message.
- Los trabajadores de Dataflow producen los registros de worker. Los trabajadores realizan la mayor parte del trabajo de la canalización (por ejemplo, aplicar tus ParDos a los datos). Los registros de worker contienen mensajes registrados por tu código y Dataflow.
- Los registros de worker-startup están presentes en la mayoría de los trabajos de Dataflow y pueden capturar los mensajes que se relacionan con el proceso de inicio. El proceso de inicio incluye la descarga de los archivos JAR de un trabajo desde Cloud Storage y, luego, el inicio de los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscar información.
- Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
- Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas, que se usan en los trabajadores de Dataflow.
En el siguiente paso, verificaremos que el trabajo se haya ejecutado correctamente.
7. Comprueba si el trabajo se ejecutó de forma correcta.
Abre la IU web de Cloud Dataflow en Google Cloud Platform Console.
Primero, deberías ver el trabajo de conteo de palabras con el estado En ejecución y, luego, Correcto:

El trabajo tardará entre 3 y 4 minutos en ejecutarse.
¿Recuerda cuando ejecutó la canalización y especificó el bucket para los resultados? Veamos los resultados (¿o no quiere saber cuántas veces aparece cada palabra en El rey Lear?). Vuelve al navegador de Cloud Storage en Google Cloud Platform Console. En el bucket, deberías ver los archivos generados y los archivos de etapa de pruebas que creó el trabajo:

8. Cierra tus recursos
Puedes apagar tus recursos desde Google Cloud Platform Console.
Abre el navegador de Cloud Storage en Google Cloud Platform Console.

Selecciona la casilla de verificación junto al bucket que creaste y haz clic en BORRAR para borrar de forma permanente el bucket y su contenido.


9. ¡Felicitaciones!
Aprendió a crear un proyecto de Maven con el SDK de Cloud Dataflow, ejecutar una canalización de ejemplo con Google Cloud Platform Console y borrar el bucket de Cloud Storage asociado y sus contenidos.
Más información
- Documentación de Dataflow: https://cloud.google.com/dataflow/docs/
Licencia
Esta obra se ofrece bajo una licencia Creative Commons Atribución 3.0 genérica y una licencia Apache 2.0.