#!/usr/bin/env python
##############################################################################
#
# (c) 2025 The Trustees of Columbia University in the City of New York.
# All rights reserved.
#
# File coded by: Tieqiong Zhang and members of the Billinge Group.
#
# See GitHub contributions for a more detailed list of contributors.
# https://github.com/diffpy/diffpy.cmi/graphs/contributors
#
# See LICENSE.rst for license information.
#
##############################################################################
import shutil
from importlib.resources import as_file
from pathlib import Path
from typing import List, Union
from diffpy.cmi.installer import (
ParsedReq,
install_requirements,
parse_requirement_line,
presence_check,
)
from diffpy.cmi.log import plog
__all__ = ["PacksManager", "get_package_dir"]
class Styles:
RESET = "\033[0m"
# styles
BOLD = "\033[1m"
UNDER = "\033[4m"
# colors
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
[docs]
def get_package_dir(root_path=None):
"""Get the package directory as a context manager.
Parameters
----------
root_path : str, optional
Used for testing, overrides the files(__name__) call.
Returns
-------
context manager
A context manager that yields a pathlib.Path to the package directory.
"""
if root_path is None:
resource = Path(__file__).parents[0]
else:
resource = root_path
return as_file(resource)
def _installed_packs_dir(root_path=None) -> Path:
"""Locate requirements/packs/ for the installed package."""
with get_package_dir(root_path) as pkgdir:
pkg = Path(pkgdir).resolve()
for c in (
pkg / "requirements" / "packs",
pkg.parents[2] / "requirements" / "packs",
):
if c.is_dir():
return c
raise FileNotFoundError(
"Could not locate requirements/packs. Check your installation."
)
[docs]
class PacksManager:
"""Discovery, parsing, and installation for pack files.
Attributes
----------
packs_dir : pathlib.Path
Absolute path to the installed packs directory.
Defaults to `requirements/packs` under the installed package.
examples_dir : pathlib.Path
Absolute path to the installed examples directory.
Defaults to `docs/examples` under the installed package.
"""
def __init__(self, root_path=None) -> None:
self.packs_dir = _installed_packs_dir(root_path)
self.examples_dir = self._get_examples_dir()
def _get_examples_dir(self) -> Path:
"""Return the absolute path to the installed examples directory.
Returns
-------
pathlib.Path
Directory containing shipped examples.
"""
return (self.packs_dir / ".." / ".." / "docs" / "examples").resolve()
[docs]
def available_packs(self) -> List[str]:
"""List all available packs.
Returns
-------
list of str
Pack basenames available under :attr:`packs_dir`.
"""
return sorted(
p.stem for p in self.packs_dir.glob("*.txt") if p.is_file()
)
[docs]
def available_examples(self) -> dict[str, List[tuple[str, Path]]]:
"""Finds all examples for each pack and builds a dict.
Parameters
----------
root_path : Path
Root path to the examples directory.
Returns
-------
dict
A dictionary mapping pack names to lists of example names.
Raises
------
FileNotFoundError
If the provided root_path does not exist or is not a directory.
"""
example_dir = self.examples_dir
examples_dict = {}
for pack_path in sorted(example_dir.iterdir()):
if pack_path.is_dir():
pack_name = pack_path.stem
examples_dict[pack_name] = []
for example_path in sorted(pack_path.iterdir()):
if example_path.is_dir():
example_name = example_path.stem
examples_dict[pack_name].append(
(example_name, example_path)
)
return examples_dict
[docs]
def copy_examples(
self,
examples_to_copy: List[str],
target_dir: Union[Path | str] = None,
force: bool = False,
) -> None:
"""Copy examples or packs into the target or current working
directory.
Parameters
----------
examples_to_copy : list of str
User-specified pack(s), example(s), or "all" to copy all.
target_dir : pathlib.Path or str, optional
Target directory to copy examples into. Defaults to current
working directory.
force : bool, optional
Defaults to ``False``. If ``True``, existing files are
overwritten and directories are merged
(extra files in the target are preserved).
"""
if isinstance(target_dir, str):
target_dir = Path(target_dir)
self._target_dir = target_dir.resolve() if target_dir else Path.cwd()
self._force = force
if "all" in examples_to_copy:
self._copy_all()
return
for item in examples_to_copy:
if item in self.available_examples():
self._copy_pack(item)
elif self._is_example_name(item):
self._copy_example(item)
else:
raise FileNotFoundError(
f"No examples or packs found for input: '{item}'"
)
del self._target_dir
del self._force
return
def _copy_all(self):
"""Copy all packs and examples."""
for pack_name in self.available_examples():
self._copy_pack(pack_name)
def _copy_pack(self, pack_name):
"""Copy all examples in a single pack."""
examples = self.available_examples().get(pack_name, [])
for ex_name, ex_path in examples:
self._copy_tree_to_target(pack_name, ex_name, ex_path)
def _copy_example(self, example_name):
"""Copy a single example by its name."""
example_found = False
for pack_name, examples in self.available_examples().items():
for ex_name, ex_path in examples:
if ex_name == example_name:
self._copy_tree_to_target(pack_name, ex_name, ex_path)
example_found = True
if not example_found:
raise FileNotFoundError(
f"No examples or packs found for input: '{example_name}'"
)
def _is_example_name(self, name):
"""Return True if the given name matches any known example."""
for pack_name, examples in self.available_examples().items():
for example_name, _ in examples:
if example_name == name:
return True
return False
def _copy_tree_to_target(self, pack_name, example_name, example_origin):
"""Copy an example folder from source to the user's target
directory."""
target_dir = self._target_dir / pack_name / example_name
target_dir.parent.mkdir(parents=True, exist_ok=True)
if target_dir.exists() and self._force:
self._overwrite_example(
example_origin, target_dir, pack_name, example_name
)
return
if target_dir.exists():
self._copy_missing_files(example_origin, target_dir)
print(
f"WARNING: Example '{pack_name}/{example_name}'"
" already exists at the specified target directory. "
"Existing files were left unchanged; "
"new or missing files were copied. To overwrite everything, "
"rerun with --force."
)
return
self._copy_new_example(
example_origin, target_dir, pack_name, example_name
)
def _overwrite_example(
self, example_origin, target, pack_name, example_name
):
"""Delete target and copy example."""
shutil.rmtree(target)
shutil.copytree(example_origin, target)
print(f"Overwriting example '{pack_name}/{example_name}'.")
def _copy_missing_files(self, example_origin, target):
"""Copy only files and directories that are missing in the
target."""
for example_item in example_origin.rglob("*"):
rel_path = example_item.relative_to(example_origin)
target_item = target / rel_path
if example_item.is_dir():
target_item.mkdir(parents=True, exist_ok=True)
elif example_item.is_file() and not target_item.exists():
target_item.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(example_item, target_item)
def _copy_new_example(
self, example_origin, target, pack_name, example_name
):
shutil.copytree(example_origin, target)
print(f"Copied example '{pack_name}/{example_name}'.")
def _resolve_pack_file(self, identifier: Union[str, Path]) -> Path:
"""Resolve a pack identifier to an absolute .txt path.
Rules
-----
1) Absolute path to a ``.txt`` file is NOT accepted.
2) The identifier is treated as a basename that must exist
under :attr:`packs_dir`.
Parameters
----------
identifier : str or path-like
Basename to resolve.
Returns
-------
pathlib.Path
Absolute path to the pack file.
Raises
------
FileNotFoundError
If the pack cannot be found per the above rules.
"""
p = Path(identifier)
if p.is_absolute():
raise FileNotFoundError(
f"Absolute pack paths are not supported: {p}.\
Use a provided pack or \
define extra requirements using a profile."
)
cand = self.packs_dir / f"{p.name}.txt"
if cand.is_file():
return cand.resolve()
raise FileNotFoundError(f"Pack not found: {identifier} ({cand})")
[docs]
def pack_requirements(
self, identifier: Union[str, Path]
) -> List[ParsedReq]:
"""Return parsed requirements for a pack.
Parameters
----------
identifier : str or path-like
Installed pack name.
Returns
-------
list of ParsedReq
Parsed requirements from the pack file.
"""
path = self._resolve_pack_file(identifier)
lines: List[str] = []
for ln in path.read_text(encoding="utf-8").splitlines():
s = ln.strip()
if s and not s.startswith("#"):
lines.append(s)
return [parse_requirement_line(s) for s in lines]
[docs]
def check_pack(self, identifier: Union[str, Path]) -> bool:
"""Return whether a pack is installed.
Parameters
----------
identifier : str or path-like
Basename to the pack file.
Returns
-------
bool
``True`` if the pack is installed, ``False`` otherwise.
"""
reqs = self.pack_requirements(identifier)
return presence_check(reqs)[0]
[docs]
def install_pack(self, identifier: str | Path) -> None:
"""Install a pack and verify presence.
Parameters
----------
identifier : str
Basename to the pack file.
"""
path = self._resolve_pack_file(identifier)
reqs = self.pack_requirements(path.stem)
scripts_root = self.packs_dir / "scripts"
plog.info("Installing pack: %s", path.stem)
if install_requirements(reqs, scripts_root=scripts_root) == 0:
plog.info("Pack '%s' installation complete.", path.stem)
else:
plog.error("Pack '%s' installation failed.", path.stem)
[docs]
def print_packs(self) -> None:
"""Print information about available packs."""
uninstalled_packs, installed_packs = [], []
s = Styles()
for pack in self.available_packs():
if self.check_pack(pack):
installed_packs.append(pack)
else:
uninstalled_packs.append(pack)
print(f"{s.BOLD}{s.UNDER}{s.BLUE}Installed Packs:{s.RESET}")
for pack in installed_packs:
if not installed_packs:
print(" (none)")
else:
print(f" {pack}")
print(f"\n{s.BOLD}{s.UNDER}{s.BLUE}Available Packs:{s.RESET}")
if not uninstalled_packs:
print(" (all packs installed)")
else:
for pack in uninstalled_packs:
print(f" {pack}")
[docs]
def print_examples(self) -> None:
"""Print information about available examples."""
s = Styles()
print(f"\n{s.BOLD}{s.UNDER}{s.CYAN}Examples:{s.RESET}")
examples_dict = self.available_examples()
for pack, examples in examples_dict.items():
print(f" {s.BOLD}{pack}:{s.RESET}")
for ex_name, _ in examples:
print(f" - {ex_name}")
[docs]
def print_info(self) -> None:
"""Print information about available packs, profiles, and
examples."""
# packs
self.print_packs()
# profiles
from diffpy.cmi.profilesmanager import ProfilesManager
prm = ProfilesManager()
prm.print_profiles()
# examples
self.print_examples()