Ejecuta una canalización de procesamiento de texto de macrodatos en Cloud Dataflow

1. Descripción general

Cloud-Dataflow.png

¿Qué es Dataflow?

Dataflow es un servicio administrado para ejecutar una amplia variedad de patrones de procesamiento de datos. En la documentación de este sitio, se muestra cómo implementar tus canalizaciones de procesamiento de datos por lotes y de transmisión con Dataflow, incluidas las instrucciones para usar las funciones del servicio.

El SDK de Apache Beam es un modelo de programación de código abierto que te permite desarrollar canalizaciones de transmisión y por lotes. Creas tus canalizaciones con un programa de Apache Beam y, luego, las ejecutas en el servicio de Dataflow. La documentación de Apache Beam proporciona información conceptual detallada y material de referencia para el modelo de programación de Apache Beam, los SDK y otros ejecutores.

Transmisión de análisis de datos con velocidad

Dataflow permite desarrollar canalizaciones de transmisión de datos de forma rápida y simplificada con una latencia de datos más baja.

Simplifica las operaciones y la administración

Permite que los equipos se enfoquen en programar en lugar de administrar clústeres de servidores, ya que el enfoque sin servidores de Dataflow quita la sobrecarga operativa de las cargas de trabajo de ingeniería de datos.

Reduce el costo total de propiedad

Dataflow combina el ajuste de escala automático de los recursos con las capacidades de procesamiento por lotes con optimización del costo, por lo que puede ofrecer una capacidad prácticamente ilimitada para que administre las cargas de trabajo estacionales y con incrementos bruscos sin gastar de más.

Funciones clave

Administración de recursos automatizada y rebalanceo dinámico de trabajos

Dataflow automatiza el aprovisionamiento y la administración de los recursos de procesamiento para minimizar la latencia y maximizar el uso, de modo que no tengas que iniciar instancias ni reservarlas de forma manual. La partición de trabajo también está automatizada y optimizada para volver a balancear dinámicamente las tareas atrasadas. No es necesario buscar “teclas de acceso rápido” o preprocesar los datos de entrada.

Ajuste de escala automático horizontal

El ajuste de escala automático horizontal de los recursos de los trabajadores para alcanzar una capacidad de procesamiento óptima tiene como resultado una mejor relación general entre precio y rendimiento.

Programación flexible de recursos a un bajo precio para el procesamiento por lotes

A fin de procesar de forma flexible la hora de la programación de los trabajos, como los nocturnos, la programación flexible de recursos (FlexRS) ofrece un precio más bajo para el procesamiento por lotes. Estos trabajos flexibles se posicionan en una cola que garantiza su recuperación para ejecutarlos en un período de seis horas.

Este instructivo está adaptado de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

Qué aprenderás

  • Cómo crear un proyecto de Maven con Apache Beam mediante el SDK de Java
  • Cómo ejecutar una canalización de ejemplo con Google Cloud Platform Console
  • Cómo borrar el bucket de Cloud Storage asociado y sus contenidos

Requisitos

¿Cómo usarás este instructivo?

Ler Leer y completar los ejercicios

¿Cómo calificarías tu experiencia en el uso de los servicios de Google Cloud Platform?

Principiante Intermedio Avanzado .
.

2. Configuración y requisitos

Configuración del entorno de autoaprendizaje

  1. Accede a la consola de Cloud y crea un proyecto nuevo o reutiliza uno existente. (Si todavía no tienes una cuenta de Gmail o de G Suite, debes crear una).

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Recuerde el ID de proyecto, un nombre único en todos los proyectos de Google Cloud (el nombre anterior ya se encuentra en uso y no lo podrá usar). Se mencionará más adelante en este codelab como PROJECT_ID.

  1. A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud recursos.

Ejecutar este codelab no debería costar mucho, tal vez nada. Asegúrate de seguir las instrucciones de la sección “Realiza una limpieza” en la que se aconseja cómo cerrar recursos para no incurrir en facturación más allá de este instructivo. Los usuarios nuevos de Google Cloud son aptos para participar en el programa Prueba gratuita de $300.

Habilitar las API

Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

2bfc27ef9ba2ec7d.png

Selecciona APIs y Servicios > Panel de control en el menú desplegable.

5b65523a6cc0afa6.png

Selecciona + Habilitar APIs y servicios.

81ed72192c0edd96.png

Busca "Compute Engine". en el cuadro de búsqueda. Haz clic en “API de Compute Engine” en la lista de resultados que aparece.

3f201e991c7b4527.png

En la página de Google Compute Engine, haz clic en Habilitar.

ac121653277fa7bb.png

Una vez que se habilite, haz clic en la flecha para volver.

Ahora busca las siguientes APIs y habilítalas también:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • JSON de Cloud Storage
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • APIs de Cloud Resource Manager

3. Cree un nuevo bucket de Cloud Storage

En la consola de Google Cloud, haz clic en el ícono de menú en la parte superior izquierda de la pantalla:

2bfc27ef9ba2ec7d.png

Desplázate hacia abajo y selecciona Cloud Storage > Browser en la subsección Storage:

2b6c3a2a92b47015.png

Ahora deberías ver el navegador de Cloud Storage. Si estás usando un proyecto que actualmente no tiene buckets de Cloud Storage, verás una invitación para crear un bucket nuevo. Presiona el botón Crear bucket para crear uno:

a711016d5a99dc37.png

Ingresa un nombre para tu bucket. Como se indica en el cuadro de diálogo, los nombres de los buckets deben ser únicos en todo Cloud Storage. Por lo tanto, si eliges un nombre obvio, como “prueba”, probablemente descubrirás que alguien más ya creó un bucket con ese nombre y recibas un error.

También hay algunas reglas sobre qué caracteres están permitidos en los nombres de los buckets. Si comienzas y terminas el nombre de tu bucket con una letra o un número, y solo usas guiones en el medio, no tendrás problemas. Si intentas utilizar caracteres especiales o empezar o terminar el nombre de tu bucket con algo que no sea una letra o un número, el cuadro de diálogo te recordará las reglas.

3a5458648cfe3358.png

Ingresa un nombre único para tu bucket y presiona Crear. Si eliges un dispositivo que ya está en uso, verás el mensaje de error que aparece arriba. Una vez que hayas creado correctamente un bucket, se te dirigirá a tu nuevo bucket vacío en el navegador:

3bda986ae88c4e71.png

Por supuesto, el nombre del bucket será diferente, ya que debe ser único en todos los proyectos.

4. Inicie Cloud Shell

Activar Cloud Shell

  1. En la consola de Cloud, haz clic en Activar Cloud ShellH7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ.

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Si nunca ha iniciado Cloud Shell, aparecerá una pantalla intermedia (debajo de la mitad inferior de la página) que describe qué es. Si ese es el caso, haz clic en Continuar (y no volverás a verlo). Así es como se ve la pantalla única:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

El aprovisionamiento y la conexión a Cloud Shell solo tomará unos minutos.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

Esta máquina virtual está cargada con todas las herramientas de desarrollo que necesitarás. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Gran parte de tu trabajo en este codelab, si no todo, se puede hacer simplemente con un navegador o tu Chromebook.

Una vez conectado a Cloud Shell, debería ver que ya se autenticó y que el proyecto ya se configuró con tu ID del proyecto.

  1. En Cloud Shell, ejecuta el siguiente comando para confirmar que está autenticado:
gcloud auth list

Resultado del comando

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Resultado del comando

[core]
project = <PROJECT_ID>

De lo contrario, puedes configurarlo con el siguiente comando:

gcloud config set project <PROJECT_ID>

Resultado del comando

Updated property [core/project].

5. Cree un proyecto de Maven

Después del inicio de Cloud Shell, comencemos creando un proyecto de Maven con el SDK de Java para Apache Beam.

Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Estas canalizaciones se definen con un programa de Apache Beam y se puede elegir un ejecutor, como Dataflow, para ejecutar la canalización.

Ejecuta el comando mvn archetype:generate en tu shell de la siguiente manera:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Después de ejecutar el comando, deberías ver un directorio nuevo llamado first-dataflow en tu directorio actual. first-dataflow contiene un proyecto de Maven que incluye el SDK de Cloud Dataflow para Java y canalizaciones de ejemplo.

6. Ejecute una canalización de procesamiento de texto en Cloud Dataflow

Para comenzar, guardemos el ID de nuestro proyecto y los nombres de los buckets de Cloud Storage como variables de entorno. Puedes hacer esto en Cloud Shell. Asegúrate de reemplazar <your_project_id> por el ID de tu proyecto.

 export PROJECT_ID=<your_project_id>

Ahora, haremos lo mismo con el bucket de Cloud Storage. Recuerda reemplazar <your_bucket_name> por el nombre único que usaste para crear tu bucket en un paso anterior.

 export BUCKET_NAME=<your_bucket_name>

Cambia al directorio first-dataflow/.

 cd first-dataflow

A continuación, ejecutaremos una canalización denominada WordCount, que lee texto, convierte las líneas de texto en tokens de palabras individuales y cuenta la frecuencia con la que aparece cada una. Primero, ejecutaremos la canalización y, mientras se ejecuta, veremos lo que sucede en cada paso.

Para iniciar la canalización, ejecuta el comando mvn compile exec:java en tu shell o ventana de terminal. Para los argumentos --project, --stagingLocation, y --output, el siguiente comando hace referencia a las variables de entorno que configuraste antes en este paso.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Mientras se ejecuta el trabajo, búscalo en la lista de trabajos.

Abre la IU web de Cloud Dataflow en Google Cloud Platform Console. Deberías ver el trabajo de conteo de palabras con el estado En ejecución:

3623be74922e3209.png

Ahora, veamos los parámetros de la canalización. Comience por hacer clic en el nombre de su trabajo:

816d8f59c72797d7.png

Cuando seleccionas un trabajo, puedes ver el gráfico de ejecución. El gráfico de ejecución de una canalización representa cada una de sus transformaciones como un cuadro que contiene el nombre de la transformación y algo de información de su estado. Puede hacer clic en el símbolo que aparece en la esquina superior derecha de cada paso para ver más detalles:

80a972dd19a6f1eb.png

Veamos cómo la canalización transforma los datos en cada paso:

  • Read: En este paso, la canalización lee desde una fuente de entrada. En este caso, es un archivo de texto de Cloud Storage con el texto completo de la obra de teatro de Shakespeare El rey Lear. Nuestra canalización lee el archivo línea por línea y genera un PCollection para cada uno, en el que cada línea del archivo de texto es un elemento de la colección.
  • CountWords: El paso CountWords se divide en dos partes. Primero, usa una función de tarea paralela (ParDo) denominada ExtractWords para asignar un token a cada línea en palabras individuales. Como resultado de ExtractWords, se genera una PCollection nueva en la que cada elemento es una palabra. En el siguiente paso, Count, se utiliza una transformación proporcionada por el SDK de Java que muestra pares clave-valor en los que la clave es una palabra única y el valor es la cantidad de veces que aparece. Este es el método que implementa CountWords. Puedes consultar el archivo WordCount.java completo en GitHub:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: Esto invoca el elemento FormatAsTextFn, que se copia a continuación, que formatea cada par clave-valor y lo convierte en una string imprimible.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: En este paso, escribimos las cadenas imprimibles en varios archivos de texto fragmentados.

En unos minutos, veremos los resultados obtenidos de la canalización.

Ahora, observa la página Información del trabajo a la derecha del gráfico, que incluye los parámetros de canalización que incluimos en el comando mvn compile exec:java.

9723815a1f5bf08b.png

208a7f0d6973acf6.png

También puede ver los Contadores personalizados de la canalización, que, en este caso, muestran cuántas líneas vacías se encontraron hasta el momento durante la ejecución. Puedes agregar contadores nuevos a tu canalización para realizar un seguimiento de las métricas específicas de la aplicación.

a2e2800e2c6893f8.png

Puedes hacer clic en el ícono Registros en la parte inferior de la consola para ver los mensajes de error específicos.

23c64138a1027f8.png

De forma predeterminada, el panel muestra mensajes de registro de trabajos que informan el estado del trabajo en su totalidad. Puedes usar el selector de gravedad mínima para filtrar los mensajes de estado y el progreso del trabajo.

94ba42015fdafbe2.png

Seleccionar un paso de canalización en el grafo cambia la vista a los registros que genera tu código y al código generado que se ejecuta en el paso de la canalización.

Para volver a Job Logs, anula la selección del paso haciendo clic fuera del gráfico o usando el botón Close (cerrar) en el panel lateral derecho.

Puedes usar el botón Registros de trabajador de la pestaña Registros para ver los registros de trabajadores de las instancias de Compute Engine que ejecutan tu canalización. Los registros de trabajador constan de líneas de registro generadas por su código y el código generado por Dataflow que lo ejecuta.

Si intentas depurar una falla en la canalización, a menudo habrá registros adicionales en los Registros de trabajador que ayudan a resolver el problema. Ten en cuenta que estos registros se agregan a todos los trabajadores y se pueden filtrar y buscar.

5a53c244f28d5478.png

La interfaz de supervisión de Dataflow muestra solo los mensajes de registro más recientes. Para ver todos los registros, haz clic en el vínculo de Google Cloud Observability en el lado derecho del panel de registros.

2bc704a4d6529b31.png

Aquí hay un resumen de los diferentes tipos de registros disponibles para ver en la página Monitoring→Registros:

  • Los registros de job-message contienen mensajes a nivel del trabajo que generan varios componentes de Dataflow. Los ejemplos incluyen la configuración del ajuste de escala automático, el momento en que los trabajadores se inician o cierran, el progreso en el paso del trabajo y los errores del trabajo. Los errores a nivel del trabajador que se originan en una falla del código del usuario y que están presentes en los registros de worker también se propagan a los registros de job-message.
  • Los trabajadores de Dataflow producen los registros de worker. Los trabajadores hacen la mayor parte del trabajo de canalización (por ejemplo, aplicar tus ParDo a los datos). Los registros de Worker contienen mensajes registrados por tu código y Dataflow.
  • Los registros de worker-startup están presentes en la mayoría de los trabajos de Dataflow y pueden capturar mensajes relacionados con el proceso de inicio. El proceso de inicio incluye descargar los archivos jar de un trabajo desde Cloud Storage y, luego, iniciar los trabajadores. Si hay un problema cuando se inician los trabajadores, estos registros son un buen lugar para buscarlos.
  • Los registros de shuffler contienen mensajes de los trabajadores que consolidan los resultados de las operaciones de canalización paralelas.
  • Los registros de docker y kubelet contienen mensajes relacionados con estas tecnologías públicas, que se usan en los trabajadores de Dataflow.

En el siguiente paso, comprobaremos que el trabajo se haya realizado correctamente.

7. Comprueba si el trabajo se ejecutó de forma correcta.

Abre la IU web de Cloud Dataflow en Google Cloud Platform Console.

Deberías ver el trabajo de conteo de palabras con estado Running primero y, luego, Succeeded:

4c408162416d03a2.png

El trabajo tardará entre 3 y 4 minutos en ejecutarse.

¿Recuerda cuando ejecutó la canalización y especificó el bucket para los resultados? Veamos los resultados (¿o no quiere saber cuántas veces aparece cada palabra en El rey Lear?). Regresa al navegador de Cloud Storage en Google Cloud Platform Console. En el bucket, deberías ver los archivos generados y los archivos de etapa de pruebas que creó el trabajo:

25a5d3d4b5d0b567.png

8. Detén tus recursos

Puedes cerrar tus recursos desde Google Cloud Platform Console.

Abre el navegador de Cloud Storage en Google Cloud Platform Console.

2b6c3a2a92b47015.png

Selecciona la casilla de verificación junto al bucket que creaste y haz clic en BORRAR para borrar permanentemente el bucket y su contenido.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. ¡Felicitaciones!

Aprendió a crear un proyecto de Maven con el SDK de Cloud Dataflow, ejecutar una canalización de ejemplo con Google Cloud Platform Console y borrar el bucket de Cloud Storage asociado y sus contenidos.

Más información

Licencia

Esta obra se ofrece bajo una licencia genérica de Creative Commons Attribution 3.0 y Apache 2.0.