"""
A implementation of RobotInterface for a Precise Flex 400.
The current communication is designed nor non-verbose mode.
Manual movement follows carthesian coordianates while automatic movements follow joint coordinates.
"""
from __future__ import annotations
import time
import traceback
from typing import List, Tuple, Optional, Dict
import socket
import logging
from genericroboticarm.control.graph_manager import JointState
from genericroboticarm.robo_APIs import InteractiveRobot
from .pf_simulation import PFSimulation
from threading import Lock
from datetime import datetime
from dataclasses import dataclass
HOST = "10.10.0.98"
PORT = 10100
[docs]
@dataclass
class MoveProfile:
Speed: int
Speed2: int = 0
Accel: int = 50
AccRamp: float = .1
Decel: int = 50
DecRamp: float = .1
InRange: int = -1 # -1 is the smoothest and quickest movement possible
Straight: int = 0 # -1 means straight line, 0 means joint-based path. Settings to -1 can cause Error 1042
# profile_number: Tuple (speed and speed2)
movement_profiles: Dict[int, MoveProfile] = {
1: MoveProfile(Speed = 10),
2: MoveProfile(Speed = 50),
}
[docs]
class PFSocket:
socket: Optional[socket.socket]
BYTES_BUFFER = 1024
def __init__(self, host: str, port: int):
self.com_lock = Lock()
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.socket = socket.socket(af, socktype, proto)
print("socket: ", self.socket)
except OSError as msg:
self.socket = None
logging.debug(f"{msg} ... {traceback.print_exc()}")
print(f"{msg} ... {traceback.print_exc()}")
continue
try:
print("sa:", sa)
self.socket.connect(sa)
except OSError as msg:
print("OSError: ", msg)
self.socket.close()
self.socket = None
continue
break
if self.socket is None:
print('Could not open socket')
else:
print('Socket connected. Connecting to robot...')
self.connect()
self.configure_movement_profiles()
[docs]
def connect(self):
"""
Enables the robot high power. Returns an error if power does not come on within the specified
timeout.
Also attaches the robot. The robot must be attached to allow motion commands.
"""
power_timeout = 60
# 0 or omitted = do not wait for power to come on.
# > 0 = wait this many seconds for power to come on.
# -1 = wait indefinitely for power to come on.
self.send_command(cmd=f'hp 1 {power_timeout}')
# self.send_command(cmd="home")
self.send_command(cmd='attach 1')
self.send_command("mode 0") # set non-verbose mode
print('Connected device successfully!')
[docs]
def disconnect(self):
"""
Also attaches the robot. The robot must be attached to allow motion commands and disables the robot high power.
"""
self.send_command(cmd='attach 0')
self.send_command(cmd='hp 0')
self.send_command(cmd='exit')
print('Disconnected device successfully!')
[docs]
def send_command(self, cmd: str, try_number: int = 0) -> tuple[int, str]:
with self.com_lock:
start_time = time.time()
self.socket.send(str.encode(cmd + "\n"))
time.sleep(.01)
answer = self.receive_data()
str_answer = answer.decode().strip()
if "\n" in str_answer:
# this is probably some leftovers from the last communication
logging.debug(f"old answer to cmd {cmd}: |{str_answer}|")
str_answer = str_answer.split("\n")[-1]
if "where" not in cmd: # this would spam too much
logging.debug(f"{datetime.now()}: Command: {cmd}, Answer: {str_answer} after {time.time() - start_time} seconds")
try:
# errors are defined by negative numbers while 0 means OK
parts = str_answer.split(maxsplit=1)
error_code = int(parts[0])
# the raw answer might just consis of the return code
str_answer = parts[1].strip() if len(parts) > 1 else ""
if error_code < 0:
logging.error(f"Error in cmd: {cmd}\nanswer: {str_answer}")
if try_number < 3:
handling_successful = self.error_handling(error_code, try_number=try_number)
if handling_successful:
# try again
error_code, str_answer = self.send_command(cmd, try_number=try_number + 1)
return error_code, str_answer
except ValueError:
logging.warning(f"answer {str_answer.split()[0]} to cmd: {cmd} is no int")
return -666, str_answer
[docs]
def error_handling(self, error_code, try_number: int = 0) -> bool:
""" Tries to fix the error and returns whether progress was made. """
answer = -1
if error_code == -1021: # robot not homed
logging.info("Robot no homed. trying homing.")
answer = self.send_command("home", try_number=try_number + 1)
success = answer == 0
return success
[docs]
def receive_data(self):
return self.socket.recv(self.BYTES_BUFFER)
def __del__(self):
self.socket.shutdown(0)
self.socket.close()
[docs]
class PFImplementation(InteractiveRobot):
socket: PFSocket | PFSimulation
GRIPPER_CLOSED: float = 74
GRIPPER_OPEN: float = 85
MAX_Z_COORDINATE: float = 750.0
def __init__(self, start_simulating: bool = True, host: str = HOST, port: int = PORT):
super().__init__(start_simulating)
if start_simulating:
self.socket = PFSimulation()
else:
self.init_connection(host, port)
self.move_profile = 2
[docs]
@classmethod
def get_name(cls) -> str:
return "PF400"
[docs]
@classmethod
def get_used_cli_args(cls) -> dict[str, str]:
return {"start_simulating": "simulation", "host": "robot_ip", "port": "robot_port"}
@property
def name(self)-> str:
if self.is_in_simulation_mode:
# this ensures separate saved positions and interactive device list fir simulation and real mode
return self.get_name() + "_sim"
else:
return self.get_name()
# RoboInterface
[docs]
def init_connection(self, host: str, port: int):
self.socket = PFSocket(host, port)
@property
def joint_names(self) -> list[str]:
return ["x", "y", "z", "dir", "joint2", "joint3"]
# TODO: the computation of distances should be customized
[docs]
def send_command(self, cmd: str, error_sensitive: bool = True) -> str:
_code, answer = self.socket.send_command(cmd)
if _code < 0 and error_sensitive:
raise RuntimeError(f"Error {_code}: {answer} in command {cmd}")
return answer
[docs]
def stop_moving(self):
self.send_command("halt")
super().stop_moving()
[docs]
def set_speed(self, new_speed: float):
super().set_speed(new_speed)
self.send_command(f"Speed {self.move_profile} {new_speed}")
[docs]
def set_acceleration(self, new_acceleration: float, **kwargs):
raise NotImplementedError
[docs]
def move_relative(self, movements: List[Tuple[str, float]], **kwargs):
carth_coords = ["x", "y", "z", "dir"]
move_dict = dict(movements)
if any(coord in move_dict for coord in carth_coords):
# use carthesian coordinates together with destC for manual movements without rail
destination_str = self.send_command("DestC")
dest_coordinates = destination_str.split(" ")[0:6]
# manipulate the destination coordinates
for idx, coord in enumerate(carth_coords):
diff = move_dict.get(coord, 0)
curr_dest = float(dest_coordinates[idx])
new_dest = curr_dest + diff * self.speed/100
# there is a maximum height
if coord == "z":
new_dest = max(0., min(self.MAX_Z_COORDINATE, new_dest))
dest_coordinates[idx] = str(new_dest)
position_str = " ".join(dest_coordinates)
cmd = f"MoveC {self.move_profile} {position_str}"
else:
destj_str = self.send_command("DestJ")
destj_positions = [float(pos) for pos in destj_str.split(" ")[0:5]]
destj_positions[1] += move_dict.get("joint2", 0) * self.speed/100
destj_positions[2] += move_dict.get("joint3", 0) * self.speed/100
position_str = " ".join(str(pos) for pos in destj_positions)
cmd = f"MoveJ {self.move_profile} {position_str}"
self.send_command(cmd)
[docs]
def move_joint(self, joint_index: int, target_angle: float) -> str:
"""
Moves a specific joint to a specific angle.
:param joint_index:
:param target_angle:
:return:
"""
jmove_command = f"MoveOneAxis {joint_index+1} {target_angle} {self.move_profile}"
return self.send_command(jmove_command)
[docs]
def finish_movement(self):
self.send_command("waitForEom")
[docs]
def grip_close(self):
self.finish_movement()
self.move_joint(4, self.GRIPPER_CLOSED)
[docs]
def grip_open(self):
self.finish_movement()
self.move_joint(4, self.GRIPPER_OPEN)
@property
def gripper_fully_closed(self) -> bool:
"""Tests whether the gripper is closed further """
answer = self.send_command("wherej")
gripper_pos = answer.split(" ")[4]
tolerance = .2
return abs(float(gripper_pos) - self.GRIPPER_CLOSED) < tolerance
[docs]
def move_to_position(self, target_position, offset = None):
super().move_to_position(target_position, offset)
# for the pf, the move-command answeres as soon as is start the movement, so we wait until it finished before returning from this method
self.finish_movement()
def move_to_coordinates(self, coords: JointState, **kwargs) -> None:
position_str = " ".join(str(s) for s in coords.values())
cmd = f'MoveC {self.move_profile} {position_str}'
self.send_command(cmd)
@property
def current_coordinates(self) -> JointState:
# internally work in joint space when on normal operation
joint_position_str = self.send_command("wherej")
joint_positions = joint_position_str.split(' ')[0:6]
carth_position_str = self.send_command("wherec")
carth_positions = carth_position_str.split(' ')[0:6]
# the gripper is the 5th joint but should not be included in movements
joint_state = dict(
x=float(carth_positions[0]),
y=float(carth_positions[1]),
z=float(joint_positions[0]),
dir=float(joint_positions[3]),
joint2=float(joint_positions[1]),
joint3=float(joint_positions[2]),
)
return joint_state
[docs]
def move_to_coordinates(self, coords: JointState, **kwargs) -> None:
destj_str = self.send_command("DestJ")
gripper_pos = destj_str.split()[4] # 5th joint
# we move in the joint space
target_pos = [coords["z"], coords["joint2"], coords["joint3"], coords["dir"], gripper_pos]
position_str = " ".join(str(s) for s in target_pos)
cmd = f'MoveJ {self.move_profile} {position_str}'
self.send_command(cmd)
@property
def number_internal_positions(self) -> int:
return 1
[docs]
def put_labware(self, intermediate_actions: List[str], device: str, position: int):
position_identifier = self.site_to_position_identifier(device, position)
self.place_at_position(position_identifier)
[docs]
def get_labware(self, intermediate_actions: List[str], device: str, position: int):
position_identifier = self.site_to_position_identifier(device, position)
self.pick_at_position(position_identifier)
[docs]
def prepare_for_output(self, internal_pos: int, device: str, position: int) -> bool:
# just wait until no movement is executing
with self.move_lock:
return True
[docs]
def cancel_transfer(self):
pass
[docs]
def run_custom_command(self, cmd: str) -> str:
return self.send_command(cmd, error_sensitive=False)