civet.memoization
Manual control of memoization features.
1""" 2Manual control of memoization features. 3""" 4 5from dataclasses import dataclass, field 6from os import PathLike 7from pathlib import Path 8from tempfile import NamedTemporaryFile, TemporaryDirectory 9from typing import ContextManager, Sequence, Callable, NewType 10 11from civet.abstract_data import AbstractDataCommand 12from civet.shells import Shell, subprocess_run 13 14_IntermediatePath = NewType('IntermediatePath', Path) 15 16 17@dataclass(frozen=True) 18class Memoizer: 19 """ 20 A `Memoizer` executes `civet.abstract_data.AbstractDataCommand` and 21 writes their outputs to temporary paths. These outputs are cached, 22 so that when given the same `civet.abstract_data.AbstractDataCommand` 23 to run again, its cached output is returned. 24 25 ### Dependency Tree 26 27 `civet.abstract_data.AbstractDataCommand.command` produces a sequence 28 which usually contain other `AbstractDataCommand`. 29 These nested objects are dependencies which need to be computed first. 30 Hence, `AbstractDataCommand` can be thought of as 31 nodes of a *dependency tree* where the root is the desired output and the 32 leaves are input files. `Memoizer` performs DFS on the tree, executing 33 the commands represented by each node, to produce the intermediate outputs 34 necessary to compute the root. 35 """ 36 37 temp_dir: Path 38 shell: Shell 39 require_output: bool = True 40 _cache: dict[AbstractDataCommand, _IntermediatePath] = field(init=False, default_factory=dict) 41 42 def save(self, d: AbstractDataCommand, output: str | PathLike) -> None: 43 """ 44 If `d` was previously computed, copy the cached result to `output`. 45 Else, compute `d`, cache the result, and copy to `output`. 46 """ 47 if d not in self._cache: 48 self._force_save(d) 49 self.shell(('cp', '-r', self._cache[d], output)) 50 51 def _force_save(self, d: AbstractDataCommand) -> _IntermediatePath: 52 """ 53 Compute `d` and cache the result. 54 """ 55 output = self.__temp(d.preferred_suffix) 56 cmd = self._resolve_command(d.command(output)) 57 self.shell(cmd) 58 if self.require_output and not output.exists(): 59 print(f'output is: {output}') 60 self.shell(('ls', self.temp_dir)) 61 raise NoOutputError(d) 62 self._cache[d] = output 63 return output 64 65 def _cache_hit(self, d: AbstractDataCommand) -> _IntermediatePath: 66 """ 67 If `d` was previously computed, return the path to its cached result. 68 Else, compute `d` first and then return the path to its cached result. 69 """ 70 if d in self._cache: 71 return self._cache[d] 72 return self._force_save(d) 73 74 def _resolve_command(self, cmd: Sequence[str | PathLike | AbstractDataCommand]) -> Sequence[str | PathLike]: 75 """ 76 Replace every `AbstractDataCommand` in `cmd` with a path to their cached output. 77 The `AbstractDataCommand` will be computed if it was not computed before. 78 """ 79 return tuple(self._resolve_component(c) for c in cmd) 80 81 def _resolve_component(self, c: str | PathLike | AbstractDataCommand) -> str | PathLike: 82 if isinstance(c, (str, PathLike)): 83 return c 84 elif isinstance(c, AbstractDataCommand): 85 return self._cache_hit(c) 86 # TODO subshell support 87 raise TypeError(f'{c} is not a [str | PathLike | AbstractDataCommand]') 88 89 def __temp(self, suffix='') -> _IntermediatePath: 90 """ 91 Create a temporary path name. 92 """ 93 with NamedTemporaryFile(suffix=suffix, dir=self.temp_dir) as t: 94 pass 95 return _IntermediatePath(Path(t.name)) 96 97 98@dataclass(frozen=True) 99class Session(ContextManager[Memoizer]): 100 """ 101 A `Session` manages the temporary directory of a `Memoizer`. 102 """ 103 104 require_output: bool = True 105 """ 106 If True, raise `NoOutputError` if a command fails to produce output to its given path. 107 """ 108 shell: Shell = subprocess_run 109 """ 110 A function which executes its parameters as a subprocess. 111 """ 112 temp_dir: ContextManager[str] = field(default_factory=TemporaryDirectory) 113 114 def __enter__(self) -> Memoizer: 115 temp_dir_name = self.temp_dir.__enter__() 116 return Memoizer(Path(temp_dir_name), require_output=self.require_output, shell=self.shell) 117 118 def __exit__(self, exc_type, exc_val, exc_tb): 119 self.temp_dir.__exit__(exc_type, exc_val, exc_tb) 120 121 122class NoOutputError(Exception): 123 """ 124 Raised when a subprocesses ran by `Memoizer` does not create its output given path. 125 """ 126 pass
18@dataclass(frozen=True) 19class Memoizer: 20 """ 21 A `Memoizer` executes `civet.abstract_data.AbstractDataCommand` and 22 writes their outputs to temporary paths. These outputs are cached, 23 so that when given the same `civet.abstract_data.AbstractDataCommand` 24 to run again, its cached output is returned. 25 26 ### Dependency Tree 27 28 `civet.abstract_data.AbstractDataCommand.command` produces a sequence 29 which usually contain other `AbstractDataCommand`. 30 These nested objects are dependencies which need to be computed first. 31 Hence, `AbstractDataCommand` can be thought of as 32 nodes of a *dependency tree* where the root is the desired output and the 33 leaves are input files. `Memoizer` performs DFS on the tree, executing 34 the commands represented by each node, to produce the intermediate outputs 35 necessary to compute the root. 36 """ 37 38 temp_dir: Path 39 shell: Shell 40 require_output: bool = True 41 _cache: dict[AbstractDataCommand, _IntermediatePath] = field(init=False, default_factory=dict) 42 43 def save(self, d: AbstractDataCommand, output: str | PathLike) -> None: 44 """ 45 If `d` was previously computed, copy the cached result to `output`. 46 Else, compute `d`, cache the result, and copy to `output`. 47 """ 48 if d not in self._cache: 49 self._force_save(d) 50 self.shell(('cp', '-r', self._cache[d], output)) 51 52 def _force_save(self, d: AbstractDataCommand) -> _IntermediatePath: 53 """ 54 Compute `d` and cache the result. 55 """ 56 output = self.__temp(d.preferred_suffix) 57 cmd = self._resolve_command(d.command(output)) 58 self.shell(cmd) 59 if self.require_output and not output.exists(): 60 print(f'output is: {output}') 61 self.shell(('ls', self.temp_dir)) 62 raise NoOutputError(d) 63 self._cache[d] = output 64 return output 65 66 def _cache_hit(self, d: AbstractDataCommand) -> _IntermediatePath: 67 """ 68 If `d` was previously computed, return the path to its cached result. 69 Else, compute `d` first and then return the path to its cached result. 70 """ 71 if d in self._cache: 72 return self._cache[d] 73 return self._force_save(d) 74 75 def _resolve_command(self, cmd: Sequence[str | PathLike | AbstractDataCommand]) -> Sequence[str | PathLike]: 76 """ 77 Replace every `AbstractDataCommand` in `cmd` with a path to their cached output. 78 The `AbstractDataCommand` will be computed if it was not computed before. 79 """ 80 return tuple(self._resolve_component(c) for c in cmd) 81 82 def _resolve_component(self, c: str | PathLike | AbstractDataCommand) -> str | PathLike: 83 if isinstance(c, (str, PathLike)): 84 return c 85 elif isinstance(c, AbstractDataCommand): 86 return self._cache_hit(c) 87 # TODO subshell support 88 raise TypeError(f'{c} is not a [str | PathLike | AbstractDataCommand]') 89 90 def __temp(self, suffix='') -> _IntermediatePath: 91 """ 92 Create a temporary path name. 93 """ 94 with NamedTemporaryFile(suffix=suffix, dir=self.temp_dir) as t: 95 pass 96 return _IntermediatePath(Path(t.name))
A Memoizer
executes civet.abstract_data.AbstractDataCommand
and
writes their outputs to temporary paths. These outputs are cached,
so that when given the same civet.abstract_data.AbstractDataCommand
to run again, its cached output is returned.
Dependency Tree
civet.abstract_data.AbstractDataCommand.command
produces a sequence
which usually contain other AbstractDataCommand
.
These nested objects are dependencies which need to be computed first.
Hence, AbstractDataCommand
can be thought of as
nodes of a dependency tree where the root is the desired output and the
leaves are input files. Memoizer
performs DFS on the tree, executing
the commands represented by each node, to produce the intermediate outputs
necessary to compute the root.
43 def save(self, d: AbstractDataCommand, output: str | PathLike) -> None: 44 """ 45 If `d` was previously computed, copy the cached result to `output`. 46 Else, compute `d`, cache the result, and copy to `output`. 47 """ 48 if d not in self._cache: 49 self._force_save(d) 50 self.shell(('cp', '-r', self._cache[d], output))
If d
was previously computed, copy the cached result to output
.
Else, compute d
, cache the result, and copy to output
.
99@dataclass(frozen=True) 100class Session(ContextManager[Memoizer]): 101 """ 102 A `Session` manages the temporary directory of a `Memoizer`. 103 """ 104 105 require_output: bool = True 106 """ 107 If True, raise `NoOutputError` if a command fails to produce output to its given path. 108 """ 109 shell: Shell = subprocess_run 110 """ 111 A function which executes its parameters as a subprocess. 112 """ 113 temp_dir: ContextManager[str] = field(default_factory=TemporaryDirectory) 114 115 def __enter__(self) -> Memoizer: 116 temp_dir_name = self.temp_dir.__enter__() 117 return Memoizer(Path(temp_dir_name), require_output=self.require_output, shell=self.shell) 118 119 def __exit__(self, exc_type, exc_val, exc_tb): 120 self.temp_dir.__exit__(exc_type, exc_val, exc_tb)
If True, raise NoOutputError
if a command fails to produce output to its given path.
123class NoOutputError(Exception): 124 """ 125 Raised when a subprocesses ran by `Memoizer` does not create its output given path. 126 """ 127 pass
Raised when a subprocesses ran by Memoizer
does not create its output given path.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback