Diffusers documentation
ModularPipelineBlocks
ModularPipelineBlocks
ModularPipelineBlocks is the basic block for building a ModularPipeline. It defines what components, inputs/outputs, and computation a block should perform for a specific step in a pipeline. A ModularPipelineBlocks connects with other blocks, using state, to enable the modular construction of workflows.
A ModularPipelineBlocks on it’s own can’t be executed. It is a blueprint for what a step should do in a pipeline. To actually run and execute a pipeline, the ModularPipelineBlocks needs to be converted into a ModularPipeline.
This guide will show you how to create a ModularPipelineBlocks.
Inputs and outputs
Refer to the States guide if you aren’t familiar with how state works in Modular Diffusers.
A ModularPipelineBlocks requires inputs
, and intermediate_outputs
.
inputs
are values provided by a user and retrieved from the PipelineState. This is useful because some workflows resize an image, but the original image is still required. The PipelineState maintains the original image.Use
InputParam
to defineinputs
.from diffusers.modular_pipelines import InputParam user_inputs = [ InputParam(name="image", type_hint="PIL.Image", description="raw input image to process") ]
intermediate_inputs
are values typically created from a previous block but it can also be directly provided if no preceding block generates them. Unlikeinputs
,intermediate_inputs
can be modified.Use
InputParam
to defineintermediate_inputs
.user_intermediate_inputs = [ InputParam(name="processed_image", type_hint="torch.Tensor", description="image that has been preprocessed and normalized"), ]
intermediate_outputs
are new values created by a block and added to the PipelineState. Theintermediate_outputs
are available asintermediate_inputs
for subsequent blocks or available as the final output from running the pipeline.Use
OutputParam
to defineintermediate_outputs
.from diffusers.modular_pipelines import OutputParam user_intermediate_outputs = [ OutputParam(name="image_latents", description="latents representing the image") ]
The intermediate inputs and outputs share data to connect blocks. They are accessible at any point, allowing you to track the workflow’s progress.
Computation logic
The computation a block performs is defined in the __call__
method and it follows a specific structure.
- Retrieve the BlockState to get a local view of the
inputs
andintermediate_inputs
. - Implement the computation logic on the
inputs
andintermediate_inputs
. - Update PipelineState to push changes from the local BlockState back to the global PipelineState.
- Return the components and state which becomes available to the next block.
def __call__(self, components, state):
# Get a local view of the state variables this block needs
block_state = self.get_block_state(state)
# Your computation logic here
# block_state contains all your inputs and intermediate_inputs
# Access them like: block_state.image, block_state.processed_image
# Update the pipeline state with your updated block_states
self.set_block_state(state, block_state)
return components, state
Components and configs
The components and pipeline-level configs a block needs are specified in ComponentSpec and ConfigSpec.
- ComponentSpec contains the expected components used by a block. You need the
name
of the component and ideally atype_hint
that specifies exactly what the component is. - ConfigSpec contains pipeline-level settings that control behavior across all blocks.
from diffusers import ComponentSpec, ConfigSpec
expected_components = [
ComponentSpec(name="unet", type_hint=UNet2DConditionModel),
ComponentSpec(name="scheduler", type_hint=EulerDiscreteScheduler)
]
expected_config = [
ConfigSpec("force_zeros_for_empty_prompt", True)
]
When the blocks are converted into a pipeline, the components become available to the block as the first argument in __call__
.
def __call__(self, components, state):
# Access components using dot notation
unet = components.unet
vae = components.vae
scheduler = components.scheduler