Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,196 Bytes
62bb9d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
from typing_extensions import override
from comfy_api.latest import ComfyExtension, io
def attention_multiply(attn, model, q, k, v, out):
m = model.clone()
sd = model.model_state_dict()
for key in sd:
if key.endswith("{}.to_q.bias".format(attn)) or key.endswith("{}.to_q.weight".format(attn)):
m.add_patches({key: (None,)}, 0.0, q)
if key.endswith("{}.to_k.bias".format(attn)) or key.endswith("{}.to_k.weight".format(attn)):
m.add_patches({key: (None,)}, 0.0, k)
if key.endswith("{}.to_v.bias".format(attn)) or key.endswith("{}.to_v.weight".format(attn)):
m.add_patches({key: (None,)}, 0.0, v)
if key.endswith("{}.to_out.0.bias".format(attn)) or key.endswith("{}.to_out.0.weight".format(attn)):
m.add_patches({key: (None,)}, 0.0, out)
return m
class UNetSelfAttentionMultiply(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="UNetSelfAttentionMultiply",
category="_for_testing/attention_experiments",
inputs=[
io.Model.Input("model"),
io.Float.Input("q", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("k", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("v", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("out", default=1.0, min=0.0, max=10.0, step=0.01),
],
outputs=[io.Model.Output()],
is_experimental=True,
)
@classmethod
def execute(cls, model, q, k, v, out) -> io.NodeOutput:
m = attention_multiply("attn1", model, q, k, v, out)
return io.NodeOutput(m)
class UNetCrossAttentionMultiply(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="UNetCrossAttentionMultiply",
category="_for_testing/attention_experiments",
inputs=[
io.Model.Input("model"),
io.Float.Input("q", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("k", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("v", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("out", default=1.0, min=0.0, max=10.0, step=0.01),
],
outputs=[io.Model.Output()],
is_experimental=True,
)
@classmethod
def execute(cls, model, q, k, v, out) -> io.NodeOutput:
m = attention_multiply("attn2", model, q, k, v, out)
return io.NodeOutput(m)
class CLIPAttentionMultiply(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="CLIPAttentionMultiply",
category="_for_testing/attention_experiments",
inputs=[
io.Clip.Input("clip"),
io.Float.Input("q", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("k", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("v", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("out", default=1.0, min=0.0, max=10.0, step=0.01),
],
outputs=[io.Clip.Output()],
is_experimental=True,
)
@classmethod
def execute(cls, clip, q, k, v, out) -> io.NodeOutput:
m = clip.clone()
sd = m.patcher.model_state_dict()
for key in sd:
if key.endswith("self_attn.q_proj.weight") or key.endswith("self_attn.q_proj.bias"):
m.add_patches({key: (None,)}, 0.0, q)
if key.endswith("self_attn.k_proj.weight") or key.endswith("self_attn.k_proj.bias"):
m.add_patches({key: (None,)}, 0.0, k)
if key.endswith("self_attn.v_proj.weight") or key.endswith("self_attn.v_proj.bias"):
m.add_patches({key: (None,)}, 0.0, v)
if key.endswith("self_attn.out_proj.weight") or key.endswith("self_attn.out_proj.bias"):
m.add_patches({key: (None,)}, 0.0, out)
return io.NodeOutput(m)
class UNetTemporalAttentionMultiply(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="UNetTemporalAttentionMultiply",
category="_for_testing/attention_experiments",
inputs=[
io.Model.Input("model"),
io.Float.Input("self_structural", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("self_temporal", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("cross_structural", default=1.0, min=0.0, max=10.0, step=0.01),
io.Float.Input("cross_temporal", default=1.0, min=0.0, max=10.0, step=0.01),
],
outputs=[io.Model.Output()],
is_experimental=True,
)
@classmethod
def execute(cls, model, self_structural, self_temporal, cross_structural, cross_temporal) -> io.NodeOutput:
m = model.clone()
sd = model.model_state_dict()
for k in sd:
if (k.endswith("attn1.to_out.0.bias") or k.endswith("attn1.to_out.0.weight")):
if '.time_stack.' in k:
m.add_patches({k: (None,)}, 0.0, self_temporal)
else:
m.add_patches({k: (None,)}, 0.0, self_structural)
elif (k.endswith("attn2.to_out.0.bias") or k.endswith("attn2.to_out.0.weight")):
if '.time_stack.' in k:
m.add_patches({k: (None,)}, 0.0, cross_temporal)
else:
m.add_patches({k: (None,)}, 0.0, cross_structural)
return io.NodeOutput(m)
class AttentionMultiplyExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[io.ComfyNode]]:
return [
UNetSelfAttentionMultiply,
UNetCrossAttentionMultiply,
CLIPAttentionMultiply,
UNetTemporalAttentionMultiply,
]
async def comfy_entrypoint() -> AttentionMultiplyExtension:
return AttentionMultiplyExtension()
|