import gradio as gr from gradio_rangeslider import RangeSlider import numpy as np from scipy.spatial import KDTree import pandas as pd from dysts.base import DynSys from dysts.flows import Lorenz, Chua import io import matplotlib.pyplot as plt from PIL import Image def fig2img(fig): """Convert a Matplotlib figure to a PIL Image and return it""" buf = io.BytesIO() fig.savefig(buf) buf.seek(0) img = Image.open(buf) return img """ ----------------------------------------------------------------------------------------- ROBOTIC ARM SYSTEM ----------------------------------------------------------------------------------------- """ class RoboticArm(object): """A simple robotic arm. Receives an array of angles as a motor commands. Return the position of the tip of the arm as a 2-tuple. """ def __init__(self, dim=7, limit=150, D=0.05): """`dim` is the number of joints, which are able to move in (-limit, +limit)""" self.dim, self.limit, self.D = dim, limit, D self.trajectory = np.zeros((dim+1, 2)) # hold the last executed posture (x, y position of all joints) def execute(self, angles, return_img=True): """Return the position of the end effector. Accepts values in degrees.""" u, v, sum_a, length = 0, 0, 0, 1.0/len(angles) self.trajectory[0] = np.array([u, v]) for i, a in enumerate(angles): sum_a += np.radians(a) u, v = u + length*np.sin(sum_a), v + length*np.cos(sum_a) # at zero pose, the tip is at x=0,y=1. self.trajectory[i+1] = np.array([u, v]) if return_img: fig = plt.figure() # save discovery as image for display plt.plot(self.trajectory[:, 0], self.trajectory[..., 1].T, '-o', color="gray") plt.xlim([-1.0, 1.0]) plt.ylim([-1.0, 1.0]) img = fig2img(fig) plt.close() else: img = None return np.array([u, v]), img def execute_random(self, N): """Random trajectories in parallel""" def random_params(self): """Return a random (and legal) motor command.""" return np.random.uniform(-self.limit, self.limit, size=(self.dim)) def perturb_params(self, params): """With a range of ±150°, creates a perturbation of ±15° (5% of 300° in each direction).""" min_perturb = np.minimum(np.repeat(self.limit, self.dim), params + 2*self.D*self.limit) # create a random perturbation inside the legal values. max_perturb = np.maximum(np.repeat(-self.limit, self.dim), params - 2*self.D*self.limit) new_params = np.random.uniform(min_perturb, max_perturb) return new_params def random_goal(self, reached_goals=None): """Goal babbling strategy""" goal = np.random.uniform(-1, 1, size=(2, )) # goal as random point in [-1, 1]x[-1, 1]. return goal """ ----------------------------------------------------------------------------------------- LORENZ ATTRACTOR SYSTEM ----------------------------------------------------------------------------------------- """ class ChaoticAttractor(DynSys): """The Lorenz Attractor. """ def __init__(self, name, params_range, T, D=0.05): super().__init__() if name == "Lorenz": self._rhs = Lorenz._rhs self._jac = Lorenz._jac self.pts_per_period = 100 elif name == "Chua": self._rhs = Chua._rhs self._jac = Chua._jac self.pts_per_period = 100 else: raise NotImplementedError params_range["ic"] = [np.repeat(params_range["ic"][0], self.embedding_dimension), np.repeat(params_range["ic"][1], self.embedding_dimension)] self.params_range = params_range self.T, self.D = T, D self.trajectory = np.zeros((T, self.embedding_dimension)) # hold the executed trajectory (x, y, z positions in time) def execute(self, params, return_img=True): """Return the position in 3D space at time T""" self.params = {} for k, v in params.items(): setattr(self, k, v) if k != "ic": self.params[k] = v self.trajectory = self.make_trajectory(self.T, pts_per_period=self.pts_per_period) if return_img: fig = plt.figure() # save discovery as image for display plt.plot(self.trajectory[:, 0], self.trajectory[..., 1].T) img = fig2img(fig) plt.close() else: img = None return self.trajectory[-1], img def random_params(self): """Return a random (and legal) motor command.""" params = {} for k, v in self.params_range.items(): params[k] = np.random.uniform(v[0], v[1]) return params def perturb_params(self, params): """Creates a perturbation of parameters within allowed range.""" new_params = {} for k, v in params.items(): param_range = self.params_range[k][1] - self.params_range[k][0] min_perturb = np.minimum(self.params_range[k][1], v + self.D*param_range) max_perturb = np.maximum(self.params_range[k][0], v - self.D*param_range) new_params[k] = np.random.uniform(min_perturb, max_perturb) return new_params def random_goal(self, reached_goals=None): """Goal babbling strategy""" goal = np.random.uniform(np.stack(reached_goals).min(0), np.stack(reached_goals).max(0)) # goal as random point in hypercube of currently reached goals. return goal """ ----------------------------------------------------------------------------------------- RANDOM EXPLORATION IN INPUT PARAMETER SPACE ----------------------------------------------------------------------------------------- """ def explore_rip(system, n): """Explore the system using random input parameters (RIP) during n steps.""" history = {"params": [], "effects": []} for t in range(n): params = system.random_params() # choosing a random parameter command; does not depend on history. effect, _ = system.execute(params, return_img=False) # executing the parameter command and observing its effect. history["params"].append(params) # updating history. history["effects"].append(effect) return history """ ----------------------------------------------------------------------------------------- RANDOM EXPLORATION IN EFFECT (GOAL) SPACE ----------------------------------------------------------------------------------------- """ def inverse(system, goal, history): """Transform a goal into a parameter command""" tree = KDTree(history["effects"]) # find the nearest neighbor of the goal. d, i = tree.query(goal, k=1) nn_params = history["params"][i] return system.perturb_params(nn_params) def goal_babbling(system, history): goal = system.random_goal(history["effects"]) return inverse(system, goal, history) def explore_rge(system, n): """Explore the arm using random goal exploration (RGE) during n steps.""" history = {"params": [], "effects": [], "images": []} for t in range(n): if t < 0.2*n: # random parameter exploration for the first 20% steps. params = system.random_params() else: # then random goal exploration. params = goal_babbling(system, history) # goal exploration depends on history. effect, img = system.execute(params) # executing the motor command and observing its effect. history["params"].append(params) # updating history. history["effects"].append(effect) history["images"].append((img, f"Run {t}")) return history """ ----------------------------------------------------------------------------------------- MAIN ----------------------------------------------------------------------------------------- """ def explore_robotic_arm(seed, N, dim, limit): system = RoboticArm(dim, limit) return explore(system, seed, N) def explore_chaotic_attractor(seed, N, T, name, lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range): if name == "Lorenz": params_range = {"beta": lorenz_beta_range, "rho": lorenz_rho_range, "sigma": lorenz_sigma_range, "ic": lorenz_ic_range} elif name == "Chua": params_range = {"alpha": chua_alpha_range, "beta": chua_beta_range, "m0": chua_m0_range, "m1": chua_m1_range, "ic": chua_ic_range} else: raise NotImplementedError system = ChaoticAttractor(name, params_range, T) return explore(system, seed, N) def explore(system, seed, N): # Random Exploration in Input Space np.random.seed(seed) history_rip = explore_rip(system, N) effects_rip = np.stack(history_rip["effects"]) # Goal Exploration in Output Space np.random.seed(seed) history_rge = explore_rge(system, N) effects_rge = np.stack(history_rge["effects"]) df_rip = pd.DataFrame({"x": effects_rip[:, 0], "y": effects_rip[:, 1]}) df_rge = pd.DataFrame({"x": effects_rge[:, 0], "y": effects_rge[:, 1]}) gallery_rge = history_rge["images"] return df_rip , df_rge, gallery_rge def change_attractor(name): if name == "Chua": return gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) elif name == "Lorenz": return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) else: raise NotImplementedError with gr.Blocks() as demo: with gr.Tab("Robotic Arm Exploration"): with gr.Row(): gr.Markdown("# Goal-based Exploration of a Robotic Arm") with gr.Row(): gr.Markdown( """This example is based on Fabien Benureau goal babbling tutorial, which can also be run in [this notebook](https://fabien.benureau.com/recode/benureau2015_gb/benureau2015_gb.html) """ ) with gr.Row(): seed = gr.Number(value=0, minimum=0, maximum=1000, precision=0, label="Random seed") N = gr.Number(value=300, minimum=10, maximum=5000, precision=0, label="Number of exploration steps") dim = gr.Number(value=7, minimum=2, maximum=100, precision=0, label="Number of joints") limit = gr.Slider(value=150, minimum=20, maximum=180,step=10, label="Joint Angle Limit") with gr.Row(): examples = gr.Examples([[300, 2, 150], [300, 7, 150], [300, 20, 150], [300, 100, 150]], inputs=[N, dim, limit]) with gr.Row(): explo_btn = gr.Button("Launch Exploration! ", variant="primary") with gr.Row(): rip_plot = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Input Parameters", x_lim=[-1.1, 1.1], y_lim=[-1.1, 1.1], width=400, height=400, label="Random Input Parameters") rge_plot = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Goal Exploration", x_lim=[-1.1, 1.1], y_lim=[-1.1, 1.1], width=400, height=400, label="Random Goal Exploration") with gr.Row(): gallery = gr.Gallery(object_fit="contain", height="600px", columns=8, label="IMGEP Discoveries") explo_btn.click(explore_robotic_arm, inputs=[seed, N, dim, limit], outputs=[rip_plot, rge_plot, gallery]) with gr.Tab("Chaotic Attractors Exploration"): with gr.Row(): gr.Markdown("# Goal-based Exploration of a Chaotic System") with gr.Row(): gr.Markdown( """This example uses William Gilpin's `dysts` library, which contains many examples of chaotic systems (see [github repo](https://github.com/williamgilpin/dysts/)) """ ) with gr.Row(): seed2 = gr.Number(value=0, minimum=0, maximum=1000, precision=0, label="Random seed") N2 = gr.Number(value=100, minimum=10, maximum=1000, precision=0, label="Number of exploration steps") T = gr.Number(value=100, minimum=10, maximum=200, precision=0, label="Number of trajectory timesteps") with gr.Row(): gr.Markdown("Select Chaotic Attractor and Parameter Range") with gr.Row(): name = gr.Dropdown(value="Chua", choices=["Chua", "Lorenz"], label="Chaotic Attractor") with gr.Row(): # Chua Attractor chua_alpha_range = RangeSlider(value=(13, 18), minimum=10, maximum=20, step=0.1, label="Alpha Range", visible=True) chua_beta_range = RangeSlider(value=(25, 30), minimum=20, maximum=40, step=0.1, label="Beta Range", visible=True) chua_m0_range = RangeSlider(value=(-1.3, -1), minimum=-1.5, maximum=-1, step=0.001, label="m0 Range", visible=True) chua_m1_range = RangeSlider(value=(-0.85, -0.55), minimum=-0.9, maximum=-0.5, step=0.001, label="m1 Range", visible=True) chua_ic_range = RangeSlider(value=(-0.5, 0.5), minimum=-1, maximum=1, step=1, label="Initial Conditions Range", visible=True) # Lorenz Attractor lorenz_beta_range = RangeSlider(value=(2, 6), minimum=2, maximum=6, step=0.001, label="Beta Range", visible=False) lorenz_rho_range = RangeSlider(value=(20, 50), minimum=20, maximum=50, step=10, label="Rho Range", visible=False) lorenz_sigma_range = RangeSlider(value=(6, 35), minimum=6, maximum=35, step=1, label="Sigma Range", visible=False) lorenz_ic_range = RangeSlider(value=(-20, 20), minimum=-20, maximum=20, step=1, label="Initial Conditions Range", visible=False) # Rossler Attractor with gr.Row(): explo_btn2 = gr.Button("Launch Exploration! ", variant="primary") with gr.Row(): rip_plot2 = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Input Parameters", width=400, height=400, label="Random Input Parameters") rge_plot2 = gr.ScatterPlot(pd.DataFrame({"x": [], "y": []}), x="x", y="y", title="Reached Positions with Random Goal Exploration", width=400, height=400, label="Random Goal Exploration") with gr.Row(): gallery2 = gr.Gallery(object_fit="contain", height="600px", columns=8, label="IMGEP Discoveries") name.input(change_attractor, inputs=name, queue=False, show_progress="hidden", outputs=[lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range]) explo_btn2.click(explore_chaotic_attractor, inputs=[seed2, N2, T, name, lorenz_beta_range, lorenz_rho_range, lorenz_sigma_range, lorenz_ic_range, chua_alpha_range, chua_beta_range, chua_m0_range, chua_m1_range, chua_ic_range], outputs=[rip_plot2, rge_plot2, gallery2]) demo.launch()