Coverage for pyTooling/CLIAbstraction/__init__.py: 77%
212 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-25 22:22 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-25 22:22 +0000
1# ==================================================================================================================== #
2# _____ _ _ ____ _ ___ _ _ _ _ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| #
7# |_| |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany #
15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture #
16# #
17# Licensed under the Apache License, Version 2.0 (the "License"); #
18# you may not use this file except in compliance with the License. #
19# You may obtain a copy of the License at #
20# #
21# http://www.apache.org/licenses/LICENSE-2.0 #
22# #
23# Unless required by applicable law or agreed to in writing, software #
24# distributed under the License is distributed on an "AS IS" BASIS, #
25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
26# See the License for the specific language governing permissions and #
27# limitations under the License. #
28# #
29# SPDX-License-Identifier: Apache-2.0 #
30# ==================================================================================================================== #
31#
32"""Basic abstraction layer for executables."""
34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"]
36from os import environ as os_environ
37from pathlib import Path
38from platform import system
39from shutil import which as shutil_which
40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut
41from sys import version_info # needed for versions before Python 3.11
42from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable
44try:
45 from pyTooling.Decorators import export, readonly
46 from pyTooling.MetaClasses import ExtendedType
47 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException
48 from pyTooling.Common import getFullyQualifiedName
49 from pyTooling.Attributes import Attribute
50 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
51 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
52 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag
53except (ImportError, ModuleNotFoundError): # pragma: no cover
54 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!")
56 try:
57 from Decorators import export, readonly
58 from MetaClasses import ExtendedType
59 from Exceptions import ToolingException, PlatformNotSupportedException
60 from Common import getFullyQualifiedName
61 from Attributes import Attribute
62 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument
63 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument
64 from CLIAbstraction.ValuedFlag import ValuedFlag
65 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover
66 print("[pyTooling.CLIAbstraction] Could not import directly!")
67 raise ex
70@export
71class CLIAbstractionException(ToolingException):
72 pass
75@export
76class DryRunException(CLIAbstractionException):
77 """This exception is raised if an executable is launched while in dry-run mode."""
80@export
81class CLIArgument(Attribute):
82 """An attribute to annotate nested classes as an CLI argument."""
85@export
86class Environment(metaclass=ExtendedType, slots=True):
87 _variables: Dict[str, str]
89 def __init__(
90 self,
91 newVariables: Nullable[Mapping[str, str]] = None,
92 addVariables: Nullable[Mapping[str, str]] = None,
93 delVariables: Nullable[Iterable[str]] = None
94 ) -> None:
95 if newVariables is None:
96 newVariables = os_environ
98 self._variables = {name: value for name, value in newVariables.items()}
100 if delVariables is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 for variableName in delVariables:
102 del self._variables[variableName]
104 if addVariables is not None:
105 self._variables.update(addVariables)
107 def __len__(self) -> len:
108 return len(self._variables)
110 def __contains__(self, name: str) -> bool:
111 return name in self._variables
113 def __getitem__(self, name: str) -> str:
114 return self._variables[name]
116 def __setitem__(self, name: str, value: str) -> None:
117 self._variables[name] = value
119 def __delitem__(self, name: str) -> None:
120 del self._variables[name]
123@export
124class Program(metaclass=ExtendedType, slots=True):
125 """Represent a simple command line interface (CLI) executable (program or script)."""
127 _platform: str #: Current platform the executable runs on (Linux, Windows, ...)
128 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names.
129 _executablePath: Path #: The path to the executable (binary, script, ...).
130 _dryRun: bool #: True, if program shall run in *dry-run mode*.
131 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options.
132 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options).
134 def __init_subclass__(cls, *args: Any, **kwargs: Any):
135 """
136 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are
137 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary.
138 """
139 super().__init_subclass__(*args, **kwargs)
141 # register all available CLI options (nested classes marked with attribute 'CLIArgument')
142 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {}
143 order: int = 0
144 for option in CLIArgument.GetClasses(scope=cls):
145 cls.__cliOptions__[option] = order
146 order += 1
148 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None:
149 self._platform = system()
150 self._dryRun = dryRun
152 if executablePath is not None:
153 if isinstance(executablePath, Path):
154 if not executablePath.exists():
155 if dryRun: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
157 else:
158 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
159 else:
160 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.")
161 if version_info >= (3, 11): # pragma: no cover
162 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.")
163 raise ex
164 elif binaryDirectoryPath is not None:
165 if isinstance(binaryDirectoryPath, Path):
166 if not binaryDirectoryPath.exists():
167 if dryRun: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]")
169 else:
170 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath)
172 try:
173 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform]
174 except KeyError:
175 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
177 if not executablePath.exists():
178 if dryRun: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]")
180 else:
181 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath)
182 else:
183 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.")
184 if version_info >= (3, 11): # pragma: no cover
185 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.")
186 raise ex
187 else:
188 try:
189 executablePath = Path(self._executableNames[self._platform])
190 except KeyError:
191 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform)
193 resolvedExecutable = shutil_which(str(executablePath))
194 if dryRun: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 if resolvedExecutable is None:
196 pass
197 # XXX: log executable not found in PATH
198 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]")
199 else:
200 fullExecutablePath = Path(resolvedExecutable)
201 if not fullExecutablePath.exists():
202 pass
203 # XXX: log executable not found
204 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]")
205 else:
206 if resolvedExecutable is None:
207 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath)
209 fullExecutablePath = Path(resolvedExecutable)
210 if not fullExecutablePath.exists(): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath)
213 # TODO: log found executable in PATH
214 # TODO: check if found executable has execute permissions
215 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.")
217 self._executablePath = executablePath
218 self.__cliParameters__ = {}
220 @staticmethod
221 def _NeedsParameterInitialization(key):
222 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument))
224 def __getitem__(self, key):
225 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used."""
226 if not issubclass(key, CommandLineArgument): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
228 if version_info >= (3, 11): # pragma: no cover
229 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
230 raise ex
232 # TODO: is nested check
233 return self.__cliParameters__[key]
235 def __setitem__(self, key, value):
236 if not issubclass(key, CommandLineArgument): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.")
238 if version_info >= (3, 11): # pragma: no cover
239 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.")
240 raise ex
241 elif key not in self.__cliOptions__:
242 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'")
243 elif key in self.__cliParameters__:
244 raise KeyError(f"Option '{key}' is already set to a value.")
246 if self._NeedsParameterInitialization(key):
247 self.__cliParameters__[key] = key(value)
248 else:
249 self.__cliParameters__[key] = key()
251 @readonly
252 def Path(self) -> Path:
253 return self._executablePath
255 def ToArgumentList(self) -> List[str]:
256 result: List[str] = []
258 result.append(str(self._executablePath))
260 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int:
261 return self.__cliOptions__[item[0]]
263 for key, value in sorted(self.__cliParameters__.items(), key=predicate):
264 param = value.AsArgument()
265 if isinstance(param, str):
266 result.append(param)
267 elif isinstance(param, (Tuple, List)): 267 ↛ 270line 267 didn't jump to line 270 because the condition on line 267 was always true
268 result += param
269 else:
270 raise TypeError(f"") # XXX: needs error message
272 return result
274 def __repr__(self):
275 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]"
277 def __str__(self):
278 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()])
281@export
282class Executable(Program): # (ILogable):
283 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`."""
285 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======"
287 _workingDirectory: Nullable[Path]
288 _environment: Nullable[Environment]
289 _process: Nullable[Subprocess_Popen]
290 _iterator: Nullable[Iterator]
292 def __init__(
293 self,
294 executablePath: Path = None,
295 binaryDirectoryPath: Path = None,
296 workingDirectory: Path = None,
297 environment: Nullable[Environment] = None,
298 dryRun: bool = False
299 ):
300 super().__init__(executablePath, binaryDirectoryPath, dryRun)
302 self._workingDirectory = None
303 self._environment = environment
304 self._process = None
305 self._iterator = None
307 def StartProcess(
308 self,
309 environment: Nullable[Environment] = None
310 ):
311 # start child process
313 if self._dryRun: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 self.LogDryRun(f"Start process: {self!r}")
315 return
317 if self._environment is not None: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 envVariables = self._environment._variables
319 elif environment is not None: 319 ↛ 322line 319 didn't jump to line 322 because the condition on line 319 was always true
320 envVariables = environment._variables
321 else:
322 envVariables = None
324 # FIXME: verbose log start process
325 # FIXME: debug log - parameter list
326 try:
327 self._process = Subprocess_Popen(
328 self.ToArgumentList(),
329 stdin=Subprocess_Pipe,
330 stdout=Subprocess_Pipe,
331 stderr=Subprocess_StdOut,
332 cwd=self._workingDirectory,
333 env=envVariables,
334 universal_newlines=True,
335 bufsize=256
336 )
338 except OSError as ex:
339 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex
341 def Send(self, line: str, end: str = "\n") -> None:
342 try:
343 self._process.stdin.write(line + end)
344 self._process.stdin.flush()
345 except Exception as ex:
346 raise CLIAbstractionException(f"") from ex # XXX: need error message
348 # This is TCL specific ...
349 # def SendBoundary(self):
350 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY))
352 def GetLineReader(self) -> Generator[str, None, None]:
353 if self._dryRun: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 raise DryRunException() # XXX: needs a message
356 try:
357 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved?
358 yield line[:-1]
359 except Exception as ex:
360 raise CLIAbstractionException() from ex # XXX: need error message
361 # finally:
362 # self._process.terminate()
364 def Terminate(self):
365 self._process.terminate()
367 @readonly
368 def ExitCode(self) -> int:
369 if self._process is None:
370 raise CLIAbstractionException(f"Process not yet started, thus no exit code.")
372 # TODO: check if process is still running
374 return self._process.returncode
376 # This is TCL specific
377 # def ReadUntilBoundary(self, indent=0):
378 # __indent = " " * indent
379 # if self._iterator is None:
380 # self._iterator = iter(self.GetReader())
381 #
382 # for line in self._iterator:
383 # print(__indent + line)
384 # if self._pyIPCMI_BOUNDARY in line:
385 # break
386 # self.LogDebug("Quartus II is ready")
389@export
390class OutputFilteredExecutable(Executable):
391 _hasOutput: bool
392 _hasWarnings: bool
393 _hasErrors: bool
394 _hasFatals: bool
396 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None:
397 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger)
399 self._hasOutput = False
400 self._hasWarnings = False
401 self._hasErrors = False
402 self._hasFatals = False
404 @readonly
405 def HasWarnings(self):
406 """True if warnings were found while processing the output stream."""
407 return self._hasWarnings
409 @readonly
410 def HasErrors(self):
411 """True if errors were found while processing the output stream."""
412 return self._hasErrors
414 @readonly
415 def HasFatals(self):
416 """True if fatals were found while processing the output stream."""
417 return self._hasErrors