Source code for enjoyn.animator

"""
This module contains animators that join images into the desired animation format.
"""

import itertools
import shlex
import shutil
import subprocess
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory, mkstemp
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import dask.bag
import dask.delayed
import dask.diagnostics
import imageio.v3 as iio
import numpy as np

try:
    from IPython.display import Image, Video
except ImportError:  # pragma: no cover
    Image = Video = None
try:
    import pygifsicle
except ImportError:  # pragma: no cover
    pygifsicle = None
try:
    import imageio_ffmpeg
except ImportError:  # pragma: no cover
    imageio_ffmpeg = None

from pydantic import BaseModel, Field, PrivateAttr, root_validator, validator

if TYPE_CHECKING:  # pragma: no cover
    from dask.distributed import Client
    from typing_extensions import Self

from .preprocessor import NullPreprocessor, Preprocessor


# Do not converts Args section to Attributes
# for it to show up neatly in output docs.
[docs]class BaseAnimator(BaseModel, ABC): """ The base animator containing most of the common inputs and methods used in other animators inheriting from this. Note, this should not to be used directly. Args: items: The items to animate; can be file names, bytes, numpy arrays, or anything that can be read with `imageio.imread`. If the `preprocessor` is provided, then the items can be anything the preprocessor function accepts. output_path: The path to save the output animation to. preprocessor: The preprocessor to apply to each item. More info can be found within the :meth:`enjoyn.Preprocessor` model's docstring. imwrite_kwds: Additional keywords to pass to `imageio.imwrite`. scratch_directory: The base directory to create the temporary directory for intermediary files. show_output: Whether to display the output inline; only available in an IPython environment. """ items: List[Any] = Field(min_items=2) output_path: Path preprocessor: Optional[Union[Preprocessor, Callable]] = None imwrite_kwds: Optional[Dict[str, Any]] = None scratch_directory: Optional[Path] = None show_output: bool = True _output_extension: Optional[str] = PrivateAttr(None) _temporary_directory: Optional[str] = PrivateAttr(None) @root_validator(pre=True) @abstractmethod def _plugin_installed(cls, values): # pragma: no cover """ Check whether required libraries are installed. """ pass @validator("items", pre=True) def _serialize_array(cls, value) -> List: """ Serialize anything like `np.array` into a `list`. """ return list(value) @validator("preprocessor", pre=True) def _serialize_callable(cls, value) -> Preprocessor: """ Serialize callable into `Preprocessor`. """ if callable(value): value = Preprocessor(func=value) return value
[docs] @classmethod def from_directory( cls, directory: Union[Path, str], pattern: str = "*.*", limit: Optional[int] = None, **animator_kwds, ) -> "Self": """ Searches a directory for file names that match the pattern and uses them as `items` in the animator. Args: directory: The directory to retrieve the file names. pattern: The pattern to subset the file names within the directory. limit: The maximum number of file names to use. **animator_kwds: Additional keywords to pass to the animator. Returns: An instantiated animator class. """ directory = Path(directory) if not directory.exists(): raise ValueError(f"`{directory}` does not exist") elif not directory.is_dir(): raise ValueError(f"`{directory}` must be a directory") file_paths = sorted(path for path in directory.glob(pattern) if path.is_file()) if limit: file_paths = file_paths[:limit] return cls(items=file_paths, **animator_kwds)
def _serialize_item(self, item: Any, preprocessor: Preprocessor) -> np.ndarray: """ Applies the preprocessor to the item and serialize as a `np.array`. """ with preprocessor.apply_on(item) as item: image = iio.imread(item) if not isinstance(item, np.ndarray) else item return image def _create_temporary_path(self) -> Path: temporary_path = Path( mkstemp( prefix="enjoyn_", suffix=self._output_extension, dir=self._temporary_directory, )[1] ) return temporary_path def _animate_images(self, partitioned_items: List[Any]) -> Path: """ Serializes items in the partition and creates an incomplete animation. """ if self._temporary_directory is None: raise RuntimeError( "Use the built-in `compute` method instead; the `plan` method " "does not have the temporary directory set internally yet." ) preprocessor = self.preprocessor or NullPreprocessor() images = [ self._serialize_item(item, preprocessor) for item in partitioned_items ] temporary_path = self._create_temporary_path() imwrite_kwds = self.imwrite_kwds or {} iio.imwrite( temporary_path, images, extension=self._output_extension, **imwrite_kwds, ) return temporary_path @abstractmethod def _concat_animations(self, partitioned_animations: List[Path]) -> Path: """ Concatenates the incomplete animations to create a more complete animation. """ pass @dask.delayed def _transfer_output(self, temporary_path: Path) -> Path: """ Transfers the final animation from a temporary directory to the desired output path. """ output_path = self.output_path.absolute() shutil.move(temporary_path, output_path) return output_path
[docs] def plan( self, partition_size: Optional[int] = None, split_every: int = None, visualize: bool = True, **compute_kwds: Dict[str, Any], # noqa ) -> dask.delayed: """ Assemble the plan to create the animation, partitioning items across workers, applying the preprocessor, if any, serializing the items into an incomplete animation, and progressively joining those animations into the final animation. Args: partition_size: The number of items per partition to pass to workers. split_every: The number of partitions per group while reducing. visualize: Returns a visual of how the items are delegated if True which could be useful when using a Jupyter notebook; otherwise, returns a Delayed object. **compute_kwds: Not used for anything in `plan`; exists so it's straightforward to swap `plan` out for `compute` when ready. Returns: A visualization if `visualize=True`, otherwise dask.delayed object. """ extension = self.output_path.suffix.lower() if extension != self._output_extension: raise ValueError( f"The output path must end in '{self._output_extension}' " f"to use {self.__class__.__name__}, but got '{extension}'" ) input_bag = dask.bag.from_sequence(self.items, partition_size=partition_size) temporary_file = input_bag.reduction( self._animate_images, self._concat_animations, split_every=split_every, ) output_path = self._transfer_output(temporary_file) if visualize: return output_path.visualize() else: return output_path
@staticmethod @contextmanager def _display_progress_bar(show_progress): """ Toggles displaying a progress bar. """ if show_progress: with dask.diagnostics.ProgressBar() as progress_bar: yield progress_bar else: yield
[docs] def compute( self, partition_size: Optional[int] = None, split_every: Optional[int] = None, client: Optional["Client"] = None, scheduler: Optional[str] = None, show_progress: bool = True, **compute_kwds: Dict[str, Any], ) -> Union[Image, Video, Path]: """ Execute the plan to create the animation, partitioning items across workers, applying the preprocessor, if any, serializing the items into an incomplete animation, and progressively joining those animations into the final animation. Args: partition_size: The number of items per partition to pass to workers. split_every: The number of partitions per group while reducing. client: If a distributed client is not provided, will use the local client, which has limited options. scheduler: Whether to use `threads` or `processes` workers; if unspecified, defaults to `processes` if a `preprocessor` is provided, else `threads`. show_progress: Whether to display the progress bar; if `client` is provided, the progress bar will not show. **compute_kwds: Additional keywords to pass to `dask.compute`, or if `client` is provided, `client.compute`. Returns: The path to the output animation. """ plan = self.plan( partition_size=partition_size, split_every=split_every, visualize=False ) if scheduler is None: scheduler = "processes" if self.preprocessor else "threads" compute_kwds["scheduler"] = scheduler with TemporaryDirectory(prefix="enjoyn_") as self._temporary_directory: if client is not None: output_path = client.compute(plan, **compute_kwds).result() else: with self._display_progress_bar(show_progress): output_path = dask.compute(plan, **compute_kwds)[0] if self.show_output and (Image or Video): try: return Image(output_path) except ValueError: return Video(output_path) else: return output_path
[docs]class GifAnimator(BaseAnimator): """ Used for animating images into a GIF animation. Args: gifsicle_options: A tuple of options to pass to `gifsicle`; see the [`gifsicle` manual](https://www.lcdf.org/gifsicle/man.html) for a full list of available options. """ output_path: Path = Path("enjoyn.gif") gifsicle_options: List[str] = ( "--optimize=2", "--loopcount=0", "--no-warnings", "--no-conserve-memory", ) _output_extension: str = PrivateAttr(".gif") @root_validator(pre=True) def _plugin_installed(cls, values): # pragma: no cover """ Check whether required libraries are installed. """ if pygifsicle is None: raise ImportError( "Ensure pygifsicle is installed with " "`pip install -U pygifsicle`" ) if not shutil.which("gifsicle"): raise ImportError( "Ensure gifsicle is installed with the equivalent of " "`conda install -c conda-forge gifsicle`; " "visit the docs for other installation methods" ) return values def _concat_animations(self, partitioned_animations: itertools.chain) -> Path: """ Concatenates the incomplete animations to create a more complete animation. """ temporary_path = self._create_temporary_path() partitioned_animations = list(partitioned_animations) pygifsicle.gifsicle( sources=list(partitioned_animations), destination=temporary_path, options=self.gifsicle_options, ) if not temporary_path.stat().st_size: raise RuntimeError( "gifsicle failed somewhere; ensure that the inputs, " "`items`, `output_path`, `gifsicle_options` are valid" ) return temporary_path
[docs]class Mp4Animator(BaseAnimator): """ Used for animating images into a MP4 animation. Args: ffmpeg_options: A tuple of options to pass to `ffmpeg`; see the [`ffmpeg` manual](https://ffmpeg.org/ffmpeg.html#Options) for a full list of available options. """ output_path: Path = Path("enjoyn.mp4") ffmpeg_options: List[str] = ("-loglevel warning",) _output_extension: str = PrivateAttr(".mp4") @root_validator(pre=True) def _plugin_installed(cls, values): # pragma: no cover """ Check whether required libraries are installed. """ if imageio_ffmpeg is None: raise ImportError( "Ensure imageio_ffmpeg is installed with " "`pip install -U imageio_ffmpeg`" ) if not shutil.which("ffmpeg"): raise ImportError( "Ensure ffmpeg is installed with the equivalent of " "`conda install -c conda-forge ffmpeg`; " "visit the docs for other installation methods" ) return values def _concat_animations(self, partitioned_animations: itertools.chain) -> Path: """ Concatenates the incomplete animations to create a more complete animation. """ temporary_path = self._create_temporary_path() temporary_input_file = self._create_temporary_path() with NamedTemporaryFile(suffix=".txt") as temporary_input_file: input_text = "\n".join( f"file '{animation}'" for animation in partitioned_animations ) with open(temporary_input_file.name, "w") as f: f.write(input_text) ffmpeg_options = " ".join(self.ffmpeg_options) cmd = shlex.split( f"ffmpeg -y -f concat {ffmpeg_options} -safe 0 " f"-i '{temporary_input_file.name}' -c copy '{temporary_path}'" ) run = subprocess.run(cmd, capture_output=True) if run.returncode != 0: raise RuntimeError(f"{run.stderr.decode()}") return temporary_path