1. Descripción general
En este lab, aprenderás a configurar y usar Apache Spark y notebooks de Jupyter en Cloud Dataproc.
Los notebooks de Jupyter se usan mucho para el análisis exploratorio de datos y la creación de modelos de aprendizaje automático, ya que te permiten ejecutar tu código de manera interactiva y ver los resultados de inmediato.
Sin embargo, configurar y usar Apache Spark y los notebooks de Jupyter puede ser complicado.
Cloud Dataproc facilita y agiliza esta tarea, ya que te permite crear un clúster de Dataproc con Apache Spark, un componente de Jupyter y una puerta de enlace de componentes en unos 90 segundos.
Qué aprenderás
En este codelab, aprenderás a hacer lo siguiente:
- Crea un bucket de Google Cloud Storage para tu clúster
- Crear un clúster de Dataproc con Jupyter y la puerta de enlace de componentes.
- Accede a la IU web de JupyterLab en Dataproc
- Crea un notebook usando el conector de almacenamiento Spark BigQuery
- Ejecutar un trabajo de Spark y trazar los resultados
El costo total de la ejecución de este lab en Google Cloud es de aproximadamente $1. Puedes encontrar información completa sobre los precios de Cloud Dataproc aquí.
2. Crea un proyecto
Accede a la consola de Google Cloud Platform en console.cloud.google.com y crea un proyecto nuevo:
A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.
La ejecución de este codelab no debería costar más que unos pocos dólares, pero podría ser más si decides usar más recursos o si los dejas ejecutándose. En la última sección de este codelab, aprenderás a limpiar tu proyecto.
Los usuarios nuevos de Google Cloud son aptos para una prueba gratuita de$300.
3. Configura tu entorno
Primero, haz clic en el botón en la esquina superior derecha de la consola de Cloud para abrir Cloud Shell:
Después de que se cargue Cloud Shell, ejecuta el siguiente comando para establecer el ID del proyecto del paso anterior**:**
gcloud config set project <project_id>
También puedes encontrar el ID del proyecto haciendo clic en tu proyecto en la esquina superior izquierda de la consola de Cloud:
A continuación, habilita las APIs de Dataproc, Compute Engine y BigQuery Storage.
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
Como alternativa, puedes hacerlo en la consola de Cloud. Haz clic en el ícono de menú ubicado en la parte superior izquierda de la pantalla.
Selecciona el Administrador de API en el menú desplegable.
Haz clic en Habilitar APIs y servicios.
Busca y habilita las siguientes APIs:
- API de Compute Engine
- API de Dataproc
- API de BigQuery
- API de BigQuery Storage
4. Crear un bucket de GCS
Crea un bucket de Google Cloud Storage en la región más cercana a tus datos y asígnale un nombre único.
Se usará para el clúster de Dataproc.
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
Deberías ver el siguiente resultado
Creating gs://<your-bucket-name>/...
5. Crea tu clúster de Dataproc con Jupyter y Puerta de enlace del componente
Crea tu clúster
Configura las variables de entorno para tu clúster
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
Luego, ejecuta este comando de gcloud para crear tu clúster con todos los componentes necesarios para trabajar con Jupyter en tu clúster.
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
Deberías ver el siguiente resultado mientras se crea el clúster.
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
Crear tu clúster debería tomar alrededor de 90 segundos y, una vez que esté listo, podrás acceder a él desde la IU de la consola de Cloud Dataproc.
Mientras esperas, puedes continuar con la lectura a continuación para obtener más información sobre las marcas que se usan en el comando de gcloud.
Deberías obtener el siguiente resultado una vez que se haya creado el clúster:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
Marcas usadas en el comando gcloud dataproc create
Este es un desglose de las marcas usadas en el comando gcloud dataproc create
--region=${REGION}
Especifica la región y la zona en las que se creará el clúster. Puedes ver la lista de regiones disponibles aquí.
--image-version=1.4
La versión de la imagen que se usará en tu clúster. Consulte la lista de versiones disponibles aquí.
--bucket=${BUCKET_NAME}
Especifica el bucket de Google Cloud Storage que creaste anteriormente para usarlo en el clúster. Si no proporcionas un bucket de GCS, se creará por ti.
Aquí también se guardarán tus notebooks, incluso si borras tu clúster, ya que el bucket de GCS no se borra.
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Los tipos de máquina que se usarán en tu clúster de Dataproc. Puedes ver una lista de los tipos de máquinas disponibles aquí.
De forma predeterminada, se crean 1 nodo principal y 2 nodos trabajadores si no configuras la marca –num-workers.
--optional-components=ANACONDA,JUPYTER
Si configuras estos valores para los componentes opcionales, se instalarán todas las bibliotecas necesarias para Jupyter y Anaconda (que se requieren para los notebooks de Jupyter) en tu clúster.
--enable-component-gateway
Habilitar Component Gateway crea un vínculo a App Engine con Apache Knox y el proxy de inversión, lo que brinda un acceso sencillo, seguro y autenticado a las interfaces web de Jupyter y JupyterLab, lo que significa que ya no necesitarás crear túneles SSH.
También creará vínculos para otras herramientas del clúster, incluidos el administrador de recursos YARN y el servidor de historial de Spark, que son útiles para ver el rendimiento de los trabajos y los patrones de uso del clúster.
6. Crea un notebook de Apache Spark
Accede a la interfaz web de JupyterLab
Una vez que el clúster esté listo, podrás encontrar el vínculo de la puerta de enlace del componente a la interfaz web de JupyterLab. Para ello, ve a Clústeres de Dataproc - consola de Cloud, haz clic en el clúster que creaste y, luego, ve a la pestaña Interfaces web.
Notarás que tienes acceso a Jupyter, que es la interfaz clásica de notebook, o a JupyterLab, que se describe como la IU de nueva generación para Project Jupyter.
Hay muchas funciones nuevas de IU en JupyterLab, por lo que, si es la primera vez que usas notebooks o buscas las mejoras más recientes, te recomendamos que uses JupyterLab, ya que, con el tiempo, reemplazará la interfaz clásica de Jupyter de acuerdo con los documentos oficiales.
Crea un notebook con un kernel de Python 3
En la pestaña del selector, haz clic en el ícono del notebook de Python 3 para crear un notebook con un kernel de Python 3 (no el kernel de PySpark) que te permita configurar la SparkSession en el notebook y que incluya el spark-bigquery-connector necesario para usar la API de BigQuery Storage.
Cambia el nombre del notebook
Haz clic con el botón derecho en el nombre del notebook en la barra lateral de la izquierda o en la parte superior y cambia el nombre del notebook a “BigQuery Storage & Spark DataFrames.ipynb
Ejecuta tu código de Spark en el notebook
En este notebook, usarás spark-bigquery-connector, que es una herramienta para leer y escribir datos entre BigQuery y Spark mediante la API de BigQuery Storage.
La API de BigQuery Storage ofrece mejoras significativas en el acceso a los datos en BigQuery mediante un protocolo basado en RPC. Admite operaciones de lectura y escritura de datos en paralelo, así como diferentes formatos de serialización, como Apache Avro y Apache Arrow. A grandes rasgos, esto se traduce en un rendimiento significativamente mejorado, especialmente en conjuntos de datos más grandes.
En la primera celda, verifique la versión de Scala de su clúster para que pueda incluir la versión correcta del archivo jar spark-bigquery-connector.
Entrada [1]:
!scala -version
Resultado [1]: Crea una sesión de Spark y, luego, incluye el paquete spark-bigquery-connector.
Si tu versión de Scala es la 2.11, usa el siguiente paquete.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Si tu versión de Scala es la 2.12, usa el siguiente paquete.
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
Entrada [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
Habilitar repl.eagerEval
Esto mostrará los resultados de DataFrames en cada paso sin la necesidad de mostrar df.show() y también mejorará el formato del resultado.
Entrada [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
Lee la tabla de BigQuery en Spark DataFrame
Crear un DataFrame de Spark mediante la lectura de datos de un conjunto de datos públicos de BigQuery Esto utiliza el spark-bigquery-connector y la API de BigQuery Storage para cargar los datos en el clúster de Spark.
Crear un DataFrame de Spark y cargar datos desde el conjunto de datos públicos de BigQuery para las páginas vistas de Wikipedia. Notarás que no estás ejecutando una consulta sobre los datos, ya que usas el spark-bigquery-connector para cargar los datos en Spark, donde se procesará el procesamiento. Cuando se ejecute este código, no cargará la tabla, ya que es una evaluación diferida en Spark, y la ejecución se producirá en el siguiente paso.
Entrada [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Resultado [4]:
Selecciona las columnas obligatorias y aplica un filtro con where()
, que es un alias de filter()
.
Cuando se ejecuta este código, se activa una acción de Spark y los datos se leen desde el almacenamiento de BigQuery en este punto.
Entrada [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
Resultado [5]:
Agrupa por título y orden por vistas de página para ver las principales páginas
Entrada [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Resultado [6]:
7. Usa las bibliotecas de trazado de Python en el notebook
Puedes usar las diversas bibliotecas de trazado que están disponibles en Python para trazar el resultado de tus trabajos de Spark.
Convierte DataFrame de Spark en DataFrame de Pandas
Convertir el DataFrame de Spark en DataFrame de Pandas y establecer datehour como el índice. Esto es útil si quieres trabajar con los datos directamente en Python y trazarlos usando las diversas bibliotecas de trazado de Python disponibles.
Entrada [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Resultado [7]:
Traza un DataFrame de Pandas
Importa la biblioteca de matplotlib que se requiere para mostrar los diagramas en el notebook
Entrada [8]:
import matplotlib.pyplot as plt
Usa la función de trazado de Pandas para crear un gráfico de líneas a partir del DataFrame de Pandas.
Entrada [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
Resultado [9]:
Comprueba que el notebook se guardó en GCS
Ahora deberías tener tu primer notebook de Jupyter en funcionamiento en tu clúster de Dataproc. Asígnale un nombre a tu notebook, y se guardará automáticamente en el bucket de GCS que se usó cuando se creó el clúster.
Puedes verificar esto con el comando gsutil en Cloud Shell
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
Deberías ver el siguiente resultado
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. Sugerencia de optimización: Cómo almacenar datos en caché en la memoria
Puede haber situaciones en las que desees los datos en la memoria en lugar de leerlos desde el almacenamiento de BigQuery cada vez.
Este trabajo leerá los datos de BigQuery y enviará el filtro a BigQuery. Luego, la agregación se procesará en Apache Spark.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Puedes modificar el trabajo anterior para incluir una caché de la tabla y, ahora, Apache Spark aplicará el filtro en la columna de Wiki en la memoria.
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
Luego, puedes filtrar por otro lenguaje wiki usando los datos almacenados en caché en lugar de volver a leer los datos del almacenamiento de BigQuery y, por lo tanto, su funcionamiento será mucho más rápido.
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
Para quitar la caché, ejecuta
df_wiki_all.unpersist()
9. Notebooks de ejemplo para más casos de uso
El repositorio de GitHub de Cloud Dataproc incluye notebooks de Jupyter con patrones comunes de Apache Spark para cargar, guardar y trazar tus datos con varios productos y herramientas de código abierto de Google Cloud Platform:
10. Limpia
Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:
- Borra el bucket de Cloud Storage para el entorno y que creaste
- Borra el entorno de Dataproc.
Si creaste un proyecto solo para este codelab, también puedes borrarlo de manera opcional:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.
Licencia
Esta obra se ofrece bajo una licencia genérica de Creative Commons Attribution 3.0 y Apache 2.0.