1. Descripción general
En este lab, se explicará cómo configurar y usar Apache Spark y notebooks de Jupyter en Cloud Dataproc.
Los notebooks de Jupyter se usan ampliamente para el análisis exploratorio de datos y la creación de modelos de aprendizaje automático, ya que te permiten ejecutar tu código de forma interactiva y ver los resultados de inmediato.
Sin embargo, configurar y usar Apache Spark y los notebooks de Jupyter puede ser complicado.

Cloud Dataproc hace que esto sea rápido y fácil, ya que te permite crear un clúster de Dataproc con Apache Spark, el componente Jupyter y la puerta de enlace de componentes en aproximadamente 90 segundos.
Qué aprenderás
En este codelab, aprenderás a hacer lo siguiente:
- Crea un bucket de Google Cloud Storage para tu clúster
- Crea un clúster de Dataproc con Jupyter y la puerta de enlace de componentes.
- Accede a la IU web de JupyterLab en Dataproc
- Crea un notebook que use el conector de Spark BigQuery Storage
- Ejecutar un trabajo de Spark y generar un gráfico con los resultados
El costo total de la ejecución de este lab en Google Cloud es de aproximadamente USD 1. Puedes encontrar todos los detalles sobre los precios de Cloud Dataproc aquí.
2. Creando un proyecto
Accede a Google Cloud Platform Console en console.cloud.google.com y crea un proyecto nuevo:



A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.
Ejecutar este codelab debería costar solo unos pocos dólares, pero su costo podría aumentar si decides usar más recursos o si los dejas en ejecución. En la última sección de este codelab, se te guiará para que limpies tu proyecto.
Los usuarios nuevos de Google Cloud Platform son aptos para obtener una prueba gratuita de USD 300.
3. Configura tu entorno
Primero, abre Cloud Shell haciendo clic en el botón de la esquina superior derecha de Cloud Console:

Después de que se cargue Cloud Shell, ejecuta el siguiente comando para establecer el ID del proyecto del paso anterior**:
gcloud config set project <project_id>
También puedes encontrar el ID del proyecto si haces clic en tu proyecto en la parte superior izquierda de la consola de Cloud:


A continuación, habilita las APIs de Dataproc, Compute Engine y BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Como alternativa, puedes hacerlo en la consola de Cloud. Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.

Selecciona API Manager en el menú desplegable.

Haz clic en Habilitar APIs y servicios.

Busca y habilita las siguientes APIs:
- API de Compute Engine
- API de Dataproc
- API de BigQuery
- API de BigQuery Storage
4. Crear un bucket de GCS
Crea un bucket de Google Cloud Storage en la región más cercana a tus datos y asígnale un nombre único.
Se usará para el clúster de Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Deberías ver el siguiente resultado:
Creating gs://<your-bucket-name>/...
5. Crea tu clúster de Dataproc con Jupyter y la puerta de enlace de componentes
Cómo crear tu clúster
Configura las variables de entorno para tu clúster
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Luego, ejecuta este comando de gcloud para crear tu clúster con todos los componentes necesarios para trabajar con Jupyter en él.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Deberías ver el siguiente resultado mientras se crea tu clúster:
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
La creación del clúster debería tardar unos 90 segundos. Cuando esté listo, podrás acceder a él desde la IU de Cloud Console de Dataproc.
Mientras esperas, puedes seguir leyendo a continuación para obtener más información sobre las marcas que se usan en el comando de gcloud.
Deberías ver el siguiente resultado una vez que se cree el clúster:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Marcas que se usan en el comando gcloud dataproc create
A continuación, se incluye un desglose de las marcas que se usan en el comando gcloud dataproc create
--region=${REGION}
Especifica la región y la zona en las que se creará el clúster. Puedes consultar la lista de regiones disponibles aquí.
--image-version=1.4
Es la versión de la imagen que se usará en tu clúster. Puedes consultar la lista de versiones disponibles aquí.
--bucket=${BUCKET_NAME}
Especifica el bucket de Google Cloud Storage que creaste antes para usarlo en el clúster. Si no proporcionas un bucket de GCS, se creará uno para ti.
Aquí también se guardarán tus notebooks, incluso si borras tu clúster, ya que no se borrará el bucket de GCS.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Son los tipos de máquinas que se usarán para tu clúster de Dataproc. Puedes consultar la lista de tipos de máquinas disponibles aquí.
De forma predeterminada, se crean 1 nodo principal y 2 nodos trabajadores si no configuras la marca --num-workers.
--optional-components=ANACONDA,JUPYTER
Si configuras estos valores para los componentes opcionales, se instalarán todas las bibliotecas necesarias para Jupyter y Anaconda (que se requiere para los notebooks de Jupyter) en tu clúster.
--enable-component-gateway
Habilitar Component Gateway crea un vínculo de App Engine con Apache Knox y el proxy de inversión, lo que brinda acceso fácil, seguro y autenticado a las interfaces web de Jupyter y JupyterLab, lo que significa que ya no necesitas crear túneles SSH.
También creará vínculos a otras herramientas en el clúster, como Yarn Resource Manager y Spark History Server, que son útiles para ver el rendimiento de tus trabajos y los patrones de uso del clúster.
6. Crea un notebook de Apache Spark
Cómo acceder a la interfaz web de JupyterLab
Una vez que el clúster esté listo, puedes encontrar el vínculo de Component Gateway a la interfaz web de JupyterLab. Para ello, ve a Clústeres de Dataproc - Consola de Cloud, haz clic en el clúster que creaste y ve a la pestaña Interfaces web.

Notarás que tienes acceso a Jupyter, que es la interfaz clásica de notebook, o a JupyterLab, que se describe como la IU de próxima generación para el proyecto Jupyter.
JupyterLab tiene muchas funciones nuevas y excelentes en la IU, por lo que, si no tienes experiencia en el uso de notebooks o buscas las mejoras más recientes, te recomendamos que uses JupyterLab, ya que, según la documentación oficial, reemplazará a la interfaz clásica de Jupyter.
Crea un notebook con un kernel de Python 3

En la pestaña del selector, haz clic en el ícono del notebook de Python 3 para crear un notebook con un kernel de Python 3 (no el kernel de PySpark) que te permita configurar SparkSession en el notebook y, además, incluir el spark-bigquery-connector necesario para usar la API de BigQuery Storage.
Cambia el nombre del notebook

Haz clic con el botón derecho en el nombre del notebook en la barra lateral de la izquierda o en la navegación superior y cámbiale el nombre a "BigQuery Storage & Spark DataFrames.ipynb".
Ejecuta tu código de Spark en el notebook

En este notebook, usarás el conector de Spark-BigQuery, que es una herramienta para leer y escribir datos entre BigQuery y Spark con la API de BigQuery Storage.
La API de BigQuery Storage aporta mejoras significativas al acceso a los datos en BigQuery a través de un protocolo basado en RPC. Admite lecturas y escrituras de datos en paralelo, así como diferentes formatos de serialización, como Apache Avro y Apache Arrow. En un nivel general, esto se traduce en un rendimiento significativamente mejorado, en especial en conjuntos de datos más grandes.
En la primera celda, verifica la versión de Scala de tu clúster para que puedas incluir la versión correcta del archivo JAR de spark-bigquery-connector.
Entrada [1]:
!scala -version
Salida [1]:
Crea una sesión de Spark y, luego, incluye el paquete spark-bigquery-connector.
Si tu versión de Scala es 2.11, usa el siguiente paquete.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Si tu versión de Scala es 2.12, usa el siguiente paquete.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Entrada [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Habilita repl.eagerEval
Esto generará los resultados de los DataFrames en cada paso sin la necesidad de mostrar df.show() y también mejorará el formato del resultado.
Entrada [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Cómo leer una tabla de BigQuery en un DataFrame de Spark
Crea un DataFrame de Spark leyendo datos de un conjunto de datos públicos de BigQuery. Esto usa el conector spark-bigquery y la API de BigQuery Storage para cargar los datos en el clúster de Spark.
Crea un DataFrame de Spark y carga datos del conjunto de datos públicos de BigQuery sobre vistas de páginas de Wikipedia. Notarás que no estás ejecutando una consulta en los datos, ya que estás usando el conector de Spark-BigQuery para cargar los datos en Spark, donde se realizará el procesamiento de los datos. Cuando se ejecute este código, en realidad no se cargará la tabla, ya que es una evaluación diferida en Spark y la ejecución se producirá en el siguiente paso.
Entrada [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Resultado [4]:

Selecciona las columnas obligatorias y aplica un filtro con where(), que es un alias de filter().
Cuando se ejecuta este código, se activa una acción de Spark y, en este punto, se leen los datos de BigQuery Storage.
Entrada [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Resultado [5]:

Agrupa por título y ordena por vistas de página para ver las páginas principales
Entrada [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Resultado [6]:
7. Usa bibliotecas de trazado de Python en el notebook
Puedes usar las diversas bibliotecas de trazado disponibles en Python para trazar el resultado de tus trabajos de Spark.
Cómo convertir un DataFrame de Spark en un DataFrame de Pandas
Convierte el DataFrame de Spark en un DataFrame de Pandas y establece datehour como el índice. Esto es útil si deseas trabajar con los datos directamente en Python y trazarlos con las numerosas bibliotecas de trazado de Python disponibles.
Entrada [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Resultado [7]:

Cómo generar gráficos con un DataFrame de Pandas
Importa la biblioteca matplotlib, que es necesaria para mostrar los gráficos en el notebook.
Entrada [8]:
import matplotlib.pyplot as plt
Usa la función de trazado de Pandas para crear un gráfico de líneas a partir del DataFrame de Pandas.
Entrada [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Resultado [9]:
Verifica que el notebook se haya guardado en GCS
Ahora deberías tener tu primer notebook de Jupyter en funcionamiento en tu clúster de Dataproc. Asigna un nombre a tu notebook y se guardará automáticamente en el bucket de GCS que se usó cuando se creó el clúster.
Puedes verificarlo con este comando de gsutil en Cloud Shell.
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Deberías ver el siguiente resultado:
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Sugerencia de optimización: Almacena datos en caché en la memoria
Puede haber situaciones en las que desees tener los datos en la memoria en lugar de leerlos de BigQuery Storage cada vez.
Este trabajo leerá los datos de BigQuery y enviará el filtro a BigQuery. Luego, la agregación se calculará en Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Puedes modificar el trabajo anterior para incluir una caché de la tabla, y Apache Spark aplicará el filtro en la columna wiki en la memoria.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Luego, puedes filtrar por otro idioma de la wiki con los datos almacenados en caché en lugar de volver a leer los datos del almacenamiento de BigQuery, por lo que se ejecutará mucho más rápido.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Puedes quitar la caché ejecutando
df_wiki_all.unpersist()
9. Notebooks de ejemplo para más casos de uso
El repositorio de GitHub de Cloud Dataproc incluye notebooks de Jupyter con patrones comunes de Apache Spark para cargar datos, guardar datos y generar gráficos de tus datos con varios productos de Google Cloud Platform y herramientas de código abierto:
10. Limpia
Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:
- Borra el bucket de Cloud Storage para el entorno que creaste.
- Borra el entorno de Dataproc.
Si creaste un proyecto solo para este codelab, también puedes borrarlo de forma opcional:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.
Licencia
Esta obra se ofrece bajo una licencia Creative Commons Atribución 3.0 genérica y una licencia Apache 2.0.