# **********************************************************************************
# * Copyright (C) 2024-present Bert Van Acker (B.MKR) <bert.vanacker@uantwerpen.be>
# *
# * This file is part of the roboarch R&D project.
# *
# * RAP R&D concepts can not be copied and/or distributed without the express
# * permission of Bert Van Acker
# **********************************************************************************
import os
from pathlib import Path
[docs]class PackageManager(object):
def __init__(self, name='PackageManager',description='Built-in package manager',verbose=False):
"""Initialize a Package Manager component.
Parameters
----------
name : string
name of the property components
description : string
description of the property components
verbose : bool
component verbose execution
See Also
--------
..
Examples
--------
>> pm = PackageManager(name="PM1",description="Default package manager",verbose=False)
"""
self._name = name
self._description = description
self._verbose = verbose
#package info
self._packageName = "rpio_pkg"
self.standalonePath = ""
# get current directory
self._directory = os.getcwd()
@property
def name(self):
"""The name property (read-only)."""
return self._name
@property
def description(self):
"""The description property (read-only)."""
return self._description
[docs] def create(self,name,force=False,standalone=False,path=None):
"""Create rpIO package.
Parameters
----------
name : string
name of the rpIO package
force : bool
force the creation of a rpIO package
standalone : bool
create rpIO package in standalone mode, not in current directory
path : string
path where the new hybridIO package needs to be generated
See Also
--------
..
Examples
--------
>> pm.create(force=False,standalone=True)
"""
isEmpty=self._checkEmptyDir()
if not standalone:
if isEmpty or (not isEmpty and force):
if self._verbose:print("DEBUG: directory is empty, creating "+self._packageName+" package...")
# --- rpio package creation ---
try:
self._populatePackage(name=name, standalone=standalone)
except:
raise Exception("ERROR: "+self._packageName+" package could not be created!")
if self._verbose: print("DEBUG: "+self._packageName+" package created...")
else:
if self._verbose: print("DEBUG: directory is not empty, no "+self._packageName+" package created!")
else:
if path is not None:
self.standalonePath = path
try:
self._populatePackage(name=name, standalone=standalone)
if self._verbose: print("DEBUG: " + self._packageName + " package created...")
except:
raise Exception("ERROR: "+self._packageName+" package could not be created!")
else:
self.standalonePath = os.getcwd()
try:
self._populatePackage(name=name, standalone=standalone)
if self._verbose: print("DEBUG: " + self._packageName + " package created...")
except:
raise Exception("ERROR: "+self._packageName+" package could not be created!")
[docs] def check(self, path: str | Path = None) -> bool:
"""Check rpIO package.
Parameters
----------
path : string
path to the standalone hybridIO package
See Also
--------
..
Examples
--------
>> isHybridIOPackage = pm.check(path="path/to/rpio/package")
"""
path = path if path else Path.cwd()
package_path = path if isinstance(path, Path) else Path(path)
if self._verbose: print(f"DEBUG: checking {self._packageName} package in {package_path}...")
is_valid_package = (package_path / "robosapiensIO.ini").is_file()
# TODO: add other checks
return is_valid_package
def _checkEmptyDir(self):
"""Function to determine if directory is empty."""
if self._verbose: print("DEBUG: Checking if directory is empty...")
dir = os.listdir(self._directory)
return len(dir) == 0
def _populatePackage(self, name="rpio_pkg", standalone=False):
"""Function to populate the empty package."""
self._packageName = name
if standalone:
# generate in standalone package instead of in current directory
Path(self.standalonePath+"/"+self._packageName).mkdir(parents=True, exist_ok=True)
prefix = self.standalonePath+"/"+self._packageName+'/'
self._addFile(file="robosapiensIO.ini",name=self._packageName, path=prefix)
self._addFile(file="__init__.py", name=self._packageName, path=prefix)
logfilepath = prefix+"/Resources"
else:
prefix=""
self._addFile(file="robosapiensIO.ini",name=self._packageName)
self._addFile(file="__init__.py", name=self._packageName)
logfilepath = self._directory + "/Resources"
self._mkdir_custom(prefix + "Documentation")
self._mkdir_custom(prefix + "Concept")
self._mkdir_custom(prefix + "Design")
self._mkdir_custom(prefix + "Realization")
#managing system
self._mkdir_custom(prefix + "Realization/ManagingSystem/Documentation")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Binaries")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Nodes")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Messages")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Platform")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Actions")
self._mkdir_custom(prefix + "Realization/ManagingSystem/Workflows")
#managed system
self._mkdir_custom(prefix + "Realization/ManagedSystem/Documentation")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Binaries")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Nodes/Probes")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Nodes/Effector")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Messages")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Platform")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Actions")
self._mkdir_custom(prefix + "Realization/ManagedSystem/Workflows")
# top-level workflows
self._mkdir_custom(prefix + "Workflows")
self._addFile(file="AADL2CODE.py", path=prefix + "Workflows/")
self._addFile(file="ROBOCHART2AADL.py", path=prefix + "Workflows/")
# temporary folder and resources
self._mkdir_custom(prefix + "Resources")
# add system log file
self._addFile(file="sys.log", path=logfilepath)
# add run, build and deploy actions (placeholders) for managing system
self._addFile(file="run.py", path=prefix +"Realization/ManagingSystem/Actions/")
self._addFile(file="build.py", path=prefix + "Realization/ManagingSystem/Actions/")
self._addFile(file="deploy.py", path=prefix + "Realization/ManagingSystem/Actions/")
# add run, build and deploy actions (placeholders) for managed system
self._addFile(file="run.py", path=prefix + "Realization/ManagedSystem/Actions/")
self._addFile(file="build.py", path=prefix + "Realization/ManagedSystem/Actions/")
self._addFile(file="deploy.py", path=prefix + "Realization/ManagedSystem/Actions/")
def _mkdir_custom(self, folder="empty", file='readme.md'):
"""CUSTOM mkdir function to initialize git-pushable directories"""
#create directory
Path(folder).mkdir(parents=True, exist_ok=True)
#add file
self._addFile(file=file, path=folder)
def _addFile(self, file="requirements.txt",name="",description="", path=None):
"""Function to add a file to the provided path."""
if path==None:
path = self._directory
# --- open file ---
f = open(path + "/" + file, "a")
# --- custom file content ---
if "__init__.py" in file:
f.write("")
if "readme" in file:
f.write("Placeholder")
if "sys.log" in file:
f.write("---------------------- RoboSAPIENS Adaptive Platform system logs ----------------------\n")
if "robosapiensIO.ini" in file:
f.write("[RoboSAPIENSIO]\n")
f.write('name = '+name+'\n')
f.write('description = " Add project description"\n')
f.write('\n')
f.write("[PACKAGE]\n")
f.write("name = "+name+"\n")
f.write('prefix = \n')
if "run.py" in file:
f.write("print('WARNING: Run action not implemented yet!')")
if "build.py" in file:
f.write("print('WARNING: Build action not implemented yet!')")
if "deploy.py" in file:
f.write("print('WARNING: Deploy action not implemented yet!')")
if "AADL2CODE.py" in file:
f.write("# **********************************************************************************\n")
f.write("# * Copyright (C) 2024-present Bert Van Acker (B.MKR) <bert.vanacker@uantwerpen.be>\n")
f.write("# *\n")
f.write("# * This file is part of the roboarch R&D project.\n")
f.write("# *\n")
f.write("# * RAP R&D concepts can not be copied and/or distributed without the express\n")
f.write("# * permission of Bert Van Acker\n")
f.write("# **********************************************************************************\n")
f.write("from rpio.workflow.tasks import *\n")
f.write("from rpio.workflow.executer import Executer_GUI\n")
f.write("\n")
f.write("# 1 . define the tasks\n")
f.write("tasks = {\n")
f.write(' "Generate custom messages": t_generate_messages,\n')
f.write(' "Generate swc code skeletons": t_generate_swc_skeletons,\n')
f.write(' "Generate swc launch files": t_generate_swc_launch,\n')
f.write(' "Generate main file": t_generate_main,\n')
f.write(' "Generate docker compose files": t_generate_docker,\n')
f.write(' "Update robosapiensIO.ini file": t_update_robosapiensIO_ini\n')
f.write("}\n")
f.write("\n")
f.write("# 2. Launch the graphical executer\n")
f.write('app = Executer_GUI(tasks=tasks,name="AADL2CODE")\n')
f.write("app.root.mainloop()\n")
if "ROBOCHART2AADL.py" in file:
f.write("# **********************************************************************************\n")
f.write("# * Copyright (C) 2024-present Bert Van Acker (B.MKR) <bert.vanacker@uantwerpen.be>\n")
f.write("# *\n")
f.write("# * This file is part of the roboarch R&D project.\n")
f.write("# *\n")
f.write("# * RAP R&D concepts can not be copied and/or distributed without the express\n")
f.write("# * permission of Bert Van Acker\n")
f.write("# **********************************************************************************\n")
f.write("from rpio.workflow.tasks import *\n")
f.write("from rpio.workflow.executer import Executer_GUI\n")
f.write("\n")
f.write("\n")
f.write("# 1 . define the tasks\n")
f.write("tasks = {\n")
f.write(' "Generate AADL messages": t_robochart_to_messages,\n')
f.write(' "Generate AADL Logical Architecture": t_robochart_to_logical\n')
f.write("}\n")
f.write("\n")
f.write("# 2. Launch the graphical executer\n")
f.write('app = Executer_GUI(tasks=tasks,name="ROBOCHART2AADL")\n')
f.write("app.root.mainloop()\n")
# --- close file ---
f.close()