Build Your Own Pipeline Components
This page is for advanced users. It describes how to build your own pipeline components. For an easier start, try building a pipeline with the provided samples.
Overview of pipeline components
Kubeflow Pipelines components are implementations of pipeline tasks. Each task takes one or more artifacts as input and may produce one or more artifacts as output.
Each task usually includes two parts:
Client code
The code that talks to endpoints to submit jobs. For example, code to talk to
the Google Dataproc API to submit a Spark job.
Runtime code
The code that does the actual job and usually runs in the cluster. For
example, Spark code that transforms raw data into preprocessed data.
Note the naming convention for client code and runtime code—for a task named “mytask”:
- The
mytask.py
program contains the client code. - The
mytask
directory contains all the runtime code.
A component consists of an interface (inputs/outputs), the implementation (a Docker container image and command-line arguments) and metadata (name, description).
Components can be instantiated inside the pipeline
function to create tasks.
There are multiple ways to author components:
- Wrap an existing Docker container image using
ContainerOp
, as described below. - Create a lightweight python component from a Python function
- Build a new Docker container image from a Python function.
Example: XGBoost DataProc components
Requirements for building a component
Install Docker.
Step One: Create a container for each component
In most cases, you need to create your own container image that includes your
program. See the
container building examples.
(In the directory, go to any subdirectory and then go to the containers
directory.)
If your component creates some outputs to be fed as inputs to the downstream
components, each output must be a string and must be written to a separate local
text file by the container image. For example, if a trainer component needs to
output the trained model path, it writes the path into a local file
/output.txt
. In the Python class (in step three), you have the chance to
specify how to map the content of local files to component outputs.
Step Two: Create a Python class for your component
The Python classes describe the interactions with the Docker container image created in step one. For example, a component to create confusion matrix data from prediction results looks like this:
class ConfusionMatrixOp(kfp.dsl.ContainerOp):
def __init__(self, name, predictions, output_path):
super(ConfusionMatrixOp, self).__init__(
name=name,
image='gcr.io/project-id/ml-pipeline-local-confusion-matrix:v1',
command=['python', '/ml/confusion_matrix.py'],
arguments=[
'--output', '%s/{{workflow.name}}/confusionmatrix' % output_path,
'--predictions', predictions
],
file_outputs={'label': '/output.txt'})
Note:
-
Each component needs to inherit from
kfp.dsl.ContainerOp
. -
If you already defined ENTRYPOINT in the container image, you don’t need to provide
command
unless you want to override it. -
In the init arguments, there can be Python native types (such as str, int) and
kfp.dsl.PipelineParam
types. Eachkfp.dsl.PipelineParam
represents a parameter whose value is usually only known at run time. It might be a pipeline parameter whose value is provided at pipeline run time by user, or it can be an output from an upstream component. In the above case,predictions
andoutput_path
arekfp.dsl.PipelineParam
types. -
Although the value of each PipelineParam is only available at run time, you can still use the parameters inline in the argument (note the “%s”). This means at run time the argument contains the value of the param inline.
-
file_outputs
lists a map between labels and local file paths. In the above case, the content of/output.txt
is gathered as a string output of the operator. To reference the output in code:op = ConfusionMatrixOp(...) op.outputs['label']
If there is only one output then you can also do op.output
.
Step Three: Create your workflow as a Python function
Each pipeline is identified as a Python function. For example:
@kfp.dsl.pipeline(
name='TFX Trainer',
description='A trainer that does end-to-end training for TFX models.'
)
def train(
output_path,
train_data=kfp.dsl.PipelineParam('train-data',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),
eval_data=kfp.dsl.PipelineParam('eval-data',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),
schema=kfp.dsl.PipelineParam('schema',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/schema.json'),
target=kfp.dsl.PipelineParam('target', value='tips'),
learning_rate=kfp.dsl.PipelineParam('learning-rate', value=0.1),
hidden_layer_size=kfp.dsl.PipelineParam('hidden-layer-size', value='100,50'),
steps=kfp.dsl.PipelineParam('steps', value=1000),
slice_columns=kfp.dsl.PipelineParam('slice-columns', value='trip_start_hour'),
true_class=kfp.dsl.PipelineParam('true-class', value='true'),
need_analysis=kfp.dsl.PipelineParam('need-analysis', value='true'),
)
Note:
- @kfp.dsl.pipeline is a required decoration including
name
anddescription
properties. - Input arguments show up as pipeline parameters in the Kubeflow Pipelines UI. As a Python rule, positional args go first and keyword args go next.
- Each function argument is of type
kfp.dsl.PipelineParam
. The default values should all be of that type. The default values show up in the Kubeflow Pipelines UI but can be overwritten.
See an example.
Lightweight Python components
You can also build lightweight components from Python functions. See the guide to lightweight python components.
Export metrics
See the guide to pipeline metrics.