Coverage for pyTooling / Filesystem / __init__.py: 49%

598 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 10:51 +0000

1# ==================================================================================================================== # 

2# _____ _ _ _____ _ _ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ | ___(_) | ___ ___ _ _ ___| |_ ___ _ __ ___ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` | | |_ | | |/ _ \/ __| | | / __| __/ _ \ '_ ` _ \ # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| |_| _| | | | __/\__ \ |_| \__ \ || __/ | | | | | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)_| |_|_|\___||___/\__, |___/\__\___|_| |_| |_| # 

7# |_| |___/ |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2025-2026 Patrick Lehmann - Bötzingen, Germany # 

15# # 

16# Licensed under the Apache License, Version 2.0 (the "License"); # 

17# you may not use this file except in compliance with the License. # 

18# You may obtain a copy of the License at # 

19# # 

20# http://www.apache.org/licenses/LICENSE-2.0 # 

21# # 

22# Unless required by applicable law or agreed to in writing, software # 

23# distributed under the License is distributed on an "AS IS" BASIS, # 

24# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

25# See the License for the specific language governing permissions and # 

26# limitations under the License. # 

27# # 

28# SPDX-License-Identifier: Apache-2.0 # 

29# ==================================================================================================================== # 

30# 

31""" 

32An object-oriented file system abstraction for directory, file, symbolic link, ... statistics collection. 

33 

34.. important:: 

35 

36 This isn't a replacement of :mod:`pathlib` introduced with Python 3.4. 

37""" 

38from os import scandir, readlink 

39 

40from enum import Enum 

41from itertools import chain 

42from pathlib import Path 

43from typing import Optional as Nullable, Dict, Generic, Generator, TypeVar, List, Any, Callable, Union, Iterator, Set 

44 

45from pyTooling.Decorators import readonly, export 

46from pyTooling.Exceptions import ToolingException 

47from pyTooling.MetaClasses import ExtendedType 

48from pyTooling.Common import getFullyQualifiedName, zipdicts 

49from pyTooling.Warning import WarningCollector, Warning 

50from pyTooling.Stopwatch import Stopwatch 

51from pyTooling.Tree import Node 

52 

53 

54__all__ = ["_ParentType"] 

55 

56 

57_ParentType = TypeVar("_ParentType", bound="Element") 

58"""The type variable for a parent reference.""" 

59 

60 

61@export 

62class FilesystemException(ToolingException): 

63 """Base-exception of all exceptions raised by :mod:`pyTooling.Filesystem`.""" 

64 

65 

66@export 

67class PermissionWarning(Warning): 

68 _path: Path 

69 

70 def __init__(self, path: Path, *args) -> None: 

71 super().__init__(*args) 

72 self._path = path 

73 

74 @readonly 

75 def Path(self) -> Path: 

76 return self._path 

77 

78 

79@export 

80class NodeKind(Enum): 

81 """ 

82 Node kind for filesystem elements in a :ref:`tree <STRUCT/Tree>`. 

83 

84 This enumeration is used when converting the filesystem statistics tree to an instance of :mod:`pyTooling.Tree`. 

85 """ 

86 Directory = 0 #: Node represents a directory. 

87 File = 1 #: Node represents a regular file. 

88 SymbolicLink = 2 #: Node represents a symbolic link. 

89 

90 

91@export 

92class Base(metaclass=ExtendedType, slots=True): 

93 """ 

94 Base-class for all filesystem elements in :mod:`pyTooling.Filesystem`. 

95 

96 It implements a size and a reference to the root element of the filesystem. 

97 """ 

98 _root: Nullable["Root"] #: Reference to the root of the filesystem statistics scope. 

99 _size: Nullable[int] #: Actual or aggregated size of the filesystem element. 

100 

101 def __init__( 

102 self, 

103 size: Nullable[int], 

104 root: Nullable["Root"] 

105 ) -> None: 

106 """ 

107 Initialize the base-class with filesystem element size and root reference. 

108 

109 :param size: Optional size of the element. 

110 :param root: Optional reference to the filesystem root element. 

111 """ 

112 if size is not None and not isinstance(size, int): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 ex = TypeError("Parameter 'size' is not of type 'int'.") 

114 ex.add_note(f"Got type '{getFullyQualifiedName(size)}'.") 

115 raise ex 

116 

117 if root is not None and not isinstance(root, Root): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 ex = TypeError("Parameter 'root' is not of type 'Root'.") 

119 ex.add_note(f"Got type '{getFullyQualifiedName(root)}'.") 

120 raise ex 

121 

122 self._size = size 

123 self._root = root 

124 

125 @property 

126 def Root(self) -> Nullable["Root"]: 

127 """ 

128 Property to access the root of the filesystem statistics scope. 

129 

130 :returns: Root of the filesystem statistics scope. 

131 """ 

132 return self._root 

133 

134 @Root.setter 

135 def Root(self, value: "Root") -> None: 

136 if value is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise ValueError(f"Parameter 'value' is None.") 

138 elif not isinstance(value, Root): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 ex = TypeError("Parameter 'value' is not of type 'Root'.") 

140 ex.add_note(f"Got type '{getFullyQualifiedName(value)}'.") 

141 raise ex 

142 

143 self._root = value 

144 

145 @readonly 

146 def Size(self) -> int: 

147 """ 

148 Read-only property to access the element's size in Bytes. 

149 

150 :returns: Size in Bytes. 

151 :raises FilesystemException: If size is not computed, yet. 

152 """ 

153 if self._size is None: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 raise FilesystemException("Size is not computed, yet.") 

155 

156 return self._size 

157 

158 # FIXME: @abstractmethod 

159 def ToTree(self) -> Node: 

160 """ 

161 Convert a filesystem element to a node in :mod:`pyTooling.Tree`. 

162 

163 The node's :attr:`~pyTooling.Tree.Node.Value` field contains a reference to the filesystem element. Additional data 

164 will be stored in the node's key-value store. 

165 

166 :returns: A tree's node referencing this filesystem element. 

167 """ 

168 raise NotImplementedError() 

169 

170 

171@export 

172class Element(Base, Generic[_ParentType]): 

173 """ 

174 Base-class for all named elements within a filesystem. 

175 

176 It adds a name, parent reference and list of symbolic-link sources. 

177 

178 .. hint:: 

179 

180 Symbolic link sources are reverse references describing which symbolic links point to this element. 

181 """ 

182 _name: str #: Name of the filesystem element. 

183 _parent: _ParentType #: Reference to the filesystem element's parent (:class:`Directory`) 

184 _linkSources: List["SymbolicLink"] #: A list of symbolic links pointing to this filesystem element. 

185 

186 def __init__( 

187 self, 

188 name: str, 

189 size: Nullable[int] = None, 

190 parent: Nullable[_ParentType] = None 

191 ) -> None: 

192 """ 

193 Initialize the element base-class with name, size and parent reference. 

194 

195 :param name: Name of the element. 

196 :param size: Optional size of the element. 

197 :param parent: Optional parent reference. 

198 """ 

199 if name is None: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 raise ValueError(f"Parameter 'name' is None.") 

201 elif not isinstance(name, str): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 ex = TypeError("Parameter 'name' is not of type 'str'.") 

203 ex.add_note(f"Got type '{getFullyQualifiedName(name)}'.") 

204 raise ex 

205 

206 self._name = name 

207 

208 if parent is None: 

209 super().__init__(size, None) 

210 self._parent = None 

211 else: 

212 if not isinstance(parent, Directory): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 ex = TypeError("Parameter 'parent' is not of type 'Directory'.") 

214 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

215 raise ex 

216 

217 super().__init__(size, parent._root) 

218 self._parent = parent 

219 

220 self._linkSources = [] 

221 

222 @property 

223 def Parent(self) -> _ParentType: 

224 """ 

225 Property to access the element's parent. 

226 

227 :returns: Parent element. 

228 """ 

229 return self._parent 

230 

231 @Parent.setter 

232 def Parent(self, value: _ParentType) -> None: 

233 if value is None: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 raise ValueError(f"Parameter 'value' is None.") 

235 elif not isinstance(value, Directory): 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 ex = TypeError("Parameter 'value' is not of type 'Directory'.") 

237 ex.add_note(f"Got type '{getFullyQualifiedName(value)}'.") 

238 raise ex 

239 

240 self._parent = value 

241 

242 if value._root is not None: 

243 self._root = value._root 

244 

245 @readonly 

246 def Name(self) -> str: 

247 """ 

248 Read-only property to access the element's name. 

249 

250 :returns: Element name. 

251 """ 

252 return self._name 

253 

254 @readonly 

255 def Path(self) -> Path: 

256 raise NotImplemented(f"Property 'Path' is abstract.") 

257 

258 @readonly 

259 def LinkSources(self) -> List["SymbolicLink"]: 

260 return self._linkSources 

261 

262 def AddLinkSources(self, source: "SymbolicLink") -> None: 

263 """ 

264 Add a link source of a symbolic link to the named element (reverse reference). 

265 

266 :param source: The referenced symbolic link. 

267 """ 

268 if not isinstance(source, SymbolicLink): 

269 ex = TypeError("Parameter 'source' is not of type 'SymbolicLink'.") 

270 ex.add_note(f"Got type '{getFullyQualifiedName(source)}'.") 

271 raise ex 

272 

273 source._isConnected = True 

274 source._isBroken = False 

275 source._isOutOfRange = False 

276 self._linkSources.append(source) 

277 

278 

279@export 

280class Directory(Element["Directory"]): 

281 """ 

282 A **directory** represents a directory in the filesystem, which contains subdirectories, regular files and symbolic links. 

283 

284 While scanning for subelements, the directory is populated with elements. Every file object added, gets registered in 

285 the filesystems :class:`Root` for deduplication. In case a file identifier already exists, the found filename will 

286 reference the same file objects. In turn, the file objects has then references to multiple filenames (parents). This 

287 allows to detect :term:`hardlinks <hardlink>`. 

288 

289 The time needed for scanning the directory and its subelements is provided via :data:`ScanDuration`. 

290 

291 After scnaning the directory for subelements, certain directory properties get aggregated. The time needed for 

292 aggregation is provided via :data:`AggregateDuration`. 

293 """ 

294 

295 _path: Nullable[Path] #: Cached :class:`~pathlib.Path` object of this directory. 

296 _subdirectories: Dict[str, "Directory"] #: Dictionary containing name-:class:`Directory` pairs. 

297 _files: Dict[str, "Filename"] #: Dictionary containing name-:class:`Filename` pairs. 

298 _symbolicLinks: Dict[str, "SymbolicLink"] #: Dictionary containing name-:class:`SymbolicLink` pairs. 

299 _filesSize: int #: Aggregated size of all direct files. 

300 _collapsed: bool #: True, if this directory was collapsed. It contains no subelements. 

301 _scanDuration: Nullable[float] #: Duration for scanning the directory and all its subelements. 

302 _aggregateDuration: Nullable[float] #: Duration for aggregating all subelements. 

303 

304 def __init__( 

305 self, 

306 name: str, 

307 collectSubdirectories: bool = False, 

308 parent: Nullable["Directory"] = None 

309 ) -> None: 

310 """ 

311 Initialize the directory with name and parent reference. 

312 

313 :param name: Name of the element. 

314 :param collectSubdirectories: If true, collect subdirectory statistics. 

315 :param parent: Optional parent reference. 

316 """ 

317 super().__init__(name, None, parent) 

318 

319 self._path = None 

320 self._subdirectories = {} 

321 self._files = {} 

322 self._symbolicLinks = {} 

323 self._filesSize = 0 

324 self._collapsed = False 

325 self._scanDuration = None 

326 self._aggregateDuration = None 

327 

328 if parent is not None: 

329 parent._subdirectories[name] = self 

330 

331 if parent._root is not None: 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was always true

332 self._root = parent._root 

333 

334 if collectSubdirectories: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 self.CollectSubdirectories() 

336 

337 def CollectSubdirectories(self) -> None: 

338 """ 

339 Helper method for scanning subdirectories and aggregating found element sizes therein. 

340 """ 

341 self.ScanSubdirectories() 

342 self.AggregateSizes() 

343 

344 def ScanSubdirectories(self) -> None: 

345 """ 

346 Helper method for scanning subdirectories (recursively) and building a 

347 :class:`Directory`-:class:`Filename`-:class:`File` object tree. 

348 

349 If a file refers to the same filesystem internal unique ID, a hardlink (two or more filenames) to the same file 

350 storage object is assumed. 

351 """ 

352 with Stopwatch() as sw1: 

353 try: 

354 items = scandir(directoryPath := self.Path) 

355 except PermissionError as ex: 

356 return WarningCollector.Raise(PermissionWarning(self.Path), ex) 

357 

358 for dirEntry in items: 

359 if dirEntry.is_dir(follow_symlinks=False): 

360 _ = Directory(dirEntry.name, collectSubdirectories=True, parent=self) 

361 elif dirEntry.is_file(follow_symlinks=False): 

362 id = dirEntry.inode() 

363 if id in self._root._ids: 

364 file = self._root._ids[id] 

365 

366 _ = Filename(dirEntry.name, file=file, parent=self) 

367 else: 

368 s = dirEntry.stat(follow_symlinks=False) 

369 filename = Filename(dirEntry.name, parent=self) 

370 file = File(id, s.st_size, parent=filename) 

371 

372 self._root._ids[id] = file 

373 elif dirEntry.is_symlink(): 

374 target = Path(readlink(directoryPath / dirEntry.name)) 

375 _ = SymbolicLink(dirEntry.name, target, parent=self) 

376 else: 

377 raise FilesystemException(f"Unknown directory element.") 

378 

379 self._scanDuration = sw1.Duration 

380 

381 def ResolveSymbolicLinks(self) -> None: 

382 for dir in self._subdirectories.values(): 

383 dir.ResolveSymbolicLinks() 

384 

385 for link in self._symbolicLinks.values(): 385 ↛ 386line 385 didn't jump to line 386 because the loop on line 385 never started

386 if link._target.is_absolute(): 

387 # todo: resolve path and check if target is in range, otherwise add to out-of-range list 

388 pass 

389 else: 

390 target = self 

391 for elem in link._target.parts: 

392 if elem == ".": 

393 continue 

394 elif elem == "..": 

395 if (target := target._parent) is None: 

396 self._root.RegisterUnconnectedSymbolicLink(link) 

397 break 

398 

399 continue 

400 

401 try: 

402 target = target._subdirectories[elem] 

403 continue 

404 except KeyError: 

405 pass 

406 

407 try: 

408 target = target._files[elem] 

409 continue 

410 except KeyError: 

411 pass 

412 

413 try: 

414 target = target._symbolicLinks[elem] 

415 continue 

416 except KeyError: 

417 self._root.RegisterBrokenSymbolicLink(link) 

418 break 

419 else: 

420 target.AddLinkSources(link) 

421 

422 def AggregateSizes(self) -> Set["File"]: 

423 with Stopwatch() as sw2: 

424 aggregatedFiles = set() 

425 

426 self._size = 0 

427 self._filesSize = 0 

428 for dir in self._subdirectories.values(): 

429 aggregatedFiles |= dir.AggregateSizes() 

430 self._size += dir._size 

431 

432 for filename in self._files.values(): 

433 if (file := filename._file) not in aggregatedFiles: 433 ↛ 432line 433 didn't jump to line 432 because the condition on line 433 was always true

434 self._filesSize += file._size 

435 aggregatedFiles.add(file) 

436 

437 self._size += self._filesSize 

438 

439 self._aggregateDuration = sw2.Duration 

440 

441 return aggregatedFiles 

442 

443 @Element.Root.setter 

444 def Root(self, value: "Root") -> None: 

445 Element.Root.fset(self, value) 

446 

447 for subdir in self._subdirectories.values(): 447 ↛ 448line 447 didn't jump to line 448 because the loop on line 447 never started

448 subdir.Root = value 

449 

450 for file in self._files.values(): 

451 file.Root = value 

452 

453 for link in self._symbolicLinks.values(): 453 ↛ 454line 453 didn't jump to line 454 because the loop on line 453 never started

454 link.Root = value 

455 

456 @Element.Parent.setter 

457 def Parent(self, value: _ParentType) -> None: 

458 Element.Parent.fset(self, value) 

459 

460 value._subdirectories[self._name] = self 

461 

462 if isinstance(value, Root): 462 ↛ exitline 462 didn't return from function 'Parent' because the condition on line 462 was always true

463 self.Root = value 

464 

465 @readonly 

466 def Count(self) -> int: 

467 """ 

468 Read-only property to access the number of elements in a directory. 

469 

470 :returns: Number of files plus subdirectories. 

471 """ 

472 return len(self._subdirectories) + len(self._files) + len(self._symbolicLinks) 

473 

474 @readonly 

475 def FileCount(self) -> int: 

476 """ 

477 Read-only property to access the number of files in a directory. 

478 

479 .. hint:: 

480 

481 Files include regular files and symbolic links. 

482 

483 :returns: Number of files. 

484 """ 

485 return len(self._files) + len(self._symbolicLinks) 

486 

487 @readonly 

488 def RegularFileCount(self) -> int: 

489 """ 

490 Read-only property to access the number of regular files in a directory. 

491 

492 :returns: Number of regular files. 

493 """ 

494 return len(self._files) 

495 

496 @readonly 

497 def SymbolicLinkCount(self) -> int: 

498 """ 

499 Read-only property to access the number of symbolic links in a directory. 

500 

501 :returns: Number of symbolic links. 

502 """ 

503 return len(self._symbolicLinks) 

504 

505 @readonly 

506 def SubdirectoryCount(self) -> int: 

507 """ 

508 Read-only property to access the number of subdirectories in a directory. 

509 

510 :returns: Number of subdirectories. 

511 """ 

512 return len(self._subdirectories) 

513 

514 @readonly 

515 def TotalFileCount(self) -> int: 

516 """ 

517 Read-only property to access the total number of files in all child hierarchy levels (recursively). 

518 

519 .. hint:: 

520 

521 Files include regular files and symbolic links. 

522 

523 :returns: Total number of files. 

524 """ 

525 return sum(d.TotalFileCount for d in self._subdirectories.values()) + len(self._files) + len(self._symbolicLinks) 

526 

527 @readonly 

528 def TotalRegularFileCount(self) -> int: 

529 """ 

530 Read-only property to access the total number of regular files in all child hierarchy levels (recursively). 

531 

532 :returns: Total number of regular files. 

533 """ 

534 return sum(d.TotalRegularFileCount for d in self._subdirectories.values()) + len(self._files) 

535 

536 @readonly 

537 def TotalSymbolicLinkCount(self) -> int: 

538 """ 

539 Read-only property to access the total number of symbolic links in all child hierarchy levels (recursively). 

540 

541 :returns: Total number of symbolic links. 

542 """ 

543 return sum(d.TotalSymbolicLinkCount for d in self._subdirectories.values()) + len(self._symbolicLinks) 

544 

545 @readonly 

546 def TotalSubdirectoryCount(self) -> int: 

547 """ 

548 Read-only property to access the total number of subdirectories in all child hierarchy levels (recursively). 

549 

550 :returns: Total number of subdirectories. 

551 """ 

552 return len(self._subdirectories) + sum(d.TotalSubdirectoryCount for d in self._subdirectories.values()) 

553 

554 @readonly 

555 def Subdirectories(self) -> Generator["Directory", None, None]: 

556 """ 

557 Iterate all direct subdirectories of the directory. 

558 

559 :returns: A generator to iterate all direct subdirectories. 

560 """ 

561 return (d for d in self._subdirectories.values()) 

562 

563 @readonly 

564 def Files(self) -> Generator[Union["Filename", "SymbolicLink"], None, None]: 

565 """ 

566 Iterate all direct files of the directory. 

567 

568 .. hint:: 

569 

570 Files include regular files and symbolic links. 

571 

572 :returns: A generator to iterate all direct files. 

573 """ 

574 return (f for f in chain(self._files.values(), self._symbolicLinks.values())) 

575 

576 @readonly 

577 def RegularFiles(self) -> Generator["Filename", None, None]: 

578 """ 

579 Iterate all direct regular files of the directory. 

580 

581 :returns: A generator to iterate all direct regular files. 

582 """ 

583 return (f for f in self._files.values()) 

584 

585 @readonly 

586 def SymbolicLinks(self) -> Generator["SymbolicLink", None, None]: 

587 """ 

588 Iterate all direct symbolic links of the directory. 

589 

590 :returns: A generator to iterate all direct symbolic links. 

591 """ 

592 return (l for l in self._symbolicLinks.values()) 

593 

594 @readonly 

595 def Path(self) -> Path: 

596 """ 

597 Read-only property to access the equivalent Path instance for accessing the represented directory. 

598 

599 :returns: Path to the directory. 

600 :raises FilesystemException: If no parent is set. 

601 """ 

602 if self._path is not None: 

603 return self._path 

604 

605 if self._parent is None: 

606 raise FilesystemException(f"No parent or root set for directory.") 

607 

608 self._path = self._parent.Path / self._name 

609 return self._path 

610 

611 @readonly 

612 def ScanDuration(self) -> float: 

613 """ 

614 Read-only property to access the time needed to scan a directory structure including all subelements (recursively). 

615 

616 :returns: The scan duration in seconds. 

617 :raises FilesystemException: If the directory was not scanned. 

618 """ 

619 if self._scanDuration is None: 

620 raise FilesystemException(f"Directory was not scanned, yet.") 

621 

622 return self._scanDuration 

623 

624 @readonly 

625 def AggregateDuration(self) -> float: 

626 """ 

627 Read-only property to access the time needed to aggregate the directory's and subelement's properties (recursively). 

628 

629 :returns: The aggregation duration in seconds. 

630 :raises FilesystemException: If the directory properties were not aggregated. 

631 """ 

632 if self._scanDuration is None: 

633 raise FilesystemException(f"Directory properties were not aggregated, yet.") 

634 

635 return self._aggregateDuration 

636 

637 def __hash__(self) -> int: 

638 return hash(id(self)) 

639 

640 def IterateFiles(self) -> Iterator[Element]: 

641 for directory in self._subdirectories.values(): 

642 yield from directory.IterateFiles() 

643 

644 yield from self._files.values() 

645 yield from self._symbolicLinks.values() 

646 

647 def Copy(self, parent: Nullable["Directory"] = None) -> "Directory": 

648 """ 

649 Copy the directory structure including all subelements and link it to the given parent. 

650 

651 .. hint:: 

652 

653 Statistics like aggregated directory size are copied too. |br| 

654 There is no rescan or repeated aggregation needed. 

655 

656 :param parent: The parent element of the copied directory. 

657 :returns: A deep copy of the directory structure. 

658 """ 

659 dir = Directory(self._name, parent=parent) 

660 dir._size = self._size 

661 

662 for subdir in self._subdirectories.values(): 

663 subdir.Copy(dir) 

664 

665 for file in self._files.values(): 

666 file.Copy(dir) 

667 

668 for link in self._symbolicLinks.values(): 

669 link.Copy(dir) 

670 

671 return dir 

672 

673 def Collapse(self, func: Callable[["Directory"], bool]) -> bool: 

674 # if len(self._subdirectories) == 0 or all(subdir.Collapse(func) for subdir in self._subdirectories.values()): 

675 if len(self._subdirectories) == 0: 

676 if func(self): 

677 # print(f"collapse 1 {self.Path}") 

678 self._collapsed = True 

679 self._subdirectories.clear() 

680 self._files.clear() 

681 self._symbolicLinks.clear() 

682 

683 return True 

684 else: 

685 return False 

686 

687 # if all(subdir.Collapse(func) for subdir in self._subdirectories.values()) 

688 collapsible = True 

689 for subdir in self._subdirectories.values(): 

690 result = subdir.Collapse(func) 

691 collapsible = collapsible and result 

692 

693 if collapsible: 

694 # print(f"collapse 2 {self.Path}") 

695 self._collapsed = True 

696 self._subdirectories.clear() 

697 self._files.clear() 

698 self._symbolicLinks.clear() 

699 

700 return True 

701 else: 

702 return False 

703 

704 def ToTree(self, format: Nullable[Callable[[Node], str]] = None) -> Node: 

705 """ 

706 Convert the directory to a :class:`~pyTooling.Tree.Node`. 

707 

708 The node's :attr:`~pyTooling.Tree.Node.Value` field contains a reference to the directory. Additional data is 

709 attached to the node's key-value store: 

710 

711 ``kind`` 

712 The node's kind. See :class:`NodeKind`. 

713 ``size`` 

714 The directory's aggregated size. 

715 

716 :param format: A user defined formatting function for tree nodes. 

717 :returns: A tree node representing this directory. 

718 """ 

719 if format is None: 

720 def format(node: Node) -> str: 

721 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

722 

723 directoryNode = Node( 

724 value=self, 

725 keyValuePairs={ 

726 "kind": NodeKind.File, 

727 "size": self._size 

728 }, 

729 format=format 

730 ) 

731 directoryNode.AddChildren( 

732 e.ToTree(format) for e in chain(self._subdirectories.values()) #, self._files.values(), self._symbolicLinks.values()) 

733 ) 

734 

735 return directoryNode 

736 

737 def __eq__(self, other) -> bool: 

738 """ 

739 Compare two Directory instances for equality. 

740 

741 :param other: Parameter to compare against. 

742 :returns: ``True``, if both directories and all its subelements are equal. 

743 :raises TypeError: If parameter ``other`` is not of type :class:`Directory`. 

744 """ 

745 if not isinstance(other, Directory): 

746 ex = TypeError("Parameter 'other' is not of type Directory.") 

747 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

748 raise ex 

749 

750 if not all(dir1 == dir2 for _, dir1, dir2 in zipdicts(self._subdirectories, other._subdirectories)): 

751 return False 

752 

753 if not all(file1 == file2 for _, file1, file2 in zipdicts(self._files, other._files)): 

754 return False 

755 

756 if not all(link1 == link2 for _, link1, link2 in zipdicts(self._symbolicLinks, other._symbolicLinks)): 

757 return False 

758 

759 return True 

760 

761 def __ne__(self, other: Any) -> bool: 

762 """ 

763 Compare two Directory instances for inequality. 

764 

765 :param other: Parameter to compare against. 

766 :returns: ``True``, if both directories and all its subelements are unequal. 

767 :raises TypeError: If parameter ``other`` is not of type :class:`Directory`. 

768 """ 

769 return not self.__eq__(other) 

770 

771 def __repr__(self) -> str: 

772 return f"Directory: {self.Path}" 

773 

774 def __str__(self) -> str: 

775 return self._name 

776 

777 

778@export 

779class Filename(Element[Directory]): 

780 """ 

781 Represents a filename in the filesystem, but not the file storage object (:class:`File`). 

782 

783 .. hint:: 

784 

785 Filename and file storage are represented by two classes, which allows multiple names (hard links) per file storage 

786 object. 

787 """ 

788 _file: Nullable["File"] 

789 

790 def __init__( 

791 self, 

792 name: str, 

793 file: Nullable["File"] = None, 

794 parent: Nullable[Directory] = None 

795 ) -> None: 

796 """ 

797 Initialize the filename with name, file (storage) object and parent reference. 

798 

799 :param name: Name of the file. 

800 :param size: Optional file (storage) object. 

801 :param parent: Optional parent reference. 

802 """ 

803 super().__init__(name, None, parent) 

804 

805 if file is None: 805 ↛ 808line 805 didn't jump to line 808 because the condition on line 805 was always true

806 self._file = None 

807 else: 

808 if not isinstance(file, File): 

809 ex = TypeError("Parameter 'file' is not of type 'File'.") 

810 ex.add_note(f"Got type '{getFullyQualifiedName(file)}'.") 

811 raise ex 

812 

813 self._file = file 

814 file._parents.append(self) 

815 

816 if parent is not None: 

817 parent._files[name] = self 

818 

819 if parent._root is not None: 

820 self._root = parent._root 

821 

822 @Element.Root.setter 

823 def Root(self, value: "Root") -> None: 

824 Element.Root.fset(self, value) 

825 

826 if self._file is not None: 826 ↛ exitline 826 didn't return from function 'Root' because the condition on line 826 was always true

827 self._file.Root = value 

828 

829 @Element.Parent.setter 

830 def Parent(self, value: _ParentType) -> None: 

831 Element.Parent.fset(self, value) 

832 

833 value._files[self._name] = self 

834 

835 if isinstance(value, Root): 835 ↛ 836line 835 didn't jump to line 836 because the condition on line 835 was never true

836 self.Root = value 

837 

838 @readonly 

839 def File(self) -> Nullable["File"]: 

840 return self._file 

841 

842 @readonly 

843 def Size(self) -> int: 

844 if self._file is None: 

845 raise ToolingException(f"Filename isn't linked to a File object.") 

846 

847 return self._file._size 

848 

849 @readonly 

850 def Path(self) -> Path: 

851 if self._parent is None: 

852 raise ToolingException(f"Filename has no parent object.") 

853 

854 return self._parent.Path / self._name 

855 

856 def __hash__(self) -> int: 

857 return hash(id(self)) 

858 

859 def Copy(self, parent: Directory) -> "Filename": 

860 fileID = self._file._id 

861 

862 if fileID in parent._root._ids: 

863 file = parent._root._ids[fileID] 

864 else: 

865 fileSize = self._file._size 

866 file = File(fileID, fileSize) 

867 

868 parent._root._ids[fileID] = file 

869 

870 return Filename(self._name, file, parent=parent) 

871 

872 def ToTree(self) -> Node: 

873 def format(node: Node) -> str: 

874 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

875 

876 fileNode = Node( 

877 value=self, 

878 keyValuePairs={ 

879 "kind": NodeKind.File, 

880 "size": self._size 

881 }, 

882 format=format 

883 ) 

884 

885 return fileNode 

886 

887 def __eq__(self, other) -> bool: 

888 """ 

889 Compare two Filename instances for equality. 

890 

891 :param other: Parameter to compare against. 

892 :returns: ``True``, if both filenames are equal. 

893 :raises TypeError: If parameter ``other`` is not of type :class:`Filename`. 

894 """ 

895 if not isinstance(other, Filename): 

896 ex = TypeError("Parameter 'other' is not of type 'Filename'.") 

897 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

898 raise ex 

899 

900 return self._name == other._name and self.Size == other.Size 

901 

902 def __ne__(self, other: Any) -> bool: 

903 """ 

904 Compare two Filename instances for inequality. 

905 

906 :param other: Parameter to compare against. 

907 :returns: ``True``, if both filenames are unequal. 

908 :raises TypeError: If parameter ``other`` is not of type :class:`Filename`. 

909 """ 

910 if not isinstance(other, Filename): 

911 ex = TypeError("Parameter 'other' is not of type 'Filename'.") 

912 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

913 raise ex 

914 

915 return self._name != other._name or self.Size != other.Size 

916 

917 def __repr__(self) -> str: 

918 return f"File: {self.Path}" 

919 

920 def __str__(self) -> str: 

921 return self._name 

922 

923 

924@export 

925class SymbolicLink(Element[Directory]): 

926 _target: Path 

927 _isConnected: bool 

928 _isBroken: Nullable[bool] 

929 _isOutOfRange: Nullable[bool] 

930 

931 def __init__( 

932 self, 

933 name: str, 

934 target: Path, 

935 parent: Nullable[Directory] 

936 ) -> None: 

937 super().__init__(name, None, parent) 

938 

939 if target is None: 

940 raise ValueError(f"Parameter 'target' is None.") 

941 elif not isinstance(target, Path): 

942 ex = TypeError("Parameter 'target' is not of type 'Path'.") 

943 ex.add_note(f"Got type '{getFullyQualifiedName(target)}'.") 

944 raise ex 

945 

946 self._target = target 

947 self._isConnected = False 

948 self._isBroken = None 

949 self._isOutOfRange = None 

950 

951 if parent is not None: 

952 parent._symbolicLinks[name] = self 

953 

954 if parent._root is not None: 

955 self._root = parent._root 

956 

957 @readonly 

958 def Path(self) -> Path: 

959 return self._parent.Path / self._name 

960 

961 @readonly 

962 def Target(self) -> Path: 

963 return self._target 

964 

965 @readonly 

966 def IsConnected(self) -> bool: 

967 return self._isConnected 

968 

969 @readonly 

970 def IsBroken(self) -> Nullable[bool]: 

971 return self._isBroken 

972 

973 @readonly 

974 def IsOutOfRange(self) -> Nullable[bool]: 

975 return self._isOutOfRange 

976 

977 def __hash__(self) -> int: 

978 return hash(id(self)) 

979 

980 def Copy(self, parent: Directory) -> "SymbolicLink": 

981 return SymbolicLink(self._name, self._target, parent=parent) 

982 

983 def ToTree(self) -> Node: 

984 def format(node: Node) -> str: 

985 return f"{node['size'] * 1e-6:7.1f} MiB {node._value.Name}" 

986 

987 symbolicLinkNode = Node( 

988 value=self, 

989 keyValuePairs={ 

990 "kind": NodeKind.SymbolicLink, 

991 "size": self._size 

992 }, 

993 format=format 

994 ) 

995 

996 return symbolicLinkNode 

997 

998 def __eq__(self, other) -> bool: 

999 """ 

1000 Compare two SymbolicLink instances for equality. 

1001 

1002 :param other: Parameter to compare against. 

1003 :returns: ``True``, if both symbolic links are equal. 

1004 :raises TypeError: If parameter ``other`` is not of type :class:`SymbolicLink`. 

1005 """ 

1006 if not isinstance(other, SymbolicLink): 

1007 ex = TypeError("Parameter 'other' is not of type 'SymbolicLink'.") 

1008 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

1009 raise ex 

1010 

1011 return self._name == other._name and self._target == other._target 

1012 

1013 def __ne__(self, other: Any) -> bool: 

1014 """ 

1015 Compare two SymbolicLink instances for inequality. 

1016 

1017 :param other: Parameter to compare against. 

1018 :returns: ``True``, if both symbolic links are unequal. 

1019 :raises TypeError: If parameter ``other`` is not of type :class:`SymbolicLink`. 

1020 """ 

1021 if not isinstance(other, SymbolicLink): 

1022 ex = TypeError("Parameter 'other' is not of type 'SymbolicLink'.") 

1023 ex.add_note(f"Got type '{getFullyQualifiedName(other)}'.") 

1024 raise ex 

1025 

1026 return self._name != other._name or self._target != other._target 

1027 

1028 def __repr__(self) -> str: 

1029 return f"SymLink: {self.Path} -> {self._target}" 

1030 

1031 def __str__(self) -> str: 

1032 return self._name 

1033 

1034 

1035@export 

1036class Root(Directory): 

1037 """ 

1038 A **Root** represents the root-directory in the filesystem, which contains subdirectories, regular files and symbolic links. 

1039 """ 

1040 _ids: Dict[int, "File"] #: Dictionary of file identifier - file objects pairs found while scanning the directory structure. 

1041 _brokenSymbolicLinks: List[SymbolicLink] #: Broken symbolic links (target doesn't exist). 

1042 _unconnectedSymbolicLinks: List[SymbolicLink] #: Symbolic links which couldn't be connected to their target (out of scope). 

1043 

1044 def __init__( 

1045 self, 

1046 rootDirectory: Path, 

1047 collectSubdirectories: bool = True 

1048 ) -> None: 

1049 if rootDirectory is None: 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true

1050 raise ValueError(f"Parameter 'rootDirectory' is None.") 

1051 elif not isinstance(rootDirectory, Path): 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 raise TypeError(f"Parameter 'rootDirectory' is not of type 'Path'.") 

1053 elif not rootDirectory.exists(): 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 raise ToolingException(f"Path '{rootDirectory}' doesn't exist.") from FileNotFoundError(rootDirectory) 

1055 

1056 self._ids = {} 

1057 self._brokenSymbolicLinks = [] 

1058 self._unconnectedSymbolicLinks = [] 

1059 

1060 super().__init__(rootDirectory.name) 

1061 self._root = self 

1062 self._path = rootDirectory 

1063 

1064 if collectSubdirectories: 1064 ↛ 1065line 1064 didn't jump to line 1065 because the condition on line 1064 was never true

1065 self.CollectSubdirectories() 

1066 self.ResolveSymbolicLinks() 

1067 

1068 @readonly 

1069 def Path(self) -> Path: 

1070 """ 

1071 Read-only property to access the path of the filesystem statistics root. 

1072 

1073 :returns: Path to the root of the filesystem statistics root directory. 

1074 """ 

1075 return self._path 

1076 

1077 @readonly 

1078 def BrokenSymbolicLinks(self) -> List[SymbolicLink]: 

1079 return self._brokenSymbolicLinks 

1080 

1081 @readonly 

1082 def UnconnectedSymbolicLinks(self) -> List[SymbolicLink]: 

1083 return self._unconnectedSymbolicLinks 

1084 

1085 @readonly 

1086 def TotalHardLinkCount(self) -> int: 

1087 return sum(l for f in self._ids.values() if (l := len(f._parents)) > 1) 

1088 

1089 @readonly 

1090 def TotalHardLinkCount2(self) -> int: 

1091 return sum(1 for f in self._ids.values() if len(f._parents) > 1) 

1092 

1093 @readonly 

1094 def TotalHardLinkCount3(self) -> int: 

1095 return sum(1 for f in self._ids.values() if len(f._parents) == 1) 

1096 

1097 @readonly 

1098 def Size2(self) -> int: 

1099 return sum(f._size for f in self._ids.values() if len(f._parents) > 1) 

1100 

1101 @readonly 

1102 def Size3(self) -> int: 

1103 return sum(f._size * len(f._parents) for f in self._ids.values() if len(f._parents) > 1) 

1104 

1105 @readonly 

1106 def TotalUniqueFileCount(self) -> int: 

1107 return len(self._ids) 

1108 

1109 def RegisterBrokenSymbolicLink(self, symLink: SymbolicLink) -> None: 

1110 symLink._isBroken = True 

1111 self._brokenSymbolicLinks.append(symLink) 

1112 

1113 def RegisterUnconnectedSymbolicLink(self, symLink: SymbolicLink) -> None: 

1114 symLink._isOutOfRange = True 

1115 self._unconnectedSymbolicLinks.append(symLink) 

1116 

1117 def Copy(self) -> "Root": 

1118 """ 

1119 Copy the directory structure including all subelements and link it to the given parent. 

1120 

1121 The duration for the deep copy process is provided in :attr:`ScanDuration` 

1122 

1123 .. hint:: 

1124 

1125 Statistics like aggregated directory size are copied too. |br| 

1126 There is no rescan or repeated aggregation needed. 

1127 

1128 :returns: A deep copy of the directory structure. 

1129 """ 

1130 with Stopwatch() as sw: 

1131 root = Root(self._path, False) 

1132 root._size = self._size 

1133 

1134 for subdir in self._subdirectories.values(): 

1135 subdir.Copy(root) 

1136 

1137 for file in self._files.values(): 

1138 file.Copy(root) 

1139 

1140 for link in self._symbolicLinks.values(): 

1141 link.Copy(root) 

1142 

1143 root._scanDuration = sw.Duration 

1144 root._aggregateDuration = 0.0 

1145 

1146 return root 

1147 

1148 def __repr__(self) -> str: 

1149 return f"Root: {self.Path} (dirs: {self.TotalSubdirectoryCount}, files: {self.TotalRegularFileCount}, symlinks: {self.TotalSymbolicLinkCount})" 

1150 

1151 def __str__(self) -> str: 

1152 return self._name 

1153 

1154 

1155@export 

1156class File(Base): 

1157 """ 

1158 A **File** represents a file storage object in the filesystem, which is accessible by one or more :class:`Filename` objects. 

1159 

1160 Each file has an internal id, which is associated to a unique ID within the host's filesystem. 

1161 """ 

1162 _id: int #: Unique (host internal) file object ID) 

1163 _parents: List[Filename] #: List of reverse references to :class:`Filename` objects. 

1164 

1165 def __init__( 

1166 self, 

1167 id: int, 

1168 size: int, 

1169 parent: Nullable[Filename] = None 

1170 ) -> None: 

1171 """ 

1172 Initialize the File storage object with an ID, size and parent reference. 

1173 

1174 :param id: Unique ID of the file object. 

1175 :param size: Size of the file object. 

1176 :param parent: Optional parent reference. 

1177 """ 

1178 if id is None: 1178 ↛ 1179line 1178 didn't jump to line 1179 because the condition on line 1178 was never true

1179 raise ValueError(f"Parameter 'id' is None.") 

1180 elif not isinstance(id, int): 1180 ↛ 1181line 1180 didn't jump to line 1181 because the condition on line 1180 was never true

1181 ex = TypeError("Parameter 'id' is not of type 'int'.") 

1182 ex.add_note(f"Got type '{getFullyQualifiedName(id)}'.") 

1183 raise ex 

1184 

1185 self._id = id 

1186 

1187 if parent is None: 

1188 super().__init__(size, None) 

1189 self._parents = [] 

1190 elif isinstance(parent, Filename): 1190 ↛ 1195line 1190 didn't jump to line 1195 because the condition on line 1190 was always true

1191 super().__init__(size, parent._root) 

1192 self._parents = [parent] 

1193 parent._file = self 

1194 else: 

1195 ex = TypeError("Parameter 'parent' is not of type 'Filename'.") 

1196 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

1197 raise ex 

1198 

1199 @readonly 

1200 def ID(self) -> int: 

1201 """ 

1202 Read-only property to access the file object's unique identifier. 

1203 

1204 :returns: Unique file object identifier. 

1205 """ 

1206 return self._id 

1207 

1208 @readonly 

1209 def Parents(self) -> List[Filename]: 

1210 """ 

1211 Read-only property to access the list of filenames using the same file storage object. 

1212 

1213 .. hint:: 

1214 

1215 This allows to check if a file object has multiple filenames a.k.a hardlinks. 

1216 

1217 :returns: List of filenames for the file storage object. 

1218 """ 

1219 return self._parents 

1220 

1221 def AddParent(self, filename: Filename) -> None: 

1222 """ 

1223 Add another parent reference to a :class:`Filename`. 

1224 

1225 :param filename: Reference to a filename object. 

1226 """ 

1227 if filename is None: 1227 ↛ 1228line 1227 didn't jump to line 1228 because the condition on line 1227 was never true

1228 raise ValueError(f"Parameter 'filename' is None.") 

1229 elif not isinstance(filename, Filename): 1229 ↛ 1230line 1229 didn't jump to line 1230 because the condition on line 1229 was never true

1230 ex = TypeError("Parameter 'filename' is not of type 'Filename'.") 

1231 ex.add_note(f"Got type '{getFullyQualifiedName(filename)}'.") 

1232 raise ex 

1233 elif filename._file is not None: 1233 ↛ 1234line 1233 didn't jump to line 1234 because the condition on line 1233 was never true

1234 raise ToolingException(f"Filename is already referencing an other file object ({filename._file._id}).") 

1235 

1236 self._parents.append(filename) 

1237 filename._file = self 

1238 

1239 if filename._root is not None: 

1240 self._root = filename._root