# Copyright 2021 The Kubeflow Authors## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Base class for KFP components."""importabcfromtypingimportListfromkfp.dslimportpipeline_contextfromkfp.dslimportpipeline_taskfromkfp.dslimportstructuresfromkfp.dsl.typesimporttype_utilsfromkfp.pipeline_specimportpipeline_spec_pb2
[docs]classBaseComponent(abc.ABC):"""Base class for a component. **Note:** ``BaseComponent`` is not intended to be used to construct components directly. Use ``@kfp.dsl.component`` or ``kfp.components.load_component_from_*()`` instead. Attributes: name: Name of the component. component_spec: Component definition. """def__init__(self,component_spec:structures.ComponentSpec):"""Init function for BaseComponent. Args: component_spec: The component definition. """self.component_spec=component_specself.name=component_spec.nameself.description=component_spec.descriptionorNone# Arguments typed as PipelineTaskFinalStatus are special arguments that# do not count as user inputs. Instead, they are reserved to for the# (backend) system to pass a value.self._component_inputs={input_nameforinput_name,input_specin(self.component_spec.inputsor{}).items()ifnottype_utils.is_task_final_status_type(input_spec.type)}def_prevent_using_output_lists_of_artifacts(self):"""This method should be called at the end of __init__ for PythonComponent and ContainerComponent subclasses to temporarily block outputting lists of artifacts from a component."""# TODO: remove when output lists of artifacts from primitive components is supportedforoutput_name,output_specin(self.component_spec.outputsor{}).items():ifoutput_spec.is_artifact_list:raiseValueError(f'Output lists of artifacts are only supported for pipelines. Got output list of artifacts for output parameter {output_name!r} of component {self.name!r}.')def__call__(self,*args,**kwargs)->pipeline_task.PipelineTask:"""Creates a PipelineTask object. The arguments are generated on the fly based on component input definitions. """task_inputs={}ifargs:raiseTypeError('Components must be instantiated using keyword arguments. Positional 'f'parameters are not allowed (found {len(args)} such parameters for 'f'component "{self.name}").')fork,vinkwargs.items():ifknotinself._component_inputs:raiseTypeError(f'{self.name}() got an unexpected keyword argument "{k}".')task_inputs[k]=v# Skip optional inputs and arguments typed as PipelineTaskFinalStatus.missing_arguments=[argforarginself.required_inputsifargnotinkwargs]ifmissing_arguments:argument_or_arguments='argument'iflen(missing_arguments)==1else'arguments'arguments=', '.join(arg_name.replace('-','_')forarg_nameinmissing_arguments)raiseTypeError(f'{self.name}() missing {len(missing_arguments)} required 'f'{argument_or_arguments}: {arguments}.')returnpipeline_task.PipelineTask(component_spec=self.component_spec,args=task_inputs,execute_locally=pipeline_context.Pipeline.get_default_pipeline()isNone,execution_caching_default=pipeline_context.Pipeline.get_execution_caching_default(),)@propertydefpipeline_spec(self)->pipeline_spec_pb2.PipelineSpec:"""Returns the pipeline spec of the component."""withBlockPipelineTaskRegistration():returnself.component_spec.to_pipeline_spec()@propertydefplatform_spec(self)->pipeline_spec_pb2.PlatformSpec:"""Returns the PlatformSpec of the component. Useful when the component is a GraphComponent, else will be empty per component_spec.platform_spec default. """returnself.component_spec.platform_spec
[docs]@abc.abstractmethoddefexecute(self,**kwargs):"""Executes the component locally if implemented by the inheriting subclass."""
classBlockPipelineTaskRegistration:"""Temporarily stop registering tasks to the default pipeline. Handles special, uncommon functions that decorate and mutate a component, possibly by using the component's .pipeline_spec attribute. This is exhibited in the version of google_cloud_pipeline_components compatible with KFP SDK v2. """# TODO: this handles the special case of a compiled component (when compiled inside a pipeline), which should not have any concept of a default pipeline. Perhaps there is a way to unify component/pipeline compilation concepts to remove this workaround?def__enter__(self):self.task_handler,pipeline_task.PipelineTask._register_task_handler=pipeline_task.PipelineTask._register_task_handler,pipeline_task._register_task_handlerdef__exit__(self,*args):pipeline_task.PipelineTask._register_task_handler=self.task_handler