Coverage for pyTooling / CLIAbstraction / __init__.py: 73%
226 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 22:21 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 22:21 +0000
1# ==================================================================================================================== #
2# _____ _ _ ____ _ ___ _ _ _ _ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| #
7# |_| |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2017-2026 Patrick Lehmann - Bötzingen, Germany #
15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture #
16# #
17# Licensed under the Apache License, Version 2.0 (the "License"); #
18# you may not use this file except in compliance with the License. #
19# You may obtain a copy of the License at #
20# #
21# http://www.apache.org/licenses/LICENSE-2.0 #
22# #
23# Unless required by applicable law or agreed to in writing, software #
24# distributed under the License is distributed on an "AS IS" BASIS, #
25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
26# See the License for the specific language governing permissions and #
27# limitations under the License. #
28# #
29# SPDX-License-Identifier: Apache-2.0 #
30# ==================================================================================================================== #
31#
32"""Basic abstraction layer for executables."""
34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"]
36from os import environ as os_environ
37from pathlib import Path
38from platform import system
39from shutil import which as shutil_which
40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut, TimeoutExpired
41from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable
44try:
45 from pyTooling.Decorators import export, readonly
46 from pyTooling.MetaClasses import ExtendedType
47 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException
48 from pyTooling.Common import getFullyQualifiedName
49 from pyTooling.Attributes import Attribute
50 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
51 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
52 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag
53 from pyTooling.Platform import Platform
54except (ImportError, ModuleNotFoundError): # pragma: no cover
55 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!")
57 try:
58 from Decorators import export, readonly
59 from MetaClasses import ExtendedType
60 from Exceptions import ToolingException, PlatformNotSupportedException
61 from Common import getFullyQualifiedName
62 from Attributes import Attribute
63 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
64 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
65 from CLIAbstraction.ValuedFlag import ValuedFlag
66 from Platform import Platform
67 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover
68 print("[pyTooling.CLIAbstraction] Could not import directly!")
69 raise ex
72@export
73class CLIAbstractionException(ToolingException):
74 """Base-exception of all exceptions raised by :mod:`pyTooling.CLIAbstraction`."""
77@export
78class DryRunException(CLIAbstractionException):
79 """This exception is raised if an executable is launched while in dry-run mode."""
82@export
83class CLIArgument(Attribute):
84 """An attribute to annotate nested classes as an CLI argument."""
87@export
88class Environment(metaclass=ExtendedType, slots=True):
89 """
90 A class describing the environment of an executable.
92 .. topic:: Content of the environment
94 * Environment variables
95 """
96 _variables: Dict[str, str] #: Dictionary of active environment variables.
98 # TODO: derive environment from existing environment object.
99 def __init__(
100 self, *,
101 environment: Nullable["Environment"] = None,
102 newVariables: Nullable[Mapping[str, str]] = None,
103 addVariables: Nullable[Mapping[str, str]] = None,
104 delVariables: Nullable[Iterable[str]] = None
105 ) -> None:
106 """
107 Initializes an environment class managing.
109 .. topic:: Algorithm
111 1. Create a new dictionary of environment variables (name-value pairs) from either:
113 * an existing :class:`Environment` instance.
114 * current executable's environment by reading environment variables from :func:`os.environ`.
115 * a dictionary of name-value pairs.
117 2. Remove variables from environment.
118 3. Add new or update existing variables.
120 :param environment: Optional existing Environment instance to derive a new environment.
121 :param newVariables: Optional dictionary of new environment variables. |br|
122 If ``None``, read current environment variables from :func:`os.environ`.
123 :param addVariables: Optional dictionary of variables to be added or modified in the environment.
124 :param delVariables: Optional list of variable names to be removed from the environment.
125 """
126 if environment is not None: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 newVariables = environment._variables
128 elif newVariables is None:
129 newVariables = os_environ
131 self._variables = {name: value for name, value in newVariables.items()}
133 if delVariables is not None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 for variableName in delVariables:
135 del self._variables[variableName]
137 if addVariables is not None:
138 self._variables.update(addVariables)
140 def __len__(self) -> len:
141 """
142 Returns the number of set environment variables.
144 :returns: Number of environment variables.
145 """
146 return len(self._variables)
148 def __contains__(self, name: str) -> bool:
149 """
150 Checks if the variable is set in the environment.
152 :param key: The variable name to check.
153 :returns: ``True``, if the variable is set in the environment.
154 """
155 return name in self._variables
157 def __getitem__(self, name: str) -> str:
158 """
159 Access an environment variable in the environment by name.
161 :param name: Name of the environment variable.
162 :returns: The environment variable's value.
163 :raises KeyError: If Variable name is not set in the environment.
164 """
165 return self._variables[name]
167 def __setitem__(self, name: str, value: str) -> None:
168 """
169 Add or set an environment variable in the environment by name.
171 :param name: Name of the environment variable.
172 :param value: Value of the environment variable to be set.
173 """
174 self._variables[name] = value
176 def __delitem__(self, name: str) -> None:
177 """
178 Remove an environment variable from the environment by name.
180 :param name: The name of the environment variable to remove.
181 :raises KeyError: If name doesn't exist in the environment.
182 """
183 del self._variables[name]
186@export
187class Program(metaclass=ExtendedType, slots=True):
188 """
189 Represent a simple command line interface (CLI) executable (program or script).
191 CLI options are collected in a ``__cliOptions__`` dictionary.
192 """
194 _platform: str #: Current platform the executable runs on (Linux, Windows, ...)
195 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names.
196 _executablePath: Path #: The path to the executable (binary, script, ...).
197 _dryRun: bool #: True, if program shall run in *dry-run mode*.
198 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options.
199 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options).
201 def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None:
202 """
203 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are
204 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary.
206 :param args: Any positional arguments.
207 :param kwargs: Any keyword arguments.
208 """
209 super().__init_subclass__(*args, **kwargs)
211 # register all available CLI options (nested classes marked with attribute 'CLIArgument')
212 cls.__cliOptions__ = {option: order for order, option in enumerate(CLIArgument.GetClasses(scope=cls))}
214 def __init__(
215 self,
216 executablePath: Nullable[Path] = None,
217 binaryDirectoryPath: Nullable[Path] = None,
218 dryRun: bool = False
219 ) -> None:
220 """
221 Initializes a program instance.
223 .. todo:: Document algorithm
225 :param executablePath: Path to the executable.
226 :param binaryDirectoryPath: Path to the executable's directory.
227 :param dryRun: True, when the program should run in dryrun mode.
228 """
229 self._platform = system()
230 self._dryRun = dryRun
232 if executablePath is not None:
233 if isinstance(executablePath, Path):
234 if not executablePath.exists():
235 if dryRun: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
237 else:
238 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
239 else:
240 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.")
241 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.")
242 raise ex
243 elif binaryDirectoryPath is not None:
244 if isinstance(binaryDirectoryPath, Path):
245 if not binaryDirectoryPath.exists():
246 if dryRun: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]")
248 else:
249 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath)
251 try:
252 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform]
253 except KeyError:
254 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
256 if not executablePath.exists():
257 if dryRun: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
259 else:
260 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
261 else:
262 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.")
263 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.")
264 raise ex
265 else:
266 try:
267 executablePath = Path(self._executableNames[self._platform])
268 except KeyError:
269 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
271 resolvedExecutable = shutil_which(str(executablePath))
272 if dryRun: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 if resolvedExecutable is None:
274 pass
275 # XXX: log executable not found in PATH
276 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]")
277 else:
278 fullExecutablePath = Path(resolvedExecutable)
279 if not fullExecutablePath.exists():
280 pass
281 # XXX: log executable not found
282 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]")
283 else:
284 if resolvedExecutable is None:
285 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath)
287 fullExecutablePath = Path(resolvedExecutable)
288 if not fullExecutablePath.exists(): 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath)
291 # TODO: log found executable in PATH
292 # TODO: check if found executable has execute permissions
293 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.")
295 self._executablePath = executablePath
296 self.__cliParameters__ = {}
298 @staticmethod
299 def _NeedsParameterInitialization(key) -> bool:
300 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument))
302 def __getitem__(self, key: Type[CommandLineArgument]) -> CommandLineArgument:
303 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used."""
304 if not issubclass(key, CommandLineArgument): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
306 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
307 raise ex
309 # TODO: is nested check
310 return self.__cliParameters__[key]
312 def __setitem__(self, key: Type[CommandLineArgument], value: CommandLineArgument) -> None:
313 if not issubclass(key, CommandLineArgument): 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
315 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
316 raise ex
317 elif key not in self.__cliOptions__:
318 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'")
319 elif key in self.__cliParameters__:
320 raise KeyError(f"Option '{key}' is already set to a value.")
322 if self._NeedsParameterInitialization(key):
323 self.__cliParameters__[key] = key(value)
324 else:
325 self.__cliParameters__[key] = key()
327 @readonly
328 def Path(self) -> Path:
329 """
330 Read-only property to access the program's path.
332 :returns: The program's path.
333 """
334 return self._executablePath
336 def ToArgumentList(self) -> List[str]:
337 """
338 Convert a program and used CLI options to a list of CLI argument strings in correct order and with escaping.
340 :returns: List of CLI arguments
341 """
342 result: List[str] = []
344 result.append(str(self._executablePath))
346 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int:
347 return self.__cliOptions__[item[0]]
349 for key, value in sorted(self.__cliParameters__.items(), key=predicate):
350 param = value.AsArgument()
351 if isinstance(param, str):
352 result.append(param)
353 elif isinstance(param, (Tuple, List)): 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true
354 result += param
355 else:
356 raise TypeError(f"") # XXX: needs error message
358 return result
360 def __repr__(self) -> str:
361 """
362 Returns the string representation as coma-separated list of double-quoted CLI argument strings within square brackets.
364 Example: :pycode:`["arg1", "arg2"]`
366 :returns: Coma-separated list of CLI arguments with double-quotes.
367 """
368 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]" # WORKAROUND: Python <3.12
369 # return f"[{", ".join([f"\"{item}\"" for item in self.ToArgumentList()])}]"
371 def __str__(self) -> str:
372 """
373 Returns the string representation as space-separated list of double-quoted CLI argument strings.
375 Example: :pycode:`"arg1" "arg2"`
377 :returns: Space-separated list of CLI arguments with double-quotes.
378 """
379 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()])
382@export
383class Executable(Program): # (ILogable):
384 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`."""
386 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======"
388 _workingDirectory: Nullable[Path] #: Path to the working directory
389 _environment: Nullable[Environment] #: Environment to use when executing.
390 _process: Nullable[Subprocess_Popen] #: Reference to the running process.
391 _exitCode: Nullable[int] #: The child's process exit code.
392 _killed: Nullable[bool] #: True, if the child-process got killed (e.g. by a timeout).
393 _iterator: Nullable[Iterator] #: Iterator for reading STDOUT.
395 def __init__(
396 self,
397 executablePath: Nullable[Path] = None,
398 binaryDirectoryPath: Nullable[Path] = None,
399 workingDirectory: Nullable[Path] = None,
400 environment: Nullable[Environment] = None,
401 dryRun: bool = False
402 ) -> None:
403 """
404 Initializes an executable instance.
406 :param executablePath: Path to the executable.
407 :param binaryDirectoryPath: Path to the executable's directory.
408 :param workingDirectory: Path to the working directory.
409 :param environment: Optional environment that should be setup when launching the executable.
410 :param dryRun: True, when the program should run in dryrun mode.
411 """
412 super().__init__(executablePath, binaryDirectoryPath, dryRun)
414 self._workingDirectory = None
415 self._environment = environment
416 self._process = None
417 self._exitCode = None
418 self._killed = None
419 self._iterator = None
421 def StartProcess(self, environment: Nullable[Environment] = None) -> None:
422 """
423 Start the executable as a child-process.
425 :param environment: Optional environment that should be setup when launching the executable. |br|
426 If ``None``, the :attr:`_environment` is used.
427 :raises CLIAbstractionException: When an :exc:`OSError` occurs while launching the child-process.
428 """
429 if self._dryRun: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 self.LogDryRun(f"Start process: {self!r}")
431 return
433 if environment is not None: 433 ↛ 435line 433 didn't jump to line 435 because the condition on line 433 was always true
434 envVariables = environment._variables
435 elif self._environment is not None:
436 envVariables = self._environment._variables
437 else:
438 envVariables = None
440 # FIXME: verbose log start process
441 # FIXME: debug log - parameter list
442 try:
443 self._process = Subprocess_Popen(
444 self.ToArgumentList(),
445 stdin=Subprocess_Pipe,
446 stdout=Subprocess_Pipe,
447 stderr=Subprocess_StdOut,
448 cwd=self._workingDirectory,
449 env=envVariables,
450 universal_newlines=True,
451 bufsize=256
452 )
454 except OSError as ex:
455 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex
457 def Send(self, line: str, end: str = "\n") -> None:
458 """
459 Send a string to STDIN of the running child-process.
461 :param line: Line to send.
462 :param end: Line end character.
463 :raises CLIAbstractionException: When any error occurs while sending data to the child-process.
464 """
465 try:
466 self._process.stdin.write(line + end)
467 self._process.stdin.flush()
468 except Exception as ex:
469 raise CLIAbstractionException(f"") from ex # XXX: need error message
471 # This is TCL specific ...
472 # def SendBoundary(self):
473 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY))
475 def GetLineReader(self) -> Generator[str, None, None]:
476 """
477 Return a line-reader for STDOUT.
479 :returns: A generator object to read from STDOUT line-by-line.
480 :raises DryRunException: In case dryrun mode is active.
481 :raises CLIAbstractionException: When any error occurs while reading outputs from the child-process.
482 """
483 if self._dryRun: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 raise DryRunException() # XXX: needs a message
486 try:
487 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved?
488 yield line[:-1]
489 except Exception as ex:
490 raise CLIAbstractionException() from ex # XXX: need error message
491 # finally:
492 # self._process.terminate()
494 def Wait(self, timeout: Nullable[float] = None, kill: bool = False) -> Nullable[int]:
495 """
496 Wait on the child-process with an optional timeout.
498 When the timeout period exceeds, the child-process can be forcefully terminated.
500 :param timeout: Optional, timeout in seconds. |br|
501 Default: infinitely wait on the child-process.
502 :param kill: If true, terminate (kill) the child-process if it didn't terminate by itself within
503 the timeout period.
504 :returns: ``None`` when the child-process is still running, otherwise the exit code.
505 :raises CLIAbstractionException: When the child-process is not started yet.
507 .. topic:: Usecases
509 :pycode:`executable.Wait()`
510 Infinitely wait on the child-process. When the child-process terminates by itself, the exit code is returned.
512 This is a blocking call.
514 :pycode:`executable.Wait(timeout=5.4)`
515 Wait for a specified time on the child-process' termination. If it terminated by itself within the specified
516 timeout period, the exit code is returned; otherwise ``None``.
518 Thus :pycode:`.Wait(timeout=0.0)` returning ``None`` indicates a running process.
520 :pycode:`executable.Wait(timeout=20.0, kill=True)`
521 Wait for a specified time on the child-process' termination. If it terminated by itself within the specified
522 timeout period, the exit code is returned; otherwise the child-process gets killed and it's exit code is returned.
524 :pycode:`executable.Wait(timeout=0.0, kill=True)`
525 Kill immediately.
527 .. seealso::
529 :meth:`Terminate` - Terminate the child-process.
530 """
531 if self._process is None:
532 raise CLIAbstractionException(f"Process not yet started.")
534 try:
535 self._exitCode = self._process.wait(timeout=timeout)
536 except TimeoutExpired:
537 # when timed out, the process isn't terminated/killed automatically
538 if kill:
539 self._killed = True
540 self._process.terminate()
541 # After killing, wait to clean up the "zombie" process
542 self._exitCode = self._process.wait()
544 return self._exitCode
546 def Terminate(self) -> Nullable[int]:
547 """
548 Terminate the child-process.
550 :returns: The child-process' exit code.
551 :raises CLIAbstractionException: When the child-process is not started yet.
553 .. seealso::
555 :meth:`Wait` - Wait on the child-process with an optional timeout.
556 """
557 return self.Wait(timeout=0.0, kill=True)
559 @readonly
560 def ExitCode(self) -> int:
561 """
562 Read-only property accessing the child-process' exit code.
564 :returns: Child-process' exit code or ``None`` if it's still running.
566 .. seealso::
568 * :meth:`Wait` - Wait on the child-process with an optional timeout.
569 * :meth:`Terminate` - Terminate the child-process.
570 """
571 return self._exitCode
573 # This is TCL specific
574 # def ReadUntilBoundary(self, indent=0):
575 # __indent = " " * indent
576 # if self._iterator is None:
577 # self._iterator = iter(self.GetReader())
578 #
579 # for line in self._iterator:
580 # print(__indent + line)
581 # if self._pyIPCMI_BOUNDARY in line:
582 # break
583 # self.LogDebug("Quartus II is ready")
586@export
587class OutputFilteredExecutable(Executable):
588 """Represent a CLI executable derived from :class:`Executable`, whose outputs are filtered."""
589 _hasOutput: bool
590 _hasWarnings: bool
591 _hasErrors: bool
592 _hasFatals: bool
594 def __init__(self, platform: Platform, dryrun: bool, executablePath: Path) -> None: #, environment=None, logger=None) -> None:
595 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger)
597 self._hasOutput = False
598 self._hasWarnings = False
599 self._hasErrors = False
600 self._hasFatals = False
602 @readonly
603 def HasWarnings(self) -> bool:
604 # TODO: update doc-string
605 """True if warnings were found while processing the output stream."""
606 return self._hasWarnings
608 @readonly
609 def HasErrors(self) -> bool:
610 # TODO: update doc-string
611 """True if errors were found while processing the output stream."""
612 return self._hasErrors
614 @readonly
615 def HasFatals(self) -> bool:
616 # TODO: update doc-string
617 """True if fatals were found while processing the output stream."""
618 return self._hasErrors