1. Descripción general: Google Dataproc
Dataproc es un servicio completamente administrado y con alto escalamiento para ejecutar Apache Spark, Apache Flink, Presto y más herramientas y frameworks de código abierto. Usa Dataproc para la modernización de data lakes, ETL / ELT y ciencia de datos segura a escala mundial. Dataproc también está completamente integrado en varios servicios de Google Cloud, incluidos BigQuery, Cloud Storage, Vertex AI y Dataplex.
Dataproc está disponible en tres variantes:
- Dataproc Serverless te permite ejecutar trabajos de PySpark sin necesidad de configurar la infraestructura ni el ajuste de escala automático. Dataproc Serverless admite cargas de trabajo y sesiones por lotes de PySpark (notebooks).
- Dataproc en Google Compute Engine te permite administrar un clúster de Hadoop YARN para cargas de trabajo de Spark basadas en YARN, además de herramientas de código abierto, como Flink y Presto. Puedes adaptar tus clústeres basados en la nube con el escalamiento vertical u horizontal que desees, incluido el ajuste de escala automático.
- Dataproc en Google Kubernetes Engine te permite configurar clústeres virtuales de Dataproc en tu infraestructura de GKE para enviar trabajos de Spark, PySpark, SparkR o Spark SQL.
En este codelab, aprenderás varias formas diferentes de consumir Dataproc Serverless.
Apache Spark se creó originalmente para ejecutarse en clústeres de Hadoop y se usó YARN como su administrador de recursos. Mantener los clústeres de Hadoop requiere un conjunto específico de conocimientos y garantizar que muchas perillas diferentes de los clústeres estén configuradas correctamente. Esto se suma a un conjunto independiente de perillas que Spark también requiere que el usuario configure. Esto genera muchas situaciones en las que los desarrolladores pasan más tiempo configurando su infraestructura en lugar de trabajar en el código de Spark.
Dataproc Serverless elimina la necesidad de configurar manualmente los clústeres de Hadoop o Spark. Dataproc Serverless no se ejecuta en Hadoop y usa su propia asignación dinámica de recursos para determinar sus requisitos de recursos, incluido el ajuste de escala automático. Un pequeño subconjunto de propiedades de Spark aún se puede personalizar con Dataproc Serverless, pero, en la mayoría de los casos, no necesitarás modificarlas.
2. Configurar
Comenzarás por configurar tu entorno y los recursos que se usan en este codelab.
Crea un proyecto de Google Cloud. Puedes usar uno existente.
Haz clic en Cloud Shell en la barra de herramientas de Cloud Console para abrirlo.

Cloud Shell proporciona un entorno de shell listo para usar que puedes usar para este codelab.

Cloud Shell establecerá el nombre de tu proyecto de forma predeterminada. Vuelve a verificar ejecutando echo $GOOGLE_CLOUD_PROJECT. Si no ves el ID de tu proyecto en el resultado, configúralo.
export GOOGLE_CLOUD_PROJECT=<your-project-id>
Configura una región de Compute Engine para tus recursos, como us-central1 o europe-west2.
export REGION=<your-region>
Habilita las APIs
En el codelab, se usan las siguientes APIs:
- BigQuery
- Dataproc
Habilita las APIs necesarias. Esto tardará aproximadamente un minuto y aparecerá un mensaje de éxito cuando se complete.
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
Configurar el acceso a la red
Dataproc Serverless requiere que habilites Google Private Access en la región en la que ejecutarás tus trabajos de Spark, ya que los controladores y los ejecutores de Spark solo tienen IPs privadas. Ejecuta lo siguiente para habilitarlo en la subred default.
gcloud compute networks subnets update default \
--region=${REGION} \
--enable-private-ip-google-access
Puedes verificar que Google Private Access esté habilitado con lo siguiente, que generará True o False.
gcloud compute networks subnets describe default \
--region=${REGION} \
--format="get(privateIpGoogleAccess)"
Crea un bucket de almacenamiento
Crea un bucket de almacenamiento que se usará para almacenar los recursos creados en este codelab.
Selecciona un nombre para tu bucket. Los nombres de los buckets deben ser únicos a nivel global para todos los usuarios.
export BUCKET=<your-bucket-name>
Crea el bucket en la región en la que deseas ejecutar tus trabajos de Spark.
gsutil mb -l ${REGION} gs://${BUCKET}
Puedes ver que tu bucket está disponible en la consola de Cloud Storage consola. También puedes ejecutar gsutil ls para ver tu bucket.
Crea un servidor de historial persistente
La IU de Spark proporciona un conjunto enriquecido de herramientas de depuración y estadísticas sobre los trabajos de Spark. Para ver la IU de Spark para los trabajos completados de Dataproc Serverless, debes crear un clúster de Dataproc de un solo nodo para usarlo como un servidor de historial persistente.
Establece un nombre para tu servidor de historial persistente.
PHS_CLUSTER_NAME=my-phs
Ejecuta lo siguiente.
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
--region=${REGION} \
--single-node \
--enable-component-gateway \
--properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
La IU de Spark y el servidor de historial persistente se explorarán con más detalle más adelante en el codelab.
3. Ejecuta trabajos de Spark sin servidores con lotes de Dataproc
En este ejemplo, trabajarás con un conjunto de datos del conjunto de datos públicos Citi Bike Trips de la ciudad de Nueva York (NYC). NYC Citi Bikes es un sistema pagado de bicicletas compartidas de la ciudad de Nueva York. Realizarás algunas transformaciones simples y, luego, imprimirás los diez IDs de estaciones de Citi Bike más populares. En particular, este ejemplo también usa el conector de código abierto spark-bigquery-connector para leer y escribir datos sin problemas entre Spark y BigQuery.
Clona el siguiente repositorio de GitHub y usa cd para ir al directorio que contiene el archivo citibike.py.
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
Envía el trabajo a Spark sin servidores con el SDK de Cloud, disponible en Cloud Shell de forma predeterminada. Ejecuta el siguiente comando en tu shell, que usa el SDK de Cloud y la API de Batches de Dataproc para enviar trabajos de Spark sin servidores.
gcloud dataproc batches submit pyspark citibike.py \
--batch=citibike-job \
--region=${REGION} \
--deps-bucket=gs://${BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
-- ${BUCKET}
Para desglosar esto, haz lo siguiente:
gcloud dataproc batches submithace referencia a la API de Batches de Dataproc.pysparkindica que envías un trabajo de PySpark.--batches el nombre del trabajo. Si no se proporciona, se usará un UUID generado de forma aleatoria.--region=${REGION}es la región geográfica en la que se procesará el trabajo.--deps-bucket=${BUCKET}es el lugar donde se sube tu archivo local de Python antes de ejecutarse en el entorno sin servidores.--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jarincluye el archivo jar para el spark-bigquery-connector en el entorno de ejecución de Spark.--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}es el nombre completamente calificado del servidor de historial persistente. Aquí es donde se almacenan los datos de eventos de Spark (separados del resultado de la consola) y se pueden ver desde la IU de Spark.- El
--final indica que todo lo que esté más allá serán argumentos de tiempo de ejecución para el programa. En este caso, envías el nombre de tu bucket, según lo requiere el trabajo.
Cuando se envíe el lote, verás el siguiente resultado.
Batch [citibike-job] submitted.
Después de un par de minutos, verás el siguiente resultado junto con los metadatos del trabajo.
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
En la siguiente sección, aprenderás a ubicar los registros de este trabajo.
Funciones adicionales
Con Spark Serverless, tienes opciones adicionales para ejecutar tus trabajos.
- Puedes crear una imagen de Docker personalizada en la que se ejecute tu trabajo. Esta es una excelente manera de incluir dependencias adicionales, incluidas las bibliotecas de Python y R.
- Puedes conectar una instancia de Dataproc Metastore a tu trabajo para acceder a los metadatos de Hive.
- Para obtener más control, Dataproc Serverless admite la configuración de un pequeño conjunto de propiedades de Spark.
4. Métricas y observabilidad de Dataproc
En la consola de lotes de Dataproc, se enumeran todos tus trabajos de Dataproc Serverless. En la consola, verás el ID del lote, la ubicación, el estado, la hora de creación, el tiempo transcurrido y el tipo de cada trabajo. Haz clic en el ID del lote de tu trabajo para ver más información al respecto.
En esta página, verás información como Supervisión , que muestra cuántos ejecutores de Spark por lotes usó tu trabajo con el paso del tiempo (lo que indica cuánto se ajustó la escala automáticamente).
En la pestaña Detalles , verás más metadatos sobre el trabajo, incluidos los argumentos y parámetros que se enviaron con el trabajo.
También puedes acceder a todos los registros desde esta página. Cuando se ejecutan trabajos de Dataproc Serverless, se generan tres conjuntos diferentes de registros:
- A nivel de servicio
- Resultado de la consola
- Registro de eventos de Spark
A nivel de servicio, incluye registros que generó el servicio de Dataproc Serverless. Estos incluyen elementos como Dataproc Serverless que solicita CPUs adicionales para el ajuste de escala automático. Para verlos, haz clic en Ver registros , que abrirá Cloud Logging.
El resultado de la consola se puede ver en Resultado. Este es el resultado que genera el trabajo, incluidos los metadatos que Spark imprime cuando comienza un trabajo o cualquier instrucción de impresión incorporada en el trabajo.
Se puede acceder al registro de eventos de Spark desde la IU de Spark. Como proporcionaste a tu trabajo de Spark un servidor de historial persistente, puedes acceder a la IU de Spark haciendo clic en Ver servidor de historial de Spark, que contiene información sobre tus trabajos de Spark ejecutados anteriormente. Puedes obtener más información sobre la IU de Spark en la documentación oficial de Spark.
5. Plantillas de Dataproc: BQ -> GCS
Las plantillas de Dataproc son herramientas de código abierto que ayudan a simplificar aún más las tareas de procesamiento de datos en la nube. Estas sirven como wrapper para Dataproc Serverless y, además, incluyen plantillas para muchas tareas de importación y exportación de datos, incluidas las siguientes:
BigQuerytoGCSyGCStoBigQueryGCStoBigTableGCStoJDBCyJDBCtoGCSHivetoBigQueryMongotoGCSyGCStoMongo
La lista completa está disponible en el archivo README.
En esta sección, usarás las plantillas de Dataproc para exportar datos de BigQuery a GCS.
Clone el repositorio
Clona el repositorio y cambia a la carpeta python.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
Configura el entorno
Ahora, establecerás variables de entorno. Las plantillas de Dataproc usan la variable de entorno GCP_PROJECT para el ID del proyecto, por lo que debes establecerla como GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
Tu región debería estar configurada en el entorno anterior. De lo contrario, configúrala aquí.
export REGION=<region>
Las plantillas de Dataproc usan el conector spark-bigquery-connector para procesar trabajos de BigQuery y requieren que el URI se incluya en una variable de entorno JARS. Configura la variable JARS.
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
Configura los parámetros de la plantilla
Establece el nombre de un bucket de etapa de pruebas para que lo use el servicio.
export GCS_STAGING_LOCATION=gs://${BUCKET}
A continuación, establecerás algunas variables específicas del trabajo. Para la tabla de entrada, volverás a hacer referencia al conjunto de datos de BigQuery NYC Citibike.
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
Puedes elegir csv, parquet, avro o json. Para este codelab, elige CSV. En la siguiente sección, se explica cómo usar las plantillas de Dataproc para convertir tipos de archivos.
BIGQUERY_GCS_OUTPUT_FORMAT=csv
Establece el modo de salida en overwrite. Puedes elegir entre overwrite, append, ignore o errorifexists.
BIGQUERY_GCS_OUTPUT_MODE=overwrite
Establece la ubicación de salida de GCS como una ruta de acceso en tu bucket.
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
Ejecuta la plantilla
Ejecuta la plantilla BIGQUERYTOGCS. Para ello, especifícala a continuación y proporciona los parámetros de entrada que estableciste.
./bin/start.sh \
-- --template=BIGQUERYTOGCS \
--bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
--bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
--bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
--bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
El resultado será bastante ruidoso, pero, después de aproximadamente un minuto, verás lo siguiente.
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
Para verificar que se generaron los archivos, ejecuta lo siguiente.
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
De forma predeterminada, Spark escribe en varios archivos, según la cantidad de datos. En este caso, verás aproximadamente 30 archivos generados. Los nombres de los archivos de salida de Spark tienen el formato part- seguido de un número de cinco dígitos (que indica el número de parte) y una cadena de hash. Para grandes cantidades de datos, Spark suele escribir en varios archivos. Un ejemplo de nombre de archivo es part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv.
6. Plantillas de Dataproc: CSV a Parquet
Ahora, usarás las plantillas de Dataproc para convertir datos en GCS de un tipo de archivo a otro con el GCSTOGCS. Esta plantilla usa SparkSQL y proporciona la opción de enviar también una consulta de SparkSQL para que se procese durante la transformación para un procesamiento adicional.
Confirma las variables de entorno
Confirma que GCP_PROJECT, REGION y GCS_STAGING_BUCKET estén configuradas en la sección anterior.
echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}
Establece los parámetros de la plantilla
Ahora, establecerás los parámetros de configuración para GCStoGCS. Comienza con la ubicación de los archivos de entrada. Ten en cuenta que este es un directorio y no un archivo específico, ya que se procesarán todos los archivos del directorio. Configura esto como BIGQUERY_GCS_OUTPUT_LOCATION.
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
Establece el formato del archivo de entrada.
GCS_TO_GCS_INPUT_FORMAT=csv
Establece el formato de salida deseado. Puedes elegir Parquet, JSON, Avro o CSV.
GCS_TO_GCS_OUTPUT_FORMAT=parquet
Establece el modo de salida en overwrite. Puedes elegir entre overwrite, append, ignore o errorifexists.
GCS_TO_GCS_OUTPUT_MODE=overwrite
Establece la ubicación de salida.
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
Ejecuta la plantilla
Ejecuta la plantilla GCStoGCS.
./bin/start.sh \
-- --template=GCSTOGCS \
--gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
--gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
--gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
--gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
--gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
El resultado será bastante ruidoso, pero, después de aproximadamente un minuto, deberías ver un mensaje de éxito como el siguiente.
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
Para verificar que se generaron los archivos, ejecuta lo siguiente.
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
Con esta plantilla, también tienes la opción de proporcionar consultas de SparkSQL pasando gcs.to.gcs.temp.view.name y gcs.to.gcs.sql.query a la plantilla, lo que permite que se ejecute una consulta de SparkSQL en los datos antes de escribir en GCS.
7. Limpia los recursos
Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar este codelab, haz lo siguiente:
- Borra el bucket de Cloud Storage para el entorno que creaste.
gsutil rm -r gs://${BUCKET}
- Borra el clúster de Dataproc que se usó para tu servidor de historial persistente.
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
--region=${REGION}
- Borra los trabajos de Dataproc Serverless. Ve a la consola de lotes, haz clic en la casilla junto a cada trabajo que deseas borrar y, luego, haz clic en BORRAR.
Si creaste un proyecto solo para este codelab, también puedes borrarlo de forma opcional:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, elige el proyecto que deseas borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y haz clic en Cerrar para borrar el proyecto.
8. ¿Qué sigue?
En los siguientes recursos, se proporcionan formas adicionales de aprovechar Spark sin servidores:
- Obtén información para organizar flujos de trabajo de Dataproc Serverless con Cloud Composer.
- Obtén información para integrar Dataproc Serverless con canalizaciones de Kubeflow.