Notebooks de Apache Spark y Jupyter en Cloud Dataproc

1. Descripción general

En este lab, aprenderás a configurar y usar Apache Spark y notebooks de Jupyter en Cloud Dataproc.

Los notebooks de Jupyter se usan mucho para el análisis exploratorio de datos y la creación de modelos de aprendizaje automático, ya que te permiten ejecutar tu código de manera interactiva y ver los resultados de inmediato.

Sin embargo, configurar y usar Apache Spark y los notebooks de Jupyter puede ser complicado.

b9ed855863c57d6.png

Cloud Dataproc facilita y agiliza esta tarea, ya que te permite crear un clúster de Dataproc con Apache Spark, un componente de Jupyter y una puerta de enlace de componentes en unos 90 segundos.

Qué aprenderás

En este codelab, aprenderás a hacer lo siguiente:

  • Crea un bucket de Google Cloud Storage para tu clúster
  • Crear un clúster de Dataproc con Jupyter y la puerta de enlace de componentes.
  • Accede a la IU web de JupyterLab en Dataproc
  • Crea un notebook usando el conector de almacenamiento Spark BigQuery
  • Ejecutar un trabajo de Spark y trazar los resultados

El costo total de la ejecución de este lab en Google Cloud es de aproximadamente $1. Puedes encontrar información completa sobre los precios de Cloud Dataproc aquí.

2. Crea un proyecto

Accede a la consola de Google Cloud Platform en console.cloud.google.com y crea un proyecto nuevo:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.

La ejecución de este codelab no debería costar más que unos pocos dólares, pero podría ser más si decides usar más recursos o si los dejas ejecutándose. En la última sección de este codelab, aprenderás a limpiar tu proyecto.

Los usuarios nuevos de Google Cloud son aptos para una prueba gratuita de$300.

3. Configura tu entorno

Primero, haz clic en el botón en la esquina superior derecha de la consola de Cloud para abrir Cloud Shell:

a10c47ee6ca41c54.png

Después de que se cargue Cloud Shell, ejecuta el siguiente comando para establecer el ID del proyecto del paso anterior**:**

gcloud config set project <project_id>

También puedes encontrar el ID del proyecto haciendo clic en tu proyecto en la esquina superior izquierda de la consola de Cloud:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

A continuación, habilita las APIs de Dataproc, Compute Engine y BigQuery Storage.

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

Como alternativa, puedes hacerlo en la consola de Cloud. Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

2bfc27ef9ba2ec7d.png

Selecciona el Administrador de API en el menú desplegable.

408af5f32c4b7c25.png

Haz clic en Habilitar APIs y servicios.

a9c0e84296a7ba5b.png

Busca y habilita las siguientes APIs:

  • API de Compute Engine
  • API de Dataproc
  • API de BigQuery
  • API de BigQuery Storage

4. Crear un bucket de GCS

Crea un bucket de Google Cloud Storage en la región más cercana a tus datos y asígnale un nombre único.

Se usará para el clúster de Dataproc.

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

Deberías ver el siguiente resultado

Creating gs://<your-bucket-name>/...

5. Crea tu clúster de Dataproc con Jupyter y Puerta de enlace del componente

Crea tu clúster

Configura las variables de entorno para tu clúster

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

Luego, ejecuta este comando de gcloud para crear tu clúster con todos los componentes necesarios para trabajar con Jupyter en tu clúster.

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

Deberías ver el siguiente resultado mientras se crea el clúster.

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

Crear tu clúster debería tomar alrededor de 90 segundos y, una vez que esté listo, podrás acceder a él desde la IU de la consola de Cloud Dataproc.

Mientras esperas, puedes continuar con la lectura a continuación para obtener más información sobre las marcas que se usan en el comando de gcloud.

Deberías obtener el siguiente resultado una vez que se haya creado el clúster:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

Marcas usadas en el comando gcloud dataproc create

Este es un desglose de las marcas usadas en el comando gcloud dataproc create

--region=${REGION}

Especifica la región y la zona en las que se creará el clúster. Puedes ver la lista de regiones disponibles aquí.

--image-version=1.4

La versión de la imagen que se usará en tu clúster. Consulte la lista de versiones disponibles aquí.

--bucket=${BUCKET_NAME}

Especifica el bucket de Google Cloud Storage que creaste anteriormente para usarlo en el clúster. Si no proporcionas un bucket de GCS, se creará por ti.

Aquí también se guardarán tus notebooks, incluso si borras tu clúster, ya que el bucket de GCS no se borra.

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Los tipos de máquina que se usarán en tu clúster de Dataproc. Puedes ver una lista de los tipos de máquinas disponibles aquí.

De forma predeterminada, se crean 1 nodo principal y 2 nodos trabajadores si no configuras la marca –num-workers.

--optional-components=ANACONDA,JUPYTER

Si configuras estos valores para los componentes opcionales, se instalarán todas las bibliotecas necesarias para Jupyter y Anaconda (que se requieren para los notebooks de Jupyter) en tu clúster.

--enable-component-gateway

Habilitar Component Gateway crea un vínculo a App Engine con Apache Knox y el proxy de inversión, lo que brinda un acceso sencillo, seguro y autenticado a las interfaces web de Jupyter y JupyterLab, lo que significa que ya no necesitarás crear túneles SSH.

También creará vínculos para otras herramientas del clúster, incluidos el administrador de recursos YARN y el servidor de historial de Spark, que son útiles para ver el rendimiento de los trabajos y los patrones de uso del clúster.

6. Crea un notebook de Apache Spark

Accede a la interfaz web de JupyterLab

Una vez que el clúster esté listo, podrás encontrar el vínculo de la puerta de enlace del componente a la interfaz web de JupyterLab. Para ello, ve a Clústeres de Dataproc - consola de Cloud, haz clic en el clúster que creaste y, luego, ve a la pestaña Interfaces web.

afc40202d555de47.png

Notarás que tienes acceso a Jupyter, que es la interfaz clásica de notebook, o a JupyterLab, que se describe como la IU de nueva generación para Project Jupyter.

Hay muchas funciones nuevas de IU en JupyterLab, por lo que, si es la primera vez que usas notebooks o buscas las mejoras más recientes, te recomendamos que uses JupyterLab, ya que, con el tiempo, reemplazará la interfaz clásica de Jupyter de acuerdo con los documentos oficiales.

Crea un notebook con un kernel de Python 3

a463623f2ebf0518.png

En la pestaña del selector, haz clic en el ícono del notebook de Python 3 para crear un notebook con un kernel de Python 3 (no el kernel de PySpark) que te permita configurar la SparkSession en el notebook y que incluya el spark-bigquery-connector necesario para usar la API de BigQuery Storage.

Cambia el nombre del notebook

196a3276ed07e1f3.png

Haz clic con el botón derecho en el nombre del notebook en la barra lateral de la izquierda o en la parte superior y cambia el nombre del notebook a “BigQuery Storage & Spark DataFrames.ipynb

Ejecuta tu código de Spark en el notebook

fbac38062e5bb9cf.png

En este notebook, usarás spark-bigquery-connector, que es una herramienta para leer y escribir datos entre BigQuery y Spark mediante la API de BigQuery Storage.

La API de BigQuery Storage ofrece mejoras significativas en el acceso a los datos en BigQuery mediante un protocolo basado en RPC. Admite operaciones de lectura y escritura de datos en paralelo, así como diferentes formatos de serialización, como Apache Avro y Apache Arrow. A grandes rasgos, esto se traduce en un rendimiento significativamente mejorado, especialmente en conjuntos de datos más grandes.

En la primera celda, verifique la versión de Scala de su clúster para que pueda incluir la versión correcta del archivo jar spark-bigquery-connector.

Entrada [1]:

!scala -version

Resultado [1]:f580e442576b8b1f.png Crea una sesión de Spark y, luego, incluye el paquete spark-bigquery-connector.

Si tu versión de Scala es la 2.11, usa el siguiente paquete.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Si tu versión de Scala es la 2.12, usa el siguiente paquete.

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

Entrada [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

Habilitar repl.eagerEval

Esto mostrará los resultados de DataFrames en cada paso sin la necesidad de mostrar df.show() y también mejorará el formato del resultado.

Entrada [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Lee la tabla de BigQuery en Spark DataFrame

Crear un DataFrame de Spark mediante la lectura de datos de un conjunto de datos públicos de BigQuery Esto utiliza el spark-bigquery-connector y la API de BigQuery Storage para cargar los datos en el clúster de Spark.

Crear un DataFrame de Spark y cargar datos desde el conjunto de datos públicos de BigQuery para las páginas vistas de Wikipedia. Notarás que no estás ejecutando una consulta sobre los datos, ya que usas el spark-bigquery-connector para cargar los datos en Spark, donde se procesará el procesamiento. Cuando se ejecute este código, no cargará la tabla, ya que es una evaluación diferida en Spark, y la ejecución se producirá en el siguiente paso.

Entrada [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

Resultado [4]:

c107a33f6fc30ca.png

Selecciona las columnas obligatorias y aplica un filtro con where(), que es un alias de filter().

Cuando se ejecuta este código, se activa una acción de Spark y los datos se leen desde el almacenamiento de BigQuery en este punto.

Entrada [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

Resultado [5]:

ad363cbe510d625a.png

Agrupa por título y orden por vistas de página para ver las principales páginas

Entrada [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

Resultado [6]:f718abd05afc0f4.png

7. Usa las bibliotecas de trazado de Python en el notebook

Puedes usar las diversas bibliotecas de trazado que están disponibles en Python para trazar el resultado de tus trabajos de Spark.

Convierte DataFrame de Spark en DataFrame de Pandas

Convertir el DataFrame de Spark en DataFrame de Pandas y establecer datehour como el índice. Esto es útil si quieres trabajar con los datos directamente en Python y trazarlos usando las diversas bibliotecas de trazado de Python disponibles.

Entrada [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

Resultado [7]:

3df2aaa2351f028d.png

Traza un DataFrame de Pandas

Importa la biblioteca de matplotlib que se requiere para mostrar los diagramas en el notebook

Entrada [8]:

import matplotlib.pyplot as plt

Usa la función de trazado de Pandas para crear un gráfico de líneas a partir del DataFrame de Pandas.

Entrada [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

Resultado [9]:bade7042c3033594.png

Comprueba que el notebook se guardó en GCS

Ahora deberías tener tu primer notebook de Jupyter en funcionamiento en tu clúster de Dataproc. Asígnale un nombre a tu notebook, y se guardará automáticamente en el bucket de GCS que se usó cuando se creó el clúster.

Puedes verificar esto con el comando gsutil en Cloud Shell

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

Deberías ver el siguiente resultado

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. Sugerencia de optimización: Cómo almacenar datos en caché en la memoria

Puede haber situaciones en las que desees los datos en la memoria en lugar de leerlos desde el almacenamiento de BigQuery cada vez.

Este trabajo leerá los datos de BigQuery y enviará el filtro a BigQuery. Luego, la agregación se procesará en Apache Spark.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Puedes modificar el trabajo anterior para incluir una caché de la tabla y, ahora, Apache Spark aplicará el filtro en la columna de Wiki en la memoria.

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

Luego, puedes filtrar por otro lenguaje wiki usando los datos almacenados en caché en lugar de volver a leer los datos del almacenamiento de BigQuery y, por lo tanto, su funcionamiento será mucho más rápido.

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

Para quitar la caché, ejecuta

df_wiki_all.unpersist()

9. Notebooks de ejemplo para más casos de uso

El repositorio de GitHub de Cloud Dataproc incluye notebooks de Jupyter con patrones comunes de Apache Spark para cargar, guardar y trazar tus datos con varios productos y herramientas de código abierto de Google Cloud Platform:

10. Limpia

Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:

  1. Borra el bucket de Cloud Storage para el entorno y que creaste
  2. Borra el entorno de Dataproc.

Si creaste un proyecto solo para este codelab, también puedes borrarlo de manera opcional:

  1. En GCP Console, ve a la página Proyectos.
  2. En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
  3. En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

Licencia

Esta obra se ofrece bajo una licencia genérica de Creative Commons Attribution 3.0 y Apache 2.0.