Source code for genericroboticarm.control.device_interaction
import logging
import traceback
import simplejson
import os
from sila2.client import SilaClient
import time
from typing import Optional
[docs]
class DeviceList:
"""
Manages the list of devices for implicit interaction for labware transfer.
Automatically saves the list to interacting_devices/[robot_name].json
"""
devices: list[str]
clients: dict[str, SilaClient]
def __init__(self, robot_name: str):
self.clients = dict()
self.file_name = self.create_file_name(robot_name)
self.load()
[docs]
def create_file_name(self, robot_name: str):
this_dir = os.path.dirname(__file__)
file_name = f"{robot_name}.json"
return os.path.join(this_dir, "interacting_devices", file_name)
[docs]
def load(self):
if os.path.isfile(self.file_name):
with open(self.file_name, "r") as reader:
self.devices = simplejson.loads(reader.read())
else:
self.devices = []
[docs]
def save(self):
with open(self.file_name, "w") as writer:
writer.write(simplejson.dumps(self.devices))
[docs]
def add_device(self, device: str):
if device not in self.devices:
self.devices.append(device)
self.save()
[docs]
def remove_device(self, device: str):
if device in self.devices:
self.devices.remove(device)
self.save()
[docs]
def get_client(self, server_name: str, timeout: float = 5) -> Optional[SilaClient]:
# check whether we already have a client
if server_name in self.clients:
# check whether the server is online
try:
name = self.clients[server_name].SiLAService.ServerName.get()
assert name == server_name
return self.clients[server_name]
except Exception:
logging.warning(f"Server {server_name} seems to be down: {traceback.print_exc()}")
self.clients.pop(server_name)
# reaching this code means we do not have a client for that server
try:
start = time.time()
client = SilaClient.discover(server_name=server_name, insecure=True, timeout=timeout)
logging.info(f"Found server after {time.time() - start} seconds.")
print(f"Found server after {time.time() - start} seconds.")
assert hasattr(client, "LabwareTransferSiteController")
self.clients[server_name] = client
return client
except Exception:
logging.error(f"Failed to connect to a server named {server_name}: {traceback.print_exc()}")
return None