A Dummies’ guide to building a Kubeflow Pipeline

Prafful Mishra
5 min readNov 1, 2019

Kubeflow provides a layer of abstraction over Kubernetes handling things in a better way for Data Science & ML pipelines. It allows ML pipelines to become production-ready and to be delivered at scale through the resilient framework for distributed computing(i.e Kubernetes).

We will dive into the details required on how to make a machine learning pipeline from multiple components throughout an ML solution lifecycle.

Before we delve deeper into the implementation of components and stitching of pipelines, let’s have a look at how things are organized in Kubeflow.

Individual components of a pipeline can be generated using functions or docker images with a wrapper (build using ‘kfp’) packaging these components as a stitched pipeline.

1. Building pipeline through functions

Let’s start with building the pipeline using simple Python functions. This is also known as creating pipelines through lightweight or non-reusable components.

Let’s take a simple example of the computation of an average i.e.

avg = add(x1,x2,x3,x4.....xn)/n

We will compute the above formula with each mathematical operation as a component of a pipeline. A basic Python function to do so can be written as:

# function 1 
def add(numbers : list) -> int:
return sum(numbers)
# function 2
def div (sum : int, num : list) -> float:
return (sum/len(numbers))
num = [1,2,3]
avg = div(add(num),num)

Just to set the context, the above two components would be running on separate machines (pods) in the underlying Kubernetes cluster. Hence, we simply cannot pass variables across functions.

Context String

However, to make this stateless execution sudo-stateful we will ask both functions to fetch the required variables from a centrally accessible location(this can be Azure storage or a Google cloud or anything for that matter).

Where to look for the required data is passed as a context string which would look like this:

context = {
"numbers" : "example.com/address/of/numbers",
"total" : "example.com/address/of/numbers",
"auth" : "auth_token_to_access_data"
}
context = json.dumps(context)

Modified Functions

The functions we declared need to be modified as :

  • Provision to install necessary packages if required (as every function will run on a new pod which may or may not have the necessary packages)
  • Provision to download and upload data from the shared memory according to the context string
# component 1 
def add(ctx : str) -> str:
"""This function calculates the sum of all the elements of a
list stored in a shared memory and uploads the result"""
#loading context string
context = json.loads(ctx)
#defining the install function
import subprocess
def install(name):
subprocess.call(['pip', 'install', name])
#install packages (installing numpy for the sake of demo)
install('numpy')
#getting the auth token from context
auth_token = context["auth"]

#downloading the data required
numbers = download(context["numbers"], auth_token)

#uploading the intermediate result
upload(sum(numbers),"example.com/address/of/sum",auth_token)
#adding the address to intermediate result to context string
context["sum"] = "example.com/address/of/sum"
return context

# component 2
def div (ctx : str) -> str:
"""This function calculates the average from sum and total
number of elements stored in a shared memory and uploads the
result"""
#loading context string
context = json.loads(ctx)
#defining the install function
import subprocess
def install(name):
subprocess.call(['pip', 'install', name])
#install packages (installing numpy for the sake of demo)
install('numpy')
#getting the auth token from context
auth_token = context["auth"]

#downloading the data required
sum = download(context["sum"], auth_token)
numbers = download(context["numbers"], auth_token)

#uploading the final result
upload(sum/len(numbers),"example.com/address/of/avg",auth_token)
#adding the address to result to context string
context["avg"] = "example.com/address/of/avg"
return context

Once we are ready with our new function definitions, we need to stitch the functions together to form a pipeline from these de-coupled lightweight components. This means, defining the execution flow of all of the components and the sharing of context strings between them across pods.

Stitching functions to form a pipeline

Before we move forward, it is suggested that you have a basic understanding of docker (maybe read “Building small Python Docker images, How to?” by Islam Shehata).

We will use KFP (after you’ve installed it using pip) to stitch the pipeline after converting the functions we defined to a container operation :

from kfp.components import func_to_container_op
#converting functions to container operation
add_operation = func_to_container_op(add, base_image = <base_image>)
div_operation = func_to_container_op(div, base_image = <base_image>)

Let’s stitch the pipeline for the above-created containers:

#importing KFP pipeline
from kfp.dsl import pipeline
# defining pipeline meta
@pipeline(
name='Calculate Average',
description='This pipeline calculates average'
)
# stitch the steps
def average_calculation_pipeline(context: str=context):
#passing context to step 1
step_1= add_operation(context)
#passing output of step 1 to step 2
step_2 = div_operation(step_1.output)

2. Building pipelines through docker images

NOTE : Understanding docker and building docker images is important before proceeding further.

Making changes to the functions

To build a pipeline through heavy-weight (reusable) containers, the functions need two modifications:

  • accept context string as a sys argv
  • write the result in to file
  • no need to have subprocess installations inside the functions, as ‘requirements.txt’ used for building the docker image would take care of that

The function should look something like this:

import sys
add(sys.argv[1])
def add (ctx : str)

#getting the system argument
context = json.loads(ctx)
.
.
.

with open('output.txt','w') as out_file:
out_file.write(json.dumps(context))

Similar changes to be done to the div function and separate images to be built for individual functions (obviously)

Building Docker Images

Docker images need to be created now in the usual manner with no changes to Dockerfile as the arguments to be passed at ENTRYPOINT will be taken care of by KFP.

Uploading Created Images

Once the images are built we need to upload the images to a location accessible by the pods in the cluster (i.e private or public docker registry).

NOTE: relevant image pull auth needs to be taken care of while setting the cluster for private docker registry (refer : Pull image from private registry)

Stitching the pipeline

Once the images have been pushed we are ready to stitch the pipeline once again.

Unlike lightweight containers, we don’t need to convert functions to container operations and directly proceed to stitch the pipeline together.

#importing KFP pipeline
from kfp.dsl import pipeline
# defining pipeline meta
@pipeline(
name='Calculate Average',
description='This pipeline calculates average'
)
#importing container operation
from kfp.dsl import Containerop
# stitch the steps
def average_calculation_pipeline(context: str=context):
step_1 = ContainerOp(
name = 'add', # name of the operation
image = 'docker.io/avg/add', #docker location in registry
arguments = [context], # passing context as argument
file_outputs = {
'context': '/output.txt' #name of the file with result
}
)
step_2 = ContainerOp(
name = 'div', # name of operation
image = 'docker.io/avg/add', #docker location in registry
arguments = [step_1], # passing step_1.output as argument
file_outputs = {
'context': '/output.txt' #name of the file with result
}
)

3. Compiling Kubeflow Pipeline

#importing KFP compiler
from kfp.compiler import Compiler
#compiling the created pipeline
Compiler().compile(average_calculation_pipeline, 'pipeline.zip')

4. Initialising Pipeline Run through Script

This zip file produced after the compilation can either be uploaded to create a kubeflow pipeline through the Kubeflow UI route or can be created using the following script.

#importing KFP client
from kfp import client
#initialising client instance
client = kfp.Client()
#creating experiment
experiment = client.create_experiment(
name = "Average Experiment",
description = "This is the average experiment"
)
#Define a run name
run_name = "This is test run: 01"
#Submit a pipeline run
run_result = client.run_pipeline(
experiment.id,
run_name,
pipeline_filename,
params = {}
)
print(run_result)

5. Conclusion

The above article describes the most basic explanation for beginners on how to create a Kubeflow pipeline to be able to deliver Machine Learning at scale.

NOTE : Pipelines can be built using a combination of heavy-weight and light-weight components. Try mixing the above explained two process to do so.

--

--