"""
Dash visualization of what the position graph of a robotic arm looks like and what its current movements are.
"""
import traceback
from dash import dcc, html, no_update
from dash.dependencies import State, Input, Output
from dash_extensions.enrich import MultiplexerTransform, DashProxy
import dash_interactive_graphviz
from genericroboticarm.robo_APIs.robo_interface import RoboInterface
from threading import Thread
import logging
from graphviz import Graph
from dash_extensions import EventListener
# CAPTURE ONLY KEY PRESSES
events = [{'event': 'keydown', 'props': ['key']}]
input_listener = EventListener(
dcc.Textarea(id="delay", value='0', style={'width': 100, 'height': 20}),
#html.Button("Manual Movement", id='manual', n_clicks=0,
# style={'background-color': 'red', 'font-size': '55px', 'width': '300px', 'height': '150px',
# 'vertical-align': 'top', 'display': 'inline-block'}),
id='input_listener', events=events
)
[docs]
class RoboDashApp:
app: DashProxy
robot: RoboInterface
def __init__(self, robo_interface: RoboInterface, port=8055):
self.last_pos = robo_interface.current_coordinates
self.dot_string = "digraph {\n\thello -> world\n}"
self.robot = robo_interface
self.app = DashProxy("Robot Movement Visualization", prevent_initial_callbacks=True,
transforms=[MultiplexerTransform()])
self.p = Thread(target=self.app.run, daemon=True, kwargs=dict(debug=False, port=port))
self.app.layout = html.Div(children=[
input_listener,
html.Div(id='curr', children=f'Current Coordinates: {self.robot.current_coordinates}'),
html.H1(children='Current Graph:'),
dash_interactive_graphviz.DashInteractiveGraphviz(
id="network", engine='dot',
dot_source=self.dot_string
),
html.Div(id='node', children='No node selected'),
dcc.Interval(id='interval-component', interval=500, n_intervals=0),
dcc.Interval(id='interval_50', interval=50, n_intervals=0),
])
@self.app.callback(
Output(component_id='curr', component_property='children'),
Input(component_id='interval_50', component_property='n_intervals')
)
def update_coords(n_intervals):
new_coord = self.robot.current_coordinates
if not list(new_coord.values()) == list(self.last_pos.values()):
self.last_pos = new_coord
return f"Coordinates: {new_coord}"
else:
return no_update
@self.app.callback(
Output(component_id='network', component_property='dot_source'),
Input(component_id='interval-component', component_property='n_intervals')
)
def refresh_graph(n_intervals):
try:
nx_graph = self.robot.graph_manager.g
dot = Graph(comment="Position Network")
for name, data in nx_graph.nodes(data=True):
color = 'grey'
if name in self.robot.next_positions:
color = 'orange'
if name == self.robot.next_positions[-1]:
color = 'red'
if name == self.robot.current_position:
color = 'yellow'
dot.node(name, label=name, color=color, style='filled')
for u, v , data in nx_graph.edges(data=True):
dot.edge(u, v, label=f"d = {round(data['dist'], 2)}")
dot.format = "png"
fig_str = str(dot)
# only refresh if there was a change
if self.dot_string == fig_str:
return no_update
else:
return fig_str
except Exception as ex:
print(ex, traceback.print_exc())
return no_update
@self.app.callback(
Output(component_id='node', component_property='children'),
Input(component_id='network', component_property='selected_node'),
)
def select_operation(selected_node):
return f"Coordinates: {self.robot.graph_manager.get_coordinates(selected_node)}"
@self.app.callback(
Output(component_id="delay", component_property="value"),
Input(component_id="input_listener", component_property="n_events"),
State(component_id="input_listener", component_property="event"),
)
def some_random_name(n_events, event):
"""
Used to move the robotic arm manually. I.e. control its joints with keys
:param n_events:
:param event:
:return:
"""
# in case the robot is moving, it has to stop immediately
if self.robot.next_positions:
self.robot.stop_moving()
# after manual movement we have no idea, where the robot is
if self.robot.current_position:
self.robot.current_position = None
self.robot.manual_mover.receive_key(event['key'])
return "piep"
[docs]
def run(self):
if self.p.is_alive():
logging.warning('Server is already running. Restarting server')
self.stop()
logging.getLogger('werkzeug').setLevel(logging.ERROR)
self.p.start()
[docs]
def stop(self):
print("Sorry, I don't know, how to stop")