Source code for genericroboticarm.robo_APIs.precise_flex.pf_implementation

"""
A implementation of RobotInterface for a Precise Flex 400. 
The current communication is designed nor non-verbose mode.
Manual movement follows carthesian coordianates while automatic movements follow joint coordinates.
"""
from __future__ import annotations

import time
import traceback
from typing import List, Tuple, Optional, Dict
import socket
import logging
from genericroboticarm.control.graph_manager import JointState
from genericroboticarm.robo_APIs import InteractiveRobot
from .pf_simulation import PFSimulation
from threading import Lock
from datetime import datetime
from dataclasses import dataclass


HOST = "10.10.0.98"
PORT = 10100


[docs] @dataclass class MoveProfile: Speed: int Speed2: int = 0 Accel: int = 50 AccRamp: float = .1 Decel: int = 50 DecRamp: float = .1 InRange: int = -1 # -1 is the smoothest and quickest movement possible Straight: int = 0 # -1 means straight line, 0 means joint-based path. Settings to -1 can cause Error 1042
# profile_number: Tuple (speed and speed2) movement_profiles: Dict[int, MoveProfile] = { 1: MoveProfile(Speed = 10), 2: MoveProfile(Speed = 50), }
[docs] class PFSocket: socket: Optional[socket.socket] BYTES_BUFFER = 1024 def __init__(self, host: str, port: int): self.com_lock = Lock() for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: self.socket = socket.socket(af, socktype, proto) print("socket: ", self.socket) except OSError as msg: self.socket = None logging.debug(f"{msg} ... {traceback.print_exc()}") print(f"{msg} ... {traceback.print_exc()}") continue try: print("sa:", sa) self.socket.connect(sa) except OSError as msg: print("OSError: ", msg) self.socket.close() self.socket = None continue break if self.socket is None: print('Could not open socket') else: print('Socket connected. Connecting to robot...') self.connect() self.configure_movement_profiles()
[docs] def connect(self): """ Enables the robot high power. Returns an error if power does not come on within the specified timeout. Also attaches the robot. The robot must be attached to allow motion commands. """ power_timeout = 60 # 0 or omitted = do not wait for power to come on. # > 0 = wait this many seconds for power to come on. # -1 = wait indefinitely for power to come on. self.send_command(cmd=f'hp 1 {power_timeout}') # self.send_command(cmd="home") self.send_command(cmd='attach 1') self.send_command("mode 0") # set non-verbose mode print('Connected device successfully!')
[docs] def disconnect(self): """ Also attaches the robot. The robot must be attached to allow motion commands and disables the robot high power. """ self.send_command(cmd='attach 0') self.send_command(cmd='hp 0') self.send_command(cmd='exit') print('Disconnected device successfully!')
[docs] def configure_movement_profiles(self): for index, profile in movement_profiles.items(): self.send_command( f"Profile {index} {profile.Speed} {profile.Speed2} {profile.Accel} {profile.Decel} " f"{profile.AccRamp} {profile.DecRamp} {profile.InRange} {profile.Straight}" )
[docs] def send_command(self, cmd: str, try_number: int = 0) -> tuple[int, str]: with self.com_lock: start_time = time.time() self.socket.send(str.encode(cmd + "\n")) time.sleep(.01) answer = self.receive_data() str_answer = answer.decode().strip() if "\n" in str_answer: # this is probably some leftovers from the last communication logging.debug(f"old answer to cmd {cmd}: |{str_answer}|") str_answer = str_answer.split("\n")[-1] if "where" not in cmd: # this would spam too much logging.debug(f"{datetime.now()}: Command: {cmd}, Answer: {str_answer} after {time.time() - start_time} seconds") try: # errors are defined by negative numbers while 0 means OK parts = str_answer.split(maxsplit=1) error_code = int(parts[0]) # the raw answer might just consis of the return code str_answer = parts[1].strip() if len(parts) > 1 else "" if error_code < 0: logging.error(f"Error in cmd: {cmd}\nanswer: {str_answer}") if try_number < 3: handling_successful = self.error_handling(error_code, try_number=try_number) if handling_successful: # try again error_code, str_answer = self.send_command(cmd, try_number=try_number + 1) return error_code, str_answer except ValueError: logging.warning(f"answer {str_answer.split()[0]} to cmd: {cmd} is no int") return -666, str_answer
[docs] def error_handling(self, error_code, try_number: int = 0) -> bool: """ Tries to fix the error and returns whether progress was made. """ answer = -1 if error_code == -1021: # robot not homed logging.info("Robot no homed. trying homing.") answer = self.send_command("home", try_number=try_number + 1) success = answer == 0 return success
[docs] def receive_data(self): return self.socket.recv(self.BYTES_BUFFER)
def __del__(self): self.socket.shutdown(0) self.socket.close()
[docs] class PFImplementation(InteractiveRobot): socket: PFSocket | PFSimulation GRIPPER_CLOSED: float = 74 GRIPPER_OPEN: float = 85 MAX_Z_COORDINATE: float = 750.0 def __init__(self, start_simulating: bool = True, host: str = HOST, port: int = PORT): super().__init__(start_simulating) if start_simulating: self.socket = PFSimulation() else: self.init_connection(host, port) self.move_profile = 2
[docs] @classmethod def get_name(cls) -> str: return "PF400"
[docs] @classmethod def get_used_cli_args(cls) -> dict[str, str]: return {"start_simulating": "simulation", "host": "robot_ip", "port": "robot_port"}
@property def name(self)-> str: if self.is_in_simulation_mode: # this ensures separate saved positions and interactive device list fir simulation and real mode return self.get_name() + "_sim" else: return self.get_name() # RoboInterface
[docs] def init_connection(self, host: str, port: int): self.socket = PFSocket(host, port)
@property def joint_names(self) -> list[str]: return ["x", "y", "z", "dir", "joint2", "joint3"] # TODO: the computation of distances should be customized
[docs] def send_command(self, cmd: str, error_sensitive: bool = True) -> str: _code, answer = self.socket.send_command(cmd) if _code < 0 and error_sensitive: raise RuntimeError(f"Error {_code}: {answer} in command {cmd}") return answer
[docs] def stop_moving(self): self.send_command("halt") super().stop_moving()
[docs] def set_speed(self, new_speed: float): super().set_speed(new_speed) self.send_command(f"Speed {self.move_profile} {new_speed}")
[docs] def set_acceleration(self, new_acceleration: float, **kwargs): raise NotImplementedError
[docs] def move_relative(self, movements: List[Tuple[str, float]], **kwargs): carth_coords = ["x", "y", "z", "dir"] move_dict = dict(movements) if any(coord in move_dict for coord in carth_coords): # use carthesian coordinates together with destC for manual movements without rail destination_str = self.send_command("DestC") dest_coordinates = destination_str.split(" ")[0:6] # manipulate the destination coordinates for idx, coord in enumerate(carth_coords): diff = move_dict.get(coord, 0) curr_dest = float(dest_coordinates[idx]) new_dest = curr_dest + diff * self.speed/100 # there is a maximum height if coord == "z": new_dest = max(0., min(self.MAX_Z_COORDINATE, new_dest)) dest_coordinates[idx] = str(new_dest) position_str = " ".join(dest_coordinates) cmd = f"MoveC {self.move_profile} {position_str}" else: destj_str = self.send_command("DestJ") destj_positions = [float(pos) for pos in destj_str.split(" ")[0:5]] destj_positions[1] += move_dict.get("joint2", 0) * self.speed/100 destj_positions[2] += move_dict.get("joint3", 0) * self.speed/100 position_str = " ".join(str(pos) for pos in destj_positions) cmd = f"MoveJ {self.move_profile} {position_str}" self.send_command(cmd)
[docs] def move_joint(self, joint_index: int, target_angle: float) -> str: """ Moves a specific joint to a specific angle. :param joint_index: :param target_angle: :return: """ jmove_command = f"MoveOneAxis {joint_index+1} {target_angle} {self.move_profile}" return self.send_command(jmove_command)
[docs] def finish_movement(self): self.send_command("waitForEom")
[docs] def grip_close(self): self.finish_movement() self.move_joint(4, self.GRIPPER_CLOSED)
[docs] def grip_open(self): self.finish_movement() self.move_joint(4, self.GRIPPER_OPEN)
@property def gripper_fully_closed(self) -> bool: """Tests whether the gripper is closed further """ answer = self.send_command("wherej") gripper_pos = answer.split(" ")[4] tolerance = .2 return abs(float(gripper_pos) - self.GRIPPER_CLOSED) < tolerance
[docs] def move_to_position(self, target_position, offset = None): super().move_to_position(target_position, offset) # for the pf, the move-command answeres as soon as is start the movement, so we wait until it finished before returning from this method self.finish_movement()
def move_to_coordinates(self, coords: JointState, **kwargs) -> None: position_str = " ".join(str(s) for s in coords.values()) cmd = f'MoveC {self.move_profile} {position_str}' self.send_command(cmd) @property def current_coordinates(self) -> JointState: # internally work in joint space when on normal operation joint_position_str = self.send_command("wherej") joint_positions = joint_position_str.split(' ')[0:6] carth_position_str = self.send_command("wherec") carth_positions = carth_position_str.split(' ')[0:6] # the gripper is the 5th joint but should not be included in movements joint_state = dict( x=float(carth_positions[0]), y=float(carth_positions[1]), z=float(joint_positions[0]), dir=float(joint_positions[3]), joint2=float(joint_positions[1]), joint3=float(joint_positions[2]), ) return joint_state
[docs] def move_to_coordinates(self, coords: JointState, **kwargs) -> None: destj_str = self.send_command("DestJ") gripper_pos = destj_str.split()[4] # 5th joint # we move in the joint space target_pos = [coords["z"], coords["joint2"], coords["joint3"], coords["dir"], gripper_pos] position_str = " ".join(str(s) for s in target_pos) cmd = f'MoveJ {self.move_profile} {position_str}' self.send_command(cmd)
@property def number_internal_positions(self) -> int: return 1
[docs] def put_labware(self, intermediate_actions: List[str], device: str, position: int): position_identifier = self.site_to_position_identifier(device, position) self.place_at_position(position_identifier)
[docs] def get_labware(self, intermediate_actions: List[str], device: str, position: int): position_identifier = self.site_to_position_identifier(device, position) self.pick_at_position(position_identifier)
[docs] def prepare_for_output(self, internal_pos: int, device: str, position: int) -> bool: # just wait until no movement is executing with self.move_lock: return True
[docs] def prepare_for_input(self, internal_pos: int, device: str, position: int, plate_type: str) -> bool: # just wait until no movement is executing with self.move_lock: return True
[docs] def cancel_transfer(self): pass
[docs] def run_custom_command(self, cmd: str) -> str: return self.send_command(cmd, error_sensitive=False)