Coverage for pyTooling/CLIAbstraction/__init__.py: 76%

216 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-24 22:21 +0000

1# ==================================================================================================================== # 

2# _____ _ _ ____ _ ___ _ _ _ _ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| # 

7# |_| |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany # 

15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture # 

16# # 

17# Licensed under the Apache License, Version 2.0 (the "License"); # 

18# you may not use this file except in compliance with the License. # 

19# You may obtain a copy of the License at # 

20# # 

21# http://www.apache.org/licenses/LICENSE-2.0 # 

22# # 

23# Unless required by applicable law or agreed to in writing, software # 

24# distributed under the License is distributed on an "AS IS" BASIS, # 

25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

26# See the License for the specific language governing permissions and # 

27# limitations under the License. # 

28# # 

29# SPDX-License-Identifier: Apache-2.0 # 

30# ==================================================================================================================== # 

31# 

32"""Basic abstraction layer for executables.""" 

33 

34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"] 

35 

36from os import environ as os_environ 

37from pathlib import Path 

38from platform import system 

39from shutil import which as shutil_which 

40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut 

41from sys import version_info # needed for versions before Python 3.11 

42from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable 

43 

44try: 

45 from pyTooling.Decorators import export, readonly 

46 from pyTooling.MetaClasses import ExtendedType 

47 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException 

48 from pyTooling.Common import getFullyQualifiedName 

49 from pyTooling.Attributes import Attribute 

50 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

51 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

52 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag 

53except (ImportError, ModuleNotFoundError): # pragma: no cover 

54 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!") 

55 

56 try: 

57 from Decorators import export, readonly 

58 from MetaClasses import ExtendedType 

59 from Exceptions import ToolingException, PlatformNotSupportedException 

60 from Common import getFullyQualifiedName 

61 from Attributes import Attribute 

62 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

63 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

64 from CLIAbstraction.ValuedFlag import ValuedFlag 

65 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover 

66 print("[pyTooling.CLIAbstraction] Could not import directly!") 

67 raise ex 

68 

69 

70@export 

71class CLIAbstractionException(ToolingException): 

72 pass 

73 

74 

75@export 

76class DryRunException(CLIAbstractionException): 

77 """This exception is raised if an executable is launched while in dry-run mode.""" 

78 

79 

80@export 

81class CLIArgument(Attribute): 

82 """An attribute to annotate nested classes as an CLI argument.""" 

83 

84 

85@export 

86class Environment(metaclass=ExtendedType, slots=True): 

87 _variables: Dict[str, str] 

88 

89 def __init__( 

90 self, 

91 newVariables: Nullable[Mapping[str, str]] = None, 

92 addVariables: Nullable[Mapping[str, str]] = None, 

93 delVariables: Nullable[Iterable[str]] = None 

94 ) -> None: 

95 if newVariables is None: 

96 newVariables = os_environ 

97 

98 self._variables = {name: value for name, value in newVariables.items()} 

99 

100 if delVariables is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 for variableName in delVariables: 

102 del self._variables[variableName] 

103 

104 if addVariables is not None: 

105 self._variables.update(addVariables) 

106 

107 def __len__(self) -> len: 

108 return len(self._variables) 

109 

110 def __contains__(self, name: str) -> bool: 

111 return name in self._variables 

112 

113 def __getitem__(self, name: str) -> str: 

114 return self._variables[name] 

115 

116 def __setitem__(self, name: str, value: str) -> None: 

117 self._variables[name] = value 

118 

119 def __delitem__(self, name: str) -> None: 

120 del self._variables[name] 

121 

122 

123@export 

124class Program(metaclass=ExtendedType, slots=True): 

125 """Represent a simple command line interface (CLI) executable (program or script).""" 

126 

127 _platform: str #: Current platform the executable runs on (Linux, Windows, ...) 

128 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names. 

129 _executablePath: Path #: The path to the executable (binary, script, ...). 

130 _dryRun: bool #: True, if program shall run in *dry-run mode*. 

131 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options. 

132 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options). 

133 

134 def __init_subclass__(cls, *args: Any, **kwargs: Any): 

135 """ 

136 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are 

137 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary. 

138 """ 

139 super().__init_subclass__(*args, **kwargs) 

140 

141 # register all available CLI options (nested classes marked with attribute 'CLIArgument') 

142 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {} 

143 order: int = 0 

144 for option in CLIArgument.GetClasses(scope=cls): 

145 cls.__cliOptions__[option] = order 

146 order += 1 

147 

148 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None: 

149 self._platform = system() 

150 self._dryRun = dryRun 

151 

152 if executablePath is not None: 

153 if isinstance(executablePath, Path): 

154 if not executablePath.exists(): 

155 if dryRun: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

157 else: 

158 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

159 else: 

160 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.") 

161 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.") 

162 raise ex 

163 elif binaryDirectoryPath is not None: 

164 if isinstance(binaryDirectoryPath, Path): 

165 if not binaryDirectoryPath.exists(): 

166 if dryRun: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]") 

168 else: 

169 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath) 

170 

171 try: 

172 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform] 

173 except KeyError: 

174 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

175 

176 if not executablePath.exists(): 

177 if dryRun: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

179 else: 

180 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

181 else: 

182 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.") 

183 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.") 

184 raise ex 

185 else: 

186 try: 

187 executablePath = Path(self._executableNames[self._platform]) 

188 except KeyError: 

189 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

190 

191 resolvedExecutable = shutil_which(str(executablePath)) 

192 if dryRun: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 if resolvedExecutable is None: 

194 pass 

195 # XXX: log executable not found in PATH 

196 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]") 

197 else: 

198 fullExecutablePath = Path(resolvedExecutable) 

199 if not fullExecutablePath.exists(): 

200 pass 

201 # XXX: log executable not found 

202 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]") 

203 else: 

204 if resolvedExecutable is None: 

205 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath) 

206 

207 fullExecutablePath = Path(resolvedExecutable) 

208 if not fullExecutablePath.exists(): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath) 

210 

211 # TODO: log found executable in PATH 

212 # TODO: check if found executable has execute permissions 

213 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.") 

214 

215 self._executablePath = executablePath 

216 self.__cliParameters__ = {} 

217 

218 @staticmethod 

219 def _NeedsParameterInitialization(key): 

220 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument)) 

221 

222 def __getitem__(self, key): 

223 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used.""" 

224 if not issubclass(key, CommandLineArgument): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

226 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

227 raise ex 

228 

229 # TODO: is nested check 

230 return self.__cliParameters__[key] 

231 

232 def __setitem__(self, key, value): 

233 if not issubclass(key, CommandLineArgument): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

235 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

236 raise ex 

237 elif key not in self.__cliOptions__: 

238 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'") 

239 elif key in self.__cliParameters__: 

240 raise KeyError(f"Option '{key}' is already set to a value.") 

241 

242 if self._NeedsParameterInitialization(key): 

243 self.__cliParameters__[key] = key(value) 

244 else: 

245 self.__cliParameters__[key] = key() 

246 

247 @readonly 

248 def Path(self) -> Path: 

249 return self._executablePath 

250 

251 def ToArgumentList(self) -> List[str]: 

252 result: List[str] = [] 

253 

254 result.append(str(self._executablePath)) 

255 

256 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int: 

257 return self.__cliOptions__[item[0]] 

258 

259 for key, value in sorted(self.__cliParameters__.items(), key=predicate): 

260 param = value.AsArgument() 

261 if isinstance(param, str): 

262 result.append(param) 

263 elif isinstance(param, (Tuple, List)): 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true

264 result += param 

265 else: 

266 raise TypeError(f"") # XXX: needs error message 

267 

268 return result 

269 

270 def __repr__(self): 

271 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]" 

272 

273 def __str__(self): 

274 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()]) 

275 

276 

277@export 

278class Executable(Program): # (ILogable): 

279 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`.""" 

280 

281 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======" 

282 

283 _workingDirectory: Nullable[Path] 

284 _environment: Nullable[Environment] 

285 _process: Nullable[Subprocess_Popen] 

286 _iterator: Nullable[Iterator] 

287 

288 def __init__( 

289 self, 

290 executablePath: Path = None, 

291 binaryDirectoryPath: Path = None, 

292 workingDirectory: Path = None, 

293 environment: Nullable[Environment] = None, 

294 dryRun: bool = False 

295 ): 

296 super().__init__(executablePath, binaryDirectoryPath, dryRun) 

297 

298 self._workingDirectory = None 

299 self._environment = environment 

300 self._process = None 

301 self._iterator = None 

302 

303 def StartProcess( 

304 self, 

305 environment: Nullable[Environment] = None 

306 ): 

307 # start child process 

308 

309 if self._dryRun: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 self.LogDryRun(f"Start process: {self!r}") 

311 return 

312 

313 if self._environment is not None: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 envVariables = self._environment._variables 

315 elif environment is not None: 315 ↛ 318line 315 didn't jump to line 318 because the condition on line 315 was always true

316 envVariables = environment._variables 

317 else: 

318 envVariables = None 

319 

320 # FIXME: verbose log start process 

321 # FIXME: debug log - parameter list 

322 try: 

323 self._process = Subprocess_Popen( 

324 self.ToArgumentList(), 

325 stdin=Subprocess_Pipe, 

326 stdout=Subprocess_Pipe, 

327 stderr=Subprocess_StdOut, 

328 cwd=self._workingDirectory, 

329 env=envVariables, 

330 universal_newlines=True, 

331 bufsize=256 

332 ) 

333 

334 except OSError as ex: 

335 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex 

336 

337 def Send(self, line: str, end: str = "\n") -> None: 

338 try: 

339 self._process.stdin.write(line + end) 

340 self._process.stdin.flush() 

341 except Exception as ex: 

342 raise CLIAbstractionException(f"") from ex # XXX: need error message 

343 

344 # This is TCL specific ... 

345 # def SendBoundary(self): 

346 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY)) 

347 

348 def GetLineReader(self) -> Generator[str, None, None]: 

349 if self._dryRun: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 raise DryRunException() # XXX: needs a message 

351 

352 try: 

353 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved? 

354 yield line[:-1] 

355 except Exception as ex: 

356 raise CLIAbstractionException() from ex # XXX: need error message 

357 # finally: 

358 # self._process.terminate() 

359 

360 def Terminate(self): 

361 self._process.terminate() 

362 

363 @readonly 

364 def ExitCode(self) -> int: 

365 if self._process is None: 

366 raise CLIAbstractionException(f"Process not yet started, thus no exit code.") 

367 

368 # TODO: check if process is still running 

369 

370 return self._process.returncode 

371 

372 # This is TCL specific 

373 # def ReadUntilBoundary(self, indent=0): 

374 # __indent = " " * indent 

375 # if self._iterator is None: 

376 # self._iterator = iter(self.GetReader()) 

377 # 

378 # for line in self._iterator: 

379 # print(__indent + line) 

380 # if self._pyIPCMI_BOUNDARY in line: 

381 # break 

382 # self.LogDebug("Quartus II is ready") 

383 

384 

385@export 

386class OutputFilteredExecutable(Executable): 

387 _hasOutput: bool 

388 _hasWarnings: bool 

389 _hasErrors: bool 

390 _hasFatals: bool 

391 

392 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None: 

393 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger) 

394 

395 self._hasOutput = False 

396 self._hasWarnings = False 

397 self._hasErrors = False 

398 self._hasFatals = False 

399 

400 @readonly 

401 def HasWarnings(self): 

402 """True if warnings were found while processing the output stream.""" 

403 return self._hasWarnings 

404 

405 @readonly 

406 def HasErrors(self): 

407 """True if errors were found while processing the output stream.""" 

408 return self._hasErrors 

409 

410 @readonly 

411 def HasFatals(self): 

412 """True if fatals were found while processing the output stream.""" 

413 return self._hasErrors