"""
Main class to represent a dorna robotic arm
"""
from __future__ import annotations
import time
from typing import Optional, List, Tuple, Dict, Callable
from abc import ABC, abstractmethod
from genericroboticarm.control.device_interaction import DeviceList
from genericroboticarm.control.graph_manager import GraphManager, JointState
from genericroboticarm.control.manual_movement import ManualMovementManager
from sila2.client import ClientObservableCommandInstance
from sila2.framework import CommandExecutionStatus
from threading import Lock
import logging
[docs]
def finish_successfully(cmd_infos: list[ClientObservableCommandInstance]) -> bool:
while not all(cmd_info.done for cmd_info in cmd_infos):
time.sleep(.05)
success = all(cmd_info.status == CommandExecutionStatus.finishedSuccessfully for cmd_info in cmd_infos)
return success
[docs]
class RoboInterface(ABC):
current_position: Optional[str] = None
next_positions: List[str]
def __init__(self, start_simulating: bool = True, **kwargs):
self.is_in_simulation_mode = start_simulating
self.precision = .3 # 1 is normal, higher means more intermediate steps of movements
self.speed = 100 # in percent
self.halt = False
self.next_positions = []
graph_name = self.name
self.graph_manager = GraphManager(name=self.name)
self.manual_mover = ManualMovementManager(self, joint_names=self.joint_names)
self.move_lock = Lock()
self.interacting_devices = DeviceList(self.name)
@property
def name(self)-> str:
return self.get_name()
[docs]
@classmethod
@abstractmethod
def get_name(cls) -> str:
"""
The name will be used as parameter in command line startup (-r),
as sila server name and to save the position graph.
:return: the name
"""
[docs]
@classmethod
@abstractmethod
def get_used_cli_args(cls) -> dict[str, str]:
"""
Defines the arguments that are forwarded from the command line to the robots constructor.
:return: a dictionary of type [constructor parameter name, command line argument name]
"""
[docs]
@abstractmethod
def init_connection(self, **kwargs):
"""
Sets up the hardware connection
:return:
"""
@property
@abstractmethod
def joint_names(self) -> list[str]:
"""
Returns a list of names of the individual coordinates used by the robot.
For example [x, y, z, roll, pitch, grip] or [j1, j2, j3, j4, j5]
:return:
"""
[docs]
def set_metrik(self, metrik: Callable[[JointState, JointState], float]):
"""
Sets the way
:param metrik:
:return:
"""
self.graph_manager.dist = metrik
self.graph_manager.update_distances()
[docs]
def stop_moving(self):
"""
Immediately(Or as quickly as possible) stops all movements
:return:
"""
print("Immediately stopped moving")
self.next_positions = []
# wait for movement command to finish and nullify the current position (we do not know where we stopped)
with self.move_lock:
self.current_position = None
[docs]
def set_speed(self, new_speed: float):
"""
Set the speed
:param new_speed:
:return:
"""
self.speed = new_speed
[docs]
def set_acceleration(self, new_acceleration: float, **kwargs):
"""
:param new_acceleration:
:return:
"""
[docs]
def move_to_nearest_known_position(self, offset: Optional[Dict[str, float]]=None):
"""
:return:
"""
nearest = self.graph_manager.get_nearest_position(self.current_coordinates)
self.move_straight_to_position(nearest, offset=offset)
self.current_position = nearest
[docs]
@abstractmethod
def move_relative(self, movements: List[Tuple[str, float]], **kwargs):
"""
:param movements:
:return:
"""
[docs]
def move_to_position(self, target_position: str, offset: Optional[Dict[str, float]]=None):
"""
:param offset:
:param target_position:
:return:
"""
print(f"moving to {target_position}.")
# check whether the position exists
if not self.graph_manager.position_known(target_position):
logging.error(f"Position {target_position} is unknown.")
raise ValueError(f"Position {target_position} is unknown.")
with self.move_lock:
# move to nearest known position if not currently at a known position
if not self.current_position:
start = self.graph_manager.get_nearest_position(self.current_coordinates)
self.next_positions = [start]
else:
start = self.current_position
self.next_positions = []
# find the shortest path to target
path = self.graph_manager.get_shortest_path(start, target_position)
# add the path to next_positions or move directly to the target if no path exists
if path:
self.next_positions.extend(path)
else:
self.next_positions.append(target_position)
# move to target
while self.next_positions:
self.move_straight_to_position(self.next_positions[0], offset=offset)
self.current_position = self.next_positions[0]
self.next_positions.pop(0)
[docs]
@abstractmethod
def grip_close(self, **kwargs):
"""
:return:
"""
[docs]
@abstractmethod
def grip_open(self, **kwargs):
"""
:return:
"""
[docs]
def pick_from_device(self, device: str, pos: int, offset: Optional[Dict[str, float]]=None):
"""
:param device:
:param pos:
:param offset:
:return:
"""
source_interactive = device in self.interacting_devices.devices
cmd_to_finish = []
if source_interactive:
source_client = self.interacting_devices.get_client(device)
if not source_client:
logging.error(f"Could not connect to source device {device}")
return
cmd_to_finish.append(source_client.LabwareTransferSiteController.PrepareForOutput((device, 1), pos+1))
# do this while the source device is preparing as well
if hasattr(self, "prepare_for_input"):
self.prepare_for_input(0, device, pos+1, "plate_type")
if not finish_successfully(cmd_to_finish):
logging.error("Preparations for labware transfer failed")
if hasattr(self, "cancel_transfer"):
self.cancel_transfer()
raise Exception("Preparations for labware transfer failed")
position_identifier = self.site_to_position_identifier(device, pos)
self.pick_at_position(position_identifier, offset)
if source_interactive:
source_client.LabwareTransferSiteController.LabwareRemoved((device, pos+1))
[docs]
def pick_at_position(self, identifier: str, offset: Optional[Dict[str, float]]=None):
"""
Opens the gripper, moves to a specified location and closes the gripper there
:param identifier: Position identifier
:param offset:
:return:
"""
if not self.graph_manager.position_known(identifier):
raise ValueError(f"Position {identifier} is unknown.")
self.grip_open()
self.move_to_position(identifier, offset=offset)
self.grip_close()
[docs]
def place_at_device(self, device: str, pos: int, offset: Optional[Dict[str, float]]=None,
plate_type: str = "plate_type"):
"""
:param plate_type:
:param device:
:param pos:
:param offset:
:return:
"""
target_interactive = device in self.interacting_devices.devices
cmd_to_finish = []
if target_interactive:
target_client = self.interacting_devices.get_client(device)
if not target_client:
logging.error(f"Could not connect to target device {device}")
return
cmd_to_finish.append(target_client.LabwareTransferSiteController.PrepareForInput(
(device, 1), pos+1, plate_type, "uuid"))
# do this while the target device is preparing as well
if hasattr(self, "prepare_for_output"):
self.prepare_for_output(0, device, pos+1)
if not finish_successfully(cmd_to_finish):
logging.error("Preparations for labware transfer failed")
if hasattr(self, "cancel_transfer"):
self.cancel_transfer()
raise Exception("Preparations for labware transfer failed")
position_identifier = self.site_to_position_identifier(device, pos)
self.place_at_position(position_identifier, offset)
if target_interactive:
target_client.LabwareTransferSiteController.LabwareDelivered((device, pos+1))
[docs]
def place_at_position(self, identifier: str, offset: Optional[Dict[str, float]]=None):
"""
Opens the gripper, moves to a specified location and opens the gripper there
:param identifier: Position identifier
:param offset:
:return:
"""
if not self.graph_manager.position_known(identifier):
logging.error(f"Position {identifier} is unknown.")
return
self.move_to_position(identifier, offset=offset)
self.grip_open()
[docs]
def site_to_position_identifier(self, device: str, slot: int) -> str:
logging.warning("Using the default translation of simply concatenating device_name and slot number")
return f"{device}{slot}"
[docs]
def move_straight_to_position(self, identifier: str, offset: Optional[Dict[str, float]]=None):
"""
Moves the robot straight to the specified position, i.e. queries its coordinates, adds the offset and call
move_to_coordinates() on the result.
:param offset:
:param identifier:
:return:
"""
if identifier not in self.graph_manager.g.nodes:
logging.warning(f"Unknown position: {identifier}")
if not offset:
offset = dict()
if not self.halt:
target_coord = self.graph_manager.get_coordinates(identifier)
for joint, diff in offset.items():
if joint not in target_coord:
logging.warning(f"{joint} is an unknown joint name in offset")
continue
target_coord[joint] += diff
self.move_to_coordinates(target_coord)
[docs]
@abstractmethod
def move_to_coordinates(self, coords: JointState, **kwargs) -> None:
"""
:param coords:
:return:
"""
@property
@abstractmethod
def current_coordinates(self) -> JointState:
"""
:return:
"""
[docs]
def end_connection(self, **kwargs):
"""
Is executed when the server stops
:return:
"""
[docs]
def run_custom_command(self, cmd: str) -> str:
"""
Runs a custom command on the hardware. The implementation for a specific hardware might look very different
and the response must be a string. Be default returns 'Not Implemented'. Use on your own risk.
:param cmd:
:return:
"""
return "Not Implemented"