Coverage for pyTooling/CLIAbstraction/__init__.py: 76%
216 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 22:21 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 22:21 +0000
1# ==================================================================================================================== #
2# _____ _ _ ____ _ ___ _ _ _ _ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| #
7# |_| |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany #
15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture #
16# #
17# Licensed under the Apache License, Version 2.0 (the "License"); #
18# you may not use this file except in compliance with the License. #
19# You may obtain a copy of the License at #
20# #
21# http://www.apache.org/licenses/LICENSE-2.0 #
22# #
23# Unless required by applicable law or agreed to in writing, software #
24# distributed under the License is distributed on an "AS IS" BASIS, #
25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
26# See the License for the specific language governing permissions and #
27# limitations under the License. #
28# #
29# SPDX-License-Identifier: Apache-2.0 #
30# ==================================================================================================================== #
31#
32"""Basic abstraction layer for executables."""
34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"]
36from os import environ as os_environ
37from pathlib import Path
38from platform import system
39from shutil import which as shutil_which
40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut
41from sys import version_info # needed for versions before Python 3.11
42from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable
44try:
45 from pyTooling.Decorators import export, readonly
46 from pyTooling.MetaClasses import ExtendedType
47 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException
48 from pyTooling.Common import getFullyQualifiedName
49 from pyTooling.Attributes import Attribute
50 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
51 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
52 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag
53except (ImportError, ModuleNotFoundError): # pragma: no cover
54 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!")
56 try:
57 from Decorators import export, readonly
58 from MetaClasses import ExtendedType
59 from Exceptions import ToolingException, PlatformNotSupportedException
60 from Common import getFullyQualifiedName
61 from Attributes import Attribute
62 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
63 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
64 from CLIAbstraction.ValuedFlag import ValuedFlag
65 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover
66 print("[pyTooling.CLIAbstraction] Could not import directly!")
67 raise ex
70@export
71class CLIAbstractionException(ToolingException):
72 pass
75@export
76class DryRunException(CLIAbstractionException):
77 """This exception is raised if an executable is launched while in dry-run mode."""
80@export
81class CLIArgument(Attribute):
82 """An attribute to annotate nested classes as an CLI argument."""
85@export
86class Environment(metaclass=ExtendedType, slots=True):
87 _variables: Dict[str, str]
89 def __init__(
90 self,
91 newVariables: Nullable[Mapping[str, str]] = None,
92 addVariables: Nullable[Mapping[str, str]] = None,
93 delVariables: Nullable[Iterable[str]] = None
94 ) -> None:
95 if newVariables is None:
96 newVariables = os_environ
98 self._variables = {name: value for name, value in newVariables.items()}
100 if delVariables is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 for variableName in delVariables:
102 del self._variables[variableName]
104 if addVariables is not None:
105 self._variables.update(addVariables)
107 def __len__(self) -> len:
108 return len(self._variables)
110 def __contains__(self, name: str) -> bool:
111 return name in self._variables
113 def __getitem__(self, name: str) -> str:
114 return self._variables[name]
116 def __setitem__(self, name: str, value: str) -> None:
117 self._variables[name] = value
119 def __delitem__(self, name: str) -> None:
120 del self._variables[name]
123@export
124class Program(metaclass=ExtendedType, slots=True):
125 """Represent a simple command line interface (CLI) executable (program or script)."""
127 _platform: str #: Current platform the executable runs on (Linux, Windows, ...)
128 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names.
129 _executablePath: Path #: The path to the executable (binary, script, ...).
130 _dryRun: bool #: True, if program shall run in *dry-run mode*.
131 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options.
132 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options).
134 def __init_subclass__(cls, *args: Any, **kwargs: Any):
135 """
136 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are
137 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary.
138 """
139 super().__init_subclass__(*args, **kwargs)
141 # register all available CLI options (nested classes marked with attribute 'CLIArgument')
142 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {}
143 order: int = 0
144 for option in CLIArgument.GetClasses(scope=cls):
145 cls.__cliOptions__[option] = order
146 order += 1
148 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None:
149 self._platform = system()
150 self._dryRun = dryRun
152 if executablePath is not None:
153 if isinstance(executablePath, Path):
154 if not executablePath.exists():
155 if dryRun: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
157 else:
158 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
159 else:
160 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.")
161 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.")
162 raise ex
163 elif binaryDirectoryPath is not None:
164 if isinstance(binaryDirectoryPath, Path):
165 if not binaryDirectoryPath.exists():
166 if dryRun: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]")
168 else:
169 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath)
171 try:
172 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform]
173 except KeyError:
174 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
176 if not executablePath.exists():
177 if dryRun: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
179 else:
180 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
181 else:
182 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.")
183 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.")
184 raise ex
185 else:
186 try:
187 executablePath = Path(self._executableNames[self._platform])
188 except KeyError:
189 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
191 resolvedExecutable = shutil_which(str(executablePath))
192 if dryRun: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 if resolvedExecutable is None:
194 pass
195 # XXX: log executable not found in PATH
196 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]")
197 else:
198 fullExecutablePath = Path(resolvedExecutable)
199 if not fullExecutablePath.exists():
200 pass
201 # XXX: log executable not found
202 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]")
203 else:
204 if resolvedExecutable is None:
205 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath)
207 fullExecutablePath = Path(resolvedExecutable)
208 if not fullExecutablePath.exists(): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath)
211 # TODO: log found executable in PATH
212 # TODO: check if found executable has execute permissions
213 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.")
215 self._executablePath = executablePath
216 self.__cliParameters__ = {}
218 @staticmethod
219 def _NeedsParameterInitialization(key):
220 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument))
222 def __getitem__(self, key):
223 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used."""
224 if not issubclass(key, CommandLineArgument): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
226 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
227 raise ex
229 # TODO: is nested check
230 return self.__cliParameters__[key]
232 def __setitem__(self, key, value):
233 if not issubclass(key, CommandLineArgument): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
235 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
236 raise ex
237 elif key not in self.__cliOptions__:
238 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'")
239 elif key in self.__cliParameters__:
240 raise KeyError(f"Option '{key}' is already set to a value.")
242 if self._NeedsParameterInitialization(key):
243 self.__cliParameters__[key] = key(value)
244 else:
245 self.__cliParameters__[key] = key()
247 @readonly
248 def Path(self) -> Path:
249 return self._executablePath
251 def ToArgumentList(self) -> List[str]:
252 result: List[str] = []
254 result.append(str(self._executablePath))
256 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int:
257 return self.__cliOptions__[item[0]]
259 for key, value in sorted(self.__cliParameters__.items(), key=predicate):
260 param = value.AsArgument()
261 if isinstance(param, str):
262 result.append(param)
263 elif isinstance(param, (Tuple, List)): 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true
264 result += param
265 else:
266 raise TypeError(f"") # XXX: needs error message
268 return result
270 def __repr__(self):
271 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]"
273 def __str__(self):
274 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()])
277@export
278class Executable(Program): # (ILogable):
279 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`."""
281 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======"
283 _workingDirectory: Nullable[Path]
284 _environment: Nullable[Environment]
285 _process: Nullable[Subprocess_Popen]
286 _iterator: Nullable[Iterator]
288 def __init__(
289 self,
290 executablePath: Path = None,
291 binaryDirectoryPath: Path = None,
292 workingDirectory: Path = None,
293 environment: Nullable[Environment] = None,
294 dryRun: bool = False
295 ):
296 super().__init__(executablePath, binaryDirectoryPath, dryRun)
298 self._workingDirectory = None
299 self._environment = environment
300 self._process = None
301 self._iterator = None
303 def StartProcess(
304 self,
305 environment: Nullable[Environment] = None
306 ):
307 # start child process
309 if self._dryRun: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 self.LogDryRun(f"Start process: {self!r}")
311 return
313 if self._environment is not None: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 envVariables = self._environment._variables
315 elif environment is not None: 315 ↛ 318line 315 didn't jump to line 318 because the condition on line 315 was always true
316 envVariables = environment._variables
317 else:
318 envVariables = None
320 # FIXME: verbose log start process
321 # FIXME: debug log - parameter list
322 try:
323 self._process = Subprocess_Popen(
324 self.ToArgumentList(),
325 stdin=Subprocess_Pipe,
326 stdout=Subprocess_Pipe,
327 stderr=Subprocess_StdOut,
328 cwd=self._workingDirectory,
329 env=envVariables,
330 universal_newlines=True,
331 bufsize=256
332 )
334 except OSError as ex:
335 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex
337 def Send(self, line: str, end: str = "\n") -> None:
338 try:
339 self._process.stdin.write(line + end)
340 self._process.stdin.flush()
341 except Exception as ex:
342 raise CLIAbstractionException(f"") from ex # XXX: need error message
344 # This is TCL specific ...
345 # def SendBoundary(self):
346 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY))
348 def GetLineReader(self) -> Generator[str, None, None]:
349 if self._dryRun: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 raise DryRunException() # XXX: needs a message
352 try:
353 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved?
354 yield line[:-1]
355 except Exception as ex:
356 raise CLIAbstractionException() from ex # XXX: need error message
357 # finally:
358 # self._process.terminate()
360 def Terminate(self):
361 self._process.terminate()
363 @readonly
364 def ExitCode(self) -> int:
365 if self._process is None:
366 raise CLIAbstractionException(f"Process not yet started, thus no exit code.")
368 # TODO: check if process is still running
370 return self._process.returncode
372 # This is TCL specific
373 # def ReadUntilBoundary(self, indent=0):
374 # __indent = " " * indent
375 # if self._iterator is None:
376 # self._iterator = iter(self.GetReader())
377 #
378 # for line in self._iterator:
379 # print(__indent + line)
380 # if self._pyIPCMI_BOUNDARY in line:
381 # break
382 # self.LogDebug("Quartus II is ready")
385@export
386class OutputFilteredExecutable(Executable):
387 _hasOutput: bool
388 _hasWarnings: bool
389 _hasErrors: bool
390 _hasFatals: bool
392 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None:
393 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger)
395 self._hasOutput = False
396 self._hasWarnings = False
397 self._hasErrors = False
398 self._hasFatals = False
400 @readonly
401 def HasWarnings(self):
402 """True if warnings were found while processing the output stream."""
403 return self._hasWarnings
405 @readonly
406 def HasErrors(self):
407 """True if errors were found while processing the output stream."""
408 return self._hasErrors
410 @readonly
411 def HasFatals(self):
412 """True if fatals were found while processing the output stream."""
413 return self._hasErrors