Spaces:
Runtime error
Runtime error
# This app explores integrating `nrtk` with `Gradio` to create an interactive interface for applying perturbations. | |
import numpy as np | |
import yaml | |
from pathlib import Path | |
from nrtk.impls.perturb_image.pybsm.scenario import PybsmScenario | |
from nrtk.impls.perturb_image.pybsm.sensor import PybsmSensor | |
from nrtk.impls.perturb_image.pybsm.perturber import PybsmPerturber | |
import gradio as gr # type: ignore | |
asset_dir = Path(__file__).parent / "assets" | |
# Define default values for fields for initilization and button clicks | |
# | |
# Note, for this application we'll be focusing on the use case of UAS imagery, so we'll try to pick default | |
# values that work with images similar to those in the VisDrone dataset. Perturbing images from other datasets or other | |
# operational tasks may not be successful without modification to these values; defining broad defaults is extremely | |
# difficult, if not impossible due to the physics-based nature of these perturbations. | |
default_values = { | |
"gsd": 0.105, | |
"scenario": { | |
"aircraftSpeed": 0, | |
"altitude": 75, | |
"backgroundReflectance": 0.07, | |
"backgroundTemperature": 293., | |
"cn2at1m": 0, | |
"groundRange": 0, | |
"haWindspeed": 21., | |
"ihaze": 2, | |
"name": "", | |
"targetReflectance": 0.15, | |
"targetTemperature": 295., | |
}, | |
"sensor": { | |
"D": 0.004, | |
"bitdepth": 11.9, | |
"darkCurrent": 0., | |
"dax": 0.0001, | |
"day": 0.0001, | |
"eta": 0.4, | |
"f": 0.01429, | |
"intTime": 0.03, | |
"maxN": 96000, | |
"maxWellFill": 0.005, | |
"name": "", | |
"optTransWavelengths": [3.8e-07, 7.0e-07], | |
"opticsTransmission": [], | |
"px": 2.0e-05, | |
"qe": [0.05, 0.6, 0.75, 0.85, 0.85, 0.75, 0.5, 0.2, 0], | |
"qewavelengths": [3.0e-7, 4.0e-7, 5.0e-7, 6.0e-7, 7.0e-7, 8.0e-7, 9.0e-7, 1.0e-6, 1.1e-6], | |
"readNoise": 25., | |
"sx": 0., | |
"sy": 0. | |
} | |
} | |
def load_config(config): | |
""" | |
Loads configuration from config dictionary to UI elements | |
""" | |
scenario_config = config["scenario"] | |
default_scenario = default_values["scenario"] | |
sensor_config = config["sensor"] | |
default_sensor = default_values["sensor"] | |
return { | |
input_group: gr.Group(visible=True), | |
img_gsd: config["gsd"] if "gsd" in config else default_values["gsd"], | |
scenario_name: scenario_config["name"] if "name" in scenario_config else default_scenario["name"], | |
ihaze: scenario_config["ihaze"] if "ihaze" in scenario_config else default_scenario["ihaze"], | |
altitude_m: scenario_config["altitude"] if "altitude" in scenario_config else default_scenario["altitude"], | |
ground_range_m: scenario_config["groundRange"] | |
if "groundRange" in scenario_config else default_scenario["groundRange"], | |
aircraft_speed_m_Per_s: scenario_config["aircraftSpeed"] | |
if "aircraftSpeed" in scenario_config else default_scenario["aircraftSpeed"], | |
target_reflectance: scenario_config["targetReflectance"] | |
if "targetReflectance" in scenario_config else default_scenario["targetReflectance"], | |
target_temperature_K: scenario_config["targetTemperature"] | |
if "targetTemperature" in scenario_config else default_scenario["targetTemperature"], | |
bkgd_reflectance: scenario_config["backgroundReflectance"] | |
if "backgroundReflectance" in scenario_config else default_scenario["backgroundReflectance"], | |
bkgd_temperature_K: scenario_config["backgroundTemperature"] | |
if "backgroundTemperature" in scenario_config else default_scenario["backgroundTemperature"], | |
ha_windspeed_m_Per_s: scenario_config["haWindspeed"] | |
if "haWindspeed" in scenario_config else default_scenario["haWindspeed"], | |
cn2at1m: scenario_config["cn2at1m"] | |
if "cn2at1m" in scenario_config else default_scenario["cn2at1m"], | |
sensor_name: sensor_config["name"] if "name" in sensor_config else default_sensor["name"], | |
D_m: sensor_config["D"] if "D" in sensor_config else default_sensor["D"], | |
f_m: sensor_config["f"] if "f" in sensor_config else default_sensor["f"], | |
px_m: sensor_config["px"] if "px" in sensor_config else default_sensor["px"], | |
# Convert lists to comma separated text | |
optTransWavelengths_str: ", ".join(str(x) for x in sensor_config["optTransWavelengths"]) | |
if "optTransWavelengths" in sensor_config and sensor_config["optTransWavelengths"] else "", | |
opticsTransmission_str: ", ".join(str(x) for x in sensor_config["opticsTransmission"]) | |
if "opticsTransmission" in sensor_config and sensor_config["opticsTransmission"] else "", | |
eta: sensor_config["eta"] if "eta" in sensor_config else default_sensor["eta"], | |
int_time_s: sensor_config["intTime"] if "intTime" in sensor_config else default_sensor["intTime"], | |
dark_current: sensor_config["darkCurrent"] if "darkCurrent" in sensor_config else default_sensor["darkCurrent"], | |
read_noise: sensor_config["readNoise"] if "readNoise" in sensor_config else default_sensor["readNoise"], | |
max_N: sensor_config["maxN"] if "maxN" in sensor_config else default_sensor["maxN"], | |
bit_depth: sensor_config["bitdepth"] if "bitdepth" in sensor_config else default_sensor["bitdepth"], | |
max_well_fill: sensor_config["maxWellFill"] if "maxWellFill" in sensor_config else default_sensor["maxWellFill"], | |
sx: sensor_config["sx"] if "sx" in sensor_config else default_sensor["sx"], | |
sy: sensor_config["sy"] if "sy" in sensor_config else default_sensor["sy"], | |
dax: sensor_config["dax"] if "dax" in sensor_config else default_sensor["dax"], | |
day: sensor_config["day"] if "day" in sensor_config else default_sensor["day"], | |
# Convert lists to comma separated text | |
qe_str: ", ".join(str(x) for x in sensor_config["qe"]) if "qe" in sensor_config and sensor_config["qe"] else "", | |
qewavelengths_str: ", ".join(str(x) for x in sensor_config["qewavelengths"]) | |
if "qewavelengths" in sensor_config and sensor_config["qewavelengths"] else "" | |
} | |
def generate_config(data): | |
""" | |
Generate dictionary that can easily be transformed into sensor and scenario objects | |
""" | |
# Input validation | |
if data[ihaze] not in PybsmScenario.ihaze_values: | |
raise gr.Error("Invalid ihaze value!") | |
if data[altitude_m] not in PybsmScenario.altitude_values: | |
raise gr.Error("Invalid altitude value!") | |
if data[ground_range_m] not in PybsmScenario.groundRange_values: | |
raise gr.Error("Invalid ground range value!") | |
scenario_config = { | |
"name": data[scenario_name], | |
"ihaze": data[ihaze], | |
"altitude": data[altitude_m], | |
"groundRange": data[ground_range_m], | |
"aircraftSpeed": data[aircraft_speed_m_Per_s], | |
"targetReflectance": data[target_reflectance], | |
"targetTemperature": data[target_temperature_K], | |
"backgroundReflectance": data[bkgd_reflectance], | |
"backgroundTemperature": data[bkgd_temperature_K], | |
"haWindspeed": data[ha_windspeed_m_Per_s], | |
"cn2at1m": data[cn2at1m] | |
} | |
# Convert text fields into lists of floats | |
optTransWavelengths = [float(w.strip()) for w in data[optTransWavelengths_str].split(",") if w.strip()] | |
opticsTransmission = [float(t.strip()) for t in data[opticsTransmission_str].split(",") if t.strip()] | |
opticsTransmission = None if not opticsTransmission else opticsTransmission | |
qe = [float(q.strip()) for q in data[qe_str].split(",") if q.strip()] | |
qe = None if not qe else qe | |
qewavelengths = [float(q.strip()) for q in data[qewavelengths_str].split(",") if q.strip()] | |
qewavelengths = None if not qewavelengths else qewavelengths | |
# More input validation | |
if len(optTransWavelengths) < 2: | |
raise gr.Error("At least 2 optical transmission wavelengths required!") | |
if optTransWavelengths[0] >= optTransWavelengths[-1]: | |
raise gr.Error("Optical transmission wavelengths should be entered least to greatest!") | |
if opticsTransmission is not None and len(opticsTransmission) != len(optTransWavelengths): | |
raise gr.Error("If provided, Optical Transmission must have the same number of values as Spectral Bandpass!") | |
sensor_config = { | |
"name": data[sensor_name], | |
"D": data[D_m], | |
"f": data[f_m], | |
"px": data[px_m], | |
"optTransWavelengths": optTransWavelengths, | |
"opticsTransmission": opticsTransmission, | |
"eta": data[eta], | |
"intTime": data[int_time_s], | |
"darkCurrent": data[dark_current], | |
"readNoise": data[read_noise], | |
"maxN": data[max_N], | |
"bitdepth": data[bit_depth], | |
"maxWellFill": data[max_well_fill], | |
"sx": data[sx], | |
"sy": data[sy], | |
"dax": data[dax], | |
"day": data[day], | |
"qe": qe, | |
"qewavelengths": qewavelengths | |
} | |
return { | |
"scenario": scenario_config, | |
"sensor": sensor_config, | |
"gsd" : data[img_gsd] | |
} | |
def gen_new_config(): | |
""" | |
Resets all fields to default values | |
""" | |
return load_config(default_values) | |
def load_config_from_file(data): | |
""" | |
Loads configuration from given file to UI elements | |
""" | |
if not data[config_file]: | |
raise gr.Error("A file must be uploaded to load existing configuration!") | |
with open(data[config_file]) as file: | |
config = yaml.safe_load(file) | |
return load_config(config) | |
def submit(data): | |
""" | |
Apply the perturbation and hide/show relevant UI elements as needed | |
""" | |
config = generate_config(data) | |
scenario_config = config["scenario"] | |
sensor_config = config["sensor"] | |
# Sensor expects numpy arrays, but plain lists serialize better so convert here | |
if sensor_config["optTransWavelengths"]: | |
sensor_config["optTransWavelengths"] = np.asarray(sensor_config["optTransWavelengths"]) | |
if sensor_config["opticsTransmission"]: | |
sensor_config["opticsTransmission"] = np.asarray(sensor_config["opticsTransmission"]) | |
if sensor_config["qe"]: | |
sensor_config["qe"] = np.asarray(sensor_config["qe"]) | |
if sensor_config["qewavelengths"]: | |
sensor_config["qewavelengths"] = np.asarray(sensor_config["qewavelengths"]) | |
gsd = config["gsd"] | |
sensor = PybsmSensor(**sensor_config) | |
scenario = PybsmScenario(**scenario_config) | |
perturber = PybsmPerturber(sensor=sensor, scenario=scenario) | |
config = generate_config(data) | |
config_file = str(asset_dir / "generated_config.yml") | |
with open(config_file, "w") as f: | |
yaml.dump(config, f) | |
def stretch_contrast_convert_8bit(img, perc=[0.1, 99.9]): | |
img = img.astype(float) | |
img = img - np.percentile(img.ravel(), perc[0]) | |
img = img / (np.percentile(img.ravel(), perc[1]) / 255) | |
img = np.clip(img, 0, 255) | |
return np.round(img).astype(np.uint8) | |
perturbed_img = perturber(image=data[input_img], additional_params={"img_gsd": gsd}) | |
peturbed_img = stretch_contrast_convert_8bit(perturbed_img, [0.1, 90.9]) | |
# Apply the perturbation and display | |
return { | |
out_img: perturbed_img, | |
out_config: config_file | |
} | |
# Lastly, we define the layout of the application and register the button click listener functions: | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# Apply pyBSM Perturbations with NRTK | |
Note: Default configuration options are tailored to UAS imagery (specifically VisDrone). | |
Perturbing other datasets or other operational tasks may not be successful without modification | |
to some of these configuration options; defining broad defaults is extremely difficult, if not | |
impossible due to the physics-based nature of these perturbations. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
sample_img_path = str(asset_dir / "0000161_01584_d_0000158.jpg") | |
input_img = gr.Image( | |
label="Input Image", | |
value=sample_img_path | |
) | |
gr.Examples( | |
examples=[ | |
sample_img_path, | |
str(asset_dir / "0000006_01111_d_0000003.jpg"), | |
str(asset_dir / "0000006_01659_d_0000004.jpg"), | |
str(asset_dir / "0000006_04309_d_0000011.jpg") | |
], | |
inputs=input_img | |
) | |
with gr.Column() as output_col: | |
out_img = gr.Image(label="Perturbed Image") | |
out_config = gr.File(label="Generated Configuration") | |
with gr.Row(): | |
with gr.Column() as input_col: | |
with gr.Row(): | |
gen_config_btn = gr.Button("Generate New Configuration") | |
with gr.Column(): | |
config_file = gr.File(label="Configuration File", file_types=[".yaml"]) | |
load_config_btn = gr.Button("Load Configuration from File") | |
with gr.Group(visible=False) as input_group: | |
with gr.Accordion("Image Parameters", open=False): | |
img_gsd = gr.Number( | |
label="Image Ground Sample Distance (GSD) (m)", | |
info="The size of one pixel on the ground", | |
value=default_values["gsd"] | |
) | |
with gr.Accordion("Scenario Parameters") as scenario_params: | |
altitude_m = gr.Number( | |
label="Altitude (m)", | |
info="Sensor height above ground level in meters. The database includes the following " \ | |
"altitude options: 2m 32.55m 75m 150m 225m 500m, 1000m to 12000m in 1000m steps, " \ | |
"14000m to 20000m in 2000m steps, and 24500m", | |
value=default_values["scenario"]["altitude"] | |
) | |
ground_range_m = gr.Number( | |
label="Ground Range (m)", | |
info="Distance on the ground between the target and sensor in meters. The following " \ | |
"ground ranges are included in the database at each altitude until the ground " \ | |
"range exceeds the distance to the spherical earth horizon: 0m 100m 500m, 1000m to " \ | |
"20000m in 1000m steps, 22000m to 80000m in 2000m steps, and 85000m to " \ | |
"300000m in 5000m steps.", | |
value=default_values["scenario"]["groundRange"] | |
) | |
with gr.Accordion("Additional Scenario Parameters", open=False) as opt_scenario_params: | |
scenario_name = gr.Textbox(label="Scenario Name", value=default_values["scenario"]["name"]) | |
ihaze = gr.Dropdown( | |
label="IHAZE", | |
info="MODTRAN code for visibility", | |
choices=[1, 2], | |
value=default_values["scenario"]["ihaze"] | |
) | |
aircraft_speed_m_Per_s = gr.Number( | |
label="Aircraft Speed (m/s)", | |
info="Ground speed of the aircraft", | |
value=default_values["scenario"]["aircraftSpeed"] | |
) | |
with gr.Row(): | |
target_reflectance = gr.Number( | |
label="Target Reflectance", | |
info="Object reflectance", | |
value=default_values["scenario"]["targetReflectance"] | |
) | |
target_temperature_K = gr.Number( | |
label="Target Temperature (K)", | |
info="Object temperature (Kelvin)", | |
value=default_values["scenario"]["targetTemperature"] | |
) | |
with gr.Row(): | |
bkgd_reflectance = gr.Number( | |
label="Background Reflectance", | |
info="Background reflectance", | |
value=default_values["scenario"]["backgroundReflectance"] | |
) | |
bkgd_temperature_K = gr.Number( | |
label="Background Temperature (K)", | |
info="Background temperature (Kelvin)", | |
value=default_values["scenario"]["backgroundTemperature"] | |
) | |
ha_windspeed_m_Per_s = gr.Number( | |
label="High Altitude Windspeed (m/s)", | |
info="Used to calculate the turbulence profile", | |
value=default_values["scenario"]["haWindspeed"] | |
) | |
cn2at1m = gr.Number( | |
label="Refractive Index Structure Parameter", | |
info='The refractive index structure parameter "near the ground" (e.g. ' \ | |
'at h = 1m). Used to calculate the turbulence profile', | |
value=default_values["scenario"]["cn2at1m"] | |
) | |
with gr.Accordion("Sensor Parameters") as sensor_params: | |
D_m = gr.Number(label="Effective Aperture Diameter (m)", value=default_values["sensor"]["D"]) | |
f_m = gr.Number(label="Focal Length (m)", value=default_values["sensor"]["f"]) | |
with gr.Accordion("Additional Sensor Parameters", open=False) as opt_sensor_parameters: | |
sensor_name = gr.Textbox(label="Sensor Name", value=default_values["sensor"]["name"]) | |
px_m = gr.Number(label="Detector Center-to-Center Spacing (Pitch) (m)", value=default_values["sensor"]["px"]) | |
optTransWavelengths_str = gr.Textbox( | |
label="Spectral Bandpass of the Camera (m)", | |
info="Enter a comma separated list. At minimum, a start and end wavelength should be specified", | |
value=", ".join(map(str, default_values["sensor"]["optTransWavelengths"])) | |
if default_values["sensor"]["optTransWavelengths"] else "" | |
) | |
opticsTransmission_str = gr.Textbox( | |
label="Full System In-Band Optical Transmission", | |
info="Enter a comma separated list. Loss due to any telescope obscuration should not be included", | |
value=", ".join(map(str, default_values["sensor"]["opticsTransmission"])) | |
if default_values["sensor"]["opticsTransmission"] else "" | |
) | |
eta = gr.Number(label="Relative Linear Obscuration", value=default_values["sensor"]["eta"]) | |
int_time_s = gr.Number( | |
label="Integration Time (s)", info="Maximum integration time", | |
value=default_values["sensor"]["intTime"] | |
) | |
dark_current = gr.Number(label="Detector Dark Current (e-/s)", value=default_values["sensor"]["darkCurrent"]) | |
read_noise = gr.Number(label="RMS Read Noise (RMS e-)", value=default_values["sensor"]["readNoise"]) | |
max_N = gr.Number(label="Maximum ADC Level (e-)", value=default_values["sensor"]["maxN"]) | |
bit_depth = gr.Number( | |
label="Bit Depth (bits)", | |
info="Resolution of the detector ADC", | |
value=default_values["sensor"]["bitdepth"] | |
) | |
max_well_fill = gr.Number( | |
label="Max Well Fill", | |
info="Desired well fill. i.e. maximum well size x desired fill fraction", | |
value=default_values["sensor"]["maxWellFill"] | |
) | |
with gr.Row(): | |
sx = gr.Number(label="RMS Jitter Amplitude, X Direction (rad)", value=default_values["sensor"]["sx"]) | |
sy = gr.Number(label="RMS Jitter Amplitude, Y Direction (rad)", value=default_values["sensor"]["sy"]) | |
with gr.Row(): | |
dax = gr.Number( | |
label="Line of Sight Angular Drift Rate, X Direction (rad/s)", | |
info="Drift rate during one integration time", | |
value=default_values["sensor"]["dax"] | |
) | |
day = gr.Number( | |
label="Line of Sight Angular Drift Rate, Y Direction (rad/s)", | |
info="Drift rate during one integration time", | |
value=default_values["sensor"]["day"] | |
) | |
qe_str = gr.Textbox( | |
label="Quantum Efficiency as a function of Wavelength (e-/photon)", | |
info="Enter a comma separated list", | |
value=", ".join(map(str, default_values["sensor"]["qe"])) if default_values["sensor"]["qe"] else "" | |
) | |
qewavelengths_str = gr.Textbox( | |
label="Wavelengths Corresponding to the Quantum Efficiency Array (microns)", | |
info="Enter a comma separated list", | |
value=", ".join(map(str, default_values["sensor"]["qewavelengths"])) | |
if default_values["sensor"]["qewavelengths"] else "" | |
) | |
submit_btn = gr.Button("Perturb Image") | |
github_btn = gr.Button( | |
"Check out NRTK on GitHub!", | |
icon=str(asset_dir / "github-badge.png"), | |
link="https://github.com/Kitware/nrtk" | |
) | |
# Button listeners | |
gen_config_btn.click( | |
fn=gen_new_config, | |
inputs=None, | |
outputs=[ | |
input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, | |
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, | |
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, | |
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str | |
] | |
) | |
load_config_btn.click( | |
fn=load_config_from_file, | |
inputs={config_file}, | |
outputs=[ | |
input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, | |
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, | |
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, | |
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str | |
] | |
) | |
submit_btn.click( | |
fn=submit, | |
inputs={ | |
input_img, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, | |
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, | |
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, | |
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str | |
}, | |
outputs=[out_img, out_config], | |
) | |
demo.launch(show_error=True) | |