Coverage for pyTooling / CLIAbstraction / __init__.py: 76%
215 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-21 22:22 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-21 22:22 +0000
1# ==================================================================================================================== #
2# _____ _ _ ____ _ ___ _ _ _ _ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| #
7# |_| |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany #
15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture #
16# #
17# Licensed under the Apache License, Version 2.0 (the "License"); #
18# you may not use this file except in compliance with the License. #
19# You may obtain a copy of the License at #
20# #
21# http://www.apache.org/licenses/LICENSE-2.0 #
22# #
23# Unless required by applicable law or agreed to in writing, software #
24# distributed under the License is distributed on an "AS IS" BASIS, #
25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
26# See the License for the specific language governing permissions and #
27# limitations under the License. #
28# #
29# SPDX-License-Identifier: Apache-2.0 #
30# ==================================================================================================================== #
31#
32"""Basic abstraction layer for executables."""
34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"]
36from os import environ as os_environ
37from pathlib import Path
38from platform import system
39from shutil import which as shutil_which
40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut
41from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable
43try:
44 from pyTooling.Decorators import export, readonly
45 from pyTooling.MetaClasses import ExtendedType
46 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException
47 from pyTooling.Common import getFullyQualifiedName
48 from pyTooling.Attributes import Attribute
49 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
50 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
51 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag
52except (ImportError, ModuleNotFoundError): # pragma: no cover
53 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!")
55 try:
56 from Decorators import export, readonly
57 from MetaClasses import ExtendedType
58 from Exceptions import ToolingException, PlatformNotSupportedException
59 from Common import getFullyQualifiedName
60 from Attributes import Attribute
61 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
62 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
63 from CLIAbstraction.ValuedFlag import ValuedFlag
64 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover
65 print("[pyTooling.CLIAbstraction] Could not import directly!")
66 raise ex
69@export
70class CLIAbstractionException(ToolingException):
71 pass
74@export
75class DryRunException(CLIAbstractionException):
76 """This exception is raised if an executable is launched while in dry-run mode."""
79@export
80class CLIArgument(Attribute):
81 """An attribute to annotate nested classes as an CLI argument."""
84@export
85class Environment(metaclass=ExtendedType, slots=True):
86 _variables: Dict[str, str]
88 def __init__(
89 self,
90 newVariables: Nullable[Mapping[str, str]] = None,
91 addVariables: Nullable[Mapping[str, str]] = None,
92 delVariables: Nullable[Iterable[str]] = None
93 ) -> None:
94 if newVariables is None:
95 newVariables = os_environ
97 self._variables = {name: value for name, value in newVariables.items()}
99 if delVariables is not None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 for variableName in delVariables:
101 del self._variables[variableName]
103 if addVariables is not None:
104 self._variables.update(addVariables)
106 def __len__(self) -> len:
107 return len(self._variables)
109 def __contains__(self, name: str) -> bool:
110 return name in self._variables
112 def __getitem__(self, name: str) -> str:
113 return self._variables[name]
115 def __setitem__(self, name: str, value: str) -> None:
116 self._variables[name] = value
118 def __delitem__(self, name: str) -> None:
119 del self._variables[name]
122@export
123class Program(metaclass=ExtendedType, slots=True):
124 """Represent a simple command line interface (CLI) executable (program or script)."""
126 _platform: str #: Current platform the executable runs on (Linux, Windows, ...)
127 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names.
128 _executablePath: Path #: The path to the executable (binary, script, ...).
129 _dryRun: bool #: True, if program shall run in *dry-run mode*.
130 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options.
131 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options).
133 def __init_subclass__(cls, *args: Any, **kwargs: Any):
134 """
135 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are
136 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary.
137 """
138 super().__init_subclass__(*args, **kwargs)
140 # register all available CLI options (nested classes marked with attribute 'CLIArgument')
141 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {}
142 order: int = 0
143 for option in CLIArgument.GetClasses(scope=cls):
144 cls.__cliOptions__[option] = order
145 order += 1
147 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None:
148 self._platform = system()
149 self._dryRun = dryRun
151 if executablePath is not None:
152 if isinstance(executablePath, Path):
153 if not executablePath.exists():
154 if dryRun: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
156 else:
157 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
158 else:
159 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.")
160 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.")
161 raise ex
162 elif binaryDirectoryPath is not None:
163 if isinstance(binaryDirectoryPath, Path):
164 if not binaryDirectoryPath.exists():
165 if dryRun: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]")
167 else:
168 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath)
170 try:
171 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform]
172 except KeyError:
173 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
175 if not executablePath.exists():
176 if dryRun: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
178 else:
179 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
180 else:
181 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.")
182 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.")
183 raise ex
184 else:
185 try:
186 executablePath = Path(self._executableNames[self._platform])
187 except KeyError:
188 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
190 resolvedExecutable = shutil_which(str(executablePath))
191 if dryRun: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 if resolvedExecutable is None:
193 pass
194 # XXX: log executable not found in PATH
195 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]")
196 else:
197 fullExecutablePath = Path(resolvedExecutable)
198 if not fullExecutablePath.exists():
199 pass
200 # XXX: log executable not found
201 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]")
202 else:
203 if resolvedExecutable is None:
204 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath)
206 fullExecutablePath = Path(resolvedExecutable)
207 if not fullExecutablePath.exists(): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath)
210 # TODO: log found executable in PATH
211 # TODO: check if found executable has execute permissions
212 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.")
214 self._executablePath = executablePath
215 self.__cliParameters__ = {}
217 @staticmethod
218 def _NeedsParameterInitialization(key):
219 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument))
221 def __getitem__(self, key):
222 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used."""
223 if not issubclass(key, CommandLineArgument): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
225 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
226 raise ex
228 # TODO: is nested check
229 return self.__cliParameters__[key]
231 def __setitem__(self, key, value):
232 if not issubclass(key, CommandLineArgument): 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
234 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
235 raise ex
236 elif key not in self.__cliOptions__:
237 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'")
238 elif key in self.__cliParameters__:
239 raise KeyError(f"Option '{key}' is already set to a value.")
241 if self._NeedsParameterInitialization(key):
242 self.__cliParameters__[key] = key(value)
243 else:
244 self.__cliParameters__[key] = key()
246 @readonly
247 def Path(self) -> Path:
248 return self._executablePath
250 def ToArgumentList(self) -> List[str]:
251 result: List[str] = []
253 result.append(str(self._executablePath))
255 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int:
256 return self.__cliOptions__[item[0]]
258 for key, value in sorted(self.__cliParameters__.items(), key=predicate):
259 param = value.AsArgument()
260 if isinstance(param, str):
261 result.append(param)
262 elif isinstance(param, (Tuple, List)): 262 ↛ 265line 262 didn't jump to line 265 because the condition on line 262 was always true
263 result += param
264 else:
265 raise TypeError(f"") # XXX: needs error message
267 return result
269 def __repr__(self):
270 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]"
272 def __str__(self):
273 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()])
276@export
277class Executable(Program): # (ILogable):
278 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`."""
280 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======"
282 _workingDirectory: Nullable[Path]
283 _environment: Nullable[Environment]
284 _process: Nullable[Subprocess_Popen]
285 _iterator: Nullable[Iterator]
287 def __init__(
288 self,
289 executablePath: Path = None,
290 binaryDirectoryPath: Path = None,
291 workingDirectory: Path = None,
292 environment: Nullable[Environment] = None,
293 dryRun: bool = False
294 ):
295 super().__init__(executablePath, binaryDirectoryPath, dryRun)
297 self._workingDirectory = None
298 self._environment = environment
299 self._process = None
300 self._iterator = None
302 def StartProcess(
303 self,
304 environment: Nullable[Environment] = None
305 ):
306 # start child process
308 if self._dryRun: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 self.LogDryRun(f"Start process: {self!r}")
310 return
312 if self._environment is not None: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 envVariables = self._environment._variables
314 elif environment is not None: 314 ↛ 317line 314 didn't jump to line 317 because the condition on line 314 was always true
315 envVariables = environment._variables
316 else:
317 envVariables = None
319 # FIXME: verbose log start process
320 # FIXME: debug log - parameter list
321 try:
322 self._process = Subprocess_Popen(
323 self.ToArgumentList(),
324 stdin=Subprocess_Pipe,
325 stdout=Subprocess_Pipe,
326 stderr=Subprocess_StdOut,
327 cwd=self._workingDirectory,
328 env=envVariables,
329 universal_newlines=True,
330 bufsize=256
331 )
333 except OSError as ex:
334 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex
336 def Send(self, line: str, end: str = "\n") -> None:
337 try:
338 self._process.stdin.write(line + end)
339 self._process.stdin.flush()
340 except Exception as ex:
341 raise CLIAbstractionException(f"") from ex # XXX: need error message
343 # This is TCL specific ...
344 # def SendBoundary(self):
345 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY))
347 def GetLineReader(self) -> Generator[str, None, None]:
348 if self._dryRun: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 raise DryRunException() # XXX: needs a message
351 try:
352 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved?
353 yield line[:-1]
354 except Exception as ex:
355 raise CLIAbstractionException() from ex # XXX: need error message
356 # finally:
357 # self._process.terminate()
359 def Terminate(self):
360 self._process.terminate()
362 @readonly
363 def ExitCode(self) -> int:
364 if self._process is None:
365 raise CLIAbstractionException(f"Process not yet started, thus no exit code.")
367 # TODO: check if process is still running
369 return self._process.returncode
371 # This is TCL specific
372 # def ReadUntilBoundary(self, indent=0):
373 # __indent = " " * indent
374 # if self._iterator is None:
375 # self._iterator = iter(self.GetReader())
376 #
377 # for line in self._iterator:
378 # print(__indent + line)
379 # if self._pyIPCMI_BOUNDARY in line:
380 # break
381 # self.LogDebug("Quartus II is ready")
384@export
385class OutputFilteredExecutable(Executable):
386 _hasOutput: bool
387 _hasWarnings: bool
388 _hasErrors: bool
389 _hasFatals: bool
391 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None:
392 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger)
394 self._hasOutput = False
395 self._hasWarnings = False
396 self._hasErrors = False
397 self._hasFatals = False
399 @readonly
400 def HasWarnings(self):
401 """True if warnings were found while processing the output stream."""
402 return self._hasWarnings
404 @readonly
405 def HasErrors(self):
406 """True if errors were found while processing the output stream."""
407 return self._hasErrors
409 @readonly
410 def HasFatals(self):
411 """True if fatals were found while processing the output stream."""
412 return self._hasErrors