Welcome to kung-fu-pipelines’s documentation!

class kungfupipelines.cli.Step(name: str, function: Callable, arguments: List[str], fullname: str = None, description: str = None)

Bases: object

Represents a step in an Argo/Kubeflow pipeline. This class enables you to provide a function along with a set of arguments that the function accepts and perform a variety of tasks related to pipeline generation: 1) Generate a dslContainerOp object which can be used to compile pipeline specification via KFP 2) Generate a command line tool that can be used to invoke your function. This is helpful because pipeline steps run as some command invoked on a container, usually invoking some script that you wrote containing the code to run (see documentation for StepSwitches). 3) Chain Steps together in a Workflow (see documentation for Workflows).

Args:
name: The name of the Step function: The function to be called when this Step is invoked arguments: A list containing the names of the arguments expected for this function fullname: (optional) The fullname of the step description: (optional) The description for the step
dslContainerOp(image, script_path, **kwargs) → kfp.dsl._container_op.ContainerOp

Returns a dsl.ContainerOp that runs the Step function.

class kungfupipelines.cli.StepSwitch(name: str, steps: List[kungfupipelines.cli.Step])

Bases: object

This is essentially a collection of Steps. When called, this reads in command line arguments and runs the appropriate Step along with the provided arguments. eg. if you had run python myscript.py step1 a b c –d and myscript.py simply creates and calls a StepSwitch, the result would be to call the Step named ‘step1’ with positional arguments a,b,c, and d=True

class kungfupipelines.workflow.BasicMLWorkflow(name: str, image: str, script_path: str, make_dataset: kungfupipelines.cli.Step, train_test_split: kungfupipelines.cli.Step, preprocess: kungfupipelines.cli.Step, train: kungfupipelines.cli.Step, postprocess_ops: List[kungfupipelines.cli.Step] = [], description: str = None)

Bases: kungfupipelines.workflow.Workflow

This specifies a simple pipeline for machiine learning. It consists of the following steps taking place one after another: 1) create/download/acquire the master dataset 2) perform a train/test split 3) apply any preprocessing logic 4) train your model 5) apply any post processing logic using the test set, including computing accuracy, ROC, etc.

Args:
name: Name to use for the compiled pipeline image: Docker container URI containing all of the scripts script_path: Path to script that runs the steps make_dataset: The Step to use for making the dataset train_test_split: The Step to use for train_test_split preprocess: The Step to use for preprocessing train: The Step to use for model training postprocess_ops: A list of Steps to perform post-training operations with description: An optional string describing your pipeline
compile()

Generates a kfp Pipeline spec which can be used to generate an Argo workflow.yaml Args:

image: The uri for the image to use. script_path: The path that your script is located in in the image.
class kungfupipelines.workflow.Workflow

Bases: abc.ABC

A Workflow abstracts the connectivity structure between pipeline steps. Individual steps can be provided to a Workflow, and they will then be compiled together so that they happen in a predefined sequence specified by that Workflow. This makes is easy to have ‘standardized’ pipeline structures in which you can simply swap out individual steps as needed. For example, a machine learning workflow might have slots for dataset preprocessing, training, hyperparameter optimization, etc. You can provide specific pipeline steps to fill in those slots and generate the pipeline spec without having to rewrite the connectivity structure each time.

compile(image: str, script_path: str) → Callable

Generates a kfp Pipeline spec which can be used to generate an Argo workflow.yaml Args:

image: The uri for the image to use. script_path: The path that your script is located in in the image.
kungfupipelines.workflow.generate_yaml(self, filename)

Generates an argo workflow.yaml spec which can be used to submit this workflow to Argo / Kubeflow.

kungfupipelines.workflow.make_sequence(ops: List[kfp.dsl._container_op.ContainerOp]) → None

Links a sequence of pipeline operations so that they are configured to take place one after another. Args:

ops - list[dsl.ContainerOp]

Indices and tables