Introdução ao Vertex Pipelines

1. Visão geral

Neste laboratório, você vai aprender a criar e executar pipelines de ML com o Vertex Pipelines.

Conteúdo do laboratório

Você vai aprender a:

  • usar o SDK do Kubeflow Pipelines para criar pipelines de ML escalonáveis;
  • criar e executar um pipeline introdutório de três etapas que recebe uma entrada de texto;
  • criar e executar um pipeline que treina, avalia e implanta um modelo de classificação do AutoML;
  • Usar componentes pré-criados para interagir com serviços da Vertex AI, disponibilizados pela biblioteca google_cloud_pipeline_components
  • agendar um job de pipeline com o Cloud Scheduler.

O custo total da execução deste laboratório no Google Cloud é de aproximadamente US$25.

2. Introdução à Vertex AI

Este laboratório usa a mais nova oferta de produtos de IA disponível no Google Cloud. A Vertex AI integra as ofertas de ML do Google Cloud em uma experiência de desenvolvimento intuitiva. Anteriormente, modelos treinados com o AutoML e modelos personalizados eram acessíveis por serviços separados. A nova oferta combina ambos em uma única API, com outros novos produtos. Você também pode migrar projetos existentes para a Vertex AI.

Além de oferecer treinamento de modelos e serviços de implantação, a Vertex AI também inclui uma variedade de produtos de MLOps, como o Vertex Pipelines (o assunto desse laboratório), Monitoramento de modelos, Feature Store e outros. Confira todos as ofertas de produtos da Vertex AI no diagrama abaixo.

Visão geral do produto Vertex

Se você tiver algum feedback, consulte a página de suporte.

Por que pipelines de ML são úteis?

Antes de começar, primeiro entenda em quais situações você usaria um pipeline. Imagine que você está criando um fluxo de trabalho de ML que inclui processamento de dados, treinamento de modelos, ajuste de hiperparâmetros, avaliação e implantação de modelos. Cada uma dessas etapas pode ter dependências diferentes, o que pode ser difícil de administrar se você tratar o fluxo inteiro como algo monolítico. Ao começar a escalonar seu processo de ML, você pode querer compartilhar seu fluxo de trabalho de ML com outras pessoas da sua equipe para que elas possam executar e colaborar com código. Sem um processo confiável e possível de ser reproduzido, isso pode ser difícil. Com pipelines, cada etapa do seu processo de ML é um contêiner próprio, o que permite que você desenvolva as etapas independentemente e rastreie a entrada e a saída de cada etapa de maneira possível de ser reproduzida. Você também pode agendar ou acionar execuções do seu pipeline com base em outros eventos do seu ambiente do Cloud, como iniciar uma execução de pipeline quando novos dados de treinamento estiverem disponíveis.

Resumo: os pipelines ajudam a automatizar e reproduzir seu fluxo de trabalho de ML.

3. configure o ambiente do Cloud

Para executar este codelab, você vai precisar de um projeto do Google Cloud Platform com o faturamento ativado. Para criar um projeto, siga estas instruções.

Etapa 1: iniciar o Cloud Shell

Neste laboratório, você vai trabalhar em uma sessão do Cloud Shell, que é um interpretador de comandos hospedado por uma máquina virtual em execução na nuvem do Google. A sessão também pode ser executada localmente no seu computador, mas se você usar o Cloud Shell, todas as pessoas vão ter acesso a uma experiência reproduzível em um ambiente consistente. Após concluir o laboratório, é uma boa ideia testar a sessão no seu computador.

Autorizar o Cloud Shell

Ativar o Cloud Shell

No canto superior direito do Cloud Console, clique no botão abaixo para ativar o Cloud Shell:

Ativar o Cloud Shell

Se você nunca iniciou o Cloud Shell, vai aparecer uma tela intermediária (abaixo da dobra) com a descrição dele. Se esse for o caso, clique em Continuar e você não vai encontrar essa tela novamente. Esta é a aparência dessa tela única:

Configuração do Cloud Shell

Leva apenas alguns instantes para provisionar e se conectar ao Cloud Shell.

Inicialização do Cloud Shell

Essa máquina virtual contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB, além de ser executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o seu trabalho neste codelab pode ser feito em um navegador ou no seu Chromebook.

Depois de se conectar ao Cloud Shell, você já estará autenticado e o projeto já estará configurado com seu ID do projeto.

Execute o seguinte comando no Cloud Shell para confirmar se a conta está autenticada:

gcloud auth list

A saída do comando vai ser parecida com esta:

Saída do Cloud Shell

Execute o comando a seguir no Cloud Shell para confirmar se o comando gcloud sabe sobre seu projeto:

gcloud config list project

Resposta ao comando

[core]
project = <PROJECT_ID>

Se o projeto não estiver configurado, configure-o usando este comando:

gcloud config set project <PROJECT_ID>

Resposta ao comando

Updated property [core/project].

O Cloud Shell tem algumas variáveis de ambiente, incluindo GOOGLE_CLOUD_PROJECT, que contém o nome do nosso projeto atual do Cloud. Vamos usar esses dados várias vezes neste laboratório. É possível ver essa variável ao executar:

echo $GOOGLE_CLOUD_PROJECT

Etapa 2: ativar as APIs

Nas próximas etapas, você vai entender onde e por que esses serviços são necessários. Por enquanto, apenas execute este comando para conceder ao seu projeto acesso aos serviços do Compute Engine, Container Registry e Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Uma mensagem semelhante a esta vai aparecer:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Etapa 3: criar um bucket do Cloud Storage

Para executar um job de treinamento na Vertex AI, precisamos de um bucket de armazenamento para armazenar os recursos de modelo salvos. O bucket precisa ser regional. Estamos usando us-central aqui, mas você pode usar qualquer outra, é só fazer a substituição durante o laboratório. Se você já tem um bucket, pule esta etapa.

Execute os comandos a seguir no terminal do Cloud Shell para criar um bucket:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Depois, vamos conceder à conta de serviço do Compute acesso a esse bucket. Isso garante que o Vertex Pipelines tenha as permissões necessárias para gravar arquivos no bucket. Execute o comando a seguir para adicionar essa permissão:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Etapa 4: criar uma instância do Vertex AI Workbench

Na seção Vertex AI do Console do Cloud, clique em "Workbench":

Menu da Vertex AI

Em Notebooks gerenciados pelo usuário, clique em Novo notebook:

Criar um novo notebook

Em seguida, selecione o tipo de instância TensorFlow Enterprise 2.3 (com LTS) sem GPUs:

Instância do TFE

Use as opções padrão e clique em Criar.

Etapa 5: abrir o notebook

Depois que a instância for criada, selecione Abrir JupyterLab:

Abrir Notebook

4. Configuração do Vertex Pipelines

É preciso instalar algumas bibliotecas adicionais para usar o Vertex Pipelines:

  • Kubeflow Pipelines: o SDK que vamos usar para criar o pipeline. O Vertex Pipelines suporta pipelines em execução criados com o Kubeflow Pipelines ou o TFX.
  • Componentes de pipeline do Google Cloud: essa biblioteca oferece componentes pré-criados que facilitam a interação com os serviços da Vertex AI nas etapas do seu pipeline.

Etapa 1: criar o notebook Python e instalar as bibliotecas

Primeiro, no menu de início da sua instância do notebook, selecione Python 3 para criar um notebook:

Criar notebook Python3

Para acessar o menu inicial, clique no sinal de + no canto superior esquerdo da sua instância do notebook.

Para instalar os serviços que vamos usar neste laboratório, primeiro configure a sinalização do usuário em uma célula do notebook:

USER_FLAG = "--user"

Em seguida, execute o seguinte no seu notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Depois de instalar esses pacotes, será necessário reiniciar o kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Por fim, verifique se os pacotes foram instalados corretamente. A versão do SDK do KFP precisa ser igual ou mais recente do que 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Etapa 2: configurar o ID do projeto e o bucket

Neste laboratório, você vai fazer referência ao ID do projeto do Cloud e ao bucket que você criou anteriormente. Em seguida, criaremos variáveis para cada um deles.

Se não souber o ID do projeto, você pode consegui-lo executando o seguinte:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Caso contrário, defina-o aqui:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Depois, crie uma variável para armazenar o nome do seu bucket. Se você criou neste laboratório, as próximas etapas vão funcionar. Caso contrário, configure manualmente o que é mostrado a seguir:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Etapa 3: importar as bibliotecas

Adicione o seguinte para importar as bibliotecas que vamos usar durante este codelab:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Etapa 4: definir as constantes

Por último, precisamos definir algumas variáveis constantes antes de criar o pipeline. PIPELINE_ROOT é o caminho do Cloud Storage em que os artefatos criados pelo pipeline serão gravados. Aqui, estamos usando us-central1 como a região, mas, se você usou uma região diferente quando criou o bucket, atualize a variável REGION no código abaixo:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Depois de executar o código acima, você verá o diretório raiz do seu pipeline. Esse é o local do Cloud Storage em que os artefatos do seu pipeline serão gravados. Ele estará no formato gs://YOUR-BUCKET-NAME/pipeline_root/

5. Como criar seu primeiro pipeline

Para se familiarizar com o funcionamento do Vertex Pipelines, primeiro vamos criar um pipeline curto usando o SDK do KFP. Esse pipeline não faz nada relacionado a ML (não se preocupe, vamos chegar lá!). Este exercício ensina a você o seguinte:

  • Como criar componentes personalizados no SDK do KFP
  • Como executar e monitorar um pipeline no Vertex Pipelines

Vamos criar um pipeline que mostra uma sentença usando duas saídas: um nome de produto e uma descrição de emoji. O pipeline vai consistir de três componentes:

  • product_name: esse componente usa um nome de produto (ou qualquer substantivo que você queira) como entrada e retorna essa string como a saída
  • emoji: este componente usa a descrição em texto de um emoji e a converte no emoji. Por exemplo, o código de texto de ✨ é "sparkles". Este componente usa uma biblioteca de emojis para mostrar a você como gerenciar dependências externas no seu pipeline.
  • build_sentence: esse último componente consome a saída dos dois anteriores para criar uma frase que usa o emoji. Por exemplo, a saída resultante pode ser "Vertex Pipelines is ✨".

Vamos começar a programar.

Etapa 1: criar um componente baseado em função do Python

Usando o SDK do KFP, podemos criar componentes baseados em funções do Python. Vamos usar isso para os três componentes do nosso primeiro pipeline. Primeiro, vamos criar o componente product_name, que simplesmente usa uma string como entrada e retorna essa string. Adicione o seguinte ao seu notebook:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Vamos ver mais detalhes da sintaxe:

  • O decorador @component compila essa função em um componente quando o pipeline é executado. Você vai usar esse decorador sempre que criar um componente personalizado.
  • O parâmetro base_image especifica a imagem de contêiner que esse componente vai usar.
  • O parâmetro output_component_file é opcional e especifica o arquivo yaml em que o componente compilado vai ser criado. Depois de executar a célula, você deve ver o arquivo gravado na sua instância do notebook. Se você quiser compartilhar esse componente com alguém, pode enviar o arquivo yaml gerado e pedir para a pessoa carregar com o seguinte:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • O -> str após a definição da função especifica o tipo de saída desse componente.

Etapa 2: criar dois componentes adicionais

Para concluir nosso pipeline, vamos criar mais dois componentes. O primeiro que vamos definir usa uma string como entrada e a converte no emoji correspondente, se houver um. O componente retorna uma tupla com o texto de entrada transmitido e o emoji resultante:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Este componente é um pouco mais complexo que o anterior. Veja os detalhes dele:

  • O parâmetro packages_to_install informa ao componente sobre quaisquer dependências de bibliotecas externas do contêiner. Neste caso, estamos usando uma biblioteca chamada emoji.
  • Esse componente retorna uma NamedTuple chamada Outputs. Observe que cada uma das strings dessa tupla tem chaves: emoji_text e emoji. Vamos usá-las no próximo componente para acessar a saída.

O último componente desse pipeline consome a saída dos dois primeiros e as combina para retornar uma string:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Você pode se perguntar como esse componente sabe que é preciso usar a saída das etapas anteriores que você definiu. Essa é uma boa pergunta. Vamos unir tudo na próxima etapa.

Etapa 3: unir os componentes em um pipeline

As configurações do componente definidas acima criaram funções de fábrica que podem ser usadas em uma definição de pipeline para criar etapas. Para configurar um pipeline, use o decorador @pipeline, dê um nome e uma descrição ao pipeline e forneça o caminho raiz em que os artefatos do seu pipeline serão gravados. Artefatos são quaisquer arquivos de saída gerados pelo seu pipeline. Este pipeline introdutório não gera nenhum artefato, mas o próximo sim.

No próximo bloco de código, vamos definir uma função intro_pipeline. Nela, vamos especificar as entradas das etapas iniciais do pipeline e como conectar umas às outras:

  • product_task usa um nome de produto como entrada. Aqui, você está transmitindo "Vertex Pipelines", mas pode mudar para o que quiser.
  • emoji_task usa o código de texto de um emoji como entrada. Também é possível alterar para o que você quiser. Por exemplo, "party_face" se refere ao emoji 🥳. Observe que, como esse componente e o product_task não têm etapas que fornecem entrada, especificamos manualmente a entrada para eles quando definimos nosso pipeline.
  • A última etapa do pipeline, consumer_task, tem três parâmetros de entrada:
    • Saída de product_task. Como esta etapa só produz uma saída, podemos fazer referência a ela usando product_task.output.
    • A saída emoji da etapa emoji_task. Consulte o componente emoji definido acima, em que nomeamos os parâmetros de saída.
    • Da mesma forma, a saída nomeada emoji_text do componente emoji. Caso o pipeline receba texto que não corresponda a um emoji, ele usará esse texto para criar uma sentença.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Etapa 4: compilar e executar o pipeline

Com seu pipeline definido, está tudo pronto para compilá-lo. O código a seguir gera um arquivo JSON que você vai usar para executar o pipeline:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Em seguida, crie uma variável TIMESTAMP. Vamos usar isso no nosso ID de trabalho:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Em seguida, defina o job do pipeline:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Por fim, execute o job para criar uma nova execução de pipeline:

job.submit()

Depois de executar esta célula, você vai encontrar registros com um link para conferir a execução do pipeline no console:

Registros de jobs de pipeline

Acesse esse link. Seu pipeline vai ficar assim quando concluído:

Pipeline de introdução concluído

Esse pipeline vai levar de 5 a 6 minutos para ser executado. Quando concluído, você pode clicar no componente build-sentence para conferir a saída final:

Saída do pipeline de introdução

Agora que você sabe como o SDK do KFP e o Vertex Pipelines funcionam, você já pode criar um pipeline que cria e implanta um modelo de ML usando outros serviços da Vertex AI. Vamos lá.

6. Crie um pipeline de ML completo

É hora de criar seu primeiro pipeline de ML. Neste pipeline, vamos usar o conjunto de dados Dry beans (feijões secos) do UCI Machine Learning, de: KOKLU, M. e OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

Esse é um conjunto de dados tabular que você vai usar no seu pipeline para treinar, avaliar e implantar um modelo de AutoML que classifica grãos em um dos sete tipos com base nas características.

Esse pipeline vai fazer o seguinte:

  • Crie um conjunto de dados em
  • Treinar um modelo de classificação tabular com o AutoML
  • receber métricas de avaliação sobre o modelo;
  • com base nas métricas de avaliação, decidir se é preciso implantar o modelo usando a lógica condicional no Vertex Pipelines;
  • Implantar o modelo em um endpoint usando o Vertex Prediction

Cada uma das etapas descritas será um componente. A maioria das etapas do pipeline usará componentes pré-criados para serviços da Vertex AI usando a biblioteca google_cloud_pipeline_components que importamos anteriormente neste codelab. Nesta seção, vamos primeiro definir um componente personalizado e só depois definir as outras etapas do pipeline usando componentes pré-criados. Com componentes pré-criados, é mais fácil acessar serviços da Vertex AI como o treinamento de modelos e a implantação.

Etapa 1: um componente personalizado para avaliação de modelos

O componente personalizado que vamos definir será usado ao final do pipeline assim que o treinamento de modelos for concluído. Esse componente faz algumas coisas:

  • recebe métricas de avaliação do modelo de classificação treinado do AutoML;
  • analisa as métricas e as renderiza na IU do Vertex Pipelines;
  • compara as métricas com um limite para determinar se o modelo deve ser implantado.

Antes de definir o componente, vamos entender os parâmetros de entrada e saída dele. Como entrada, esse pipeline usa alguns metadados do nosso projeto do Cloud, o modelo treinado resultante (vamos definir esse componente mais tarde), as métricas de avaliação do modelo e um thresholds_dict_str. O thresholds_dict_str será definido quando executarmos o pipeline. No caso deste modelo de classificação, ele será o valor da área sob a curva ROC em que queremos implantar o modelo. Por exemplo, se transmitirmos 0,95, isso significa que o pipeline só vai implantar o modelo se a métrica estiver acima de 95%.

Nosso componente de avaliação retorna uma string que indica se o modelo deve ser implantado ou não. Adicione o seguinte em uma célula do notebook para criar o componente personalizado:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Etapa 2: adicionar componentes pré-criados do Google Cloud

Nesta etapa, vamos definir os componentes restantes do pipeline e ver como todos se encaixam. Primeiro, defina o nome de exibição da sua execução de pipeline usando um carimbo de data/hora:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Depois, copie o seguinte em uma nova célula do notebook:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Vamos conferir o que está acontecendo neste código:

  • Primeiro, assim como no pipeline anterior, definimos os parâmetros de entrada que esse pipeline aceita. É preciso selecioná-los manualmente, porque eles não dependem da saída de outras etapas do pipeline.
  • O restante do pipeline usa alguns componentes pré-criados para interagir com os serviços da Vertex AI:
    • TabularDatasetCreateOp cria um conjunto de dados tabular na Vertex AI quando recebe uma origem de banco de dados no Cloud Storage ou no BigQuery. Neste pipeline, estamos transmitindo os dados usando um URL de tabela do BigQuery.
    • AutoMLTabularTrainingJobRunOp inicia um job de treinamento do AutoML para um conjunto de dados tabular. Transmitimos alguns parâmetros de configuração para esse componente, incluindo o tipo de modelo (nesse caso, classificação), alguns dados nas colunas, por quanto tempo queremos executar o treinamento e um ponteiro para o conjunto de dados. Para transmitir o conjunto de dados a esse componente, fornecemos a saída do componente anterior usando dataset_create_op.outputs["dataset"].
    • EndpointCreateOp cria um endpoint na Vertex AI. O endpoint criado nesta etapa será transmitido como entrada para o próximo componente.
    • ModelDeployOp implanta um modelo específico em um endpoint da Vertex AI. Neste caso, estamos usando o endpoint criado na etapa anterior. Há outras opções de configuração disponíveis, mas aqui fornecemos o tipo de máquina e o modelo do endpoint que queremos implantar. Transmitimos o modelo acessando as saídas da etapa de treinamento no nosso pipeline.
  • Este pipeline também usa a lógica condicional, um recurso do Vertex Pipelines que permite definir uma condição, junto com diferentes ramificações baseadas no resultado dessa condição. Lembre-se de que, ao definir o pipeline, transferimos um parâmetro thresholds_dict_str. Esse é o limite de precisão que usamos para determinar se o modelo será implantado em um endpoint. Para implementar isso, usamos a classe Condition do SDK do KFP. A condição transmitida é a saída do componente de avaliação personalizada que você definiu anteriormente neste codelab. Se essa condição for verdadeira, o pipeline continuará executando o componente deploy_op. Se a acurácia não atingir o limite predefinido, o pipeline para aqui e não implanta um modelo.

Etapa 3: compilar e executar o pipeline de ML completo

Com o pipeline completo definido, é hora de compilá-lo:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Em seguida, defina o job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Por fim, execute o job:

ml_pipeline_job.submit()

Para conferir o pipeline no console, navegue até o link mostrado nos registros depois de executar a célula acima. A execução desse pipeline vai levar pouco mais de uma hora. A maior parte do tempo é gasta na etapa de treinamento do AutoML. O pipeline concluído parecerá com este:

Pipeline concluído do AutoML

Se você acionar o botão "Expandir artefatos" na parte de cima da tela, poderá conferir detalhes dos diferentes artefatos criados usando seu pipeline. Por exemplo, se você clicar no artefato dataset, verá detalhes sobre o conjunto de dados da Vertex AI que foi criado. Você pode clicar no link mostrado aqui para acessar a página do conjunto de dados:

Conjunto de dados do pipeline

Da mesma maneira, para conferir as visualizações de métricas resultantes do nosso componente de avaliação personalizado, clique no artefato chamado metricsc. No lado direito do seu painel, você pode ver a matriz de confusão do modelo:

Visualização das métricas

Para conferir o modelo e o endpoint criados nessa execução do pipeline, acesse a seção de modelos e clique no modelo chamado automl-beans. Você vai conferir esse modelo implantado em um endpoint:

Model-endpoint

Também é possível acessar essa página clicando no artefato endpoint no gráfico do seu pipeline.

Além de conferir o gráfico do pipeline no console, é possível usar o Vertex Pipelines para fazer o Rastreamento de linhagem. O rastreamento de linhagem quer dizer rastrear os artefatos criados em todo o pipeline. Isso pode nos ajudar a entender onde os artefatos foram criados e como eles estão sendo usados em um fluxo de trabalho de ML. Por exemplo, para ver o rastreamento de linhagem para o conjunto de dados criado neste pipeline, clique no artefato de conjunto de dados e em Ver linhagem:

Ver linhagem

Isso mostra todos os lugares em que o artefato está sendo usado:

Detalhes da linhagem

Etapa 4: comparar métricas entre as execuções de pipeline

Se você executar o pipeline muitas vezes, talvez queira compartilhar as métricas entre execuções. É possível usar o método aiplatform.get_pipeline_df() para acessar os metadados da execução. Aqui, você vai receber metadados de todas as execuções desse pipeline e carregá-los em um DataFrame do Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Com isso, você concluiu o laboratório.

Parabéns! 🎉

Você aprendeu a usar a Vertex AI para:

  • Usar o SDK do Kubeflow Pipelines para criar pipelines completos com componentes personalizados
  • Executar pipelines no Vertex Pipelines e iniciar execuções do pipeline com o SDK
  • Acessar e analisar o gráfico do Vertex Pipelines no console
  • Usar componentes de pipeline pré-criados para adicionar serviços da Vertex AI ao pipeline
  • Programar jobs recorrentes com pipelines

Para saber mais sobre as diferentes partes da Vertex, consulte a documentação.

7. Limpeza

Para que você não seja cobrado, recomendamos que exclua os recursos criados neste laboratório.

Etapa 1: interromper ou excluir a instância do Notebooks

Se você quiser continuar usando o notebook que criou neste laboratório, é recomendado que você o desligue quando não estiver usando. A partir da interface de Notebooks no seu Console do Cloud, selecione o notebook e depois clique em Parar. Para excluir a instância completamente, selecione Excluir:

Interromper instância

Etapa 2: excluir o endpoint

Para excluir o endpoint implantado, acesse a seção Endpoints do console da Vertex AI e clique no ícone de exclusão:

Excluir endpoint

Depois, clique em Cancelar a implantação no seguinte prompt:

Cancelar a implantação do modelo

Por fim, acesse a seção Modelos do console, encontre o modelo e, no menu de três pontos à direita, clique em Excluir modelo:

Excluir modelo

Etapa 3: excluir o bucket do Cloud Storage

Para excluir o bucket do Storage, use o menu de navegação do console do Cloud, acesse o Storage, selecione o bucket e clique em "Excluir":

Excluir armazenamento