Source code for genericroboticarm.control.device_interaction

import logging
import traceback

import simplejson
import os

from sila2.client import SilaClient
import time
from typing import Optional


[docs] class DeviceList: """ Manages the list of devices for implicit interaction for labware transfer. Automatically saves the list to interacting_devices/[robot_name].json """ devices: list[str] clients: dict[str, SilaClient] def __init__(self, robot_name: str): self.clients = dict() self.file_name = self.create_file_name(robot_name) self.load()
[docs] def create_file_name(self, robot_name: str): this_dir = os.path.dirname(__file__) file_name = f"{robot_name}.json" return os.path.join(this_dir, "interacting_devices", file_name)
[docs] def load(self): if os.path.isfile(self.file_name): with open(self.file_name, "r") as reader: self.devices = simplejson.loads(reader.read()) else: self.devices = []
[docs] def save(self): with open(self.file_name, "w") as writer: writer.write(simplejson.dumps(self.devices))
[docs] def add_device(self, device: str): if device not in self.devices: self.devices.append(device) self.save()
[docs] def remove_device(self, device: str): if device in self.devices: self.devices.remove(device) self.save()
[docs] def get_client(self, server_name: str, timeout: float = 5) -> Optional[SilaClient]: # check whether we already have a client if server_name in self.clients: # check whether the server is online try: name = self.clients[server_name].SiLAService.ServerName.get() assert name == server_name return self.clients[server_name] except Exception: logging.warning(f"Server {server_name} seems to be down: {traceback.print_exc()}") self.clients.pop(server_name) # reaching this code means we do not have a client for that server try: start = time.time() client = SilaClient.discover(server_name=server_name, insecure=True, timeout=timeout) logging.info(f"Found server after {time.time() - start} seconds.") print(f"Found server after {time.time() - start} seconds.") assert hasattr(client, "LabwareTransferSiteController") self.clients[server_name] = client return client except Exception: logging.error(f"Failed to connect to a server named {server_name}: {traceback.print_exc()}") return None