1. Descripción general
En este codelab, se explica cómo crear una canalización de procesamiento de datos con Apache Spark y Dataproc en Google Cloud Platform. Es un caso de uso común en la ciencia y la ingeniería de datos para leerlos desde una ubicación de almacenamiento, realizar transformaciones y escribirlos en otra ubicación de almacenamiento. Entre las transformaciones comunes, se incluyen cambiar el contenido de los datos, quitar la información innecesaria y cambiar los tipos de archivos.
En este codelab, aprenderás sobre Apache Spark, ejecutarás una canalización de muestra con Dataproc y PySpark (la API de Python de Apache Spark), BigQuery, Google Cloud Storage y datos de Reddit.
2. Introducción a Apache Spark (opcional)
Según el sitio web, “Apache Spark es un motor de estadísticas unificado para el procesamiento de datos a gran escala”. Te permite analizar y procesar datos en paralelo y en la memoria, lo que permite un procesamiento masivo en paralelo en varias máquinas y nodos diferentes. Se lanzó originalmente en 2014 como una actualización de MapReduce tradicional y sigue siendo uno de los frameworks más populares para realizar cálculos a gran escala. Apache Spark está escrito en Scala y, posteriormente, tiene APIs en Scala, Java, Python y R. Contiene una gran cantidad de bibliotecas, como Spark SQL para realizar consultas de SQL en los datos, Spark Streaming para transmitir datos, MLlib para el aprendizaje automático y GraphX para el procesamiento de gráficos, que se ejecutan en el motor de Apache Spark.
Spark puede ejecutarse por sí solo o aprovechar un servicio de administración de recursos, como Yarn, Mesos o Kubernetes para el escalamiento. En este codelab, usarás Dataproc, que utiliza Yarn.
Los datos de Spark se cargaron originalmente en la memoria en lo que se denomina RDD, o conjunto de datos resilientes y distribuidos. Desde entonces, el desarrollo en Spark incluyó la adición de dos nuevos tipos de datos de estilo de columna: el conjunto de datos, que está escrito, y el dataframe, que no tiene formato. En términos generales, los RDD son excelentes para cualquier tipo de datos, mientras que los conjuntos de datos y los marcos de datos están optimizados para los datos tabulares. Como los conjuntos de datos solo están disponibles con las APIs de Java y Scala, usaremos la API de Dataframe de PySpark para este codelab. Para obtener más información, consulta la documentación de Apache Spark.
3. Caso de uso
Los ingenieros de datos suelen necesitar que los científicos de datos puedan acceder fácilmente a los datos. Sin embargo, a menudo, los datos son incorrectos al principio (difíciles de usar para las estadísticas en su estado actual) y deben limpiarse antes de que puedan ser de gran utilidad. Un ejemplo de esto son los datos que se extrajeron de la Web y que pueden contener codificaciones extrañas o etiquetas HTML superfluas.
En este lab, cargarás un conjunto de datos de BigQuery en forma de publicaciones de Reddit en un clúster de Spark alojado en Dataproc, extraerás información útil y almacenarás los datos procesados como archivos CSV comprimidos en Google Cloud Storage.
El científico de datos principal de tu empresa está interesado en que sus equipos trabajen en diferentes problemas de procesamiento de lenguaje natural. Específicamente, le interesa analizar los datos del subreddit “r/food”. Crearás una canalización para un volcado de datos que comenzará con un reabastecimiento desde enero de 2017 hasta agosto de 2019.
4. Cómo acceder a BigQuery a través de la API de BigQuery Storage
Extraer datos de BigQuery con el método de la API tabledata.list puede ser lento y poco eficiente a medida que aumenta la cantidad de datos. Este método muestra una lista de objetos JSON y requiere la lectura secuencial de una página a la vez para leer un conjunto de datos completo.
La API de BigQuery Storage ofrece mejoras significativas en el acceso a los datos en BigQuery mediante un protocolo basado en RPC. Admite operaciones de lectura y escritura de datos en paralelo, así como diferentes formatos de serialización, como Apache Avro y Apache Arrow. A un nivel alto, esto se traduce en un rendimiento significativamente mejorado, especialmente en conjuntos de datos más grandes.
En este codelab, usarás el conector de spark-bigquery para leer y escribir datos entre BigQuery y Spark.
5. Crea un proyecto
Accede a la consola de Google Cloud Platform en console.cloud.google.com y crea un proyecto nuevo:
A continuación, deberás habilitar la facturación en la consola de Cloud para usar los recursos de Google Cloud.
Ejecutar este codelab no debería costar más que unos pocos dólares, pero podría ser más si decides usar más recursos o si los dejas en ejecución. En la última sección de este codelab, se te indicará cómo limpiar tu proyecto.
Los usuarios nuevos de Google Cloud Platform son aptos para obtener una prueba gratuita de USD 300.
6. Configura el entorno
Ahora, configurarás tu entorno de la siguiente manera:
- Habilita las APIs de Compute Engine, Dataproc y BigQuery Storage
- Configura la configuración del proyecto
- Crea un clúster de Dataproc
- Crea un bucket de Google Cloud Storage
Habilita las APIs y configura tu entorno
Para abrir Cloud Shell, presiona el botón que se encuentra en la esquina superior derecha de la consola de Cloud.
Después de que se cargue Cloud Shell, ejecuta los siguientes comandos para habilitar las APIs de Compute Engine, Dataproc y BigQuery Storage:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
Establece el ID del proyecto. Para encontrarlo, ve a la página de selección de proyectos y busca tu proyecto. Es posible que no sea el mismo que el nombre de tu proyecto.
Ejecuta el siguiente comando para establecer el ID del proyecto:
gcloud config set project <project_id>
Establece la región de tu proyecto eligiendo una de la lista aquí. Un ejemplo podría ser us-central1
.
gcloud config set dataproc/region <region>
Elige un nombre para tu clúster de Dataproc y crea una variable de entorno para él.
CLUSTER_NAME=<cluster_name>
Cómo crear un clúster de Dataproc
Para crear un clúster de Dataproc, ejecuta el siguiente comando:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
Este comando tardará unos minutos en completarse. Para desglosar el comando, sigue estos pasos:
Se iniciará la creación de un clúster de Dataproc con el nombre que proporcionaste antes. El uso de la API de beta
habilitará las funciones beta de Dataproc, como la puerta de enlace de componentes.
gcloud beta dataproc clusters create ${CLUSTER_NAME}
Esto establecerá el tipo de máquina que se usará para tus trabajadores.
--worker-machine-type n1-standard-8
Esto establecerá la cantidad de trabajadores que tendrá tu clúster.
--num-workers 8
Esto establecerá la versión de la imagen de Dataproc.
--image-version 1.5-debian
Esto configurará las acciones de inicialización que se usarán en el clúster. Aquí, incluyes la acción de inicialización pip.
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
Estos son los metadatos que se incluirán en el clúster. Aquí, proporcionas metadatos para la acción de inicialización pip
.
--metadata 'PIP_PACKAGES=google-cloud-storage'
Esto establecerá los componentes opcionales que se instalarán en el clúster.
--optional-components=ANACONDA
Esto habilitará la puerta de enlace de componentes, que te permitirá usar la puerta de enlace de componentes de Dataproc para ver IUs comunes, como Zeppelin, Jupyter o el historial de Spark.
--enable-component-gateway
Para obtener una introducción más detallada a Dataproc, consulta este codelab.
Cómo crear un bucket de Google Cloud Storage
Necesitarás un bucket de Google Cloud Storage para el resultado de tu trabajo. Determina un nombre único para tu bucket y ejecuta el siguiente comando para crear uno nuevo. Los nombres de bucket son únicos en todos los proyectos de Google Cloud para todos los usuarios, por lo que es posible que debas intentar esto varias veces con nombres diferentes. Se crea correctamente un bucket si no recibes un ServiceException
.
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. Análisis de datos exploratorio
Antes de realizar el procesamiento previo, debes obtener más información sobre la naturaleza de los datos con los que estás trabajando. Para ello, explorarás dos métodos de exploración de datos. Primero, verás algunos datos sin procesar con la IU web de BigQuery y, luego, calcularás la cantidad de publicaciones por subreddit con PySpark y Dataproc.
Usa la IU web de BigQuery
Comienza por usar la IU web de BigQuery para ver tus datos. En el ícono de menú de la consola de Cloud, desplázate hacia abajo y presiona "BigQuery" para abrir la IU web de BigQuery.
A continuación, ejecuta el siguiente comando en el editor de consultas de la IU web de BigQuery. Esto mostrará 10 filas completas de los datos de enero de 2017:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
Puedes desplazarte por la página para ver todas las columnas disponibles, así como algunos ejemplos. En particular, verás dos columnas que representan el contenido textual de cada publicación: "title" y "selftext", que es el cuerpo de la publicación. También observa otras columnas, como "created_utc", que es la hora UTC en la que se realizó una publicación, y "subreddit", que es el subreddit en el que existe la publicación.
Ejecuta un trabajo de PySpark
Ejecuta los siguientes comandos en Cloud Shell para clonar el repositorio con el código de muestra y cambiar al directorio correcto:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
Puedes usar PySpark para determinar cuántas publicaciones existen para cada subreddit. Puedes abrir Cloud Editor y leer la secuencia de comandos cloud-dataproc/codelabs/spark-bigquery
antes de ejecutarla en el siguiente paso:
Haz clic en el botón “Abrir terminal” en el Editor de Cloud para volver a Cloud Shell y ejecuta el siguiente comando para ejecutar tu primera tarea de PySpark:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
Este comando te permite enviar trabajos a Dataproc a través de la API de Jobs. Aquí, indicas el tipo de trabajo como pyspark
. Puedes proporcionar el nombre del clúster, los parámetros opcionales y el nombre del archivo que contiene la tarea. Aquí, proporcionas el parámetro --jars
, que te permite incluir el spark-bigquery-connector
con tu trabajo. También puedes establecer los niveles de salida de registro con --driver-log-levels root=FATAL
, que suprimirá todos los resultados de registro, excepto los errores. Los registros de Spark suelen ser bastante ruidosos.
La ejecución debería tardar unos minutos, y el resultado final debería ser similar al siguiente:
8. Explora las IU de Dataproc y Spark
Cuando ejecutas trabajos de Spark en Dataproc, tienes acceso a dos IUs para verificar el estado de tus trabajos o clústeres. La primera es la IU de Dataproc, que puedes encontrar haciendo clic en el ícono de menú y desplazándote hacia abajo hasta Dataproc. Aquí, puedes ver la memoria actual disponible, la memoria pendiente y la cantidad de trabajadores.
También puedes hacer clic en la pestaña de trabajos para ver los trabajos completados. Para ver los detalles de un trabajo, como los registros y el resultado, haz clic en el ID de un trabajo en particular.
También puedes ver la IU de Spark. En la página del trabajo, haz clic en la flecha hacia atrás y, luego, en Interfaces web. Deberías ver varias opciones en la puerta de enlace de componentes. Muchos de ellos se pueden habilitar a través de Componentes opcionales cuando configuras tu clúster. En este lab, haz clic en “Spark History Server”.
Se debería abrir la siguiente ventana:
Aquí aparecerán todos los trabajos completados, y puedes hacer clic en cualquier application_id para obtener más información sobre ellos. Del mismo modo, puedes hacer clic en "Mostrar aplicaciones incompletas" en la parte inferior de la página de destino para ver todos los trabajos que se están ejecutando.
9. Ejecuta tu trabajo de reabastecimiento
Ahora, ejecutarás un trabajo que cargue datos en la memoria, extraiga la información necesaria y vuelque el resultado en un bucket de Google Cloud Storage. Extraerás los campos "title", "body" (texto sin procesar) y "timestamp created" de cada comentario de Reddit. Luego, tomarás estos datos, los convertirás en un archivo CSV, los comprimirás y los cargarás en un bucket con un URI de gs://${BUCKET_NAME}/reddit_posts/AAAA/MM/comida.csv.gz.
Puedes consultar el Editor de Cloud nuevamente para leer el código de cloud-dataproc/codelabs/spark-bigquery/backfill.sh
, que es una secuencia de comandos de wrapper para ejecutar el código en cloud-dataproc/codelabs/spark-bigquery/backfill.py
.
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
En breve, deberías ver varios mensajes de finalización de trabajos. La tarea puede tardar hasta 15 minutos en completarse. También puedes volver a verificar tu bucket de almacenamiento para verificar que se hayan generado datos correctamente con gsutil. Una vez que se hayan completado todas las tareas, ejecuta el siguiente comando:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
Deberías ver el siguiente resultado:
¡Felicitaciones! Completaste correctamente un reabastecimiento de tus datos de comentarios de Reddit. Si te interesa saber cómo puedes crear modelos a partir de estos datos, continúa con el codelab Spark-NLP.
10. Limpieza
Para evitar que se generen cargos innecesarios en tu cuenta de GCP después de completar esta guía de inicio rápido, haz lo siguiente:
- Borra el bucket de Cloud Storage para el entorno que creaste.
- Borra el entorno de Dataproc.
Si creaste un proyecto solo para este codelab, también puedes borrarlo de forma opcional:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y haz clic en Cerrar para borrarlo.
Licencia
Esta obra se ofrece bajo la licencia Atribución 3.0 Genérica de Creative Commons y la licencia Apache 2.0.