In this lab, you will have some hands-on practice with Kubeflow Pipelines. As mentioned in the lectures, modern ML engineering is moving towards pipeline automation for rapid iteration and experiment tracking. This is especially useful in production deployments where models need to be frequently retrained to catch trends in newer data.
Kubeflow Pipelines is one component of the Kubeflow suite of tools for machine learning workflows. It is deployed on top of a Kubernetes cluster and builds an infrastructure for orchestrating ML pipelines and monitoring inputs and outputs of each component. You will use this tool in Google Cloud Platform in the first assignment this week and this lab will help prepare you for that by exploring its features on a local deployment. In particular, you will:
Let's begin!
You will need these tool installed in your local machine to complete the exercises:
Docker - platform for building and running containerized applications. You should already have this installed from the previous ungraded labs. If not, you can see the instructions here. If you are using Docker for Desktop (Mac or Windows), you may need to increase the resource limits to start Kubeflow Pipelines later. You can click on the Docker icon in your Task Bar, choose Preferences and adjust the CPU to 4, Storage to 50GB, and the memory to at least 4GB (8GB recommended). Just make sure you are not maxing out any of these limits (i.e. the slider should ideally be at the midpoint or less) since it can make your machine slow or unresponsive. If you're constrained on resources, don't worry. You can still use this notebook as reference since we'll show the expected outputs at each step. The important thing is to become familiar with this Kubeflow Pipelines before you get more hands-on in the assignment.
kubectl - tool for running commands on Kubernetes clusters. This should also be installed from the previous labs. If not, please see the instructions here
kind - a Kubernetes distribution for running local clusters using Docker. Please follow the instructions here to install kind and create a local cluster. (NOTE: This lab currently does not support Kubernetes v1.22 and above. Please check the default Kubernetes image used by the kind version you are about to download here. If it is using v1.22 or higher, consider downloading an older version or using the --image flag when creating the cluster (e.g. kind create cluster --image=kindest/node:v1.19.1). After creating the cluster, you can check the Kubernetes version with the command kubectl version. This lab was tested using kind v0.9 running Kubernetes v1.19.1.)
Kubeflow Pipelines (KFP) - a platform for building and deploying portable, scalable machine learning (ML) workflows based on Docker containers. Once you've created a local cluster using kind, you can deploy Kubeflow Pipelines with these commands. (NOTE: This lab was tested using KFP v1.7.0).
export PIPELINE_VERSION=1.7.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION&timeout=300"
kubectl wait --for condition=established --timeout=300s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION&timeout=300"
You can enter the commands above one line at a time. These will setup all the deployments and spin up the pods for the entire application. These will be found in the kubeflow namespace. After sending the last command, it will take a moment (around 30 minutes) for all the deployments to be ready. You can send the command kubectl get deploy -n kubeflow a few times to check the status. You should see all deployments with the READY status before you can proceed to the next section.
NAME READY UP-TO-DATE AVAILABLE AGE
cache-deployer-deployment 1/1 1 1 21h
cache-server 1/1 1 1 21h
metadata-envoy-deployment 1/1 1 1 21h
metadata-grpc-deployment 1/1 1 1 21h
metadata-writer 1/1 1 1 21h
minio 1/1 1 1 21h
ml-pipeline 1/1 1 1 21h
ml-pipeline-persistenceagent 1/1 1 1 21h
ml-pipeline-scheduledworkflow 1/1 1 1 21h
ml-pipeline-ui 1/1 1 1 21h
ml-pipeline-viewer-crd 1/1 1 1 21h
ml-pipeline-visualizationserver 1/1 1 1 21h
mysql 1/1 1 1 21h
workflow-controller 1/1 1 1 21h
When everything is ready, you can run the following command to access the ml-pipeline-ui service.
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
The terminal should respond with something like this:
Forwarding from 127.0.0.1:8080 -> 3000
Forwarding from [::1]:8080 -> 3000
You can then open your browser and go to http://localhost:8080 to see the user interface.

As you know, generating a trained model involves executing a sequence of steps. Here is a high level overview of what these steps might look like:

You can recall the very first model you ever built and more likely than not, your code then also followed a similar flow. In essence, building an ML pipeline mainly involves implementing these steps but you will need to optimize your operations to deliver value to your team. Platforms such as Kubeflow helps you to build ML pipelines that can be automated, reproducible, and easily monitored. You will see these as you build your pipeline in the next sections below.
The main building blocks of your ML pipeline are referred to as components. In the context of Kubeflow, these are containerized applications that run a specific task in the pipeline. Moreover, these components generate and consume artifacts from other components. For example, a download task will generate a dataset artifact and this will be consumed by a data splitting task. If you go back to the simple pipeline image above and describe it using tasks and artifacts, it will look something like this:

This relationship between tasks and their artifacts are what constitutes a pipeline and is also called a directed acyclic graph (DAG).
Kubeflow Pipelines let's you create components either by building the component specification directly or through Python functions. For this lab, you will use the latter since it is more intuitive and allows for quick iteration. As you gain more experience, you can explore building the component specification directly especially if you want to use different languages other than Python.
You will begin by installing the Kubeflow Pipelines SDK. Remember to restart the runtime to load the newly installed modules in Colab.
# Install the KFP SDK
!pip install --upgrade kfp==1.7.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kfp==1.7.0
Downloading kfp-1.7.0.tar.gz (231 kB)
|████████████████████████████████| 231 kB 5.4 MB/s
Collecting absl-py<=0.11,>=0.9
Downloading absl_py-0.11.0-py3-none-any.whl (127 kB)
|████████████████████████████████| 127 kB 31.9 MB/s
Collecting PyYAML<6,>=5.3
Downloading PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl (636 kB)
|████████████████████████████████| 636 kB 39.1 MB/s
Collecting google-cloud-storage<2,>=1.20.0
Downloading google_cloud_storage-1.44.0-py2.py3-none-any.whl (106 kB)
|████████████████████████████████| 106 kB 12.1 MB/s
Collecting kubernetes<13,>=8.0.0
Downloading kubernetes-12.0.1-py2.py3-none-any.whl (1.7 MB)
|████████████████████████████████| 1.7 MB 32.5 MB/s
Requirement already satisfied: google-api-python-client<2,>=1.7.8 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (1.12.11)
Requirement already satisfied: google-auth<2,>=1.6.1 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (1.35.0)
Collecting requests-toolbelt<1,>=0.8.0
Downloading requests_toolbelt-0.9.1-py2.py3-none-any.whl (54 kB)
|████████████████████████████████| 54 kB 1.3 MB/s
Requirement already satisfied: cloudpickle<2,>=1.3.0 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (1.5.0)
Collecting kfp-server-api<2.0.0,>=1.1.2
Downloading kfp-server-api-1.8.5.tar.gz (58 kB)
|████████████████████████████████| 58 kB 1.9 MB/s
Collecting jsonschema<4,>=3.0.1
Downloading jsonschema-3.2.0-py2.py3-none-any.whl (56 kB)
|████████████████████████████████| 56 kB 4.1 MB/s
Requirement already satisfied: tabulate<1,>=0.8.6 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (0.8.10)
Requirement already satisfied: click<8,>=7.1.1 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (7.1.2)
Collecting Deprecated<2,>=1.2.7
Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Collecting strip-hints<1,>=0.1.8
Downloading strip-hints-0.1.10.tar.gz (29 kB)
Collecting docstring-parser<1,>=0.7.3
Downloading docstring_parser-0.14.1-py3-none-any.whl (33 kB)
Collecting kfp-pipeline-spec<0.2.0,>=0.1.8
Downloading kfp_pipeline_spec-0.1.16-py3-none-any.whl (19 kB)
Collecting fire<1,>=0.3.1
Downloading fire-0.4.0.tar.gz (87 kB)
|████████████████████████████████| 87 kB 6.4 MB/s
Requirement already satisfied: protobuf<4,>=3.13.0 in /usr/local/lib/python3.7/dist-packages (from kfp==1.7.0) (3.17.3)
Requirement already satisfied: six in /usr/local/lib/python3.7/dist-packages (from absl-py<=0.11,>=0.9->kfp==1.7.0) (1.15.0)
Requirement already satisfied: wrapt<2,>=1.10 in /usr/local/lib/python3.7/dist-packages (from Deprecated<2,>=1.2.7->kfp==1.7.0) (1.14.1)
Requirement already satisfied: termcolor in /usr/local/lib/python3.7/dist-packages (from fire<1,>=0.3.1->kfp==1.7.0) (1.1.0)
Requirement already satisfied: google-api-core<3dev,>=1.21.0 in /usr/local/lib/python3.7/dist-packages (from google-api-python-client<2,>=1.7.8->kfp==1.7.0) (1.31.6)
Requirement already satisfied: httplib2<1dev,>=0.15.0 in /usr/local/lib/python3.7/dist-packages (from google-api-python-client<2,>=1.7.8->kfp==1.7.0) (0.17.4)
Requirement already satisfied: google-auth-httplib2>=0.0.3 in /usr/local/lib/python3.7/dist-packages (from google-api-python-client<2,>=1.7.8->kfp==1.7.0) (0.0.4)
Requirement already satisfied: uritemplate<4dev,>=3.0.0 in /usr/local/lib/python3.7/dist-packages (from google-api-python-client<2,>=1.7.8->kfp==1.7.0) (3.0.1)
Requirement already satisfied: pytz in /usr/local/lib/python3.7/dist-packages (from google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (2022.2.1)
Requirement already satisfied: requests<3.0.0dev,>=2.18.0 in /usr/local/lib/python3.7/dist-packages (from google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (2.23.0)
Requirement already satisfied: setuptools>=40.3.0 in /usr/local/lib/python3.7/dist-packages (from google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (57.4.0)
Requirement already satisfied: googleapis-common-protos<2.0dev,>=1.6.0 in /usr/local/lib/python3.7/dist-packages (from google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (1.56.4)
Requirement already satisfied: packaging>=14.3 in /usr/local/lib/python3.7/dist-packages (from google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (21.3)
Requirement already satisfied: rsa<5,>=3.1.4 in /usr/local/lib/python3.7/dist-packages (from google-auth<2,>=1.6.1->kfp==1.7.0) (4.9)
Requirement already satisfied: cachetools<5.0,>=2.0.0 in /usr/local/lib/python3.7/dist-packages (from google-auth<2,>=1.6.1->kfp==1.7.0) (4.2.4)
Requirement already satisfied: pyasn1-modules>=0.2.1 in /usr/local/lib/python3.7/dist-packages (from google-auth<2,>=1.6.1->kfp==1.7.0) (0.2.8)
Collecting google-resumable-media<3.0dev,>=1.3.0
Downloading google_resumable_media-2.3.3-py2.py3-none-any.whl (76 kB)
|████████████████████████████████| 76 kB 4.9 MB/s
Collecting google-cloud-core<3.0dev,>=1.6.0
Downloading google_cloud_core-2.3.2-py2.py3-none-any.whl (29 kB)
Collecting google-crc32c<2.0dev,>=1.0
Downloading google_crc32c-1.5.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (32 kB)
Requirement already satisfied: importlib-metadata in /usr/local/lib/python3.7/dist-packages (from jsonschema<4,>=3.0.1->kfp==1.7.0) (4.12.0)
Requirement already satisfied: pyrsistent>=0.14.0 in /usr/local/lib/python3.7/dist-packages (from jsonschema<4,>=3.0.1->kfp==1.7.0) (0.18.1)
Requirement already satisfied: attrs>=17.4.0 in /usr/local/lib/python3.7/dist-packages (from jsonschema<4,>=3.0.1->kfp==1.7.0) (22.1.0)
Requirement already satisfied: urllib3>=1.15 in /usr/local/lib/python3.7/dist-packages (from kfp-server-api<2.0.0,>=1.1.2->kfp==1.7.0) (1.24.3)
Requirement already satisfied: certifi in /usr/local/lib/python3.7/dist-packages (from kfp-server-api<2.0.0,>=1.1.2->kfp==1.7.0) (2022.6.15)
Requirement already satisfied: python-dateutil in /usr/local/lib/python3.7/dist-packages (from kfp-server-api<2.0.0,>=1.1.2->kfp==1.7.0) (2.8.2)
Requirement already satisfied: requests-oauthlib in /usr/local/lib/python3.7/dist-packages (from kubernetes<13,>=8.0.0->kfp==1.7.0) (1.3.1)
Collecting websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0
Downloading websocket_client-1.4.1-py3-none-any.whl (55 kB)
|████████████████████████████████| 55 kB 2.7 MB/s
Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /usr/local/lib/python3.7/dist-packages (from packaging>=14.3->google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (3.0.9)
Requirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /usr/local/lib/python3.7/dist-packages (from pyasn1-modules>=0.2.1->google-auth<2,>=1.6.1->kfp==1.7.0) (0.4.8)
Requirement already satisfied: idna<3,>=2.5 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0dev,>=2.18.0->google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (2.10)
Requirement already satisfied: chardet<4,>=3.0.2 in /usr/local/lib/python3.7/dist-packages (from requests<3.0.0dev,>=2.18.0->google-api-core<3dev,>=1.21.0->google-api-python-client<2,>=1.7.8->kfp==1.7.0) (3.0.4)
Requirement already satisfied: wheel in /usr/local/lib/python3.7/dist-packages (from strip-hints<1,>=0.1.8->kfp==1.7.0) (0.37.1)
Requirement already satisfied: zipp>=0.5 in /usr/local/lib/python3.7/dist-packages (from importlib-metadata->jsonschema<4,>=3.0.1->kfp==1.7.0) (3.8.1)
Requirement already satisfied: typing-extensions>=3.6.4 in /usr/local/lib/python3.7/dist-packages (from importlib-metadata->jsonschema<4,>=3.0.1->kfp==1.7.0) (4.1.1)
Requirement already satisfied: oauthlib>=3.0.0 in /usr/local/lib/python3.7/dist-packages (from requests-oauthlib->kubernetes<13,>=8.0.0->kfp==1.7.0) (3.2.0)
Building wheels for collected packages: kfp, fire, kfp-server-api, strip-hints
Building wheel for kfp (setup.py) ... done
Created wheel for kfp: filename=kfp-1.7.0-py3-none-any.whl size=320983 sha256=a133f1f39192a4d02897e40f12f784a19de91ec307b83a67af35e84697f4a2c0
Stored in directory: /root/.cache/pip/wheels/2c/4e/8e/7c38c0cefe4701caf621009fe1b44d07c0a4e2caba3856b288
Building wheel for fire (setup.py) ... done
Created wheel for fire: filename=fire-0.4.0-py2.py3-none-any.whl size=115942 sha256=7fd5b775d1df4e399f690fc675f380bba9503c12df120acba3c5c9d22ae23c3b
Stored in directory: /root/.cache/pip/wheels/8a/67/fb/2e8a12fa16661b9d5af1f654bd199366799740a85c64981226
Building wheel for kfp-server-api (setup.py) ... done
Created wheel for kfp-server-api: filename=kfp_server_api-1.8.5-py3-none-any.whl size=99715 sha256=e34a7c60a8e28d72e0f991e908d78a2a1ce96206b2dd1e4b7edec133b68ef170
Stored in directory: /root/.cache/pip/wheels/77/0e/7b/ed385d69453b7b754834c01d83fa9f5708ba66b4f6ed5d6a35
Building wheel for strip-hints (setup.py) ... done
Created wheel for strip-hints: filename=strip_hints-0.1.10-py2.py3-none-any.whl size=22302 sha256=33f40d1fc5e96fd0af8b7a4638220cceaefb66d59f24a731497bc2c8cc2ee04c
Stored in directory: /root/.cache/pip/wheels/5e/14/c3/6e44e9b2545f2d570b03f5b6d38c00b7534aa8abb376978363
Successfully built kfp fire kfp-server-api strip-hints
Installing collected packages: google-crc32c, websocket-client, PyYAML, google-resumable-media, google-cloud-core, strip-hints, requests-toolbelt, kubernetes, kfp-server-api, kfp-pipeline-spec, jsonschema, google-cloud-storage, fire, docstring-parser, Deprecated, absl-py, kfp
Attempting uninstall: PyYAML
Found existing installation: PyYAML 6.0
Uninstalling PyYAML-6.0:
Successfully uninstalled PyYAML-6.0
Attempting uninstall: google-resumable-media
Found existing installation: google-resumable-media 0.4.1
Uninstalling google-resumable-media-0.4.1:
Successfully uninstalled google-resumable-media-0.4.1
Attempting uninstall: google-cloud-core
Found existing installation: google-cloud-core 1.0.3
Uninstalling google-cloud-core-1.0.3:
Successfully uninstalled google-cloud-core-1.0.3
Attempting uninstall: jsonschema
Found existing installation: jsonschema 4.3.3
Uninstalling jsonschema-4.3.3:
Successfully uninstalled jsonschema-4.3.3
Attempting uninstall: google-cloud-storage
Found existing installation: google-cloud-storage 1.18.1
Uninstalling google-cloud-storage-1.18.1:
Successfully uninstalled google-cloud-storage-1.18.1
Attempting uninstall: absl-py
Found existing installation: absl-py 1.2.0
Uninstalling absl-py-1.2.0:
Successfully uninstalled absl-py-1.2.0
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-cloud-translate 1.5.0 requires google-cloud-core<2.0dev,>=1.0.0, but you have google-cloud-core 2.3.2 which is incompatible.
google-cloud-firestore 1.7.0 requires google-cloud-core<2.0dev,>=1.0.3, but you have google-cloud-core 2.3.2 which is incompatible.
google-cloud-datastore 1.8.0 requires google-cloud-core<2.0dev,>=1.0.0, but you have google-cloud-core 2.3.2 which is incompatible.
google-cloud-bigquery 1.21.0 requires google-cloud-core<2.0dev,>=1.0.3, but you have google-cloud-core 2.3.2 which is incompatible.
google-cloud-bigquery 1.21.0 requires google-resumable-media!=0.4.0,<0.5.0dev,>=0.3.1, but you have google-resumable-media 2.3.3 which is incompatible.
Successfully installed Deprecated-1.2.13 PyYAML-5.4.1 absl-py-0.11.0 docstring-parser-0.14.1 fire-0.4.0 google-cloud-core-2.3.2 google-cloud-storage-1.44.0 google-crc32c-1.5.0 google-resumable-media-2.3.3 jsonschema-3.2.0 kfp-1.7.0 kfp-pipeline-spec-0.1.16 kfp-server-api-1.8.5 kubernetes-12.0.1 requests-toolbelt-0.9.1 strip-hints-0.1.10 websocket-client-1.4.1
Note: Please do not proceed to the next steps without restarting the Runtime after installing kfp. You can do that by either pressing the Restart Runtime button at the end of the cell output above, or going to the Runtime button at the Colab toolbar above and selecting Restart Runtime.
Now you will import the modules you will be using to construct the Kubeflow pipeline. You will know more what these are for in the next sections.
# Import the modules you will use
import kfp
# For creating the pipeline
from kfp.v2 import dsl
# For building components
from kfp.v2.dsl import component
# Type annotations for the component artifacts
from kfp.v2.dsl import (
Input,
Output,
Artifact,
Dataset,
Model,
Metrics
)
In this lab, you will build a pipeline to train a multi-output model trained on the Energy Effeciency dataset from the UCI Machine Learning Repository. It uses the bulding features (e.g. wall area, roof area) as inputs and has two outputs: Cooling Load and Heating Load. You will follow the five-task graph above with some slight differences in the generated artifacts.
You will now build the component to load your data into the pipeline. The code is shown below and we will discuss the syntax in more detail after running it.
@component(
packages_to_install=["pandas", "openpyxl"],
output_component_file="download_data_component.yaml"
)
def download_data(url:str, output_csv:Output[Dataset]):
import pandas as pd
# Use pandas excel reader
df = pd.read_excel(url)
df = df.sample(frac=1).reset_index(drop=True)
df.to_csv(output_csv.path, index=False)
When building a component, it's good to determine first its inputs and outputs.
The dataset you want to download is an Excel file hosted by UCI here and you can load that using Pandas. Instead of hardcoding the URL in your code, you can design your function to accept an input string parameter so you can use other URLs in case the data has been transferred.
For the output, you will want to pass the downloaded dataset to the next task (i.e. data splitting). You should assign this as an Output type and specify what kind of artifact it is. Kubeflow provides several of these such as Dataset, Model, Metrics, etc. All artifacts are saved by Kubeflow to a storage server. For local deployments, the default will be a MinIO server. The path property fetches the location where this artifact will be saved and that's what you did above when you called df.to_csv(output_csv.path, index=False)
The inputs and outputs are declared as parameters in the function definition. As you can see in the code we defined a url parameter with a str type and an output_csv parameter with an Output[Dataset] type.
Lastly, you'll need to use the component decorator to specify that this is a Kubeflow Pipeline component. The documentation shows several parameters you can set and two of them are used in the code above. As the name suggests, the packages_to_install argument declares any extra packages outside the base image that is needed to run your code. As of writing, the default base image is python:3.7 so you'll need pandas and openpyxl to load the Excel file.
The output_component_file is an output file that contains the specification for your newly built component. You should see it in the Colab file explorer once you've ran the cell above. You'll see your code there and other settings that pertain to your component. You can use this file when building other pipelines if necessary. You don't have to redo your code again in a notebook in your next project as long as you have this YAML file. You can also pass this to your team members or use it in another machine. Kubeflow also hosts other reusable modules in their repo here. For example, if you want a file downloader component in one of your projects, you can load the component from that repo using the load_component_from_url function as shown below. The YAML file of that component should tell you the inputs and outputs so you can use it accordingly.
web_downloader_op = kfp.components.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/web/Download/component-sdk-v2.yaml')
Next, you will build the next component in the pipeline. Like in the previous step, you should design it first with inputs and outputs in mind. You know that the input of this component will come from the artifact generated by the download_data() function above. To declare input artifacts, you can annotate your parameter with the Input[Dataset] data type as shown below. For the outputs, you want to have two: train and test datasets. You can see the implementation below:
@component(
packages_to_install=["pandas", "sklearn"],
output_component_file="split_data_component.yaml"
)
def split_data(input_csv: Input[Dataset], train_csv: Output[Dataset], test_csv: Output[Dataset]):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_csv.path)
train, test = train_test_split(df, test_size=0.2)
train.to_csv(train_csv.path, index=False)
test.to_csv(test_csv.path, index=False)
Now that you have at least two components, you can try building a pipeline just to quickly see how it works. The code is shown below. Basically, you just define a function with the sequence of steps then use the dsl.pipeline decorator. Notice in the last line (i.e. split_data_task) that to get a particular artifact from a previous step, you will need to use the outputs dictionary and use the parameter name as the key.
@dsl.pipeline(
name="my-pipeline",
)
def my_pipeline(url: str):
download_data_task = download_data(url=url)
split_data_task = split_data(input_csv=download_data_task.outputs['output_csv'])
To generate your pipeline specification file, you need to compile your pipeline function using the Compiler class as shown below.
kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
pipeline_func=my_pipeline,
package_path='pipeline.yaml')
/usr/local/lib/python3.7/dist-packages/kfp/compiler/compiler.py:76: UserWarning: V2_COMPATIBLE execution mode is at Beta quality. Some pipeline features may not work as expected.
warnings.warn('V2_COMPATIBLE execution mode is at Beta quality.'
After running the cell, you'll see a pipeline.yaml file in the Colab file explorer. Please download that because it will be needed in the next step.
You can run a pipeline programmatically or from the UI. For this exercise, you will do it from the UI and you will see how it is done programmatically in the Qwiklabs later this week.
Please go back to the Kubeflow Pipelines UI and click Upload Pipelines from the Pipelines page.
Next, select Upload a file and choose the pipeline.yaml you downloaded earlier then click Create. This will open a screen showing your simple DAG (just two tasks).
Click Create Run then scroll to the bottom to input the URL of the Excel file: https://archive.ics.uci.edu/ml/machine-learning-databases/00242/ENB2012_data.xlsx . Then Click Start.
Select the topmost entry in the Runs page and you should see the progress of your run. You can click on the download-data box to see more details about that particular task (i.e. the URL input and the container logs). After it turns green, you should also see the output artifact and you can download it if you want by clicking the minio link.
Eventually, both tasks will turn green indicating that the run completed successfully. Nicely done!
Now that you've seen a sample workflow, you can build the rest of the components for preprocessing, model training, and model evaluation. The functions will be longer because the task is more complex. Nonetheless, it follows the same principles as before such as declaring inputs and outputs, and specifying the additional packages.
In the eval_model() function, you'll notice the use of the log_metric() to record the results. You'll see this in the Visualizations tab of that task after it has completed.
@component(
packages_to_install=["pandas", "numpy"],
output_component_file="preprocess_data_component.yaml"
)
def preprocess_data(input_train_csv: Input[Dataset], input_test_csv: Input[Dataset],
output_train_x: Output[Dataset], output_test_x: Output[Dataset],
output_train_y: Output[Artifact], output_test_y: Output[Artifact]):
import pandas as pd
import numpy as np
import pickle
def format_output(data):
y1 = data.pop('Y1')
y1 = np.array(y1)
y2 = data.pop('Y2')
y2 = np.array(y2)
return y1, y2
def norm(x, train_stats):
return (x - train_stats['mean']) / train_stats['std']
train = pd.read_csv(input_train_csv.path)
test = pd.read_csv(input_test_csv.path)
train_stats = train.describe()
# Get Y1 and Y2 as the 2 outputs and format them as np arrays
train_stats.pop('Y1')
train_stats.pop('Y2')
train_stats = train_stats.transpose()
train_Y = format_output(train)
with open(output_train_y.path, "wb") as file:
pickle.dump(train_Y, file)
test_Y = format_output(test)
with open(output_test_y.path, "wb") as file:
pickle.dump(test_Y, file)
# Normalize the training and test data
norm_train_X = norm(train, train_stats)
norm_test_X = norm(test, train_stats)
norm_train_X.to_csv(output_train_x.path, index=False)
norm_test_X.to_csv(output_test_x.path, index=False)
@component(
packages_to_install=["tensorflow", "pandas"],
output_component_file="train_model_component.yaml"
)
def train_model(input_train_x: Input[Dataset], input_train_y: Input[Artifact],
output_model: Output[Model], output_history: Output[Artifact]):
import pandas as pd
import tensorflow as tf
import pickle
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input
norm_train_X = pd.read_csv(input_train_x.path)
with open(input_train_y.path, "rb") as file:
train_Y = pickle.load(file)
def model_builder(train_X):
# Define model layers.
input_layer = Input(shape=(len(train_X.columns),))
first_dense = Dense(units='128', activation='relu')(input_layer)
second_dense = Dense(units='128', activation='relu')(first_dense)
# Y1 output will be fed directly from the second dense
y1_output = Dense(units='1', name='y1_output')(second_dense)
third_dense = Dense(units='64', activation='relu')(second_dense)
# Y2 output will come via the third dense
y2_output = Dense(units='1', name='y2_output')(third_dense)
# Define the model with the input layer and a list of output layers
model = Model(inputs=input_layer, outputs=[y1_output, y2_output])
print(model.summary())
return model
model = model_builder(norm_train_X)
# Specify the optimizer, and compile the model with loss functions for both outputs
optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)
model.compile(optimizer=optimizer,
loss={'y1_output': 'mse', 'y2_output': 'mse'},
metrics={'y1_output': tf.keras.metrics.RootMeanSquaredError(),
'y2_output': tf.keras.metrics.RootMeanSquaredError()})
# Train the model for 500 epochs
history = model.fit(norm_train_X, train_Y, epochs=100, batch_size=10)
model.save(output_model.path)
with open(output_history.path, "wb") as file:
train_Y = pickle.dump(history.history, file)
@component(
packages_to_install=["tensorflow", "pandas"],
output_component_file="eval_model_component.yaml"
)
def eval_model(input_model: Input[Model], input_history: Input[Artifact],
input_test_x: Input[Dataset], input_test_y: Input[Artifact],
MLPipeline_Metrics: Output[Metrics]):
import pandas as pd
import tensorflow as tf
import pickle
model = tf.keras.models.load_model(input_model.path)
norm_test_X = pd.read_csv(input_test_x.path)
with open(input_test_y.path, "rb") as file:
test_Y = pickle.load(file)
# Test the model and print loss and mse for both outputs
loss, Y1_loss, Y2_loss, Y1_rmse, Y2_rmse = model.evaluate(x=norm_test_X, y=test_Y)
print("Loss = {}, Y1_loss = {}, Y1_mse = {}, Y2_loss = {}, Y2_mse = {}".format(loss, Y1_loss, Y1_rmse, Y2_loss, Y2_rmse))
MLPipeline_Metrics.log_metric("loss", loss)
MLPipeline_Metrics.log_metric("Y1_loss", Y1_loss)
MLPipeline_Metrics.log_metric("Y2_loss", Y2_loss)
MLPipeline_Metrics.log_metric("Y1_rmse", Y1_rmse)
MLPipeline_Metrics.log_metric("Y2_rmse", Y2_rmse)
You can then build and run the entire pipeline as you did earlier. It will take around 20 minutes for all the tasks to complete and you can see the Logs tab of each task to see how it's going. For instance, you can see there the model training epochs as you normally see in a notebook environment.
# Define a pipeline and create a task from a component:
@dsl.pipeline(
name="my-pipeline",
)
def my_pipeline(url: str):
download_data_task = download_data(url=url)
split_data_task = split_data(input_csv=download_data_task.outputs['output_csv'])
preprocess_data_task = preprocess_data(input_train_csv=split_data_task.outputs['train_csv'],
input_test_csv=split_data_task.outputs['test_csv'])
train_model_task = train_model(input_train_x=preprocess_data_task.outputs["output_train_x"],
input_train_y=preprocess_data_task.outputs["output_train_y"])
eval_model_task = eval_model(input_model=train_model_task.outputs["output_model"],
input_history=train_model_task.outputs["output_history"],
input_test_x=preprocess_data_task.outputs["output_test_x"],
input_test_y=preprocess_data_task.outputs["output_test_y"])
kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
pipeline_func=my_pipeline,
package_path='pipeline.yaml')
After you've uploaded and ran the entire pipeline, you should see all green boxes and the training metrics in the Visualizations tab of the eval-model task.

If you're done experimenting with the software and want to free up resources, you can execute the commands below to delete Kubeflow Pipelines from your system:
export PIPELINE_VERSION=1.7.0
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
kubectl delete -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
You can delete the cluster for kind with the following:
kind delete cluster
This lab demonstrated how you can use Kubeflow Pipelines to build and orchestrate your ML workflows. Having automated, shareable, and modular pipelines is a very useful feature in production deployments so you and your team can monitor and maintain your system more effectively. In the first Qwiklabs this week, you will use Kubeflow Pipelines as part of the Google Cloud AI Platform. You'll see more features implemented there such as integration with Tensorboard and more output visualizations from each component. If you want to know more, you can start with the Kubeflow Pipelines documentation and start conversations in Discourse.
Great job and on to the next part of the course!