"""
"""
from __future__ import annotations
import logging
from pathlib import Path
import networkx as nx
from typing import List, Dict, Callable, Optional
from math import dist
import platformdirs
from importlib import resources
JointState = Dict[str, float]
[docs]
def default_dist(p: JointState, q: JointState) -> float:
return dist(p.values(), q.values())
[docs]
class GraphManager:
g: nx.Graph
dist: Callable[[JointState, JointState], float]
def __init__(self, name: str):
self.dist = default_dist
# Filename without the path
self.filename = f"position_graph_{name}.gml"
self.default_dir = "genericroboticarm.graphs"
self.custom_dir = platformdirs.user_config_path(appname="GenericRoboticArm",
ensure_exists=True)
try:
self.load(self.filename)
except FileNotFoundError:
self.g = nx.Graph()
[docs]
def load(self, file_name: Optional[str] = None):
"""
Loads a networkx graph
:param file_name:
:return:
"""
try:
path = Path(self.custom_dir) / file_name
self.g = nx.read_gml(path)
except FileNotFoundError:
try:
# Load default resource
with resources.path(self.default_dir, file_name) as path:
self.g = nx.read_gml(path)
except FileNotFoundError:
# If the file is not found in either location, raise a more descriptive error
raise FileNotFoundError(f"File {file_name} not found in {self.custom_dir} or {self.default_dir}")
[docs]
def save(self, filename: str):
"""
Saves the current networkx graph
:param filename:
:return:
"""
dest_path = self.custom_dir/filename
nx.write_gml(self.g, dest_path, stringizer=str)
[docs]
def position_known(self, identifier: str) -> bool:
known = identifier in self.g.nodes
return known
[docs]
def change_position(self, identifier: str, pos: JointState):
"""
:param identifier:
:param pos:
:return:
"""
self.g.nodes[identifier].update(pos)
self.update_distances()
self.save(self.filename)
[docs]
def add_position(self, identifier: str, pos: JointState):
"""
:param pos:
:param identifier:
:return:
"""
self.g.add_node(identifier, **pos)
self.save(self.filename)
[docs]
def add_connection(self, head: str, tail: str):
"""
:param head:
:param tail:
:return:
"""
# it is important (for this code) to consistently have a attribute called dist
tail_pos = self.g.nodes[tail]
head_pos = self.g.nodes[head]
length = self.dist(tail_pos, head_pos)
self.g.add_edge(tail, head, dist=length)
self.save(self.filename)
[docs]
def update_distances(self):
"""
Applies the distance function to all edges
:return:
"""
for u, v, data in self.g.edges(data=True):
u_coordinates = self.get_coordinates(u)
v_coordinates = self.get_coordinates(v)
data["dist"] = self.dist(u_coordinates, v_coordinates)
[docs]
def get_nearest_position(self, pos: JointState) -> str:
"""
:param pos:
:return:
"""
def criteria(n: str):
return self.dist(self.g.nodes[n], pos)
sorted_positions = sorted(self.g.nodes, key=criteria)
# the first one should be the nearest
return sorted_positions[0]
[docs]
def get_coordinates(self, identifier: str) -> JointState:
"""
:param identifier:
:return:
"""
data = self.g.nodes[identifier]
return data.copy()
[docs]
def get_shortest_path(self, start: str, target: str) -> List[str]:
"""
Returns the shortest path from start to target as a list of node identifiers
including the target but not the start. Returns an empty list if no path exists.
:param start:
:param target:
:return:
"""
if nx.has_path(self.g, start, target):
path = nx.shortest_path(self.g, start, target, weight='dist')
logging.info("shortest path is:", path)
# the first entry is the current node. we remove it.
return path[1:]
else:
logging.warning(f"There is no path between {start} and {target}.")
return []
[docs]
def remove_position(self, identifier: str):
"""
:param identifier:
:return:
"""
if identifier in self.g.nodes:
self.g.remove_node(identifier)
self.save(self.filename)
[docs]
def remove_connection(self, head: str, tail: str):
"""
:param head:
:param tail:
:return:
"""
if (tail, head) in self.g.edges:
self.g.remove_edge(tail, head)
self.save(self.filename)