Coverage for pyTooling / Filesystem / __init__.py: 50%

602 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 22:41 +0000

1# ==================================================================================================================== # 

2# _____ _ _ _____ _ _ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ | ___(_) | ___ ___ _ _ ___| |_ ___ _ __ ___ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` | | |_ | | |/ _ \/ __| | | / __| __/ _ \ '_ ` _ \ # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| |_| _| | | | __/\__ \ |_| \__ \ || __/ | | | | | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)_| |_|_|\___||___/\__, |___/\__\___|_| |_| |_| # 

7# |_| |___/ |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2025-2026 Patrick Lehmann - Bötzingen, Germany # 

15# # 

16# Licensed under the Apache License, Version 2.0 (the "License"); # 

17# you may not use this file except in compliance with the License. # 

18# You may obtain a copy of the License at # 

19# # 

20# http://www.apache.org/licenses/LICENSE-2.0 # 

21# # 

22# Unless required by applicable law or agreed to in writing, software # 

23# distributed under the License is distributed on an "AS IS" BASIS, # 

24# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

25# See the License for the specific language governing permissions and # 

26# limitations under the License. # 

27# # 

28# SPDX-License-Identifier: Apache-2.0 # 

29# ==================================================================================================================== # 

30# 

31""" 

32An object-oriented file system abstraction for directory, file, symbolic link, ... statistics collection. 

33 

34.. important:: 

35 

36 This isn't a replacement of :mod:`pathlib` introduced with Python 3.4. 

37""" 

38from os import scandir, readlink 

39 

40from enum import Enum 

41from itertools import chain 

42from pathlib import Path 

43from typing import Optional as Nullable, Dict, Generic, Generator, TypeVar, List, Any, Callable, Union, Iterator, Set 

44 

45from pyTooling.Decorators import readonly, export 

46from pyTooling.Exceptions import ToolingException 

47from pyTooling.MetaClasses import ExtendedType 

48from pyTooling.Common import getFullyQualifiedName, zipdicts 

49from pyTooling.Warning import WarningCollector, Warning 

50from pyTooling.Stopwatch import Stopwatch 

51from pyTooling.Tree import Node 

52 

53 

54__all__ = ["_ParentType"] 

55 

56 

57_ParentType = TypeVar("_ParentType", bound="Element") 

58"""The type variable for a parent reference.""" 

59 

60 

61@export 

62class FilesystemException(ToolingException): 

63 """Base-exception of all exceptions raised by :mod:`pyTooling.Filesystem`.""" 

64 

65 

66@export 

67class PermissionWarning(Warning): 

68 _path: Path 

69 

70 def __init__(self, path: Path, *args) -> None: 

71 super().__init__(*args) 

72 self._path = path 

73 

74 @readonly 

75 def Path(self) -> Path: 

76 return self._path 

77 

78 

79@export 

80class NodeKind(Enum): 

81 """ 

82 Node kind for filesystem elements in a :ref:`tree <STRUCT/Tree>`. 

83 

84 This enumeration is used when converting the filesystem statistics tree to an instance of :mod:`pyTooling.Tree`. 

85 """ 

86 Directory = 0 #: Node represents a directory. 

87 File = 1 #: Node represents a regular file. 

88 SymbolicLink = 2 #: Node represents a symbolic link. 

89 

90 

91@export 

92class Base(metaclass=ExtendedType, slots=True): 

93 """ 

94 Base-class for all filesystem elements in :mod:`pyTooling.Filesystem`. 

95 

96 It implements a size and a reference to the root element of the filesystem. 

97 """ 

98 _root: Nullable["Root"] #: Reference to the root of the filesystem statistics scope. 

99 _size: Nullable[int] #: Actual or aggregated size of the filesystem element. 

100 

101 def __init__( 

102 self, 

103 size: Nullable[int], 

104 root: Nullable["Root"] 

105 ) -> None: 

106 """ 

107 Initialize the base-class with filesystem element size and root reference. 

108 

109 :param size: Optional size of the element. 

110 :param root: Optional reference to the filesystem root element. 

111 """ 

112 if size is not None and not isinstance(size, int): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 ex = TypeError("Parameter 'size' is not of type 'int'.") 

114 ex.add_note(f"Got type '{getFullyQualifiedName(size)}'.") 

115 raise ex 

116 

117 if root is not None and not isinstance(root, Root): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 ex = TypeError("Parameter 'root' is not of type 'Root'.") 

119 ex.add_note(f"Got type '{getFullyQualifiedName(root)}'.") 

120 raise ex 

121 

122 self._size = size 

123 self._root = root 

124 

125 @property 

126 def Root(self) -> Nullable["Root"]: 

127 """ 

128 Property to access the root of the filesystem statistics scope. 

129 

130 :returns: Root of the filesystem statistics scope. 

131 """ 

132 return self._root 

133 

134 @Root.setter 

135 def Root(self, value: "Root") -> None: 

136 if value is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise ValueError(f"Parameter 'value' is None.") 

138 elif not isinstance(value, Root): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 ex = TypeError("Parameter 'value' is not of type 'Root'.") 

140 ex.add_note(f"Got type '{getFullyQualifiedName(value)}'.") 

141 raise ex 

142 

143 self._root = value 

144 

145 @readonly 

146 def Size(self) -> int: 

147 """ 

148 Read-only property to access the element's size in Bytes. 

149 

150 :returns: Size in Bytes. 

151 :raises FilesystemException: If size is not computed, yet. 

152 """ 

153 if self._size is None: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 raise FilesystemException("Size is not computed, yet.") 

155 

156 return self._size 

157 

158 # FIXME: @abstractmethod 

159 def ToTree(self) -> Node: 

160 """ 

161 Convert a filesystem element to a node in :mod:`pyTooling.Tree`. 

162 

163 The node's :attr:`~pyTooling.Tree.Node.Value` field contains a reference to the filesystem element. Additional data 

164 will be stored in the node's key-value store. 

165 

166 :returns: A tree's node referencing this filesystem element. 

167 """ 

168 raise NotImplementedError() 

169 

170 

171@export 

172class Element(Base, Generic[_ParentType]): 

173 """ 

174 Base-class for all named elements within a filesystem. 

175 

176 It adds a name, parent reference and list of symbolic-link sources. 

177 

178 .. hint:: 

179 

180 Symbolic link sources are reverse references describing which symbolic links point to this element. 

181 """ 

182 _name: str #: Name of the filesystem element. 

183 _parent: _ParentType #: Reference to the filesystem element's parent (:class:`Directory`) 

184 _linkSources: List["SymbolicLink"] #: A list of symbolic links pointing to this filesystem element. 

185 

186 def __init__( 

187 self, 

188 name: str, 

189 size: Nullable[int] = None, 

190 parent: Nullable[_ParentType] = None 

191 ) -> None: 

192 """ 

193 Initialize the element base-class with name, size and parent reference. 

194 

195 :param name: Name of the element. 

196 :param size: Optional size of the element. 

197 :param parent: Optional parent reference. 

198 """ 

199 if name is None: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise ValueError(f"Parameter 'name' is None.") 

201 elif not isinstance(name, str): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 ex = TypeError("Parameter 'name' is not of type 'str'.") 

203 ex.add_note(f"Got type '{getFullyQualifiedName(name)}'.") 

204 raise ex 

205 

206 self._name = name 

207 

208 if parent is None: 

209 super().__init__(size, None) 

210 self._parent = None 

211 else: 

212 if not isinstance(parent, Directory): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 ex = TypeError("Parameter 'parent' is not of type 'Directory'.") 

214 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

215 raise ex 

216 

217 super().__init__(size, parent._root) 

218 self._parent = parent 

219 

220 self._linkSources = [] 

221 

222 @property 

223 def Parent(self) -> _ParentType: 

224 """ 

225 Property to access the element's parent. 

226 

227 :returns: Parent element. 

228 """ 

229 return self._parent 

230 

231 @Parent.setter 

232 def Parent(self, value: _ParentType) -> None: 

233 if value is None: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 raise ValueError(f"Parameter 'value' is None.") 

235 elif not isinstance(value, Directory): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 ex = TypeError("Parameter 'value' is not of type 'Directory'.") 

237 ex.add_note(f"Got type '{getFullyQualifiedName(value)}'.") 

238 raise ex 

239 

240 self._parent = value 

241 

242 if value._root is not None: 

243 self._root = value._root 

244 

245 @readonly 

246 def Name(self) -> str: 

247 """ 

248 Read-only property to access the element's name. 

249 

250 :returns: Element name. 

251 """ 

252 return self._name 

253 

254 @readonly 

255 def Path(self) -> Path: 

256 raise NotImplemented(f"Property 'Path' is abstract.") 

257 

258 @readonly 

259 def LinkSources(self) -> List["SymbolicLink"]: 

260 return self._linkSources 

261 

262 def AddLinkSources(self, source: "SymbolicLink") -> None: 

263 """ 

264 Add a link source of a symbolic link to the named element (reverse reference). 

265 

266 :param source: The referenced symbolic link. 

267 """ 

268 if not isinstance(source, SymbolicLink): 

269 ex = TypeError("Parameter 'source' is not of type 'SymbolicLink'.") 

270 ex.add_note(f"Got type '{getFullyQualifiedName(source)}'.") 

271 raise ex 

272 

273 source._isConnected = True 

274 source._isBroken = False 

275 source._isOutOfRange = False 

276 self._linkSources.append(source) 

277 

278 

279@export 

280class Directory(Element["Directory"]): 

281 """ 

282 A **directory** represents a directory in the filesystem, which contains subdirectories, regular files and symbolic links. 

283 

284 While scanning for subelements, the directory is populated with elements. Every file object added, gets registered in 

285 the filesystems :class:`Root` for deduplication. In case a file identifier already exists, the found filename will 

286 reference the same file objects. In turn, the file objects has then references to multiple filenames (parents). This 

287 allows to detect :term:`hardlinks <hardlink>`. 

288 

289 The time needed for scanning the directory and its subelements is provided via :data:`ScanDuration`. 

290 

291 After scnaning the directory for subelements, certain directory properties get aggregated. The time needed for 

292 aggregation is provided via :data:`AggregateDuration`. 

293 """ 

294 

295 _path: Nullable[Path] #: Cached :class:`~pathlib.Path` object of this directory. 

296 _subdirectories: Dict[str, "Directory"] #: Dictionary containing name-:class:`Directory` pairs. 

297 _files: Dict[str, "Filename"] #: Dictionary containing name-:class:`Filename` pairs. 

298 _symbolicLinks: Dict[str, "SymbolicLink"] #: Dictionary containing name-:class:`SymbolicLink` pairs. 

299 _filesSize: int #: Aggregated size of all direct files. 

300 _collapsed: bool #: True, if this directory was collapsed. It contains no subelements. 

301 _scanDuration: Nullable[float] #: Duration for scanning the directory and all its subelements. 

302 _aggregateDuration: Nullable[float] #: Duration for aggregating all subelements. 

303 

304 def __init__( 

305 self, 

306 name: str, 

307 collectSubdirectories: bool = False, 

308 parent: Nullable["Directory"] = None 

309 ) -> None: 

310 """ 

311 Initialize the directory with name and parent reference. 

312 

313 :param name: Name of the element. 

314 :param collectSubdirectories: If true, collect subdirectory statistics. 

315 :param parent: Optional parent reference. 

316 """ 

317 super().__init__(name, None, parent) 

318 

319 self._path = None 

320 self._subdirectories = {} 

321 self._files = {} 

322 self._symbolicLinks = {} 

323 self._filesSize = 0 

324 self._collapsed = False 

325 self._scanDuration = None 

326 self._aggregateDuration = None 

327 

328 if parent is not None: 

329 parent._subdirectories[name] = self 

330 

331 if parent._root is not None: 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was always true

332 self._root = parent._root 

333 

334 if collectSubdirectories: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 self.CollectSubdirectories() 

336 

337 def CollectSubdirectories(self) -> None: 

338 """ 

339 Helper method for scanning subdirectories and aggregating found element sizes therein. 

340 """ 

341 self.ScanSubdirectories() 

342 self.AggregateSizes() 

343 

344 def ScanSubdirectories(self) -> None: 

345 """ 

346 Helper method for scanning subdirectories (recursively) and building a 

347 :class:`Directory`-:class:`Filename`-:class:`File` object tree. 

348 

349 If a file refers to the same filesystem internal unique ID, a hardlink (two or more filenames) to the same file 

350 storage object is assumed. 

351 """ 

352 with Stopwatch() as sw1: 

353 try: 

354 items = scandir(directoryPath := self.Path) 

355 except PermissionError as ex: 

356 return WarningCollector.Raise(PermissionWarning(self.Path), ex) 

357 

358 for dirEntry in items: 

359 if dirEntry.is_dir(follow_symlinks=False): 

360 _ = Directory(dirEntry.name, collectSubdirectories=True, parent=self) 

361 elif dirEntry.is_file(follow_symlinks=False): 

362 id = dirEntry.inode() 

363 if id in self._root._ids: 

364 file = self._root._ids[id] 

365 

366 _ = Filename(dirEntry.name, file=file, parent=self) 

367 else: 

368 s = dirEntry.stat(follow_symlinks=False) 

369 filename = Filename(dirEntry.name, parent=self) 

370 file = File(id, s.st_size, parent=filename) 

371 

372 self._root._ids[id] = file 

373 elif dirEntry.is_symlink(): 

374 target = Path(readlink(directoryPath / dirEntry.name)) 

375 _ = SymbolicLink(dirEntry.name, target, parent=self) 

376 else: 

377 raise FilesystemException(f"Unknown directory element.") 

378 

379 self._scanDuration = sw1.Duration 

380 

381 def ResolveSymbolicLinks(self) -> None: 

382 for dir in self._subdirectories.values(): 

383 dir.ResolveSymbolicLinks() 

384 

385 for link in self._symbolicLinks.values(): 385 ↛ 386line 385 didn't jump to line 386 because the loop on line 385 never started

386 if link._target.is_absolute(): 

387 # todo: resolve path and check if target is in range, otherwise add to out-of-range list 

388 pass 

389 else: 

390 target = self 

391 for elem in link._target.parts: 

392 if elem == ".": 

393 continue 

394 elif elem == "..": 

395 if (target := target._parent) is None: 

396 self._root.RegisterUnconnectedSymbolicLink(link) 

397 break 

398 

399 continue 

400 

401 try: 

402 target = target._subdirectories[elem] 

403 continue 

404 except KeyError: 

405 pass 

406 

407 try: 

408 target = target._files[elem] 

409 continue 

410 except KeyError: 

411 pass 

412 

413 try: 

414 target = target._symbolicLinks[elem] 

415 continue 

416 except KeyError: 

417 self._root.RegisterBrokenSymbolicLink(link) 

418 break 

419 else: 

420 target.AddLinkSources(link) 

421 

422 def AggregateSizes(self) -> Set["File"]: 

423 with Stopwatch() as sw2: 

424 aggregatedFiles = set() 

425 

426 self._size = 0 

427 self._filesSize = 0 

428 for dir in self._subdirectories.values(): 

429 aggregatedFiles |= dir.AggregateSizes() 

430 self._size += dir._size 

431 

432 for filename in self._files.values(): 

433 if (file := filename._file) not in aggregatedFiles: 433 ↛ 432line 433 didn't jump to line 432 because the condition on line 433 was always true

434 self._filesSize += file._size 

435 aggregatedFiles.add(file) 

436 

437 self._size += self._filesSize 

438 

439 self._aggregateDuration = sw2.Duration 

440 

441 return aggregatedFiles 

442 

443 @Element.Root.setter 

444 def Root(self, value: "Root") -> None: 

445 Element.Root.fset(self, value) 

446 

447 for subdir in self._subdirectories.values(): 447 ↛ 448line 447 didn't jump to line 448 because the loop on line 447 never started

448 subdir.Root = value 

449 

450 for file in self._files.values(): 

451 file.Root = value 

452 

453 for link in self._symbolicLinks.values(): 453 ↛ 454line 453 didn't jump to line 454 because the loop on line 453 never started

454 link.Root = value 

455 

456 @Element.Parent.setter 

457 def Parent(self, value: _ParentType) -> None: 

458 Element.Parent.fset(self, value) 

459 

460 value._subdirectories[self._name] = self 

461 

462 if isinstance(value, Root): 462 ↛ exitline 462 didn't return from function 'Parent' because the condition on line 462 was always true

463 self.Root = value 

464 

465 @readonly 

466 def Count(self) -> int: 

467 """ 

468 Read-only property to access the number of elements in a directory. 

469 

470 :returns: Number of files plus subdirectories. 

471 """ 

472 return len(self._subdirectories) + len(self._files) + len(self._symbolicLinks) 

473 

474 @readonly 

475 def FileCount(self) -> int: 

476 """ 

477 Read-only property to access the number of files in a directory. 

478 

479 .. hint:: 

480 

481 Files include regular files and symbolic links. 

482 

483 :returns: Number of files. 

484 """ 

485 return len(self._files) + len(self._symbolicLinks) 

486 

487 @readonly 

488 def RegularFileCount(self) -> int: 

489 """ 

490 Read-only property to access the number of regular files in a directory. 

491 

492 :returns: Number of regular files. 

493 """ 

494 return len(self._files) 

495 

496 @readonly 

497 def SymbolicLinkCount(self) -> int: 

498 """ 

499 Read-only property to access the number of symbolic links in a directory. 

500 

501 :returns: Number of symbolic links. 

502 """ 

503 return len(self._symbolicLinks) 

504 

505 @readonly 

506 def SubdirectoryCount(self) -> int: 

507 """ 

508 Read-only property to access the number of subdirectories in a directory. 

509 

510 :returns: Number of subdirectories. 

511 """ 

512 return len(self._subdirectories) 

513 

514 @readonly 

515 def TotalFileCount(self) -> int: 

516 """ 

517 Read-only property to access the total number of files in all child hierarchy levels (recursively). 

518 

519 .. hint:: 

520 

521 Files include regular files and symbolic links. 

522 

523 :returns: Total number of files. 

524 """ 

525 return sum(d.TotalFileCount for d in self._subdirectories.values()) + len(self._files) + len(self._symbolicLinks) 

526 

527 @readonly 

528 def TotalRegularFileCount(self) -> int: 

529 """ 

530 Read-only property to access the total number of regular files in all child hierarchy levels (recursively). 

531 

532 :returns: Total number of regular files. 

533 """ 

534 return sum(d.TotalRegularFileCount for d in self._subdirectories.values()) + len(self._files) 

535 

536 @readonly 

537 def TotalSymbolicLinkCount(self) -> int: 

538 """ 

539 Read-only property to access the total number of symbolic links in all child hierarchy levels (recursively). 

540 

541 :returns: Total number of symbolic links. 

542 """ 

543 return sum(d.TotalSymbolicLinkCount for d in self._subdirectories.values()) + len(self._symbolicLinks) 

544 

545 @readonly 

546 def TotalSubdirectoryCount(self) -> int: 

547 """ 

548 Read-only property to access the total number of subdirectories in all child hierarchy levels (recursively). 

549 

550 :returns: Total number of subdirectories. 

551 """ 

552 return len(self._subdirectories) + sum(d.TotalSubdirectoryCount for d in self._subdirectories.values()) 

553 

554 @readonly 

555 def Subdirectories(self) -> Generator["Directory", None, None]: 

556 """ 

557 Iterate all direct subdirectories of the directory. 

558 

559 :returns: A generator to iterate all direct subdirectories. 

560 """ 

561 return (d for d in self._subdirectories.values()) 

562 

563 @readonly 

564 def Files(self) -> Generator[Union["Filename", "SymbolicLink"], None, None]: 

565 """ 

566 Iterate all direct files of the directory. 

567 

568 .. hint:: 

569 

570 Files include regular files and symbolic links. 

571 

572 :returns: A generator to iterate all direct files. 

573 """ 

574 return (f for f in chain(self._files.values(), self._symbolicLinks.values())) 

575 

576 @readonly 

577 def RegularFiles(self) -> Generator["Filename", None, None]: 

578 """ 

579 Iterate all direct regular files of the directory. 

580 

581 :returns: A generator to iterate all direct regular files. 

582 """ 

583 return (f for f in self._files.values()) 

584 

585 @readonly 

586 def SymbolicLinks(self) -> Generator["SymbolicLink", None, None]: 

587 """ 

588 Iterate all direct symbolic links of the directory. 

589 

590 :returns: A generator to iterate all direct symbolic links. 

591 """ 

592 return (l for l in self._symbolicLinks.values()) 

593 

594 @readonly 

595 def Path(self) -> Path: 

596 """ 

597 Read-only property to access the equivalent Path instance for accessing the represented directory. 

598 

599 :returns: Path to the directory. 

600 :raises FilesystemException: If no parent is set. 

601 """ 

602 if self._path is not None: 

603 return self._path 

604 

605 if self._parent is None: 

606 raise FilesystemException(f"No parent or root set for directory.") 

607 

608 self._path = self._parent.Path / self._name 

609 return self._path 

610 

611 @readonly 

612 def ScanDuration(self) -> float: 

613 """ 

614 Read-only property to access the time needed to scan a directory structure including all subelements (recursively). 

615 

616 :returns: The scan duration in seconds. 

617 :raises FilesystemException: If the directory was not scanned. 

618 """ 

619 if self._scanDuration is None: 

620 raise FilesystemException(f"Directory was not scanned, yet.") 

621 

622 return self._scanDuration 

623 

624 @readonly 

625 def AggregateDuration(self) -> float: 

626 """ 

627 Read-only property to access the time needed to aggregate the directory's and subelement's properties (recursively). 

628 

629 :returns: The aggregation duration in seconds. 

630 :raises FilesystemException: If the directory properties were not aggregated. 

631 """ 

632 if self._scanDuration is None: 

633 raise FilesystemException(f"Directory properties were not aggregated, yet.") 

634 

635 return self._aggregateDuration 

636 

637 def __hash__(self) -> int: 

638 return hash(id(self)) 

639 

640 def IterateDirectories(self) -> Generator["Directory", None, None]: 

641 # pre-order 

642 for directory in self._subdirectories.values(): 

643 yield directory 

644 yield from directory.IterateDirectories() 

645 

646 def IterateFiles(self) -> Generator[Element, None, None]: 

647 # post-order 

648 for directory in self._subdirectories.values(): 

649 yield from directory.IterateFiles() 

650 

651 yield from self._files.values() 

652 yield from self._symbolicLinks.values() 

653 

654 def Copy(self, parent: Nullable["Directory"] = None) -> "Directory": 

655 """ 

656 Copy the directory structure including all subelements and link it to the given parent. 

657 

658 .. hint:: 

659 

660 Statistics like aggregated directory size are copied too. |br| 

661 There is no rescan or repeated aggregation needed. 

662 

663 :param parent: The parent element of the copied directory. 

664 :returns: A deep copy of the directory structure. 

665 """ 

666 dir = Directory(self._name, parent=parent) 

667 dir._size = self._size 

668 

669 for subdir in self._subdirectories.values(): 

670 subdir.Copy(dir) 

671 

672 for file in self._files.values(): 

673 file.Copy(dir) 

674 

675 for link in self._symbolicLinks.values(): 

676 link.Copy(dir) 

677 

678 return dir 

679 

680 def Collapse(self, func: Callable[["Directory"], bool]) -> bool: 

681 # if len(self._subdirectories) == 0 or all(subdir.Collapse(func) for subdir in self._subdirectories.values()): 

682 if len(self._subdirectories) == 0: 

683 if func(self): 

684 # print(f"collapse 1 {self.Path}") 

685 self._collapsed = True 

686 self._subdirectories.clear() 

687 self._files.clear() 

688 self._symbolicLinks.clear() 

689 

690 return True 

691 else: 

692 return False 

693 

694 # if all(subdir.Collapse(func) for subdir in self._subdirectories.values()) 

695 collapsible = True 

696 for subdir in self._subdirectories.values(): 

697 result = subdir.Collapse(func) 

698 collapsible = collapsible and result 

699 

700 if collapsible: 

701 # print(f"collapse 2 {self.Path}") 

702 self._collapsed = True 

703 self._subdirectories.clear() 

704 self._files.clear() 

705 self._symbolicLinks.clear() 

706 

707 return True 

708 else: 

709 return False 

710 

711 def ToTree(self, format: Nullable[Callable[[Node], str]] = None) -> Node: 

712 """ 

713 Convert the directory to a :class:`~pyTooling.Tree.Node`. 

714 

715 The node's :attr:`~pyTooling.Tree.Node.Value` field contains a reference to the directory. Additional data is 

716 attached to the node's key-value store: 

717 

718 ``kind`` 

719 The node's kind. See :class:`NodeKind`. 

720 ``size`` 

721 The directory's aggregated size. 

722 

723 :param format: A user defined formatting function for tree nodes. 

724 :returns: A tree node representing this directory. 

725 """ 

726 if format is None: 

727 def format(node: Node) -> str: 

728 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

729 

730 directoryNode = Node( 

731 value=self, 

732 keyValuePairs={ 

733 "kind": NodeKind.File, 

734 "size": self._size 

735 }, 

736 format=format 

737 ) 

738 directoryNode.AddChildren( 

739 e.ToTree(format) for e in chain(self._subdirectories.values()) #, self._files.values(), self._symbolicLinks.values()) 

740 ) 

741 

742 return directoryNode 

743 

744 def __eq__(self, other) -> bool: 

745 """ 

746 Compare two Directory instances for equality. 

747 

748 :param other: Parameter to compare against. 

749 :returns: ``True``, if both directories and all its subelements are equal. 

750 :raises TypeError: If parameter ``other`` is not of type :class:`Directory`. 

751 """ 

752 if not isinstance(other, Directory): 

753 ex = TypeError("Parameter 'other' is not of type Directory.") 

754 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

755 raise ex 

756 

757 if not all(dir1 == dir2 for _, dir1, dir2 in zipdicts(self._subdirectories, other._subdirectories)): 

758 return False 

759 

760 if not all(file1 == file2 for _, file1, file2 in zipdicts(self._files, other._files)): 

761 return False 

762 

763 if not all(link1 == link2 for _, link1, link2 in zipdicts(self._symbolicLinks, other._symbolicLinks)): 

764 return False 

765 

766 return True 

767 

768 def __ne__(self, other: Any) -> bool: 

769 """ 

770 Compare two Directory instances for inequality. 

771 

772 :param other: Parameter to compare against. 

773 :returns: ``True``, if both directories and all its subelements are unequal. 

774 :raises TypeError: If parameter ``other`` is not of type :class:`Directory`. 

775 """ 

776 return not self.__eq__(other) 

777 

778 def __repr__(self) -> str: 

779 return f"Directory: {self.Path}" 

780 

781 def __str__(self) -> str: 

782 return self._name 

783 

784 

785@export 

786class Filename(Element[Directory]): 

787 """ 

788 Represents a filename in the filesystem, but not the file storage object (:class:`File`). 

789 

790 .. hint:: 

791 

792 Filename and file storage are represented by two classes, which allows multiple names (hard links) per file storage 

793 object. 

794 """ 

795 _file: Nullable["File"] 

796 

797 def __init__( 

798 self, 

799 name: str, 

800 file: Nullable["File"] = None, 

801 parent: Nullable[Directory] = None 

802 ) -> None: 

803 """ 

804 Initialize the filename with name, file (storage) object and parent reference. 

805 

806 :param name: Name of the file. 

807 :param size: Optional file (storage) object. 

808 :param parent: Optional parent reference. 

809 """ 

810 super().__init__(name, None, parent) 

811 

812 if file is None: 812 ↛ 815line 812 didn't jump to line 815 because the condition on line 812 was always true

813 self._file = None 

814 else: 

815 if not isinstance(file, File): 

816 ex = TypeError("Parameter 'file' is not of type 'File'.") 

817 ex.add_note(f"Got type '{getFullyQualifiedName(file)}'.") 

818 raise ex 

819 

820 self._file = file 

821 file._parents.append(self) 

822 

823 if parent is not None: 

824 parent._files[name] = self 

825 

826 if parent._root is not None: 

827 self._root = parent._root 

828 

829 @Element.Root.setter 

830 def Root(self, value: "Root") -> None: 

831 Element.Root.fset(self, value) 

832 

833 if self._file is not None: 833 ↛ exitline 833 didn't return from function 'Root' because the condition on line 833 was always true

834 self._file.Root = value 

835 

836 @Element.Parent.setter 

837 def Parent(self, value: _ParentType) -> None: 

838 Element.Parent.fset(self, value) 

839 

840 value._files[self._name] = self 

841 

842 if isinstance(value, Root): 842 ↛ 843line 842 didn't jump to line 843 because the condition on line 842 was never true

843 self.Root = value 

844 

845 @readonly 

846 def File(self) -> Nullable["File"]: 

847 return self._file 

848 

849 @readonly 

850 def Size(self) -> int: 

851 if self._file is None: 

852 raise ToolingException(f"Filename isn't linked to a File object.") 

853 

854 return self._file._size 

855 

856 @readonly 

857 def Path(self) -> Path: 

858 if self._parent is None: 

859 raise ToolingException(f"Filename has no parent object.") 

860 

861 return self._parent.Path / self._name 

862 

863 def __hash__(self) -> int: 

864 return hash(id(self)) 

865 

866 def Copy(self, parent: Directory) -> "Filename": 

867 fileID = self._file._id 

868 

869 if fileID in parent._root._ids: 

870 file = parent._root._ids[fileID] 

871 else: 

872 fileSize = self._file._size 

873 file = File(fileID, fileSize) 

874 

875 parent._root._ids[fileID] = file 

876 

877 return Filename(self._name, file, parent=parent) 

878 

879 def ToTree(self) -> Node: 

880 def format(node: Node) -> str: 

881 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

882 

883 fileNode = Node( 

884 value=self, 

885 keyValuePairs={ 

886 "kind": NodeKind.File, 

887 "size": self._size 

888 }, 

889 format=format 

890 ) 

891 

892 return fileNode 

893 

894 def __eq__(self, other) -> bool: 

895 """ 

896 Compare two Filename instances for equality. 

897 

898 :param other: Parameter to compare against. 

899 :returns: ``True``, if both filenames are equal. 

900 :raises TypeError: If parameter ``other`` is not of type :class:`Filename`. 

901 """ 

902 if not isinstance(other, Filename): 

903 ex = TypeError("Parameter 'other' is not of type 'Filename'.") 

904 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

905 raise ex 

906 

907 return self._name == other._name and self.Size == other.Size 

908 

909 def __ne__(self, other: Any) -> bool: 

910 """ 

911 Compare two Filename instances for inequality. 

912 

913 :param other: Parameter to compare against. 

914 :returns: ``True``, if both filenames are unequal. 

915 :raises TypeError: If parameter ``other`` is not of type :class:`Filename`. 

916 """ 

917 if not isinstance(other, Filename): 

918 ex = TypeError("Parameter 'other' is not of type 'Filename'.") 

919 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

920 raise ex 

921 

922 return self._name != other._name or self.Size != other.Size 

923 

924 def __repr__(self) -> str: 

925 return f"File: {self.Path}" 

926 

927 def __str__(self) -> str: 

928 return self._name 

929 

930 

931@export 

932class SymbolicLink(Element[Directory]): 

933 _target: Path 

934 _isConnected: bool 

935 _isBroken: Nullable[bool] 

936 _isOutOfRange: Nullable[bool] 

937 

938 def __init__( 

939 self, 

940 name: str, 

941 target: Path, 

942 parent: Nullable[Directory] 

943 ) -> None: 

944 super().__init__(name, None, parent) 

945 

946 if target is None: 

947 raise ValueError(f"Parameter 'target' is None.") 

948 elif not isinstance(target, Path): 

949 ex = TypeError("Parameter 'target' is not of type 'Path'.") 

950 ex.add_note(f"Got type '{getFullyQualifiedName(target)}'.") 

951 raise ex 

952 

953 self._target = target 

954 self._isConnected = False 

955 self._isBroken = None 

956 self._isOutOfRange = None 

957 

958 if parent is not None: 

959 parent._symbolicLinks[name] = self 

960 

961 if parent._root is not None: 

962 self._root = parent._root 

963 

964 @readonly 

965 def Path(self) -> Path: 

966 return self._parent.Path / self._name 

967 

968 @readonly 

969 def Target(self) -> Path: 

970 return self._target 

971 

972 @readonly 

973 def IsConnected(self) -> bool: 

974 return self._isConnected 

975 

976 @readonly 

977 def IsBroken(self) -> Nullable[bool]: 

978 return self._isBroken 

979 

980 @readonly 

981 def IsOutOfRange(self) -> Nullable[bool]: 

982 return self._isOutOfRange 

983 

984 def __hash__(self) -> int: 

985 return hash(id(self)) 

986 

987 def Copy(self, parent: Directory) -> "SymbolicLink": 

988 return SymbolicLink(self._name, self._target, parent=parent) 

989 

990 def ToTree(self) -> Node: 

991 def format(node: Node) -> str: 

992 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

993 

994 symbolicLinkNode = Node( 

995 value=self, 

996 keyValuePairs={ 

997 "kind": NodeKind.SymbolicLink, 

998 "size": self._size 

999 }, 

1000 format=format 

1001 ) 

1002 

1003 return symbolicLinkNode 

1004 

1005 def __eq__(self, other) -> bool: 

1006 """ 

1007 Compare two SymbolicLink instances for equality. 

1008 

1009 :param other: Parameter to compare against. 

1010 :returns: ``True``, if both symbolic links are equal. 

1011 :raises TypeError: If parameter ``other`` is not of type :class:`SymbolicLink`. 

1012 """ 

1013 if not isinstance(other, SymbolicLink): 

1014 ex = TypeError("Parameter 'other' is not of type 'SymbolicLink'.") 

1015 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

1016 raise ex 

1017 

1018 return self._name == other._name and self._target == other._target 

1019 

1020 def __ne__(self, other: Any) -> bool: 

1021 """ 

1022 Compare two SymbolicLink instances for inequality. 

1023 

1024 :param other: Parameter to compare against. 

1025 :returns: ``True``, if both symbolic links are unequal. 

1026 :raises TypeError: If parameter ``other`` is not of type :class:`SymbolicLink`. 

1027 """ 

1028 if not isinstance(other, SymbolicLink): 

1029 ex = TypeError("Parameter 'other' is not of type 'SymbolicLink'.") 

1030 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

1031 raise ex 

1032 

1033 return self._name != other._name or self._target != other._target 

1034 

1035 def __repr__(self) -> str: 

1036 return f"SymLink: {self.Path} -> {self._target}" 

1037 

1038 def __str__(self) -> str: 

1039 return self._name 

1040 

1041 

1042@export 

1043class Root(Directory): 

1044 """ 

1045 A **Root** represents the root-directory in the filesystem, which contains subdirectories, regular files and symbolic links. 

1046 """ 

1047 _ids: Dict[int, "File"] #: Dictionary of file identifier - file objects pairs found while scanning the directory structure. 

1048 _brokenSymbolicLinks: List[SymbolicLink] #: Broken symbolic links (target doesn't exist). 

1049 _unconnectedSymbolicLinks: List[SymbolicLink] #: Symbolic links which couldn't be connected to their target (out of scope). 

1050 

1051 def __init__( 

1052 self, 

1053 rootDirectory: Path, 

1054 collectSubdirectories: bool = True 

1055 ) -> None: 

1056 if rootDirectory is None: 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true

1057 raise ValueError(f"Parameter 'rootDirectory' is None.") 

1058 elif not isinstance(rootDirectory, Path): 1058 ↛ 1059line 1058 didn't jump to line 1059 because the condition on line 1058 was never true

1059 raise TypeError(f"Parameter 'rootDirectory' is not of type 'Path'.") 

1060 elif not rootDirectory.exists(): 1060 ↛ 1061line 1060 didn't jump to line 1061 because the condition on line 1060 was never true

1061 raise ToolingException(f"Path '{rootDirectory}' doesn't exist.") from FileNotFoundError(rootDirectory) 

1062 

1063 self._ids = {} 

1064 self._brokenSymbolicLinks = [] 

1065 self._unconnectedSymbolicLinks = [] 

1066 

1067 super().__init__(rootDirectory.name) 

1068 self._root = self 

1069 self._path = rootDirectory 

1070 

1071 if collectSubdirectories: 1071 ↛ 1072line 1071 didn't jump to line 1072 because the condition on line 1071 was never true

1072 self.CollectSubdirectories() 

1073 self.ResolveSymbolicLinks() 

1074 

1075 @readonly 

1076 def Path(self) -> Path: 

1077 """ 

1078 Read-only property to access the path of the filesystem statistics root. 

1079 

1080 :returns: Path to the root of the filesystem statistics root directory. 

1081 """ 

1082 return self._path 

1083 

1084 @readonly 

1085 def BrokenSymbolicLinks(self) -> List[SymbolicLink]: 

1086 return self._brokenSymbolicLinks 

1087 

1088 @readonly 

1089 def UnconnectedSymbolicLinks(self) -> List[SymbolicLink]: 

1090 return self._unconnectedSymbolicLinks 

1091 

1092 @readonly 

1093 def TotalHardLinkCount(self) -> int: 

1094 return sum(l for f in self._ids.values() if (l := len(f._parents)) > 1) 

1095 

1096 @readonly 

1097 def TotalHardLinkCount2(self) -> int: 

1098 return sum(1 for f in self._ids.values() if len(f._parents) > 1) 

1099 

1100 @readonly 

1101 def TotalHardLinkCount3(self) -> int: 

1102 return sum(1 for f in self._ids.values() if len(f._parents) == 1) 

1103 

1104 @readonly 

1105 def Size2(self) -> int: 

1106 return sum(f._size for f in self._ids.values() if len(f._parents) > 1) 

1107 

1108 @readonly 

1109 def Size3(self) -> int: 

1110 return sum(f._size * len(f._parents) for f in self._ids.values() if len(f._parents) > 1) 

1111 

1112 @readonly 

1113 def TotalUniqueFileCount(self) -> int: 

1114 return len(self._ids) 

1115 

1116 def RegisterBrokenSymbolicLink(self, symLink: SymbolicLink) -> None: 

1117 symLink._isBroken = True 

1118 self._brokenSymbolicLinks.append(symLink) 

1119 

1120 def RegisterUnconnectedSymbolicLink(self, symLink: SymbolicLink) -> None: 

1121 symLink._isOutOfRange = True 

1122 self._unconnectedSymbolicLinks.append(symLink) 

1123 

1124 def Copy(self) -> "Root": 

1125 """ 

1126 Copy the directory structure including all subelements and link it to the given parent. 

1127 

1128 The duration for the deep copy process is provided in :attr:`ScanDuration` 

1129 

1130 .. hint:: 

1131 

1132 Statistics like aggregated directory size are copied too. |br| 

1133 There is no rescan or repeated aggregation needed. 

1134 

1135 :returns: A deep copy of the directory structure. 

1136 """ 

1137 with Stopwatch() as sw: 

1138 root = Root(self._path, False) 

1139 root._size = self._size 

1140 

1141 for subdir in self._subdirectories.values(): 

1142 subdir.Copy(root) 

1143 

1144 for file in self._files.values(): 

1145 file.Copy(root) 

1146 

1147 for link in self._symbolicLinks.values(): 

1148 link.Copy(root) 

1149 

1150 root._scanDuration = sw.Duration 

1151 root._aggregateDuration = 0.0 

1152 

1153 return root 

1154 

1155 def __repr__(self) -> str: 

1156 return f"Root: {self.Path} (dirs: {self.TotalSubdirectoryCount}, files: {self.TotalRegularFileCount}, symlinks: {self.TotalSymbolicLinkCount})" 

1157 

1158 def __str__(self) -> str: 

1159 return self._name 

1160 

1161 

1162@export 

1163class File(Base): 

1164 """ 

1165 A **File** represents a file storage object in the filesystem, which is accessible by one or more :class:`Filename` objects. 

1166 

1167 Each file has an internal id, which is associated to a unique ID within the host's filesystem. 

1168 """ 

1169 _id: int #: Unique (host internal) file object ID) 

1170 _parents: List[Filename] #: List of reverse references to :class:`Filename` objects. 

1171 

1172 def __init__( 

1173 self, 

1174 id: int, 

1175 size: int, 

1176 parent: Nullable[Filename] = None 

1177 ) -> None: 

1178 """ 

1179 Initialize the File storage object with an ID, size and parent reference. 

1180 

1181 :param id: Unique ID of the file object. 

1182 :param size: Size of the file object. 

1183 :param parent: Optional parent reference. 

1184 """ 

1185 if id is None: 1185 ↛ 1186line 1185 didn't jump to line 1186 because the condition on line 1185 was never true

1186 raise ValueError(f"Parameter 'id' is None.") 

1187 elif not isinstance(id, int): 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true

1188 ex = TypeError("Parameter 'id' is not of type 'int'.") 

1189 ex.add_note(f"Got type '{getFullyQualifiedName(id)}'.") 

1190 raise ex 

1191 

1192 self._id = id 

1193 

1194 if parent is None: 

1195 super().__init__(size, None) 

1196 self._parents = [] 

1197 elif isinstance(parent, Filename): 1197 ↛ 1202line 1197 didn't jump to line 1202 because the condition on line 1197 was always true

1198 super().__init__(size, parent._root) 

1199 self._parents = [parent] 

1200 parent._file = self 

1201 else: 

1202 ex = TypeError("Parameter 'parent' is not of type 'Filename'.") 

1203 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

1204 raise ex 

1205 

1206 @readonly 

1207 def ID(self) -> int: 

1208 """ 

1209 Read-only property to access the file object's unique identifier. 

1210 

1211 :returns: Unique file object identifier. 

1212 """ 

1213 return self._id 

1214 

1215 @readonly 

1216 def Parents(self) -> List[Filename]: 

1217 """ 

1218 Read-only property to access the list of filenames using the same file storage object. 

1219 

1220 .. hint:: 

1221 

1222 This allows to check if a file object has multiple filenames a.k.a hardlinks. 

1223 

1224 :returns: List of filenames for the file storage object. 

1225 """ 

1226 return self._parents 

1227 

1228 def AddParent(self, filename: Filename) -> None: 

1229 """ 

1230 Add another parent reference to a :class:`Filename`. 

1231 

1232 :param filename: Reference to a filename object. 

1233 """ 

1234 if filename is None: 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true

1235 raise ValueError(f"Parameter 'filename' is None.") 

1236 elif not isinstance(filename, Filename): 1236 ↛ 1237line 1236 didn't jump to line 1237 because the condition on line 1236 was never true

1237 ex = TypeError("Parameter 'filename' is not of type 'Filename'.") 

1238 ex.add_note(f"Got type '{getFullyQualifiedName(filename)}'.") 

1239 raise ex 

1240 elif filename._file is not None: 1240 ↛ 1241line 1240 didn't jump to line 1241 because the condition on line 1240 was never true

1241 raise ToolingException(f"Filename is already referencing an other file object ({filename._file._id}).") 

1242 

1243 self._parents.append(filename) 

1244 filename._file = self 

1245 

1246 if filename._root is not None: 

1247 self._root = filename._root