kfp.kubernetes

Functions:

add_ephemeral_volume(task, volume_name, ...)

Add a `generic ephemeral volume.

add_node_selector(task, label_key, label_value)

Add a constraint to the task Pod's nodeSelector.

add_node_selector_json(task, node_selector_json)

Add a pod nodeSelector constraint to the task Pod's `nodeSelector'

add_node_affinity(task[, match_expressions, ...])

Add a constraint to the task Pod's nodeAffinity.

add_node_affinity_json(task, node_affinity_json)

Add a node affinity constraint to the task Pod's nodeAffinity using a JSON struct or pipeline parameter.

add_pod_annotation(task, annotation_key, ...)

Add an annotation to the task Pod's metadata.

add_pod_label(task, label_key, label_value)

Add a label to the task Pod's metadata.

add_toleration(task[, key, operator, value, ...])

Add a `toleration<https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/>`_.

add_toleration_json(task, toleration_json)

Add a Pod Toleration in the form of a Pipeline Input JSON to a task.

CreatePVC(name, access_modes, size[, ...])

Create a PersistentVolumeClaim, which can be used by downstream tasks.

DeletePVC(pvc_name)

Delete a PersistentVolumeClaim.

empty_dir_mount(task, volume_name, mount_path)

Mount an EmptyDir volume to the task's container.

mount_pvc(task, pvc_name, mount_path[, sub_path])

Mount a PersistentVolumeClaim to the task's container.

set_image_pull_policy(task, policy)

Set image pull policy for the container.

use_field_path_as_env(task, env_name, field_path)

Use a Kubernetes Field Path as an environment variable as described in https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information

set_image_pull_secrets(task, secret_names)

Set image pull secrets for a Kubernetes task.

set_security_context(task[, run_as_user, ...])

Set the security context for the task's container.

set_timeout(task, seconds)

Add timeout to the task Pod's active_deadline_seconds.

use_config_map_as_env(task, config_map_name, ...)

Use a Kubernetes ConfigMap as an environment variable as described by the Kubernetes documentation https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#define-container-environment-variables-using-configmap-data _.

use_config_map_as_volume(task, ...[, optional])

Use a Kubernetes ConfigMap by mounting its data to the task's container as described by the Kubernetes documentation.

use_secret_as_env(task, secret_name, ...[, ...])

Use a Kubernetes Secret as an environment variable as described by the `Kubernetes documentation https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables `_.

use_secret_as_volume(task, secret_name, ...)

Use a Kubernetes Secret by mounting its data to the task's container as described by the Kubernetes documentation.

kfp.kubernetes.add_ephemeral_volume(task: PipelineTask, volume_name: str, mount_path: str, access_modes: list[str], size: str, storage_class_name: str | None = None, labels: dict[str, str] = None, annotations: dict[str, str] = None)[source]

Add a generic ephemeral volume.. to a task.

Parameters:
task: PipelineTask

Pipeline task.

volume_name: str

name to be given to the created ephemeral volume. Corresponds to Pod.spec.volumes[*].name

mount_path: str

local path in the main container where the PVC should be mounted as a volume

access_modes: list[str]

AccessModes to request for the provisioned PVC. May be one or more of 'ReadWriteOnce', 'ReadOnlyMany', 'ReadWriteMany', or``’ReadWriteOncePod’``. Corresponds to Pod.spec.volumes[*].ephemeral.volumeClaimTemplate.spec.accessModes.

size: str

The size of storage requested by the PVC that will be provisioned. For example, '5Gi'. Corresponds to Pod.spec.volumes[*].ephemeral.volumeClaimTemplate.spec.resources.requests.storage.

storage_class_name: str | None = None

Name of StorageClass from which to provision the PV to back the PVC. None indicates to use the cluster’s default storage_class_name.

labels: dict[str, str] = None

The labels to attach to the created PVC. Corresponds to `Pod.spec.volumes[*].ephemeral.volumeClaimTemplate.metadata.labels

annotations: dict[str, str] = None

The annotation to attach to the created PVC. Corresponds to `Pod.spec.volumes[*].ephemeral.volumeClaimTemplate.metadata.annotations

Returns:

Task object with added toleration.

kfp.kubernetes.add_node_selector(task: PipelineTask, label_key: str, label_value: str) PipelineTask[source]

Add a constraint to the task Pod’s nodeSelector.

Each constraint is a key-value pair, corresponding to the PodSpec’s nodeSelector field.

For the task’s Pod to be eligible to run on a node, the node’s labels must satisfy the constraint.

Parameters:
task: PipelineTask

Pipeline task.

label_key: str

Key of the nodeSelector label.

label_value: str

Value of the nodeSelector label.

Returns:

Task object with an added nodeSelector constraint.

kfp.kubernetes.add_node_selector_json(task: PipelineTask, node_selector_json: PipelineParameterChannel | dict) PipelineTask[source]

Add a pod nodeSelector constraint to the task Pod’s `nodeSelector’

Parameters:
task: PipelineTask

Pipeline task.

node_selector_json: PipelineParameterChannel | dict

node selector provided as dict or input parameter. Takes precedence over label_key and label_value. Only one node_selector_json is applicable to a task and can contain multiple key/value pairs.

Returns:

Task object with an added nodeSelector constraint.

kfp.kubernetes.add_node_affinity(task: PipelineTask, match_expressions: list[dict] | None = None, match_fields: list[dict] | None = None, weight: int | None = None) PipelineTask[source]

Add a constraint to the task Pod’s nodeAffinity.

Each constraint is specified as a match expression (key, operator, values) or match field, corresponding to the PodSpec’s nodeAffinity field.

Parameters:
task: PipelineTask

Pipeline task.

match_expressions: list[dict] | None = None

List of dicts for matchExpressions (keys: key, operator, values).

match_fields: list[dict] | None = None

List of dicts for matchFields (keys: key, operator, values).

weight: int | None = None

If set, this affinity is preferred (K8s weight 1-100); otherwise required.

Returns:

Task object with added node affinity.

kfp.kubernetes.add_node_affinity_json(task: PipelineTask, node_affinity_json: PipelineParameterChannel | dict) PipelineTask[source]

Add a node affinity constraint to the task Pod’s nodeAffinity using a JSON struct or pipeline parameter.

This allows parameterized node affinity to be specified, matching the Kubernetes NodeAffinity schema: https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#nodeaffinity

Parameters:
task: PipelineTask

Pipeline task.

node_affinity_json: PipelineParameterChannel | dict

Dict or pipeline parameter for node affinity. Should match K8s NodeAffinity schema.

Returns:

Task object with added node affinity.

kfp.kubernetes.add_pod_annotation(task: PipelineTask, annotation_key: str, annotation_value: str) PipelineTask[source]

Add an annotation to the task Pod’s metadata.

Each annotation is a key-value pair, corresponding to the metadata’s `ObjectMeta <https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta`_ field.

Parameters:
task: PipelineTask

Pipeline task.

annotation_key: str

Key of the metadata annotation.

annotation_value: str

Value of the metadata annotation.

Returns:

Task object with an added metadata annotation.

kfp.kubernetes.add_pod_label(task: PipelineTask, label_key: str, label_value: str) PipelineTask[source]

Add a label to the task Pod’s metadata.

Each label is a key-value pair, corresponding to the metadata’s `ObjectMeta <https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta`_ field.

Parameters:
task: PipelineTask

Pipeline task.

label_key: str

Key of the metadata label.

label_value: str

Value of the metadata label.

Returns:

Task object with an added metadata label.

kfp.kubernetes.add_toleration(task: PipelineTask, key: str | None = None, operator: 'Equal' | 'Exists' | None = None, value: str | None = None, effect: 'NoExecute' | 'NoSchedule' | 'PreferNoSchedule' | None = None, toleration_seconds: int | None = None)[source]

Add a `toleration<https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/>`_. to a task.

Parameters:
task: PipelineTask

Pipeline task.

key: str | None = None

key is the taint key that the toleration applies to. Empty means match all taint keys. If the key is empty, operator must be “Exists”; this combination means to match all values and all keys.

operator: 'Equal' | 'Exists' | None = None

operator represents a key’s relationship to the value. Valid operators are “Exists” and “Equal”. Defaults to “Equal”. “Exists” is equivalent to wildcard for value, so that a pod can tolerate all taints of a particular category.

value: str | None = None

value is the taint value the toleration matches to. If the operator is “Exists”, the value should be empty, otherwise just a regular string.

effect: 'NoExecute' | 'NoSchedule' | 'PreferNoSchedule' | None = None

effect indicates the taint effect to match. Empty means match all taint effects. When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute.

toleration_seconds: int | None = None

toleration_seconds represents the period of time the toleration (which must be of effect NoExecute, otherwise this field is ignored) tolerates the taint. By default, it is not set, which means tolerate the taint forever (do not evict). Zero and negative values will be treated as 0 (evict immediately) by the system.

Returns:

Task object with added toleration.

kfp.kubernetes.add_toleration_json(task: PipelineTask, toleration_json: PipelineParameterChannel | list | dict)[source]

Add a Pod Toleration in the form of a Pipeline Input JSON to a task.

Parameters:
task: PipelineTask

Pipeline task.

toleration_json: PipelineParameterChannel | list | dict

a toleration that is a pipeline input parameter, dict, or list. The input parameter must be of type dict or list.

If it is an input parameter of type dict, it must be a single toleration object. For example a pipeline input parameter in this case could be:

{

“key”: “key1”, “operator”: “Equal”, “value”: “value1”, “effect”: “NoSchedule”

}

If it is an input parameter of type list, it must be list of toleration objects. For example a pipeline input parameter in this case could be:

[
{

“key”: “key1”, “operator”: “Equal”, “value”: “value1”, “effect”: “NoSchedule”

}, {

”key”: “key2”, “operator”: “Exists”, “effect”: “NoExecute”

}

]

In the case of static list or dicts, the call wraps add_toleration.

Returns:

Task object with added toleration.

kfp.kubernetes.CreatePVC(name: <kfp.dsl.types.type_annotations.OutputPath object at 0x7bd12bc20590>, access_modes: ~typing.List[str], size: str, pvc_name: str | None = None, pvc_name_suffix: str | None = None, storage_class_name: str | None = '', volume_name: str | None = None, annotations: ~typing.Dict[str, str] | None = None)[source]

Create a PersistentVolumeClaim, which can be used by downstream tasks. See PersistentVolume and PersistentVolumeClaim documentation for more information about the component input parameters.

Parameters:
access_modes

AccessModes to request for the provisioned PVC. May be one or more of 'ReadWriteOnce', 'ReadOnlyMany', 'ReadWriteMany', or 'ReadWriteOncePod'. Corresponds to PersistentVolumeClaim.spec.accessModes.

size

The size of storage requested by the PVC that will be provisioned. For example, '5Gi'. Corresponds to PersistentVolumeClaim.spec.resources.requests.storage.

pvc_name

Name of the PVC. Corresponds to PersistentVolumeClaim.metadata.name. Only one of pvc_name and pvc_name_suffix can be provided.

pvc_name_suffix

Prefix to use for a dynamically generated name, which will take the form <argo-workflow-name>-<pvc_name_suffix>. Only one of pvc_name and pvc_name_suffix can be provided.

storage_class_name

Name of StorageClass from which to provision the PV to back the PVC. None indicates to use the cluster’s default storage_class_name. Set to '' for a statically specified PVC.

volume_name

Pre-existing PersistentVolume that should back the provisioned PersistentVolumeClaim. Used for statically specified PV only. Corresponds to PersistentVolumeClaim.spec.volumeName.

annotations

Annotations for the PVC’s metadata. Corresponds to PersistentVolumeClaim.metadata.annotations.

Returns:

name: str

Name of the generated PVC.

kfp.kubernetes.DeletePVC(pvc_name: str)[source]

Delete a PersistentVolumeClaim.

Parameters:
pvc_name: str

Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by kubernetes.CreatePvcOp().outputs['name'].

kfp.kubernetes.empty_dir_mount(task: PipelineTask, volume_name: str, mount_path: str, medium: str | None = None, size_limit: str | None = None) PipelineTask[source]

Mount an EmptyDir volume to the task’s container.

Parameters:
task: PipelineTask

Pipeline task.

volume_name: str

Name of the EmptyDir volume.

mount_path: str

Path within the container at which the EmptyDir should be mounted.

medium: str | None = None

Storage medium to back the EmptyDir. Must be one of Memory or HugePages. Defaults to None.

size_limit: str | None = None

Maximum size of the EmptyDir. For example, 5Gi. Defaults to None.

Returns:

Task object with updated EmptyDir mount configuration.

kfp.kubernetes.mount_pvc(task: PipelineTask, pvc_name: str | PipelineChannel, mount_path: str, sub_path: str = '') PipelineTask[source]

Mount a PersistentVolumeClaim to the task’s container.

Parameters:
task: PipelineTask

Pipeline task.

pvc_name: str | PipelineChannel

Name of the PVC to mount. Supports passing a runtime-generated name, such as a name provided by kubernetes.CreatePvcOp().outputs['name'].

mount_path: str

Path to which the PVC should be mounted as a volume.

sub_path: str = ''

Path within the volume from which the container’s volume should be mounted. Defaults to “” (volume’s root).

Returns:

Task object with updated PVC mount configuration.

kfp.kubernetes.set_image_pull_policy(task: PipelineTask, policy: str) PipelineTask[source]

Set image pull policy for the container.

Parameters:
task: PipelineTask

Pipeline task.

policy: str

One of Always, Never, IfNotPresent.

Returns:

Task object with an added ImagePullPolicy specification.

kfp.kubernetes.use_field_path_as_env(task: PipelineTask, env_name: str, field_path: str) PipelineTask[source]

Use a Kubernetes Field Path as an environment variable as described in https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information

Parameters:
task: PipelineTask

Pipeline task.

env_name: str

Name of the enviornment variable.

field_path: str

Kubernetes field path to expose as the enviornment variable.

Returns:

Task object with updated field path as the enviornment variable.

kfp.kubernetes.set_image_pull_secrets(task: PipelineTask, secret_names: list[str] | list[PipelineParameterChannel]) PipelineTask[source]

Set image pull secrets for a Kubernetes task.

Parameters:
task: PipelineTask

Pipeline task.

secret_names: list[str] | list[PipelineParameterChannel]

List of image pull secret names.

Returns:

Task object with updated image pull secret configuration.

kfp.kubernetes.set_security_context(task: PipelineTask, run_as_user: int | None = None, run_as_group: int | None = None, run_as_non_root: bool | None = None) PipelineTask[source]

Set the security context for the task’s container.

Sets identity fields (runAsUser, runAsGroup, runAsNonRoot) on the container’s securityContext.

All capabilities are automatically dropped to comply with Pod Security Standards (PSS) baseline.

Note

Platform security defaults (allowPrivilegeEscalation=false, drop ALL capabilities, seccompProfile=RuntimeDefault) are enforced separately by the compiler and are not affected by this function. If an administrator or the compiler has already set runAsUser, runAsGroup, or runAsNonRoot, the values provided here will be ignored and a warning will be logged by the backend. Admin-set security context values cannot be overridden by the SDK.

Parameters:
task: PipelineTask

Pipeline task.

run_as_user: int | None = None

The UID to run the container process as.

run_as_group: int | None = None

The GID to run the container process as.

run_as_non_root: bool | None = None

Whether the container must run as a non-root user.

Returns:

Task object with an updated security context.

kfp.kubernetes.set_timeout(task: PipelineTask, seconds: int) PipelineTask[source]

Add timeout to the task Pod’s active_deadline_seconds.

Timeout an integer greater than 0, corresponding to the podspec active_deadline_seconds <https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#podspec-v1-core`_ field. Integer 0 means removing the timeout fields from previous functions.

Parameters:
task: PipelineTask

Pipeline task.

seconds: int

Value of the active_deadline_seconds.

Returns:

Task object with an updated active_deadline_seconds.

kfp.kubernetes.use_config_map_as_env(task: PipelineTask, config_map_name: PipelineParameterChannel | str, config_map_key_to_env: dict[str, str], optional: bool = False) PipelineTask[source]

Use a Kubernetes ConfigMap as an environment variable as described by the Kubernetes documentation https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#define-container-environment-variables-using-configmap-data _.

Parameters:
task: PipelineTask

Pipeline task.

config_map_name: PipelineParameterChannel | str

Name of the ConfigMap.

config_map_key_to_env: dict[str, str]

Dictionary of ConfigMap key to environment variable name. For example, {'foo': 'FOO'} sets the value of the ConfigMap’s foo field to the environment variable FOO.

optional: bool = False

Optional field specifying whether the ConfigMap must be defined.

Returns:

Task object with updated ConfigMap configuration.

kfp.kubernetes.use_config_map_as_volume(task: PipelineTask, config_map_name: PipelineParameterChannel | str, mount_path: str, optional: bool = False) PipelineTask[source]

Use a Kubernetes ConfigMap by mounting its data to the task’s container as described by the Kubernetes documentation.

Parameters:
task: PipelineTask

Pipeline task.

config_map_name: PipelineParameterChannel | str

Name of the ConfigMap.

mount_path: str

Path to which to mount the ConfigMap data.

optional: bool = False

Optional field specifying whether the ConfigMap must be defined.

Returns:

Task object with updated ConfigMap configuration.

kfp.kubernetes.use_secret_as_env(task: PipelineTask, secret_name: PipelineParameterChannel | str, secret_key_to_env: dict[str, str], optional: bool = False) PipelineTask[source]

Use a Kubernetes Secret as an environment variable as described by the `Kubernetes documentation https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables `_.

Parameters:
task: PipelineTask

Pipeline task.

secret_name: PipelineParameterChannel | str

Name of the Secret.

secret_key_to_env: dict[str, str]

Dictionary of Secret data key to environment variable name. For example, {'password': 'PASSWORD'} sets the data of the Secret’s password field to the environment variable PASSWORD.

optional: bool = False

Optional field specifying whether the Secret must be defined.

Returns:

Task object with updated secret configuration.

kfp.kubernetes.use_secret_as_volume(task: PipelineTask, secret_name: PipelineParameterChannel | str, mount_path: str, optional: bool = False) PipelineTask[source]

Use a Kubernetes Secret by mounting its data to the task’s container as described by the Kubernetes documentation.

Parameters:
task: PipelineTask

Pipeline task.

secret_name: PipelineParameterChannel | str

Name of the Secret.

mount_path: str

Path to which to mount the Secret data.

optional: bool = False

Optional field specifying whether the Secret must be defined.

Returns:

Task object with updated secret configuration.