Usa notebooks con Google Cloud Dataflow

1. Introducción

Cloud-Dataflow.png

Google Cloud Dataflow

Última actualización: 5 de julio de 2023

¿Qué es Dataflow?

Dataflow es un servicio administrado para ejecutar una amplia variedad de patrones de procesamiento de datos. En la documentación de este sitio, se muestra cómo implementar tus canalizaciones de procesamiento de datos por lotes y de transmisión con Dataflow, incluidas las instrucciones para usar las funciones del servicio.

El SDK de Apache Beam es un modelo de programación de código abierto que te permite desarrollar canalizaciones de transmisión y por lotes. Creas tus canalizaciones con un programa de Apache Beam y, luego, las ejecutas en el servicio de Dataflow. La documentación de Apache Beam proporciona información conceptual detallada y material de referencia para el modelo de programación de Apache Beam, los SDK y otros ejecutores.

Transmisión de análisis de datos con velocidad

Dataflow permite desarrollar canalizaciones de transmisión de datos de forma rápida y simplificada con una latencia de datos más baja.

Simplifica las operaciones y la administración

Permite que los equipos se enfoquen en programar en lugar de administrar clústeres de servidores, ya que el enfoque sin servidores de Dataflow quita la sobrecarga operativa de las cargas de trabajo de ingeniería de datos.

Reduce el costo total de propiedad

Dataflow combina el ajuste de escala automático de los recursos con las capacidades de procesamiento por lotes con optimización del costo, por lo que puede ofrecer una capacidad prácticamente ilimitada para que administre las cargas de trabajo estacionales y con incrementos bruscos sin gastar de más.

Funciones clave

Administración de recursos automatizada y rebalanceo dinámico de trabajos

Dataflow automatiza el aprovisionamiento y la administración de los recursos de procesamiento para minimizar la latencia y maximizar el uso, de modo que no tengas que iniciar instancias ni reservarlas de forma manual. La partición de trabajo también está automatizada y optimizada para volver a balancear dinámicamente las tareas atrasadas. No es necesario buscar “teclas de acceso rápido” o preprocesar los datos de entrada.

Ajuste de escala automático horizontal

El ajuste de escala automático horizontal de los recursos de los trabajadores para alcanzar una capacidad de procesamiento óptima tiene como resultado una mejor relación general entre precio y rendimiento.

Programación flexible de recursos a un bajo precio para el procesamiento por lotes

A fin de procesar de forma flexible la hora de la programación de los trabajos, como los nocturnos, la programación flexible de recursos (FlexRS) ofrece un precio más bajo para el procesamiento por lotes. Estos trabajos flexibles se posicionan en una cola que garantiza su recuperación para ejecutarlos en un período de seis horas.

Qué ejecutarás como parte de esto

Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar tu grafo de canalización y analizar PCollections individuales en un flujo de trabajo REPL (bucle de lectura-evaluación-impresión). Estos notebooks de Apache Beam están disponibles a través de Vertex AI Workbench, un servicio administrado que aloja máquinas virtuales de notebooks preinstaladas con los frameworks más recientes de ciencia de datos y aprendizaje automático.

Este codelab se enfoca en la funcionalidad que presentan los notebooks de Apache Beam.

Qué aprenderás

  • Cómo crear una instancia de notebook
  • Crea una canalización básica
  • Lee datos de una fuente no delimitada
  • Visualiza los datos
  • Inicia un trabajo de Dataflow desde el notebook
  • Guarda un notebook

Requisitos

  • Un proyecto de Google Cloud Platform con la facturación habilitada
  • Tener habilitados Google Cloud Dataflow y Google Cloud Pub/Sub.

2. Cómo prepararte

  1. En la consola de Cloud, en la página del selector de proyectos, selecciona o crea un proyecto de Cloud.

Asegúrate de tener habilitadas las siguientes APIs:

  • API de Dataflow
  • API de Cloud Pub/Sub
  • Compute Engine
  • API de Notebooks

Para verificarlo, consulta las APIs y en la página Servicios.

En esta guía, leeremos datos de una suscripción a Pub/Sub. Por lo tanto, asegúrate de que la cuenta de servicio predeterminada de Compute Engine tenga el rol de editor, o bien otorga el rol de editor de Pub/Sub.

3. Comienza a usar los notebooks de Apache Beam

Inicia una instancia de notebook de Apache Beam

  1. Inicia Dataflow en la consola:

  1. Selecciona la página Workbench en el menú de la izquierda.
  2. Asegúrate de estar en la pestaña Notebooks administrados por el usuario.
  3. En la barra de herramientas, haz clic en Nuevo notebook.
  4. Selecciona Apache Beam > Sin GPU.
  5. En la página Nuevo notebook, selecciona una subred para la VM del notebook y haz clic en Crear.
  6. Haz clic en Abrir JupyterLab cuando se active el vínculo. Vertex AI Workbench crea una nueva instancia de notebook de Apache Beam.

4. Crea la canalización

Crea una instancia de notebook

Navega a Archivo > Nuevo > Notebook y selecciona un kernel que sea Apache Beam 2.47 o una versión posterior.

Comienza a agregar código a tu notebook

  • Copia y pega el código de cada sección en una celda nueva de tu notebook.
  • Ejecuta la celda

6bd3dd86cc7cf802.png

Usar el ejecutor interactivo de Apache Beam con notebooks de JupyterLab te permite desarrollar canalizaciones de forma iterativa, inspeccionar tu grafo de canalización y analizar PCollections individuales en un flujo de trabajo REPL (bucle de lectura-evaluación-impresión).

Apache Beam está instalado en tu instancia de notebook, así que incluye los módulos interactive_runner y interactive_beam en el notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Si tu notebook usa otros servicios de Google, agrega las siguientes sentencias de importación:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

Configura opciones de interactividad

A continuación, se establece la duración de la captura de datos en 60 segundos. Si deseas iterar más rápido, configúralo con una duración menor, por ejemplo, “10s”.

ib.options.recording_duration = '60s'

Para obtener más opciones interactivas, consulta la claseinteractive_beam.options.

Inicializa la canalización mediante un objeto InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

Lee y visualiza los datos

En el siguiente ejemplo, se muestra una canalización de Apache Beam que crea una suscripción al tema de Pub/Sub determinado y lee desde la suscripción.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

La canalización cuenta las palabras por ventanas desde la fuente. Crea ventanas fijas de 10 segundos de duración cada uno.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

Una vez que los datos se organizaron en ventanas, las palabras se cuentan por ventana.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

Visualiza los datos

Con el método show(), se visualiza la PCollection resultante en el notebook.

ib.show(windowed_word_counts, include_window_info=True)

El método show con el que se visualiza una PCollection en formato de tabla

Para mostrar visualizaciones de tus datos, pasa visualize_data=True al método show(). Agrega una celda nueva:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

Puedes aplicar varios filtros a las visualizaciones. La siguiente visualización te permite filtrar por etiqueta y eje:

Método show con el que se muestra una PCollection como un conjunto enriquecido de elementos de IU que se pueden filtrar.

5. Usa un DataFrame de Pandas

Otra visualización útil en los notebooks de Apache Beam es un DataFrame de Pandas. En el siguiente ejemplo, primero se convierten las palabras en minúsculas y, luego, se calcula la frecuencia de cada palabra.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

El método collect() proporciona el resultado en un DataFrame de Pandas.

ib.collect(windowed_lower_word_counts, include_window_info=True)

El método collect que representa una PCollection en un DataFrame de Pandas

6. Inicia trabajos de Dataflow desde tu notebook (opcional)

  1. Para ejecutar trabajos en Dataflow, necesitas permisos adicionales. Asegúrate de que la cuenta de servicio predeterminada de Compute Engine tenga el rol Editor, o bien otorga los siguientes roles de IAM:
  • Administrador de Dataflow
  • Trabajador de Dataflow
  • Administrador de almacenamiento
  • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)

Obtén más información sobre los roles en la documentación.

  1. (Opcional) Antes de usar tu notebook para ejecutar trabajos de Dataflow, reinicia el kernel, vuelve a ejecutar todas las celdas y verifica el resultado.
  2. Quita las siguientes instrucciones de importación:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. Agrega la siguiente instrucción de importación:
from apache_beam.runners import DataflowRunner
  1. Quita la siguiente opción de duración de grabación:
ib.options.recording_duration = '60s'
  1. Agrega lo siguiente a tus opciones de canalización. Deberás ajustar la ubicación de Cloud Storage para que apunte a un bucket que ya te pertenezca, o bien puedes crear un bucket nuevo con este fin. También puedes cambiar el valor de la región de us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. En el constructor de beam.Pipeline(), reemplaza InteractiveRunner por DataflowRunner. p es el objeto de canalización que se crea tu canalización.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. Quita las llamadas interactivas del código. Por ejemplo, quita show(), collect(), head(), show_graph() y watch() del código.
  2. Para poder ver todos los resultados, debes agregar un receptor. En la sección anterior, visualizamos los resultados en el notebook, pero esta vez estamos ejecutando el trabajo fuera de este notebook, en Dataflow. Por lo tanto, necesitamos una ubicación externa para nuestros resultados. En este ejemplo, escribiremos los resultados en archivos de texto almacenados en GCS (Google Cloud Storage). Dado que esta es una canalización de transmisión, con un sistema de ventanas de datos, deberemos crear un archivo de texto por ventana. Para lograrlo, agrega los siguientes pasos a tu canalización:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. Agrega p.run() al final del código de canalización.
  2. Ahora, revisa el código de tu notebook para confirmar que incorporaste todos los cambios. Debería verse similar al siguiente ejemplo:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. Ejecuta las celdas.
  2. Deberías ver un resultado similar al siguiente:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. Para validar si el trabajo se está ejecutando, ve a la página Trabajos de Dataflow. Deberías ver un trabajo nuevo en la lista. El trabajo tardará entre 5 y 10 minutos en comenzar a procesar los datos.
  2. Una vez que se estén procesando los datos, ve a Cloud Storage y navega al directorio en el que Dataflow almacena los resultados (tu output_gcs_location definido). Deberías ver una lista de archivos de texto, con un archivo por ventana. bfcc5ce9e46a8b14.png
  3. Descarga el archivo y, luego, inspecciona el contenido. Debe contener la lista de palabras vinculadas con su recuento. De manera alternativa, usa la interfaz de línea de comandos para inspeccionar los archivos. Para hacerlo, ejecuta el siguiente comando en una celda nueva de tu notebook:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. Verás un resultado similar al siguiente:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. Eso es todo. No olvides limpiar y detener el trabajo que creaste (consulta el paso final de este codelab).

Para obtener un ejemplo sobre cómo realizar esta conversión en un notebook interactivo, consulta el notebook “Conteo de palabras de Dataflow” en la instancia de notebook.

Como alternativa, puedes exportar tu notebook como una secuencia de comandos ejecutable, modificar el archivo .py generado con los pasos anteriores y, luego, implementar tu canalización en el servicio de Dataflow.

7. Guarda el notebook

Los notebooks que creas se guardan de forma local en la instancia de notebook en ejecución. Si restableces o cierras la instancia de notebook durante el desarrollo, los notebooks nuevos se conservarán siempre que se creen en el directorio /home/jupyter. Sin embargo, si se borra una instancia de notebook, esos notebooks también se borran.

Si quieres conservar tus notebooks para usarlos en el futuro, descárgalos de forma local en tu estación de trabajo, guárdalos en GitHub o expórtalos a un formato de archivo diferente.

8. Realice una limpieza

Una vez que termines de usar la instancia de notebook de Apache Beam, limpia los recursos que creaste en Google Cloud. Para ello, cierra la instancia de notebook y detiene el trabajo de transmisión, si ejecutaste uno.

Como alternativa, si creaste un proyecto para el único propósito de este codelab, también puedes cerrar el proyecto por completo.