Source code for kfp.kubernetes.config_map
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Union
from google.protobuf import json_format
from kfp.dsl import PipelineTask, pipeline_channel
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb
[docs]
def use_config_map_as_env(
task: PipelineTask,
config_map_name: Union[pipeline_channel.PipelineParameterChannel, str],
config_map_key_to_env: Dict[str, str],
optional: bool = False,
) -> PipelineTask:
"""Use a Kubernetes ConfigMap as an environment variable as described by the `Kubernetes documentation
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#define-container-environment-variables-using-configmap-data` _.
Args:
task: Pipeline task.
config_map_name: Name of the ConfigMap.
config_map_key_to_env: Dictionary of ConfigMap key to environment variable name. For example, ``{'foo': 'FOO'}`` sets the value of the ConfigMap's foo field to the environment variable ``FOO``.
optional: Optional field specifying whether the ConfigMap must be defined.
Returns:
Task object with updated ConfigMap configuration.
"""
msg = common.get_existing_kubernetes_config_as_message(task)
key_to_env = [
pb.ConfigMapAsEnv.ConfigMapKeyToEnvMap(
config_map_key=config_map_key,
env_var=env_var,
) for config_map_key, env_var in config_map_key_to_env.items()
]
config_map_as_env = pb.ConfigMapAsEnv(key_to_env=key_to_env, optional=optional)
config_map_name_parameter = common.parse_k8s_parameter_input(config_map_name, task)
config_map_as_env.config_map_name_parameter.CopyFrom(config_map_name_parameter)
# deprecated: for backwards compatibility
if isinstance(config_map_name, str):
config_map_as_env.config_map_name = config_map_name
msg.config_map_as_env.append(config_map_as_env)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)
return task
[docs]
def use_config_map_as_volume(
task: PipelineTask,
config_map_name: Union[pipeline_channel.PipelineParameterChannel, str],
mount_path: str,
optional: bool = False,
) -> PipelineTask:
"""Use a Kubernetes ConfigMap by mounting its data to the task's container as
described by the `Kubernetes documentation <https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#add-configmap-data-to-a-volume>`_.
Args:
task: Pipeline task.
config_map_name: Name of the ConfigMap.
mount_path: Path to which to mount the ConfigMap data.
optional: Optional field specifying whether the ConfigMap must be defined.
Returns:
Task object with updated ConfigMap configuration.
"""
msg = common.get_existing_kubernetes_config_as_message(task)
config_map_as_vol = pb.ConfigMapAsVolume(
mount_path=mount_path,
optional=optional,
)
config_map_name_parameter = common.parse_k8s_parameter_input(config_map_name, task)
config_map_as_vol.config_map_name_parameter.CopyFrom(config_map_name_parameter)
# deprecated: for backwards compatibility
if isinstance(config_map_name, str):
config_map_as_vol.config_map_name = config_map_name
msg.config_map_as_volume.append(config_map_as_vol)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)
return task