File size: 6,283 Bytes
d8c02ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a46979d
d8c02ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import io
import os
import sys

sys.path.append(os.path.join(os.path.dirname(__file__)))

import blended_tiling
import numpy
import onnxruntime
import streamlit.file_util
import torch
import torch.cuda
from PIL import Image
from streamlit.runtime.uploaded_file_manager import UploadedFile
from streamlit_image_comparison import image_comparison
from torchvision.transforms import functional as TVTF

from tools import image_tools

# * Cached/loaded model
onnx_session = None  # type: onnxruntime.InferenceSession

# * Streamlit UI / Config
streamlit.set_page_config(page_title="🐲 PXDN Line Extractor v1", layout="wide")
streamlit.title("🐲 PXDN Line Extractor v1")

# * Streamlit Containers / Base Layout
# Row 1
ui_section_status = streamlit.container()

# Row 2
ui_col1, ui_col2 = streamlit.columns(2, gap="medium")
streamlit.html("<hr>")

# Row 3
ui_section_compare = streamlit.container()

# * Streamlit Session
# Nothing yet

with ui_section_status:
	# Forward declared UI elements
	ui_status_text = streamlit.empty()
	ui_progress_bar = streamlit.empty()

with ui_col1:
	# Input Area
	streamlit.markdown("### Input Image")
	ui_image_input = streamlit.file_uploader("Upload an image", key="fileupload_image", type=[".png", ".jpg", ".jpeg", ".webp"])  # type: UploadedFile

with ui_col2:
	# Output Area
	streamlit.markdown("### Output Image")
	# Preallocate image spot and download button
	ui_image_output = streamlit.empty()
	ui_image_download = streamlit.empty()

def fetch_model_to_cache(huggingface_repo: str, file_path: str, access_token: str) -> str:
	import huggingface_hub
	return huggingface_hub.hf_hub_download(huggingface_repo, file_path, token=access_token)

def bootstrap_model():
	global onnx_session
	if onnx_session is None:
		
		# Environment-level configuration
		huggingface_repo = os.getenv("HF_REPO_NAME", "")
		file_path = os.getenv("HF_FILE_PATH", "")
		access_token = os.getenv("HF_TOKEN", "")
		allow_cuda = os.getenv("ALLOW_CUDA", "false").lower() in {'true', 'yes', '1', 'y'}
		
		model_file_path = fetch_model_to_cache(huggingface_repo, file_path, access_token)
		
		# * Enable CUDA if available and allowed
		model_providers = ['CPUExecutionProvider']
		if torch.cuda.is_available() and allow_cuda:
			model_providers.insert(0, 'CUDAExecutionProvider')
		
		onnx_session = onnxruntime.InferenceSession(model_file_path, sess_options=None, providers=model_providers)

def evaluate_tiled(image_pt: torch.Tensor, tile_size: int = 128, batch_size: int = 1) -> Image.Image:
	image_pt_orig = image_pt
	orig_h, orig_w = image_pt_orig.shape[1], image_pt_orig.shape[2]
	
	# ? Padding
	image_pt_padded, place_x, place_y = image_tools.pad_to_divisible(image_pt_orig, tile_size)
	
	_, im_h_padded, im_w_padded = image_pt_padded.shape
	
	# ? Tiling
	image_tiler = blended_tiling.TilingModule(tile_size=tile_size, tile_overlap=[0.18, 0.18], base_size=(im_w_padded, im_h_padded)).eval()
	# * Add batch dim for the tiler which expects (1, C, H, W)
	image_tiles = image_tiler.split_into_tiles(image_pt_padded.unsqueeze(0))
	
	# ? Pull the input and output names from the model so we're not hardcoding them.
	onnx_session.get_modelmeta()
	input_name = onnx_session.get_inputs()[0].name
	output_name = onnx_session.get_outputs()[0].name
	
	# ? Inference ==================================================================================================
	complete_tiles = []
	
	max_evals = image_tiles.size(0) // batch_size
	image_tiles = image_tiles.numpy()
	
	ui_status_text.markdown("### Processing...")
	active_progress = ui_progress_bar.progress(0, "Progress")
	
	for i in range(max_evals):
		tile_batch = image_tiles[i * batch_size:(i + 1) * batch_size]
		if len(tile_batch) == 0:
			break
		
		pct_complete = round((i + 1) / max_evals, 2)
		active_progress.progress(pct_complete)
		
		eval_output = onnx_session.run([output_name], {input_name: tile_batch})
		output_batch = eval_output[0]
		
		complete_tiles.extend(output_batch)
	
	# ? /Inference
	ui_status_text.empty()
	ui_progress_bar.empty()
	
	# ? Rehydrate the tiles into a full image.
	complete_tiles_tensor = torch.from_numpy(numpy.stack(complete_tiles))
	complete_image = image_tiler.rebuild_with_masks(complete_tiles_tensor)
	
	# ? Unpad the image, a simple crop.
	if place_x > 0 or place_y > 0:
		complete_image = complete_image[:, :, place_y:place_y + orig_h, place_x:place_x + orig_w]
	
	# ? Clamp and convert to PIL.
	complete_image = complete_image.squeeze(0)
	complete_image = complete_image.clamp(0, 1.0)
	final_image_pil = TVTF.to_pil_image(complete_image)
	
	return final_image_pil

def streamlit_to_pil_image(streamlit_file: UploadedFile):
	image = Image.open(io.BytesIO(streamlit_file.read()))
	return image

def pil_to_buffered_png(image: Image.Image) -> io.BytesIO:
	buffer = io.BytesIO()
	image.save(buffer, format="PNG", compression=3)
	buffer.seek(0)
	return buffer

# ! Image Inference
if ui_image_input is not None and ui_image_input.name is not None:
	bootstrap_model()
	ui_status_text.empty()
	ui_progress_bar.empty()
	
	onnx_session.get_modelmeta()
	onnx_input_metadata = onnx_session.get_inputs()[0]
	b, c, h, w = onnx_input_metadata.shape
	
	target_batch_size = b
	# This is always square, if H and W are different for ONNX input you screwed up, so I don't want to hear it.
	target_tile_size = h
	
	input_image = streamlit_to_pil_image(ui_image_input)
	loaded_image_pt = image_tools.prepare_image_for_inference(input_image)
	finished_image = evaluate_tiled(loaded_image_pt, tile_size=target_tile_size, batch_size=target_batch_size)
	
	with ui_col2:
		ui_image_output.image(finished_image, use_container_width=True, caption="Output Image")
		complete_file_name = f"{ui_image_input.name.rsplit('.', 1)[0]}_output.png"
		
		@streamlit.fragment
		def download_button():
			# ui_image_download.download_button("Download Image", image_to_bytesio(finished_image), complete_file_name, type="primary", on_click=lambda: setattr(streamlit.session_state, 'download_click', True))
			streamlit.download_button("Download Image", pil_to_buffered_png(finished_image), complete_file_name, type="primary")
		
		download_button()
	
	with ui_section_compare:
		image_comparison(img1=input_image, img2=finished_image, make_responsive=True, label1="Input Image", label2="Output Image", width=1024)