Dataproc Serverless

1. Descripción general - Google Dataproc

Dataproc es un servicio completamente administrado y altamente escalable para ejecutar Apache Spark, Apache Flink, Presto y muchas otras herramientas y frameworks de código abierto. Usa Dataproc para modernizar los data lakes, ETL y ELT, y ciencia de datos segura a escala mundial. Dataproc también está completamente integrado en varios servicios de Google Cloud, incluidos BigQuery, Cloud Storage, Vertex AI y Dataplex.

Dataproc está disponible en tres variantes:

  • Dataproc sin servidores te permite ejecutar trabajos de PySpark sin necesidad de configurar infraestructura ni ajuste de escala automático. Dataproc Serverless admite cargas de trabajo por lotes, sesiones y notebooks de PySpark.
  • Dataproc en Google Compute Engine te permite administrar un clúster de Hadoop YARN para cargas de trabajo de Spark basadas en YARN, además de herramientas de código abierto como Flink y Presto. Puedes adaptar tus clústeres basados en la nube con el escalamiento vertical u horizontal que desees, incluido el ajuste de escala automático.
  • Dataproc en Google Kubernetes Engine te permite configurar clústeres virtuales de Dataproc en tu infraestructura de GKE para enviar trabajos de Spark, PySpark, SparkR o Spark SQL.

En este codelab, aprenderás varias formas diferentes de consumir Dataproc Serverless.

Originalmente, Apache Spark se creó para ejecutarse en clústeres de Hadoop y usaba YARN como su administrador de recursos. Mantener los clústeres de Hadoop requiere un conjunto específico de experiencia y garantizar que muchos controles diferentes de los clústeres estén configurados correctamente. Esto se suma a un conjunto separado de controles que Spark también debe configurar el usuario. Esto genera muchas situaciones en las que los desarrolladores pasan más tiempo configurando su infraestructura en lugar de trabajar en el código de Spark.

Dataproc Serverless quita la necesidad de configurar manualmente clústeres de Hadoop o Spark. Dataproc Serverless no se ejecuta en Hadoop y usa su propia asignación dinámica de recursos para determinar sus requisitos de recursos, incluido el ajuste de escala automático. Un pequeño subconjunto de propiedades de Spark aún se puede personalizar con Dataproc Serverless. Sin embargo, en la mayoría de los casos, no será necesario modificarlos.

2. Configurar

Comenzarás por configurar el entorno y los recursos que usarás en este codelab.

Crea un proyecto de Google Cloud. Puedes usar uno existente.

Para abrir Cloud Shell, haz clic en la barra de herramientas de la consola de Cloud.

ba0bb17945a73543.png

Cloud Shell proporciona un entorno de Shell listo para usar que puedes utilizar en este codelab.

68c4ebd2a8539764.png

Cloud Shell configurará el nombre del proyecto de forma predeterminada. Ejecuta echo $GOOGLE_CLOUD_PROJECT para volver a verificarlo. Si no ves tu ID del proyecto en el resultado, configúralo.

export GOOGLE_CLOUD_PROJECT=<your-project-id>

Configura una región de Compute Engine para tus recursos, como us-central1 o europe-west2.

export REGION=<your-region>

Habilita las APIs

En el codelab, se usan las siguientes APIs:

  • BigQuery
  • Dataproc

Habilita las API necesarias. El proceso tardará aproximadamente un minuto y, cuando se complete, aparecerá un mensaje que indica que la operación se realizó correctamente.

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

Configurar el acceso a la red

Dataproc Serverless requiere que el Acceso privado a Google esté habilitado en la región en la que ejecutarás tus trabajos de Spark, ya que los controladores y ejecutores de Spark solo tienen IP privadas. Ejecuta lo siguiente para habilitarlo en la subred default.

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

Puedes verificar que el acceso privado a Google esté habilitado a través de lo siguiente, que mostrará como resultado True o False.

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

Crea un bucket de almacenamiento

Crea un bucket de almacenamiento que se usará para almacenar los recursos creados en este codelab.

Elige un nombre para tu bucket. Los nombres de los buckets deben ser únicos a nivel global entre todos los usuarios.

export BUCKET=<your-bucket-name>

Crea el bucket en la región en la que quieres ejecutar tus trabajos de Spark.

gsutil mb -l ${REGION} gs://${BUCKET}

Puedes ver que tu bucket está disponible en la consola de Cloud Storage. También puedes ejecutar gsutil ls para ver tu bucket.

Crea un servidor de historial persistente

La IU de Spark proporciona un amplio conjunto de herramientas de depuración y estadísticas sobre los trabajos de Spark. Para ver la IU de Spark de los trabajos completados de Dataproc Serverless, debes crear un clúster de Dataproc de nodo único para usarlo como servidor de historial persistente.

Establece un nombre para tu servidor de historial persistente.

PHS_CLUSTER_NAME=my-phs

Ejecuta el siguiente comando:

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

La IU de Spark y el servidor de historial persistente se analizarán con más detalle más adelante en el codelab.

3. Ejecuta trabajos de Spark sin servidores con lotes de Dataproc

En esta muestra, trabajarás con un conjunto de datos del conjunto de datos públicos de viajes en bicicletas de la ciudad de Nueva York (NYC). NYC Citi Bikes es un sistema pagado de bicicletas compartidas dentro de Nueva York. Realizarás algunas transformaciones simples y, además, imprimirás los diez IDs de estaciones de Citi Bike más populares. Este ejemplo también utiliza en particular el spark-bigquery-connector de código abierto para leer y escribir datos sin problemas entre Spark y BigQuery.

Clona el siguiente repositorio de GitHub y cd en el directorio que contiene el archivo citibike.py.

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

Envía el trabajo a Spark sin servidores con el SDK de Cloud, disponible en Cloud Shell de forma predeterminada. Ejecuta el siguiente comando en tu shell que usa el SDK de Cloud y la API de lotes de Dataproc para enviar trabajos de Spark sin servidores.

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

Para desglosarlo, sigue estos pasos:

  • gcloud dataproc batches submit hace referencia a la API de lotes de Dataproc.
  • pyspark indica que envías un trabajo de PySpark.
  • --batch es el nombre del trabajo. Si no se proporciona, se usará un UUID generado de forma aleatoria.
  • --region=${REGION} es la región geográfica en la que se procesará el trabajo.
  • --deps-bucket=${BUCKET} es el lugar al que se sube el archivo de Python local antes de ejecutarlo en el entorno sin servidores.
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar incluye el jar para el spark-bigquery-connector en el entorno de ejecución de Spark.
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} es el nombre completamente calificado del servidor de historial persistente. Aquí es donde se almacenan los datos de eventos de Spark (independientes del resultado de la consola) y se pueden ver desde la IU de Spark.
  • El -- al final denota que todo lo que esté más allá de esto serán argumentos de tiempo de ejecución para el programa. En este caso, enviarás el nombre de tu bucket, tal como lo requiere el trabajo.

Cuando se envíe el lote, verás el siguiente resultado.

Batch [citibike-job] submitted.

Después de un par de minutos, verás el siguiente resultado junto con los metadatos del trabajo.

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

En la siguiente sección, aprenderás a ubicar los registros para este trabajo.

Características adicionales

Con Spark sin servidores, tienes opciones adicionales para ejecutar tus trabajos.

  • Puedes crear una imagen personalizada de Docker en la que se ejecute tu trabajo. Esta es una excelente manera de incluir dependencias adicionales, incluidas las bibliotecas Python y R.
  • Puedes conectar una instancia de Dataproc Metastore a tu trabajo para acceder a los metadatos de Hive.
  • Para obtener un control adicional, Dataproc Serverless admite la configuración de un pequeño conjunto de propiedades de Spark.

4. Métricas y observabilidad de Dataproc

En la consola de lotes de Dataproc, se enumeran todos tus trabajos de Dataproc Serverless. En la consola, verás el ID de lote, la ubicación, el estado, la Hora de creación, el tiempo transcurrido y el Tipo de cada trabajo. Haz clic en el ID de lote de tu trabajo para ver más información sobre este.

En esta página, verás información como Monitoring, que muestra cuántos ejecutores de Spark por lotes usó tu trabajo a lo largo del tiempo (lo que indica el ajuste de escala automático).

En la pestaña Detalles, verás más metadatos sobre el trabajo, incluidos los argumentos y los parámetros que se enviaron con el trabajo.

También puedes acceder a todos los registros desde esta página. Cuando se ejecutan trabajos de Dataproc Serverless, se generan tres conjuntos diferentes de registros:

  • Nivel de servicio
  • Resultado de la consola
  • Registro de eventos de Spark

Nivel de servicio, incluye los registros que generó el servicio Dataproc Serverless. Por ejemplo, Dataproc Serverless solicita CPU adicionales para el ajuste de escala automático. Para verlos, haz clic en Ver registros, que abrirá Cloud Logging.

El resultado de la consola se puede ver en Salida. Este es el resultado que genera el trabajo, incluidos los metadatos que Spark imprime cuando comienza un trabajo o cualquier declaración de impresión incorporada al trabajo.

Se puede acceder al registro de eventos de Spark desde la IU de Spark. Debido a que le proporcionaste a tu trabajo de Spark un servidor de historial persistente, puedes acceder a la IU de Spark haciendo clic en Ver el servidor de historial de Spark, que contiene información para los trabajos de Spark que ejecutaste con anterioridad. Puedes obtener más información sobre la IU de Spark en la documentación oficial de Spark.

5. Plantillas de Dataproc: BQ -> GCS

Las plantillas de Dataproc son herramientas de código abierto que ayudan a simplificar aún más las tareas de procesamiento de datos en la nube. Estos funcionan como wrappers para Dataproc Serverless. Incluyen plantillas para muchas tareas de importación y exportación de datos, como las siguientes:

  • BigQuerytoGCS y GCStoBigQuery
  • GCStoBigTable
  • GCStoJDBC y JDBCtoGCS
  • HivetoBigQuery
  • MongotoGCS y GCStoMongo

La lista completa está disponible en el archivo README.

En esta sección, usarás plantillas de Dataproc para exportar datos de BigQuery a GCS.

Clone el repositorio

Clona el repositorio y cambia a la carpeta python.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

Configura el entorno

Ahora configurarás variables de entorno. Las plantillas de Dataproc usan la variable de entorno GCP_PROJECT para tu ID de proyecto, así que establece este valor en GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

Tu región debe configurarse en el entorno anterior. De lo contrario, configúralo aquí.

export REGION=<region>

Las plantillas de Dataproc usan spark-bigquery-conector para procesar los trabajos de BigQuery y requieren que el URI se incluya en una variable de entorno JARS. Configura la variable JARS.

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

Configura los parámetros de la plantilla

Establece el nombre de un bucket de etapa de pruebas para que use el servicio.

export GCS_STAGING_LOCATION=gs://${BUCKET}

A continuación, establecerás algunas variables específicas del trabajo. Para la tabla de entrada, volverás a hacer referencia al conjunto de datos de BigQuery NYC Citibike.

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

Puedes elegir csv, parquet, avro o json. En este codelab, elige CSV. En la siguiente sección, aprenderás a usar plantillas de Dataproc para convertir tipos de archivos.

BIGQUERY_GCS_OUTPUT_FORMAT=csv

Establece el modo de salida en overwrite. Puedes elegir entre overwrite, append, ignore o errorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

Establece la ubicación de salida de GCS como una ruta de acceso en tu bucket.

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

Ejecuta la plantilla

Para ejecutar la plantilla BIGQUERYTOGCS, especifícala a continuación y proporciona los parámetros de entrada que establezcas.

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

El resultado será bastante ruidoso, pero después de aproximadamente un minuto verás lo siguiente:

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

Para verificar que los archivos se generaron, ejecuta el siguiente comando.

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

De forma predeterminada, Spark escribe en varios archivos según la cantidad de datos. En este caso, verás aproximadamente 30 archivos generados. Los nombres de los archivos de salida de Spark tienen el formato part, seguido de un número de cinco dígitos (que indica el número de pieza) y una cadena de hash. Para grandes cantidades de datos, Spark generalmente escribe en varios archivos. Un nombre de archivo de ejemplo es part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.

6. Plantillas de Dataproc: CSV a Parquet

Ahora usarás plantillas de Dataproc para convertir datos en GCS de un tipo de archivo a otro con GCSTOGCS. Esta plantilla utiliza SparkSQL y proporciona la opción de enviar también una consulta en SparkSQL para que se procese durante la transformación y se procese adicionalmente.

Confirma las variables de entorno

Confirma que GCP_PROJECT, REGION y GCS_STAGING_BUCKET estén configurados desde la sección anterior.

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

Establecer parámetros de plantilla

Ahora, establecerás los parámetros de configuración para GCStoGCS. Comienza con la ubicación de los archivos de entrada. Ten en cuenta que se trata de un directorio y no de un archivo específico, ya que se procesarán todos los archivos del directorio. Establece esto en BIGQUERY_GCS_OUTPUT_LOCATION.

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

Configura el formato del archivo de entrada.

GCS_TO_GCS_INPUT_FORMAT=csv

Configura el formato de salida deseado. Puedes elegir Parquet, JSON, avro o CSV.

GCS_TO_GCS_OUTPUT_FORMAT=parquet

Establece el modo de salida en overwrite. Puedes elegir entre overwrite, append, ignore o errorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

Establece la ubicación de salida.

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

Ejecuta la plantilla

Ejecuta la plantilla GCStoGCS.

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

El resultado será bastante ruidoso, pero después de un minuto aproximadamente, deberías ver un mensaje de éxito como el que se muestra a continuación.

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

Para verificar que los archivos se generaron, ejecuta el siguiente comando.

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

Con esta plantilla, también tienes la opción de proporcionar consultas en SparkSQL pasando gcs.to.gcs.temp.view.name y gcs.to.gcs.sql.query a la plantilla, lo que permite que se ejecute una consulta en SparkSQL en los datos antes de escribirlos en GCS.

7. Limpia los recursos

Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar este codelab, haz lo siguiente:

  1. Borra el bucket de Cloud Storage del entorno que creaste.
gsutil rm -r gs://${BUCKET}
  1. Borra el clúster de Dataproc que usaste para el servidor de historial persistente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. Borra los trabajos de Dataproc Serverless. Ve a la Consola de Batch, haz clic en la casilla junto a cada trabajo que quieras borrar y, luego, en BORRAR.

Si creaste un proyecto solo para este codelab, también puedes borrarlo de manera opcional:

  1. En GCP Console, ve a la página Proyectos.
  2. En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
  3. En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

8. ¿Qué sigue?

Los siguientes recursos proporcionan formas adicionales de aprovechar Spark sin servidores: