# Generated by sila2.code_generator; sila2.__version__: 0.12.2
from __future__ import annotations
from datetime import timedelta, datetime
from typing import TYPE_CHECKING, List
from sila2.server import MetadataDict, ObservableCommandInstance
from ..generated.labwaretransfermanipulatorcontroller import (
GetLabware_Responses,
HandoverPosition,
LabwareTransferManipulatorControllerBase,
PositionIndex,
PrepareForInput_Responses,
PrepareForOutput_Responses,
PutLabware_Responses,
InvalidCommandSequence,
)
from genericroboticarm.robo_APIs import InteractiveTransfer
if TYPE_CHECKING:
from ..server import Server
[docs]
class LabwareTransferManipulatorControllerImpl(LabwareTransferManipulatorControllerBase):
robot : InteractiveTransfer
def __init__(self, parent_server: Server) -> None:
super().__init__(parent_server=parent_server)
assert isinstance(parent_server.robot, InteractiveTransfer)
self.robot = parent_server.robot
# Default lifetime of observable command instances. Possible values:
# None: Command instance is valid and stored in memory until server shutdown
# datetime.timedelta: Command instance is deleted after this duration, can be increased during command runtime
self.PrepareForInput_default_lifetime_of_execution = timedelta(minutes=30)
self.PrepareForOutput_default_lifetime_of_execution = timedelta(minutes=30)
self.PutLabware_default_lifetime_of_execution = timedelta(minutes=30)
self.GetLabware_default_lifetime_of_execution = timedelta(minutes=30)
[docs]
def get_AvailableHandoverPositions(self, *, metadata: MetadataDict) -> List[HandoverPosition]:
return self.robot.available_handover_positions
[docs]
def get_NumberOfInternalPositions(self, *, metadata: MetadataDict) -> int:
return self.robot.number_internal_positions
[docs]
def PrepareForOutput(
self,
HandoverPosition: HandoverPosition,
InternalPosition: PositionIndex,
*,
metadata: MetadataDict,
instance: ObservableCommandInstance,
) -> PrepareForOutput_Responses:
# set execution status from `waiting` to `running`
instance.begin_execution()
print(f"{datetime.now()}: Preparing to place plate")
preparation_succeeded = self.robot.prepare_for_output(
internal_pos=InternalPosition,
device=HandoverPosition.Position,
position=HandoverPosition.SubPosition,
)
if not preparation_succeeded:
self.robot.cancel_transfer()
raise InvalidCommandSequence("Robot failed to prepare for output.")
[docs]
def PutLabware(
self,
HandoverPosition: HandoverPosition,
IntermediateActions: List[str],
*,
metadata: MetadataDict,
instance: ObservableCommandInstance,
) -> PutLabware_Responses:
# set execution status from `waiting` to `running`
instance.begin_execution()
print(f"{datetime.now()}: Delivering plate")
self.robot.put_labware(
intermediate_actions=IntermediateActions,
device=HandoverPosition.Position,
position=HandoverPosition.SubPosition - 1,
)
[docs]
def GetLabware(
self,
HandoverPosition: HandoverPosition,
IntermediateActions: List[str],
*,
metadata: MetadataDict,
instance: ObservableCommandInstance,
) -> GetLabware_Responses:
# set execution status from `waiting` to `running`
instance.begin_execution()
print(f"{datetime.now()}: Taking plate")
self.robot.get_labware(
intermediate_actions=IntermediateActions,
device=HandoverPosition.Position,
position=HandoverPosition.SubPosition - 1,
)