"""_____________________________________________________________________
:PROJECT: genericroboticarm
* Main module command line interface *
:details: Main module command line interface.
!!! Warning: it should have a diffent name than the package name.
.. note:: -
________________________________________________________________________
"""
import asyncio
import argparse
import logging
import sys
from genericroboticarm import __version__
from genericroboticarm.robo_APIs import collect_robo_implementations
from genericroboticarm.control.robo_dash import RoboDashApp
logging.basicConfig(
format="%(levelname)-4s| %(module)s.%(funcName)s: %(message)s",
level=logging.WARNING,
)
[docs]
def parse_command_line():
"""Console script for genericroboticarm."""
description = "genericroboticarm"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("_", nargs="*")
parser.add_argument(
"-r",
"--robot",
action="store",
default="Dummy",
help="Name of the robot implementation (Dummy by default). Options are: Dummy, Dorna, XArm, "
"PF400, UppsalaXArm, JoyIt",
)
parser.add_argument("-a", "--address", action="store", default="127.0.0.1", help="IP address of the SiLA server")
parser.add_argument("-p", "--port", action="store", default=50051, help="Port of the SiLA server")
parser.add_argument(
"-s",
"--simulation",
action=argparse.BooleanOptionalAction,
default=False,
help="Set to True to start the robot in simulation mode",
)
parser.add_argument(
"-u",
"--unitelabs",
action=argparse.BooleanOptionalAction,
default=False,
help="Starts the sila interface as a unitelabs connector instead of sila_python server.",
)
parser.add_argument("--com-port", action="store", default=None,
help="Com port for the hardware. Only required for JoyIt.")
parser.add_argument("--robot-ip", action="store", default=None,
help="The ip for the robot hardware. Only required for XArm and, PF400 and Dorna")
parser.add_argument("--robot-port", action="store", default=None,
help="The port for the robot hardware. Only required for PF400")
parser.add_argument("-v", "--version", action="version", version="%(prog)s " + __version__)
return parser.parse_args()
[docs]
def main():
"""Creates and starts the connector application"""
logging.basicConfig(format="%(levelname)-4s| %(module)s.%(funcName)s: %(message)s", level=logging.DEBUG)
args = parse_command_line()
# select the robot implementation
robot_name = args.robot
implementations = collect_robo_implementations()
names = [implementation.get_name() for implementation in implementations]
print(f"Found RoboInterface implementations: {names}")
for implementation in implementations:
if implementation.get_name() == robot_name:
robot_type = implementation
break
else:
logging.error(f"Unknown robot {robot_name}. Available: {', '.join(names)}")
return
required_kwargs = robot_type.get_used_cli_args()
kwargs = {}
for key, cl_attr in required_kwargs.items():
try:
kwargs[key] = getattr(args, cl_attr)
except AttributeError:
logging.error(f"You must specify {cl_attr} to use {robot_name}.")
return
robo_impl = robot_type(**kwargs)
dash_app = RoboDashApp(robo_interface=robo_impl, port=8055)
dash_app.run()
if args.unitelabs:
from .connector import create_app
asyncio.run(create_app(robo_impl))
else:
from .sila_server import Server
# start the sila sever
server = Server(robo_interface=robo_impl)
server.start_insecure(args.address, int(args.port))
if sys.stdin.isatty():
print("\nPress 'q' -> enter to exit.")
while not input() == 'q':
pass
server.stop(grace_period=.5)
else:
# Non-interactive (Docker) - wait for interrupt signal
import signal
import threading
logging.info("Waiting for ctrl-c to stop server...")
stop_event = threading.Event()
def quit(signum, frame):
logging.info(f"Received signal {signum}, stopping server...")
stop_event.set()
signal.signal(signal.SIGINT, quit)
signal.signal(signal.SIGTERM, quit)
stop_event.wait()
if __name__ == "__main__":
main()