diff --git a/src/zimscraperlib/image/optimization.py b/src/zimscraperlib/image/optimization.py index 6a57936..15a0daf 100644 --- a/src/zimscraperlib/image/optimization.py +++ b/src/zimscraperlib/image/optimization.py @@ -22,6 +22,8 @@ import os import pathlib import subprocess +import tempfile +import warnings from dataclasses import dataclass from typing import overload @@ -38,9 +40,8 @@ ) from PIL import Image -from zimscraperlib.image.conversion import convert_image +from zimscraperlib.constants import ALPHA_NOT_SUPPORTED from zimscraperlib.image.probing import format_for -from zimscraperlib.image.utils import save_image def ensure_matches( @@ -82,43 +83,70 @@ class OptimizePngOptions: @overload def optimize_png( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, options: OptimizePngOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_png( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, + options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_png( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_png( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: - """method to optimize PNG files using a pure python external optimizer""" - - ensure_matches(src, "PNG") - - img = Image.open(src) + """method to convert (if needed) and optimize PNG files""" if options is None: options = OptimizePngOptions() - if options.remove_transparency: - img = remove_alpha(img, options.background_color) + if not convert: + ensure_matches(src, "PNG") + + with Image.open(src) as img: + if convert: + src_format = format_for(src, from_suffix=False) + if src_format and src_format != "PNG": + if img.mode == "RGBA" and "PNG" in ALPHA_NOT_SUPPORTED: + img = img.convert("RGB") + + if options.remove_transparency: + img = remove_alpha(img, options.background_color) - if options.reduce_colors: - img, _, _ = do_reduce_colors(img, options.max_colors) + if options.reduce_colors: + img, _, _ = do_reduce_colors(img, options.max_colors) - if not options.fast_mode and img.mode == "P": - img, _ = rebuild_palette(img) + if not options.fast_mode and img.mode == "P": + img, _ = rebuild_palette(img) + + if dst is None: + dst = io.BytesIO() + + try: + img.save(dst, optimize=True, format="PNG") + if isinstance(dst, io.BytesIO): + dst.seek(0) + except Exception as exc: # pragma: no cover + if isinstance(dst, pathlib.Path) and dst.exists(): + dst.unlink() + raise exc - if dst is None: - dst = io.BytesIO() - img.save(dst, optimize=True, format="PNG") - if not isinstance(dst, pathlib.Path): - dst.seek(0) return dst @@ -144,78 +172,113 @@ class OptimizeJpgOptions: @overload def optimize_jpeg( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, + options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_jpeg( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_jpeg( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_jpeg( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: - """method to optimize JPEG files using a pure python external optimizer""" + """method to convert (if needed) and optimize JPEG files""" if options is None: options = OptimizeJpgOptions() - ensure_matches(src, "JPEG") + if not convert: + ensure_matches(src, "JPEG") - img = Image.open(src) - orig_size = ( - os.path.getsize(src) - if isinstance(src, pathlib.Path) - else src.getbuffer().nbytes - ) + with Image.open(src) as img: + orig_size = ( + os.path.getsize(src) + if isinstance(src, pathlib.Path) + else src.getbuffer().nbytes + ) - had_exif = False - if ( - not isinstance(src, pathlib.Path) - and piexif.load(src.getvalue())[ # pyright: ignore[reportUnknownMemberType] - "Exif" - ] - ) or ( - isinstance(src, pathlib.Path) - and piexif.load(str(src))["Exif"] # pyright: ignore[reportUnknownMemberType] - ): - had_exif = True + had_exif = False + if not convert: + if ( + not isinstance(src, pathlib.Path) + and piexif.load( + src.getvalue() + )[ # pyright: ignore[reportUnknownMemberType] + "Exif" + ] + ) or ( + isinstance(src, pathlib.Path) + and piexif.load(str(src))[ + "Exif" + ] # pyright: ignore[reportUnknownMemberType] + ): + had_exif = True - # only use progressive if file size is bigger - use_progressive_jpg = orig_size > 10240 # 10KiB # noqa: PLR2004 + if convert: + src_format = format_for(src, from_suffix=False) + if src_format and src_format != "JPEG": + if img.mode == "RGBA" and "JPEG" in ALPHA_NOT_SUPPORTED: + img = img.convert("RGB") - if options.fast_mode: - quality_setting = options.quality - else: - quality_setting, _ = jpeg_dynamic_quality(img) - - if dst is None: - dst = io.BytesIO() - - img.save( - dst, - quality=quality_setting, - optimize=True, - progressive=use_progressive_jpg, - format="JPEG", - ) - - if isinstance(dst, io.BytesIO): - dst.seek(0) - - if options.keep_exif and had_exif: - piexif.transplant( # pyright: ignore[reportUnknownMemberType] - exif_src=( - str(src.resolve()) if isinstance(src, pathlib.Path) else src.getvalue() - ), - image=( - str(dst.resolve()) if isinstance(dst, pathlib.Path) else dst.getvalue() - ), - new_file=dst, - ) + # only use progressive if file size is bigger + use_progressive_jpg = orig_size > 10240 # 10KiB # noqa: PLR2004 + + if options.fast_mode: + quality_setting = options.quality + else: + quality_setting, _ = jpeg_dynamic_quality(img) + + if dst is None: + dst = io.BytesIO() + + try: + img.save( + dst, + quality=quality_setting, + optimize=True, + progressive=use_progressive_jpg, + format="JPEG", + ) + + if isinstance(dst, io.BytesIO): + dst.seek(0) + + if options.keep_exif and had_exif: + piexif.transplant( # pyright: ignore[reportUnknownMemberType] + exif_src=( + str(src.resolve()) + if isinstance(src, pathlib.Path) + else src.getvalue() + ), + image=( + str(dst.resolve()) + if isinstance(dst, pathlib.Path) + else dst.getvalue() + ), + new_file=dst, + ) + except Exception as exc: # pragma: no cover + if isinstance(dst, pathlib.Path) and dst.exists(): + dst.unlink() + raise exc return dst @@ -246,49 +309,67 @@ class OptimizeWebpOptions: @overload def optimize_webp( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, + options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_webp( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_webp( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_webp( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: - """method to optimize WebP using Pillow options""" + """method to convert (if needed) and optimize WebP using Pillow options""" if options is None: options = OptimizeWebpOptions() - ensure_matches(src, "WEBP") + if not convert: + ensure_matches(src, "WEBP") + params: dict[str, bool | int | None] = { "lossless": options.lossless, "quality": options.quality, "method": options.method, } - webp_image = Image.open(src) - if dst is None: - dst = io.BytesIO() - webp_image.save(dst, format="WEBP", **params) - dst.seek(0) - else: + with Image.open(src) as img: + if convert: + src_format = format_for(src, from_suffix=False) + if src_format and src_format != "WEBP": + if img.mode == "RGBA" and "WEBP" in ALPHA_NOT_SUPPORTED: + img = img.convert("RGB") + + if dst is None: + dst = io.BytesIO() + try: - save_image(webp_image, dst, fmt="WEBP", **params) - except Exception as exc: # pragma: no cover - if ( - isinstance(src, pathlib.Path) - and isinstance(dst, pathlib.Path) - and src.resolve() != dst.resolve() - and dst.exists() - ): + img.save(dst, format="WEBP", **params) + if isinstance(dst, io.BytesIO): + dst.seek(0) + except Exception as exc: # pragma: no cover + if isinstance(dst, pathlib.Path) and dst.exists(): dst.unlink() raise exc + return dst @@ -321,38 +402,98 @@ class OptimizeGifOptions: no_extensions: bool | None = True +@overload def optimize_gif( - src: pathlib.Path, dst: pathlib.Path, options: OptimizeGifOptions | None = None + src: pathlib.Path | io.BytesIO, + dst: pathlib.Path, + options: OptimizeGifOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: - """method to optimize GIFs using gifsicle >= 1.92""" + ... + +@overload +def optimize_gif( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, + options: OptimizeGifOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: + ... +def optimize_gif( + src: pathlib.Path | io.BytesIO, + dst: pathlib.Path | io.BytesIO, + options: OptimizeGifOptions | None = None, + *, + convert: bool = False, +) -> pathlib.Path | io.BytesIO: + """method to convert (if needed) and optimize GIFs using gifsicle >= 1.92""" if options is None: options = OptimizeGifOptions() - ensure_matches(src, "GIF") - - # use gifsicle - args = ["/usr/bin/env", "gifsicle"] - if options.optimize_level: - args += [f"-O{options.optimize_level}"] - if options.max_colors: - args += ["--colors", str(options.max_colors)] - if options.lossiness: - args += [f"--lossy={options.lossiness}"] - if options.no_extensions: - args += ["--no-extensions"] - if options.interlace: - args += ["--interlace"] - args += [str(src)] - with open(dst, "w") as out_file: - gifsicle = subprocess.run(args, stdout=out_file, check=False) - - # remove dst if gifsicle failed and src is different from dst - if gifsicle.returncode != 0 and src.resolve() != dst.resolve() and dst.exists(): - dst.unlink() # pragma: no cover - - # raise error if unsuccessful - gifsicle.check_returncode() + temp_files = [] + src_path = src + + try: + if convert: + src_format = format_for(src, from_suffix=False) + if src_format and src_format != "GIF": + with tempfile.NamedTemporaryFile(suffix=".gif", delete=False) as tmp: + src_path = pathlib.Path(tmp.name) + temp_files.append(src_path) + with Image.open(src) as img: + if img.mode == "RGBA": + img = img.convert("RGB") + img.save(src_path, format="GIF") + elif isinstance(src, io.BytesIO): + with tempfile.NamedTemporaryFile(suffix=".gif", delete=False) as tmp: + src_path = pathlib.Path(tmp.name) + src_path.write_bytes(src.read()) + temp_files.append(src_path) + else: + ensure_matches(src, "GIF") + if isinstance(src, io.BytesIO): + with tempfile.NamedTemporaryFile(suffix=".gif", delete=False) as tmp: + src_path = pathlib.Path(tmp.name) + src_path.write_bytes(src.read()) + temp_files.append(src_path) + + # use gifsicle + args = ["/usr/bin/env", "gifsicle"] + if options.optimize_level: + args += [f"-O{options.optimize_level}"] + if options.max_colors: + args += ["--colors", str(options.max_colors)] + if options.lossiness: + args += [f"--lossy={options.lossiness}"] + if options.no_extensions: + args += ["--no-extensions"] + if options.interlace: + args += ["--interlace"] + args += [str(src_path)] + + if isinstance(dst, io.BytesIO): + gifsicle = subprocess.run(args, capture_output=True, check=False) + if gifsicle.returncode != 0: + dst.seek(0) + dst.truncate() + else: + dst.write(gifsicle.stdout) + gifsicle.check_returncode() + else: + with open(dst, "wb") as out_file: + gifsicle = subprocess.run(args, stdout=out_file, check=False) + # remove dst if gifsicle failed and src is different from dst + if gifsicle.returncode != 0 and src_path != dst and dst.exists(): + dst.unlink() + gifsicle.check_returncode() + + finally: + for temp_file in temp_files: + temp_file.unlink(missing_ok=True) + return dst @@ -382,57 +523,117 @@ def of( ) +@overload def optimize_image( - src: pathlib.Path, + src: pathlib.Path | io.BytesIO | bytes, dst: pathlib.Path, options: OptimizeOptions | None = None, *, - delete_src: bool | None = False, - convert: bool | str | None = False, -): + src_format: str | None = None, + dst_format: str | None = None, + delete_src: bool = False, + convert: bool | str = False, +) -> pathlib.Path: ... +@overload +def optimize_image( + src: pathlib.Path | io.BytesIO | bytes, + dst: io.BytesIO, + options: OptimizeOptions | None = None, + *, + src_format: str | None = None, + dst_format: str | None = None, + delete_src: bool = False, + convert: bool | str = False, +) -> io.BytesIO: ... +def optimize_image( + src: pathlib.Path | io.BytesIO | bytes, + dst: pathlib.Path | io.BytesIO, + options: OptimizeOptions | None = None, + *, + src_format: str | None = None, + dst_format: str | None = None, + delete_src: bool = False, + convert: bool | str = False, +) -> pathlib.Path | io.BytesIO: """Optimize image, automatically selecting correct optimizer Arguments: + src_format: format of the source image, required when src is io.BytesIO or bytes. + dst_format: format of the destination image, + required when dst is io.BytesIO and convert=True. delete_src: whether to remove src file upon success (boolean) values: True | False - convert: whether/how to convert from source before optimizing (str or boolean) + convert: whether to convert from source before optimizing (boolean) values: False: don't convert - True: convert to format implied by dst suffix - "FMT": convert to format FMT (use Pillow names)""" + True: convert to format implied by dst suffix""" + + if isinstance(convert, str): + warnings.warn( + f"Passing a string to 'convert' is deprecated. " + f"Use dst_format='{convert}' instead.", + DeprecationWarning, + stacklevel=2, + ) + dst_format = dst_format or convert + convert = True if options is None: options = OptimizeOptions.of() - src_format, dst_format = format_for(src, from_suffix=False), format_for(dst) + if isinstance(src, bytes): + src = io.BytesIO(src) - if src_format is None: # pragma: no cover - # never supposed to happens since we get format from suffix, but good for type - # checker + code safety / clean errors - raise ValueError("Impossible to guess format from src image") - if dst_format is None: - raise ValueError("Impossible to guess format from dst image") - # if requested, convert src to requested format into dst path - if convert and src_format != dst_format: - src_format = dst_format = convert if isinstance(convert, str) else dst_format - convert_image(src, dst, fmt=src_format) - src_img = pathlib.Path(dst) - else: - src_img = pathlib.Path(src) + if delete_src and isinstance(src, io.BytesIO): + raise ValueError("delete_src is not applicable when src is io.BytesIO or bytes") + if isinstance(src, pathlib.Path): + src_format = src_format or format_for(src) + if src_format is None: + raise ValueError("Impossible to guess format from src image") + else: + if src_format is None: + raise ValueError("src_format is required when src is io.BytesIO or bytes") src_format = src_format.lower() - if src_format in ("jpg", "jpeg"): - optimize_jpeg(src=src_img, dst=dst, options=options.jpg) - elif src_format == "gif": - optimize_gif(src=src_img, dst=dst, options=options.gif) - elif src_format == "png": - optimize_png(src=src_img, dst=dst, options=options.png) - elif src_format == "webp": - optimize_webp(src=src_img, dst=dst, options=options.webp) + + if isinstance(dst, pathlib.Path): + dst_format = dst_format or format_for(dst) + if dst_format is None: + raise ValueError("Impossible to guess format from dst image") + else: + if convert and dst_format is None: + raise ValueError("dst_format is required when dst is io.BytesIO and convert=True") + if dst_format is None: + dst_format = src_format + dst_format = dst_format.lower() + + # if requested, convert src to requested format into dst path + if convert: + needs_convert = src_format != dst_format + else: + dst_format = src_format + needs_convert = False + + if dst_format in ("jpg", "jpeg"): + optimize_jpeg(src=src, dst=dst, options=options.jpg, convert=needs_convert) + elif dst_format == "gif": + optimize_gif(src=src, dst=dst, options=options.gif, convert=needs_convert) + elif dst_format == "png": + optimize_png(src=src, dst=dst, options=options.png, convert=needs_convert) + elif dst_format == "webp": + optimize_webp(src=src, dst=dst, options=options.webp, convert=needs_convert) else: raise NotImplementedError( - f"Image format '{src_format}' cannot yet be optimized" + f"Image format '{dst_format}' cannot yet be optimized" ) - # delete src image if requested - if delete_src and src.exists() and src.resolve() != dst.resolve(): + if ( + delete_src + and isinstance(src, pathlib.Path) + and src.exists() + and isinstance(dst, pathlib.Path) + and dst.exists() + and not src.samefile(dst) + ): src.unlink() + + return dst