Source code for genericroboticarm.control.robo_dash

"""
Dash visualization of what the position graph of a robotic arm looks like and what its current movements are.
"""
import traceback

from dash import dcc, html, no_update
from dash.dependencies import State, Input, Output
from dash_extensions.enrich import MultiplexerTransform, DashProxy
import dash_interactive_graphviz
from genericroboticarm.robo_APIs.robo_interface import RoboInterface
from threading import Thread
import logging
from graphviz import Graph
from dash_extensions import EventListener

# CAPTURE ONLY KEY PRESSES
events = [{'event': 'keydown', 'props': ['key']}]
input_listener = EventListener(
    dcc.Textarea(id="delay", value='0', style={'width': 100, 'height': 20}),
    #html.Button("Manual Movement", id='manual', n_clicks=0,
    #            style={'background-color': 'red', 'font-size': '55px', 'width': '300px', 'height': '150px',
    #                   'vertical-align': 'top', 'display': 'inline-block'}),
    id='input_listener', events=events
)


[docs] class RoboDashApp: app: DashProxy robot: RoboInterface def __init__(self, robo_interface: RoboInterface, port=8055): self.last_pos = robo_interface.current_coordinates self.dot_string = "digraph {\n\thello -> world\n}" self.robot = robo_interface self.app = DashProxy("Robot Movement Visualization", prevent_initial_callbacks=True, transforms=[MultiplexerTransform()]) self.p = Thread(target=self.app.run, daemon=True, kwargs=dict(debug=False, port=port)) self.app.layout = html.Div(children=[ input_listener, html.Div(id='curr', children=f'Current Coordinates: {self.robot.current_coordinates}'), html.H1(children='Current Graph:'), dash_interactive_graphviz.DashInteractiveGraphviz( id="network", engine='dot', dot_source=self.dot_string ), html.Div(id='node', children='No node selected'), dcc.Interval(id='interval-component', interval=500, n_intervals=0), dcc.Interval(id='interval_50', interval=50, n_intervals=0), ]) @self.app.callback( Output(component_id='curr', component_property='children'), Input(component_id='interval_50', component_property='n_intervals') ) def update_coords(n_intervals): new_coord = self.robot.current_coordinates if not list(new_coord.values()) == list(self.last_pos.values()): self.last_pos = new_coord return f"Coordinates: {new_coord}" else: return no_update @self.app.callback( Output(component_id='network', component_property='dot_source'), Input(component_id='interval-component', component_property='n_intervals') ) def refresh_graph(n_intervals): try: nx_graph = self.robot.graph_manager.g dot = Graph(comment="Position Network") for name, data in nx_graph.nodes(data=True): color = 'grey' if name in self.robot.next_positions: color = 'orange' if name == self.robot.next_positions[-1]: color = 'red' if name == self.robot.current_position: color = 'yellow' dot.node(name, label=name, color=color, style='filled') for u, v , data in nx_graph.edges(data=True): dot.edge(u, v, label=f"d = {round(data['dist'], 2)}") dot.format = "png" fig_str = str(dot) # only refresh if there was a change if self.dot_string == fig_str: return no_update else: return fig_str except Exception as ex: print(ex, traceback.print_exc()) return no_update @self.app.callback( Output(component_id='node', component_property='children'), Input(component_id='network', component_property='selected_node'), ) def select_operation(selected_node): return f"Coordinates: {self.robot.graph_manager.get_coordinates(selected_node)}" @self.app.callback( Output(component_id="delay", component_property="value"), Input(component_id="input_listener", component_property="n_events"), State(component_id="input_listener", component_property="event"), ) def some_random_name(n_events, event): """ Used to move the robotic arm manually. I.e. control its joints with keys :param n_events: :param event: :return: """ # in case the robot is moving, it has to stop immediately if self.robot.next_positions: self.robot.stop_moving() # after manual movement we have no idea, where the robot is if self.robot.current_position: self.robot.current_position = None self.robot.manual_mover.receive_key(event['key']) return "piep"
[docs] def run(self): if self.p.is_alive(): logging.warning('Server is already running. Restarting server') self.stop() logging.getLogger('werkzeug').setLevel(logging.ERROR) self.p.start()
[docs] def stop(self): print("Sorry, I don't know, how to stop")