Coverage for pyTooling / CLIAbstraction / __init__.py: 73%
225 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-07 17:18 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-07 17:18 +0000
1# ==================================================================================================================== #
2# _____ _ _ ____ _ ___ _ _ _ _ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| #
7# |_| |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2017-2026 Patrick Lehmann - Bötzingen, Germany #
15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture #
16# #
17# Licensed under the Apache License, Version 2.0 (the "License"); #
18# you may not use this file except in compliance with the License. #
19# You may obtain a copy of the License at #
20# #
21# http://www.apache.org/licenses/LICENSE-2.0 #
22# #
23# Unless required by applicable law or agreed to in writing, software #
24# distributed under the License is distributed on an "AS IS" BASIS, #
25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
26# See the License for the specific language governing permissions and #
27# limitations under the License. #
28# #
29# SPDX-License-Identifier: Apache-2.0 #
30# ==================================================================================================================== #
31#
32"""Basic abstraction layer for executables."""
34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"]
36from os import environ as os_environ
37from pathlib import Path
38from platform import system
39from shutil import which as shutil_which
40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut, TimeoutExpired
41from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable
44from pyTooling.Decorators import export, readonly
45from pyTooling.MetaClasses import ExtendedType
46from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException
47from pyTooling.Common import getFullyQualifiedName
48from pyTooling.Attributes import Attribute
49from pyTooling.CLIAbstraction.Argument import CommandLineArgument
50from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
51from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag
52from pyTooling.Platform import Platform
55@export
56class CLIAbstractionException(ToolingException):
57 """Base-exception of all exceptions raised by :mod:`pyTooling.CLIAbstraction`."""
60@export
61class DryRunException(CLIAbstractionException):
62 """This exception is raised if an executable is launched while in dry-run mode."""
65@export
66class CLIArgument(Attribute):
67 """An attribute to annotate nested classes as an CLI argument."""
70@export
71class Environment(metaclass=ExtendedType, slots=True):
72 """
73 A class describing the environment of an executable.
75 .. topic:: Content of the environment
77 * Environment variables
78 """
79 _variables: Dict[str, str] #: Dictionary of active environment variables.
81 # TODO: derive environment from existing environment object.
82 def __init__(
83 self, *,
84 environment: Nullable["Environment"] = None,
85 newVariables: Nullable[Mapping[str, str]] = None,
86 addVariables: Nullable[Mapping[str, str]] = None,
87 delVariables: Nullable[Iterable[str]] = None
88 ) -> None:
89 """
90 Initializes an environment class managing.
92 .. topic:: Algorithm
94 1. Create a new dictionary of environment variables (name-value pairs) from either:
96 * an existing :class:`Environment` instance.
97 * current executable's environment by reading environment variables from :func:`os.environ`.
98 * a dictionary of name-value pairs.
100 2. Remove variables from environment.
101 3. Add new or update existing variables.
103 :param environment: Optional existing Environment instance to derive a new environment.
104 :param newVariables: Optional dictionary of new environment variables. |br|
105 If ``None``, read current environment variables from :func:`os.environ`.
106 :param addVariables: Optional dictionary of variables to be added or modified in the environment.
107 :param delVariables: Optional list of variable names to be removed from the environment.
108 """
109 if environment is not None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 newVariables = environment._variables
111 elif newVariables is None:
112 newVariables = os_environ
114 self._variables = {name: value for name, value in newVariables.items()}
116 if delVariables is not None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 for variableName in delVariables:
118 del self._variables[variableName]
120 if addVariables is not None:
121 self._variables.update(addVariables)
123 def __len__(self) -> len:
124 """
125 Returns the number of set environment variables.
127 :returns: Number of environment variables.
128 """
129 return len(self._variables)
131 def __contains__(self, name: str) -> bool:
132 """
133 Checks if the variable is set in the environment.
135 :param key: The variable name to check.
136 :returns: ``True``, if the variable is set in the environment.
137 """
138 return name in self._variables
140 def __getitem__(self, name: str) -> str:
141 """
142 Access an environment variable in the environment by name.
144 :param name: Name of the environment variable.
145 :returns: The environment variable's value.
146 :raises KeyError: If Variable name is not set in the environment.
147 """
148 return self._variables[name]
150 def __setitem__(self, name: str, value: str) -> None:
151 """
152 Add or set an environment variable in the environment by name.
154 :param name: Name of the environment variable.
155 :param value: Value of the environment variable to be set.
156 """
157 self._variables[name] = value
159 def __delitem__(self, name: str) -> None:
160 """
161 Remove an environment variable from the environment by name.
163 :param name: The name of the environment variable to remove.
164 :raises KeyError: If name doesn't exist in the environment.
165 """
166 del self._variables[name]
169@export
170class Program(metaclass=ExtendedType, slots=True):
171 """
172 Represent a simple command line interface (CLI) executable (program or script).
174 CLI options are collected in a ``__cliOptions__`` dictionary.
175 """
177 _platform: str #: Current platform the executable runs on (Linux, Windows, ...)
178 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names.
179 _executablePath: Path #: The path to the executable (binary, script, ...).
180 _dryRun: bool #: True, if program shall run in *dry-run mode*.
181 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options.
182 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options).
184 def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None:
185 """
186 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are
187 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary.
189 :param args: Any positional arguments.
190 :param kwargs: Any keyword arguments.
191 """
192 super().__init_subclass__(*args, **kwargs)
194 # register all available CLI options (nested classes marked with attribute 'CLIArgument')
195 cls.__cliOptions__ = {option: order for order, option in enumerate(CLIArgument.GetClasses(scope=cls))}
197 def __init__(
198 self,
199 executablePath: Nullable[Path] = None,
200 binaryDirectoryPath: Nullable[Path] = None,
201 dryRun: bool = False
202 ) -> None:
203 """
204 Initializes a program instance.
206 .. todo:: Document algorithm
208 :param executablePath: Path to the executable.
209 :param binaryDirectoryPath: Path to the executable's directory.
210 :param dryRun: True, when the program should run in dryrun mode.
211 """
212 self._platform = system()
213 self._dryRun = dryRun
215 if executablePath is not None:
216 if isinstance(executablePath, Path):
217 if not executablePath.exists():
218 if dryRun: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
220 else:
221 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
222 else:
223 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.")
224 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.")
225 raise ex
226 elif binaryDirectoryPath is not None:
227 if isinstance(binaryDirectoryPath, Path):
228 if not binaryDirectoryPath.exists():
229 if dryRun: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]")
231 else:
232 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath)
234 try:
235 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform]
236 except KeyError:
237 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
239 if not executablePath.exists():
240 if dryRun: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
242 else:
243 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
244 else:
245 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.")
246 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.")
247 raise ex
248 else:
249 try:
250 executablePath = Path(self._executableNames[self._platform])
251 except KeyError:
252 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
254 resolvedExecutable = shutil_which(str(executablePath))
255 if dryRun: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 if resolvedExecutable is None:
257 pass
258 # XXX: log executable not found in PATH
259 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]")
260 else:
261 fullExecutablePath = Path(resolvedExecutable)
262 if not fullExecutablePath.exists():
263 pass
264 # XXX: log executable not found
265 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]")
266 else:
267 if resolvedExecutable is None:
268 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath)
270 fullExecutablePath = Path(resolvedExecutable)
271 if not fullExecutablePath.exists(): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath)
274 # TODO: log found executable in PATH
275 # TODO: check if found executable has execute permissions
276 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.")
278 self._executablePath = executablePath
279 self.__cliParameters__ = {}
281 @staticmethod
282 def _NeedsParameterInitialization(key) -> bool:
283 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument))
285 def __getitem__(self, key: Type[CommandLineArgument]) -> CommandLineArgument:
286 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used."""
287 if not issubclass(key, CommandLineArgument): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
289 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
290 raise ex
292 # TODO: is nested check
293 return self.__cliParameters__[key]
295 def __setitem__(self, key: Type[CommandLineArgument], value: CommandLineArgument) -> None:
296 if not issubclass(key, CommandLineArgument): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
298 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
299 raise ex
300 elif key not in self.__cliOptions__:
301 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'")
302 elif key in self.__cliParameters__:
303 raise KeyError(f"Option '{key}' is already set to a value.")
305 if self._NeedsParameterInitialization(key):
306 self.__cliParameters__[key] = key(value)
307 else:
308 self.__cliParameters__[key] = key()
310 @readonly
311 def Path(self) -> Path:
312 """
313 Read-only property to access the program's path.
315 :returns: The program's path.
316 """
317 return self._executablePath
319 def ToArgumentList(self) -> List[str]:
320 """
321 Convert a program and used CLI options to a list of CLI argument strings in correct order and with escaping.
323 :returns: List of CLI arguments
324 """
325 result: List[str] = []
327 result.append(str(self._executablePath))
329 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int:
330 return self.__cliOptions__[item[0]]
332 for key, value in sorted(self.__cliParameters__.items(), key=predicate):
333 param = value.AsArgument()
334 if isinstance(param, str):
335 result.append(param)
336 elif isinstance(param, (Tuple, List)): 336 ↛ 339line 336 didn't jump to line 339 because the condition on line 336 was always true
337 result += param
338 else:
339 raise TypeError(f"") # XXX: needs error message
341 return result
343 def __repr__(self) -> str:
344 """
345 Returns the string representation as coma-separated list of double-quoted CLI argument strings within square brackets.
347 Example: :pycode:`["arg1", "arg2"]`
349 :returns: Coma-separated list of CLI arguments with double-quotes.
350 """
351 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]" # WORKAROUND: Python <3.12
352 # return f"[{", ".join([f"\"{item}\"" for item in self.ToArgumentList()])}]"
354 def __str__(self) -> str:
355 """
356 Returns the string representation as space-separated list of double-quoted CLI argument strings.
358 Example: :pycode:`"arg1" "arg2"`
360 :returns: Space-separated list of CLI arguments with double-quotes.
361 """
362 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()])
365@export
366class Executable(Program): # (ILogable):
367 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`."""
369 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======"
371 _workingDirectory: Nullable[Path] #: Path to the working directory
372 _environment: Nullable[Environment] #: Environment to use when executing.
373 _process: Nullable[Subprocess_Popen] #: Reference to the running process.
374 _exitCode: Nullable[int] #: The child's process exit code.
375 _killed: Nullable[bool] #: True, if the child-process got killed (e.g. by a timeout).
376 _iterator: Nullable[Iterator] #: Iterator for reading STDOUT.
378 def __init__(
379 self,
380 executablePath: Nullable[Path] = None,
381 binaryDirectoryPath: Nullable[Path] = None,
382 workingDirectory: Nullable[Path] = None,
383 environment: Nullable[Environment] = None,
384 dryRun: bool = False
385 ) -> None:
386 """
387 Initializes an executable instance.
389 :param executablePath: Path to the executable.
390 :param binaryDirectoryPath: Path to the executable's directory.
391 :param workingDirectory: Path to the working directory.
392 :param environment: Optional environment that should be setup when launching the executable.
393 :param dryRun: True, when the program should run in dryrun mode.
394 """
395 super().__init__(executablePath, binaryDirectoryPath, dryRun)
397 self._workingDirectory = None
398 self._environment = environment
399 self._process = None
400 self._exitCode = None
401 self._killed = None
402 self._iterator = None
404 def StartProcess(self, environment: Nullable[Environment] = None) -> None:
405 """
406 Start the executable as a child-process.
408 :param environment: Optional environment that should be setup when launching the executable. |br|
409 If ``None``, the :attr:`_environment` is used.
410 :raises CLIAbstractionException: When an :exc:`OSError` occurs while launching the child-process.
411 """
412 if self._dryRun: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 self.LogDryRun(f"Start process: {self!r}")
414 return
416 if environment is not None: 416 ↛ 418line 416 didn't jump to line 418 because the condition on line 416 was always true
417 envVariables = environment._variables
418 elif self._environment is not None:
419 envVariables = self._environment._variables
420 else:
421 envVariables = None
423 # FIXME: verbose log start process
424 # FIXME: debug log - parameter list
425 try:
426 self._process = Subprocess_Popen(
427 self.ToArgumentList(),
428 stdin=Subprocess_Pipe,
429 stdout=Subprocess_Pipe,
430 stderr=Subprocess_StdOut,
431 cwd=self._workingDirectory,
432 env=envVariables,
433 universal_newlines=True,
434 bufsize=256
435 )
437 except OSError as ex:
438 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex
440 def Send(self, line: str, end: str = "\n") -> None:
441 """
442 Send a string to STDIN of the running child-process.
444 :param line: Line to send.
445 :param end: Line end character.
446 :raises CLIAbstractionException: When any error occurs while sending data to the child-process.
447 """
448 try:
449 self._process.stdin.write(line + end)
450 self._process.stdin.flush()
451 except Exception as ex:
452 raise CLIAbstractionException(f"") from ex # XXX: need error message
454 # This is TCL specific ...
455 # def SendBoundary(self):
456 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY))
458 def GetLineReader(self) -> Generator[str, None, None]:
459 """
460 Return a line-reader for STDOUT.
462 :returns: A generator object to read from STDOUT line-by-line.
463 :raises DryRunException: In case dryrun mode is active.
464 :raises CLIAbstractionException: When any error occurs while reading outputs from the child-process.
465 """
466 if self._dryRun: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true
467 raise DryRunException() # XXX: needs a message
469 try:
470 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved?
471 yield line[:-1]
472 except Exception as ex:
473 raise CLIAbstractionException() from ex # XXX: need error message
474 # finally:
475 # self._process.terminate()
477 def Wait(self, timeout: Nullable[float] = None, kill: bool = False) -> Nullable[int]:
478 """
479 Wait on the child-process with an optional timeout.
481 When the timeout period exceeds, the child-process can be forcefully terminated.
483 :param timeout: Optional, timeout in seconds. |br|
484 Default: infinitely wait on the child-process.
485 :param kill: If true, terminate (kill) the child-process if it didn't terminate by itself within
486 the timeout period.
487 :returns: ``None`` when the child-process is still running, otherwise the exit code.
488 :raises CLIAbstractionException: When the child-process is not started yet.
490 .. topic:: Usecases
492 :pycode:`executable.Wait()`
493 Infinitely wait on the child-process. When the child-process terminates by itself, the exit code is returned.
495 This is a blocking call.
497 :pycode:`executable.Wait(timeout=5.4)`
498 Wait for a specified time on the child-process' termination. If it terminated by itself within the specified
499 timeout period, the exit code is returned; otherwise ``None``.
501 Thus :pycode:`.Wait(timeout=0.0)` returning ``None`` indicates a running process.
503 :pycode:`executable.Wait(timeout=20.0, kill=True)`
504 Wait for a specified time on the child-process' termination. If it terminated by itself within the specified
505 timeout period, the exit code is returned; otherwise the child-process gets killed and it's exit code is returned.
507 :pycode:`executable.Wait(timeout=0.0, kill=True)`
508 Kill immediately.
510 .. seealso::
512 :meth:`Terminate` - Terminate the child-process.
513 """
514 if self._process is None:
515 raise CLIAbstractionException(f"Process not yet started.")
517 try:
518 self._exitCode = self._process.wait(timeout=timeout)
519 except TimeoutExpired:
520 # when timed out, the process isn't terminated/killed automatically
521 if kill:
522 self._killed = True
523 self._process.terminate()
524 # After killing, wait to clean up the "zombie" process
525 self._exitCode = self._process.wait()
527 return self._exitCode
529 def Terminate(self) -> Nullable[int]:
530 """
531 Terminate the child-process.
533 :returns: The child-process' exit code.
534 :raises CLIAbstractionException: When the child-process is not started yet.
536 .. seealso::
538 :meth:`Wait` - Wait on the child-process with an optional timeout.
539 """
540 return self.Wait(timeout=0.0, kill=True)
542 @readonly
543 def ExitCode(self) -> int:
544 """
545 Read-only property accessing the child-process' exit code.
547 :returns: Child-process' exit code or ``None`` if it's still running.
549 .. seealso::
551 * :meth:`Wait` - Wait on the child-process with an optional timeout.
552 * :meth:`Terminate` - Terminate the child-process.
553 """
554 return self._exitCode
556 # This is TCL specific
557 # def ReadUntilBoundary(self, indent=0):
558 # __indent = " " * indent
559 # if self._iterator is None:
560 # self._iterator = iter(self.GetReader())
561 #
562 # for line in self._iterator:
563 # print(__indent + line)
564 # if self._pyIPCMI_BOUNDARY in line:
565 # break
566 # self.LogDebug("Quartus II is ready")
569@export
570class OutputFilteredExecutable(Executable):
571 """Represent a CLI executable derived from :class:`Executable`, whose outputs are filtered."""
572 _hasOutput: bool
573 _hasWarnings: bool
574 _hasErrors: bool
575 _hasFatals: bool
577 def __init__(self, platform: Platform, dryrun: bool, executablePath: Path) -> None: #, environment=None, logger=None) -> None:
578 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger)
580 self._hasOutput = False
581 self._hasWarnings = False
582 self._hasErrors = False
583 self._hasFatals = False
585 @readonly
586 def HasWarnings(self) -> bool:
587 # TODO: update doc-string
588 """True if warnings were found while processing the output stream."""
589 return self._hasWarnings
591 @readonly
592 def HasErrors(self) -> bool:
593 # TODO: update doc-string
594 """True if errors were found while processing the output stream."""
595 return self._hasErrors
597 @readonly
598 def HasFatals(self) -> bool:
599 # TODO: update doc-string
600 """True if fatals were found while processing the output stream."""
601 return self._hasErrors