civet.memoization

Manual control of memoization features.

  1"""
  2Manual control of memoization features.
  3"""
  4
  5from dataclasses import dataclass, field
  6from os import PathLike
  7from pathlib import Path
  8from tempfile import NamedTemporaryFile, TemporaryDirectory
  9from typing import ContextManager, Sequence, Callable, NewType
 10
 11from civet.abstract_data import AbstractDataCommand
 12from civet.shells import Shell, subprocess_run
 13
 14_IntermediatePath = NewType('IntermediatePath', Path)
 15
 16
 17@dataclass(frozen=True)
 18class Memoizer:
 19    """
 20    A `Memoizer` executes `civet.abstract_data.AbstractDataCommand` and
 21    writes their outputs to temporary paths. These outputs are cached,
 22    so that when given the same `civet.abstract_data.AbstractDataCommand`
 23    to run again, its cached output is returned.
 24
 25    ### Dependency Tree
 26
 27    `civet.abstract_data.AbstractDataCommand.command` produces a sequence
 28    which usually contain other `AbstractDataCommand`.
 29    These nested objects are dependencies which need to be computed first.
 30    Hence, `AbstractDataCommand` can be thought of as
 31    nodes of a *dependency tree* where the root is the desired output and the
 32    leaves are input files. `Memoizer` performs DFS on the tree, executing
 33    the commands represented by each node, to produce the intermediate outputs
 34    necessary to compute the root.
 35    """
 36
 37    temp_dir: Path
 38    shell: Shell
 39    require_output: bool = True
 40    _cache: dict[AbstractDataCommand, _IntermediatePath] = field(init=False, default_factory=dict)
 41
 42    def save(self, d: AbstractDataCommand, output: str | PathLike) -> None:
 43        """
 44        If `d` was previously computed, copy the cached result to `output`.
 45        Else, compute `d`, cache the result, and copy to `output`.
 46        """
 47        if d not in self._cache:
 48            self._force_save(d)
 49        self.shell(('cp', '-r', self._cache[d], output))
 50
 51    def _force_save(self, d: AbstractDataCommand) -> _IntermediatePath:
 52        """
 53        Compute `d` and cache the result.
 54        """
 55        output = self.__temp(d.preferred_suffix)
 56        cmd = self._resolve_command(d.command(output))
 57        self.shell(cmd)
 58        if self.require_output and not output.exists():
 59            print(f'output is: {output}')
 60            self.shell(('ls', self.temp_dir))
 61            raise NoOutputError(d)
 62        self._cache[d] = output
 63        return output
 64
 65    def _cache_hit(self, d: AbstractDataCommand) -> _IntermediatePath:
 66        """
 67        If `d` was previously computed, return the path to its cached result.
 68        Else, compute `d` first and then return the path to its cached result.
 69        """
 70        if d in self._cache:
 71            return self._cache[d]
 72        return self._force_save(d)
 73
 74    def _resolve_command(self, cmd: Sequence[str | PathLike | AbstractDataCommand]) -> Sequence[str | PathLike]:
 75        """
 76        Replace every `AbstractDataCommand` in `cmd` with a path to their cached output.
 77        The `AbstractDataCommand` will be computed if it was not computed before.
 78        """
 79        return tuple(self._resolve_component(c) for c in cmd)
 80
 81    def _resolve_component(self, c: str | PathLike | AbstractDataCommand) -> str | PathLike:
 82        if isinstance(c, (str, PathLike)):
 83            return c
 84        elif isinstance(c, AbstractDataCommand):
 85            return self._cache_hit(c)
 86        # TODO subshell support
 87        raise TypeError(f'{c} is not a [str | PathLike | AbstractDataCommand]')
 88
 89    def __temp(self, suffix='') -> _IntermediatePath:
 90        """
 91        Create a temporary path name.
 92        """
 93        with NamedTemporaryFile(suffix=suffix, dir=self.temp_dir) as t:
 94            pass
 95        return _IntermediatePath(Path(t.name))
 96
 97
 98@dataclass(frozen=True)
 99class Session(ContextManager[Memoizer]):
100    """
101    A `Session` manages the temporary directory of a `Memoizer`.
102    """
103
104    require_output: bool = True
105    """
106    If True, raise `NoOutputError` if a command fails to produce output to its given path.
107    """
108    shell: Shell = subprocess_run
109    """
110    A function which executes its parameters as a subprocess.
111    """
112    temp_dir: ContextManager[str] = field(default_factory=TemporaryDirectory)
113
114    def __enter__(self) -> Memoizer:
115        temp_dir_name = self.temp_dir.__enter__()
116        return Memoizer(Path(temp_dir_name), require_output=self.require_output, shell=self.shell)
117
118    def __exit__(self, exc_type, exc_val, exc_tb):
119        self.temp_dir.__exit__(exc_type, exc_val, exc_tb)
120
121
122class NoOutputError(Exception):
123    """
124    Raised when a subprocesses ran by `Memoizer` does not create its output given path.
125    """
126    pass
@dataclass(frozen=True)
class Memoizer:
18@dataclass(frozen=True)
19class Memoizer:
20    """
21    A `Memoizer` executes `civet.abstract_data.AbstractDataCommand` and
22    writes their outputs to temporary paths. These outputs are cached,
23    so that when given the same `civet.abstract_data.AbstractDataCommand`
24    to run again, its cached output is returned.
25
26    ### Dependency Tree
27
28    `civet.abstract_data.AbstractDataCommand.command` produces a sequence
29    which usually contain other `AbstractDataCommand`.
30    These nested objects are dependencies which need to be computed first.
31    Hence, `AbstractDataCommand` can be thought of as
32    nodes of a *dependency tree* where the root is the desired output and the
33    leaves are input files. `Memoizer` performs DFS on the tree, executing
34    the commands represented by each node, to produce the intermediate outputs
35    necessary to compute the root.
36    """
37
38    temp_dir: Path
39    shell: Shell
40    require_output: bool = True
41    _cache: dict[AbstractDataCommand, _IntermediatePath] = field(init=False, default_factory=dict)
42
43    def save(self, d: AbstractDataCommand, output: str | PathLike) -> None:
44        """
45        If `d` was previously computed, copy the cached result to `output`.
46        Else, compute `d`, cache the result, and copy to `output`.
47        """
48        if d not in self._cache:
49            self._force_save(d)
50        self.shell(('cp', '-r', self._cache[d], output))
51
52    def _force_save(self, d: AbstractDataCommand) -> _IntermediatePath:
53        """
54        Compute `d` and cache the result.
55        """
56        output = self.__temp(d.preferred_suffix)
57        cmd = self._resolve_command(d.command(output))
58        self.shell(cmd)
59        if self.require_output and not output.exists():
60            print(f'output is: {output}')
61            self.shell(('ls', self.temp_dir))
62            raise NoOutputError(d)
63        self._cache[d] = output
64        return output
65
66    def _cache_hit(self, d: AbstractDataCommand) -> _IntermediatePath:
67        """
68        If `d` was previously computed, return the path to its cached result.
69        Else, compute `d` first and then return the path to its cached result.
70        """
71        if d in self._cache:
72            return self._cache[d]
73        return self._force_save(d)
74
75    def _resolve_command(self, cmd: Sequence[str | PathLike | AbstractDataCommand]) -> Sequence[str | PathLike]:
76        """
77        Replace every `AbstractDataCommand` in `cmd` with a path to their cached output.
78        The `AbstractDataCommand` will be computed if it was not computed before.
79        """
80        return tuple(self._resolve_component(c) for c in cmd)
81
82    def _resolve_component(self, c: str | PathLike | AbstractDataCommand) -> str | PathLike:
83        if isinstance(c, (str, PathLike)):
84            return c
85        elif isinstance(c, AbstractDataCommand):
86            return self._cache_hit(c)
87        # TODO subshell support
88        raise TypeError(f'{c} is not a [str | PathLike | AbstractDataCommand]')
89
90    def __temp(self, suffix='') -> _IntermediatePath:
91        """
92        Create a temporary path name.
93        """
94        with NamedTemporaryFile(suffix=suffix, dir=self.temp_dir) as t:
95            pass
96        return _IntermediatePath(Path(t.name))

A Memoizer executes civet.abstract_data.AbstractDataCommand and writes their outputs to temporary paths. These outputs are cached, so that when given the same civet.abstract_data.AbstractDataCommand to run again, its cached output is returned.

Dependency Tree

civet.abstract_data.AbstractDataCommand.command produces a sequence which usually contain other AbstractDataCommand. These nested objects are dependencies which need to be computed first. Hence, AbstractDataCommand can be thought of as nodes of a dependency tree where the root is the desired output and the leaves are input files. Memoizer performs DFS on the tree, executing the commands represented by each node, to produce the intermediate outputs necessary to compute the root.

Memoizer( temp_dir: pathlib.Path, shell: Callable[[Sequence[Union[str, os.PathLike]]], NoneType], require_output: bool = True)
def save( self, d: civet.abstract_data.AbstractDataCommand, output: str | os.PathLike) -> None:
43    def save(self, d: AbstractDataCommand, output: str | PathLike) -> None:
44        """
45        If `d` was previously computed, copy the cached result to `output`.
46        Else, compute `d`, cache the result, and copy to `output`.
47        """
48        if d not in self._cache:
49            self._force_save(d)
50        self.shell(('cp', '-r', self._cache[d], output))

If d was previously computed, copy the cached result to output. Else, compute d, cache the result, and copy to output.

@dataclass(frozen=True)
class Session(typing.ContextManager[civet.memoization.Memoizer]):
 99@dataclass(frozen=True)
100class Session(ContextManager[Memoizer]):
101    """
102    A `Session` manages the temporary directory of a `Memoizer`.
103    """
104
105    require_output: bool = True
106    """
107    If True, raise `NoOutputError` if a command fails to produce output to its given path.
108    """
109    shell: Shell = subprocess_run
110    """
111    A function which executes its parameters as a subprocess.
112    """
113    temp_dir: ContextManager[str] = field(default_factory=TemporaryDirectory)
114
115    def __enter__(self) -> Memoizer:
116        temp_dir_name = self.temp_dir.__enter__()
117        return Memoizer(Path(temp_dir_name), require_output=self.require_output, shell=self.shell)
118
119    def __exit__(self, exc_type, exc_val, exc_tb):
120        self.temp_dir.__exit__(exc_type, exc_val, exc_tb)

A Session manages the temporary directory of a Memoizer.

Session( require_output: bool = True, shell: Callable[[Sequence[Union[str, os.PathLike]]], NoneType] = <function subprocess_run>, temp_dir: ContextManager[str] = <factory>)
require_output: bool = True

If True, raise NoOutputError if a command fails to produce output to its given path.

def shell(cmd: Sequence[Union[str, os.PathLike]]) -> None:
14def subprocess_run(cmd: Sequence[str | PathLike]) -> None:
15    """
16    Alias for `subprocess.run(cmd, check=True)`
17    """
18    sp.run(cmd, check=True)

A function which executes its parameters as a subprocess.

class NoOutputError(builtins.Exception):
123class NoOutputError(Exception):
124    """
125    Raised when a subprocesses ran by `Memoizer` does not create its output given path.
126    """
127    pass

Raised when a subprocesses ran by Memoizer does not create its output given path.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback