Source code for genericroboticarm.robo_APIs.robo_interface

"""
Main class to represent a dorna robotic arm
"""
from __future__ import annotations

import time
from typing import Optional, List, Tuple, Dict, Callable
from abc import ABC, abstractmethod

from genericroboticarm.control.device_interaction import DeviceList
from genericroboticarm.control.graph_manager import GraphManager, JointState
from genericroboticarm.control.manual_movement import ManualMovementManager
from sila2.client import ClientObservableCommandInstance
from sila2.framework import CommandExecutionStatus
from threading import Lock
import logging


[docs] def finish_successfully(cmd_infos: list[ClientObservableCommandInstance]) -> bool: while not all(cmd_info.done for cmd_info in cmd_infos): time.sleep(.05) success = all(cmd_info.status == CommandExecutionStatus.finishedSuccessfully for cmd_info in cmd_infos) return success
[docs] class RoboInterface(ABC): current_position: Optional[str] = None next_positions: List[str] def __init__(self, start_simulating: bool = True, **kwargs): self.is_in_simulation_mode = start_simulating self.precision = .3 # 1 is normal, higher means more intermediate steps of movements self.speed = 100 # in percent self.halt = False self.next_positions = [] graph_name = self.name self.graph_manager = GraphManager(name=self.name) self.manual_mover = ManualMovementManager(self, joint_names=self.joint_names) self.move_lock = Lock() self.interacting_devices = DeviceList(self.name) @property def name(self)-> str: return self.get_name()
[docs] @classmethod @abstractmethod def get_name(cls) -> str: """ The name will be used as parameter in command line startup (-r), as sila server name and to save the position graph. :return: the name """
[docs] @classmethod @abstractmethod def get_used_cli_args(cls) -> dict[str, str]: """ Defines the arguments that are forwarded from the command line to the robots constructor. :return: a dictionary of type [constructor parameter name, command line argument name] """
[docs] @abstractmethod def init_connection(self, **kwargs): """ Sets up the hardware connection :return: """
@property @abstractmethod def joint_names(self) -> list[str]: """ Returns a list of names of the individual coordinates used by the robot. For example [x, y, z, roll, pitch, grip] or [j1, j2, j3, j4, j5] :return: """
[docs] def set_metrik(self, metrik: Callable[[JointState, JointState], float]): """ Sets the way :param metrik: :return: """ self.graph_manager.dist = metrik self.graph_manager.update_distances()
[docs] def stop_moving(self): """ Immediately(Or as quickly as possible) stops all movements :return: """ print("Immediately stopped moving") self.next_positions = [] # wait for movement command to finish and nullify the current position (we do not know where we stopped) with self.move_lock: self.current_position = None
[docs] def set_speed(self, new_speed: float): """ Set the speed :param new_speed: :return: """ self.speed = new_speed
[docs] def set_acceleration(self, new_acceleration: float, **kwargs): """ :param new_acceleration: :return: """
[docs] def move_to_nearest_known_position(self, offset: Optional[Dict[str, float]]=None): """ :return: """ nearest = self.graph_manager.get_nearest_position(self.current_coordinates) self.move_straight_to_position(nearest, offset=offset) self.current_position = nearest
[docs] @abstractmethod def move_relative(self, movements: List[Tuple[str, float]], **kwargs): """ :param movements: :return: """
[docs] def move_to_position(self, target_position: str, offset: Optional[Dict[str, float]]=None): """ :param offset: :param target_position: :return: """ print(f"moving to {target_position}.") # check whether the position exists if not self.graph_manager.position_known(target_position): logging.error(f"Position {target_position} is unknown.") raise ValueError(f"Position {target_position} is unknown.") with self.move_lock: # move to nearest known position if not currently at a known position if not self.current_position: start = self.graph_manager.get_nearest_position(self.current_coordinates) self.next_positions = [start] else: start = self.current_position self.next_positions = [] # find the shortest path to target path = self.graph_manager.get_shortest_path(start, target_position) # add the path to next_positions or move directly to the target if no path exists if path: self.next_positions.extend(path) else: self.next_positions.append(target_position) # move to target while self.next_positions: self.move_straight_to_position(self.next_positions[0], offset=offset) self.current_position = self.next_positions[0] self.next_positions.pop(0)
[docs] @abstractmethod def grip_close(self, **kwargs): """ :return: """
[docs] @abstractmethod def grip_open(self, **kwargs): """ :return: """
[docs] def pick_from_device(self, device: str, pos: int, offset: Optional[Dict[str, float]]=None): """ :param device: :param pos: :param offset: :return: """ source_interactive = device in self.interacting_devices.devices cmd_to_finish = [] if source_interactive: source_client = self.interacting_devices.get_client(device) if not source_client: logging.error(f"Could not connect to source device {device}") return cmd_to_finish.append(source_client.LabwareTransferSiteController.PrepareForOutput((device, 1), pos+1)) # do this while the source device is preparing as well if hasattr(self, "prepare_for_input"): self.prepare_for_input(0, device, pos+1, "plate_type") if not finish_successfully(cmd_to_finish): logging.error("Preparations for labware transfer failed") if hasattr(self, "cancel_transfer"): self.cancel_transfer() raise Exception("Preparations for labware transfer failed") position_identifier = self.site_to_position_identifier(device, pos) self.pick_at_position(position_identifier, offset) if source_interactive: source_client.LabwareTransferSiteController.LabwareRemoved((device, pos+1))
[docs] def pick_at_position(self, identifier: str, offset: Optional[Dict[str, float]]=None): """ Opens the gripper, moves to a specified location and closes the gripper there :param identifier: Position identifier :param offset: :return: """ if not self.graph_manager.position_known(identifier): raise ValueError(f"Position {identifier} is unknown.") self.grip_open() self.move_to_position(identifier, offset=offset) self.grip_close()
[docs] def place_at_device(self, device: str, pos: int, offset: Optional[Dict[str, float]]=None, plate_type: str = "plate_type"): """ :param plate_type: :param device: :param pos: :param offset: :return: """ target_interactive = device in self.interacting_devices.devices cmd_to_finish = [] if target_interactive: target_client = self.interacting_devices.get_client(device) if not target_client: logging.error(f"Could not connect to target device {device}") return cmd_to_finish.append(target_client.LabwareTransferSiteController.PrepareForInput( (device, 1), pos+1, plate_type, "uuid")) # do this while the target device is preparing as well if hasattr(self, "prepare_for_output"): self.prepare_for_output(0, device, pos+1) if not finish_successfully(cmd_to_finish): logging.error("Preparations for labware transfer failed") if hasattr(self, "cancel_transfer"): self.cancel_transfer() raise Exception("Preparations for labware transfer failed") position_identifier = self.site_to_position_identifier(device, pos) self.place_at_position(position_identifier, offset) if target_interactive: target_client.LabwareTransferSiteController.LabwareDelivered((device, pos+1))
[docs] def place_at_position(self, identifier: str, offset: Optional[Dict[str, float]]=None): """ Opens the gripper, moves to a specified location and opens the gripper there :param identifier: Position identifier :param offset: :return: """ if not self.graph_manager.position_known(identifier): logging.error(f"Position {identifier} is unknown.") return self.move_to_position(identifier, offset=offset) self.grip_open()
[docs] def site_to_position_identifier(self, device: str, slot: int) -> str: logging.warning("Using the default translation of simply concatenating device_name and slot number") return f"{device}{slot}"
[docs] def move_straight_to_position(self, identifier: str, offset: Optional[Dict[str, float]]=None): """ Moves the robot straight to the specified position, i.e. queries its coordinates, adds the offset and call move_to_coordinates() on the result. :param offset: :param identifier: :return: """ if identifier not in self.graph_manager.g.nodes: logging.warning(f"Unknown position: {identifier}") if not offset: offset = dict() if not self.halt: target_coord = self.graph_manager.get_coordinates(identifier) for joint, diff in offset.items(): if joint not in target_coord: logging.warning(f"{joint} is an unknown joint name in offset") continue target_coord[joint] += diff self.move_to_coordinates(target_coord)
[docs] @abstractmethod def move_to_coordinates(self, coords: JointState, **kwargs) -> None: """ :param coords: :return: """
@property @abstractmethod def current_coordinates(self) -> JointState: """ :return: """
[docs] def end_connection(self, **kwargs): """ Is executed when the server stops :return: """
[docs] def run_custom_command(self, cmd: str) -> str: """ Runs a custom command on the hardware. The implementation for a specific hardware might look very different and the response must be a string. Be default returns 'Not Implemented'. Use on your own risk. :param cmd: :return: """ return "Not Implemented"