Procesa previamente datos de BigQuery con PySpark en Dataproc

1. Descripción general

En este codelab, repasaremos cómo crear una canalización de procesamiento de datos usando Apache Spark con Dataproc en Google Cloud Platform. Es un caso de uso común en la ciencia y la ingeniería de datos para leerlos desde una ubicación de almacenamiento, realizar transformaciones y escribirlos en otra ubicación de almacenamiento. Las transformaciones comunes incluyen cambiar el contenido de los datos, quitar la información innecesaria y cambiar los tipos de archivos.

En este codelab, aprenderás sobre Apache Spark, ejecutarás una canalización de muestra mediante Dataproc con PySpark (API de Python de Apache Spark), BigQuery, Google Cloud Storage y datos de Reddit.

2. Introducción a Apache Spark (opcional)

Según el sitio web, " Apache Spark es un motor de estadísticas unificado para el procesamiento de datos a gran escala”. Te permite analizar y procesar datos en paralelo y en memoria, lo que permite realizar un procesamiento paralelo masivo en varias máquinas y nodos diferentes. Se lanzó originalmente en 2014 como una actualización de MapReduce tradicional y sigue siendo uno de los frameworks más populares para realizar cálculos a gran escala. Apache Spark está escrito en Scala y, luego, tiene APIs en Scala, Java, Python y R. Contiene una gran cantidad de bibliotecas, como Spark SQL para realizar consultas en SQL sobre los datos, Spark Streaming para datos de transmisión, MLlib para aprendizaje automático y GraphX para procesamiento de gráficos, que se ejecutan en el motor de Apache Spark.

32add0b6a47bafbc.png

Spark puede ejecutarse por sí solo o puede aprovechar un servicio de administración de recursos, como Yarn, Mesos o Kubernetes para el escalamiento. En este codelab, que usa Yarn, usarás Dataproc.

Originalmente, los datos en Spark se cargaron en la memoria, en lo que se conoce como RDD, o conjunto de datos resilientes y distribuidos. Desde entonces, el desarrollo en Spark ha incluido la adición de dos nuevos tipos de datos de estilo de columnas: el conjunto de datos, que está escrito, y el DataFrame, que no tiene tipo. En términos generales, los RDD son excelentes para cualquier tipo de datos, mientras que los conjuntos de datos y DataFrames están optimizados para datos tabulares. Como los conjuntos de datos solo están disponibles con las APIs de Java y Scala, procederemos a usar la API de DataFrame de PySpark para este codelab. Para obtener más información, consulta la documentación de Apache Spark.

3. Caso de uso

Los ingenieros de datos a menudo necesitan que los datos sean de fácil acceso para los científicos de datos. Sin embargo, al principio, los datos suelen estar sucios (difíciles de usar para el análisis en su estado actual) y deben limpiarse antes de que puedan ser de gran utilidad. Un ejemplo de esto son los datos que se extrajeron de la Web y que pueden contener codificaciones extrañas o etiquetas HTML irrelevantes.

En este lab, cargarás un conjunto de datos de BigQuery en forma de publicaciones de Reddit en un clúster de Spark alojado en Dataproc, extraerás información útil y almacenarás los datos procesados como archivos CSV comprimidos en Google Cloud Storage.

be2a4551ece63bfc.png

Al director científico de datos de tu empresa le interesa que sus equipos trabajen en diferentes problemas de procesamiento de lenguaje natural. En particular, le interesa analizar los datos del subreddit "r/food". Crearás una canalización para un volcado de datos comenzando con un reabastecimiento desde enero de 2017 hasta agosto de 2019.

4. Accede a BigQuery a través de la API de almacenamiento de BigQuery

Extraer datos de BigQuery con el método de la API tabladata.list puede llevar mucho tiempo y no ser eficiente a medida que aumenta la cantidad de datos. Este método muestra una lista de objetos JSON y requiere la lectura secuencial de una página a la vez para leer un conjunto de datos completo.

La API de BigQuery Storage ofrece mejoras significativas para acceder a los datos en BigQuery a través de un protocolo basado en RPC. Admite operaciones de lectura y escritura de datos en paralelo, así como diferentes formatos de serialización, como Apache Avro y Apache Arrow. A grandes rasgos, esto se traduce en un rendimiento significativamente mejorado, especialmente en conjuntos de datos más grandes.

En este codelab, usarás spark-bigquery-connector para leer y escribir datos entre BigQuery y Spark.

5. Crea un proyecto

Accede a la consola de Google Cloud Platform en console.cloud.google.com y crea un proyecto nuevo:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.

La ejecución de este codelab no debería costar más que unos pocos dólares, pero podría ser más si decides usar más recursos o si los dejas ejecutándose. En la última sección de este codelab, aprenderás a limpiar tu proyecto.

Los usuarios nuevos de Google Cloud son aptos para una prueba gratuita de$300.

6. Configura el entorno

Ahora deberás configurar tu entorno de la siguiente manera:

  • Habilita las APIs de almacenamiento de Compute Engine, Dataproc y BigQuery
  • Establece la configuración del proyecto
  • Crea un clúster de Dataproc
  • Crea un bucket de Google Cloud Storage

Habilita las APIs y configura tu entorno

Abre Cloud Shell presionando el botón en la esquina superior derecha de la consola de Cloud.

a10c47ee6ca41c54.png

Después de que se cargue Cloud Shell, ejecuta los siguientes comandos para habilitar las APIs de Compute Engine, Dataproc y BigQuery Storage:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

Configura el ID del proyecto de tu proyecto. Para encontrarlo, ve a la página de selección de proyectos y busca tu proyecto. Es posible que este no sea el mismo que el nombre de tu proyecto.

e682e8227aa3c781.png

76d45fb295728542.png

Ejecuta el siguiente comando para configurar tu ID del proyecto:

gcloud config set project <project_id>

Elige una de la región de tu proyecto aquí para establecer la región. Un ejemplo podría ser us-central1.

gcloud config set dataproc/region <region>

Elige un nombre para tu clúster de Dataproc y crea una variable de entorno para él.

CLUSTER_NAME=<cluster_name>

Cómo crear un clúster de Dataproc

Crea un clúster de Dataproc mediante la ejecución del siguiente comando:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

Este comando tardará unos minutos en completarse. Para desglosar el comando, haz lo siguiente:

Esto iniciará la creación de un clúster de Dataproc con el nombre que proporcionaste anteriormente. Usar la API de beta habilitará las funciones beta de Dataproc, como Component Gateway.

gcloud beta dataproc clusters create ${CLUSTER_NAME}

Esto configurará el tipo de máquina que usarás para tus trabajadores.

--worker-machine-type n1-standard-8

Esto establecerá la cantidad de trabajadores que tendrá tu clúster.

--num-workers 8

Esto configurará la versión de imagen de Dataproc.

--image-version 1.5-debian

Esto configurará las acciones de inicialización que se usarán en el clúster. Aquí, incluirás la acción de inicialización pip.

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

Estos son los metadatos que se incluirán en el clúster. Aquí, debes proporcionar metadatos para la acción de inicialización pip.

--metadata 'PIP_PACKAGES=google-cloud-storage'

Esto configurará los Componentes opcionales que se instalarán en el clúster.

--optional-components=ANACONDA

Esto habilitará la puerta de enlace de componentes que te permite usar la puerta de enlace de componentes de Dataproc para ver IUs comunes como Zeppelin, Jupyter o el historial de Spark

--enable-component-gateway

Para obtener una introducción más detallada a Dataproc, consulta este codelab.

Crea un bucket de Google Cloud Storage

Necesitarás un bucket de Google Cloud Storage para el resultado de tu trabajo. Determina un nombre único para tu bucket y ejecuta el siguiente comando para crear un bucket nuevo. Los nombres de los buckets son únicos en todos los proyectos de Google Cloud para todos los usuarios, por lo que es posible que debas intentarlo varias veces con nombres diferentes. Si no recibes un ServiceException, se crea correctamente un bucket.

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. Análisis de datos exploratorio

Antes de realizar el procesamiento previo, debes aprender más sobre la naturaleza de los datos con los que trabajas. Para ello, explorarás dos métodos de exploración de datos. Primero, verás algunos datos sin procesar con la IU web de BigQuery y, luego, calcularás la cantidad de publicaciones por subreddit con PySpark y Dataproc.

Usa la IU web de BigQuery

Comienza por usar la IU web de BigQuery para ver tus datos. En el ícono de menú de la consola de Cloud, desplázate hacia abajo y presiona “BigQuery”. para abrir la IU web de BigQuery.

242a597d7045b4da.png

A continuación, ejecuta el siguiente comando en el Editor de consultas de la IU web de BigQuery. Esto mostrará 10 filas completas de los datos de enero de 2017:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

Puedes desplazarte por la página para ver todas las columnas disponibles, así como algunos ejemplos. En particular, verás dos columnas que representan el contenido textual de cada entrada: "title". y "selftext", este último es el cuerpo de la entrada. También observa otras columnas como "created_utc" que es el momento en que se creó una publicación y “subreddit” que es el subreddit en el que existe la publicación.

Ejecuta un trabajo de PySpark

Ejecuta los siguientes comandos en Cloud Shell para clonar el repo con el código de muestra y cambiar al directorio correcto:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

Puedes usar PySpark para determinar un recuento de la cantidad de publicaciones que existen para cada subreddit. Puedes abrir Cloud Editor y leer la secuencia de comandos cloud-dataproc/codelabs/spark-bigquery antes de ejecutarla en el siguiente paso:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Haz clic en “Abrir terminal” en Cloud Editor para volver a Cloud Shell y ejecutar el siguiente comando para ejecutar tu primer trabajo de PySpark:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

Este comando te permite enviar trabajos a Dataproc a través de la API de Jobs. Aquí indicas el tipo de trabajo como pyspark. Puedes proporcionar el nombre del clúster, los parámetros opcionales y el nombre del archivo que contiene el trabajo. Aquí, proporcionas el parámetro --jars, que te permite incluir spark-bigquery-connector con tu trabajo. También puedes establecer los niveles de salida del registro con --driver-log-levels root=FATAL, que suprimirá todos los resultados del registro, excepto los errores. Los registros de Spark suelen ser bastante ruidosos.

Esto debería tardar unos minutos en ejecutarse y el resultado final debería ser similar al siguiente:

6c185228db47bb18.png

8. Explora las IU de Dataproc y Spark

Cuando ejecutas trabajos de Spark en Dataproc, tienes acceso a dos IU para verificar el estado de tus trabajos o clústeres. La primera es la IU de Dataproc, que puedes encontrar haciendo clic en el ícono de menú y desplazándote hacia abajo hasta Dataproc. Aquí puedes ver la memoria disponible actual, así como la memoria pendiente y la cantidad de trabajadores.

6f2987346d15c8e2.png

También puedes hacer clic en la pestaña de trabajos para ver los trabajos completados. Puedes ver los detalles del trabajo, como los registros y los resultados, si haces clic en el ID de trabajo de un trabajo en particular. 114d90129b0e4c88.png

1b2160f0f484594a.png

También puedes ver la IU de Spark. Desde la página del trabajo, haz clic en la flecha hacia atrás y luego en Interfaces web. Deberías ver varias opciones en la puerta de enlace de componentes. Muchos de ellos pueden habilitarse mediante los componentes opcionales cuando configures tu clúster. Para este lab, haz clic en el servidor de historial de Spark.

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

Se debería abrir la siguiente ventana:

8f6786760f994fe8.png

Todos los trabajos completados se mostrarán aquí, y puedes hacer clic en cualquier application_id para obtener más información sobre el trabajo. Del mismo modo, puedes hacer clic en "Mostrar aplicaciones incompletas" en la parte inferior de la página de destino para ver todos los trabajos que se están ejecutando.

9. Cómo ejecutar tu trabajo de reabastecimiento

Ahora, ejecutarás un trabajo que carga datos en la memoria, extrae la información necesaria y vuelca el resultado en un bucket de Google Cloud Storage. Extraerás el "título", el "cuerpo" (texto sin procesar) y "marca de tiempo de creación" para cada comentario de Reddit. Luego, tomarás estos datos, los convertirás en un archivo CSV, los comprimirás y cargarás en un bucket con el URI gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz.

Puedes consultar Cloud Editor otra vez para leer el código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh, que es una secuencia de comandos wrapper para ejecutar el código en cloud-dataproc/codelabs/spark-bigquery/backfill.py.

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

Pronto deberías ver un montón de mensajes de finalización de trabajos. El trabajo puede tardar hasta 15 minutos en completarse. También puedes volver a revisar tu bucket de almacenamiento para comprobar la salida correcta de los datos con gsutil. Cuando se completen todos los trabajos, ejecuta el siguiente comando:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

Deberías ver el siguiente resultado:

a7c3c7b2e82f9fca.png

¡Felicitaciones! Completaste correctamente el reabastecimiento de tus datos de comentarios de Reddit. Si te interesa cómo puedes compilar modelos a partir de estos datos, continúa con el codelab Spark-NLP.

10. Limpieza

Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:

  1. Borra el bucket de Cloud Storage para el entorno y que creaste
  2. Borra el entorno de Dataproc.

Si creaste un proyecto solo para este codelab, también puedes borrarlo de manera opcional:

  1. En GCP Console, ve a la página Proyectos.
  2. En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
  3. En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.

Licencia

Esta obra se ofrece bajo una licencia genérica de Creative Commons Attribution 3.0 y Apache 2.0.