12 changed files with 519 additions and 31 deletions
@ -0,0 +1,3 @@
@@ -0,0 +1,3 @@
|
||||
"""Shared display limits for HTML fragments and Qt rich text.""" |
||||
|
||||
IMAGE_DISPLAY_MAX_WIDTH_PX = 400 |
||||
@ -0,0 +1,265 @@
@@ -0,0 +1,265 @@
|
||||
"""Resize + compress images and (optionally) audio/video before upload or local storage.""" |
||||
|
||||
from __future__ import annotations |
||||
|
||||
import io |
||||
import logging |
||||
import mimetypes |
||||
import shutil |
||||
import subprocess |
||||
import tempfile |
||||
from pathlib import Path |
||||
from typing import TYPE_CHECKING, Literal, cast |
||||
|
||||
if TYPE_CHECKING: |
||||
from imwald.core.database import Database |
||||
|
||||
log = logging.getLogger(__name__) |
||||
|
||||
SETTING_MEDIA_COMPRESS_STRENGTH = "media_compress_strength" # "0" light, "1" balanced, "2" strong |
||||
SETTING_MEDIA_MAX_IMAGE_WIDTH = "media_max_image_width" # pixels, default 1000 |
||||
|
||||
MEDIA_STORE_MAX_WIDTH_DEFAULT = 1000 |
||||
|
||||
MediaStrength = Literal[0, 1, 2] |
||||
|
||||
|
||||
def read_compression_strength(db: Database | None) -> MediaStrength: |
||||
"""0 = light (larger files), 1 = balanced, 2 = strong (smaller files).""" |
||||
if db is None: |
||||
return 1 |
||||
raw = (db.get_setting(SETTING_MEDIA_COMPRESS_STRENGTH, "1") or "1").strip() |
||||
try: |
||||
v = int(raw) |
||||
except ValueError: |
||||
v = 1 |
||||
v = max(0, min(2, v)) |
||||
return cast(MediaStrength, v) |
||||
|
||||
|
||||
def read_max_image_width(db: Database | None) -> int: |
||||
if db is None: |
||||
return MEDIA_STORE_MAX_WIDTH_DEFAULT |
||||
raw = (db.get_setting(SETTING_MEDIA_MAX_IMAGE_WIDTH, str(MEDIA_STORE_MAX_WIDTH_DEFAULT)) or "").strip() |
||||
try: |
||||
w = int(raw) |
||||
except ValueError: |
||||
return MEDIA_STORE_MAX_WIDTH_DEFAULT |
||||
return max(320, min(w, 8192)) |
||||
|
||||
|
||||
def _strength_params(level: MediaStrength) -> dict[str, int | str]: |
||||
if level == 0: |
||||
return {"jpeg": 90, "png": 6, "video_crf": 20, "audio_abr": "128k"} |
||||
if level == 1: |
||||
return {"jpeg": 78, "png": 9, "video_crf": 23, "audio_abr": "96k"} |
||||
return {"jpeg": 66, "png": 9, "video_crf": 28, "audio_abr": "64k"} |
||||
|
||||
|
||||
def _guess_kind(filename: str, data: bytes) -> Literal["image", "video", "audio", "unknown"]: |
||||
ext = Path(filename).suffix.lower() |
||||
audio_ext = frozenset({".mp3", ".m4a", ".aac", ".opus", ".ogg", ".oga", ".flac", ".wav"}) |
||||
video_ext = frozenset({".mp4", ".mkv", ".mov", ".avi", ".m4v"}) |
||||
if ext in audio_ext: |
||||
return "audio" |
||||
if ext == ".webm": |
||||
if data.startswith(b"\x1a\x45\xdf\xa3"): |
||||
return "video" |
||||
return "audio" |
||||
if ext in video_ext: |
||||
return "video" |
||||
if ext in {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".tif", ".tiff", ".avif"}: |
||||
return "image" |
||||
if data[:8] == b"\x89PNG\r\n\x1a\n" or data[:2] == b"\xff\xd8" or data[:6] in (b"GIF87a", b"GIF89a"): |
||||
return "image" |
||||
guessed, _ = mimetypes.guess_type(filename) |
||||
if guessed: |
||||
if guessed.startswith("image/"): |
||||
return "image" |
||||
if guessed.startswith("video/"): |
||||
return "video" |
||||
if guessed.startswith("audio/"): |
||||
return "audio" |
||||
return "unknown" |
||||
|
||||
|
||||
def _ffmpeg_available() -> bool: |
||||
return shutil.which("ffmpeg") is not None |
||||
|
||||
|
||||
def compress_image_bytes( |
||||
data: bytes, |
||||
*, |
||||
max_width: int, |
||||
strength: MediaStrength, |
||||
original_name: str, |
||||
) -> tuple[bytes, str, str]: |
||||
""" |
||||
Downscale to ``max_width`` on the wide edge (if wider), then compress. |
||||
|
||||
Returns ``(bytes, mime_type, filename)``. |
||||
""" |
||||
from PIL import Image, ImageOps # type: ignore[import-not-found, import-untyped] |
||||
p = _strength_params(strength) |
||||
jpeg_q = int(p["jpeg"]) |
||||
png_level = int(p["png"]) |
||||
|
||||
bio = io.BytesIO(data) |
||||
im_t = Image.open(bio) |
||||
im_t = ImageOps.exif_transpose(im_t) |
||||
im_t.load() |
||||
w0, _h0 = im_t.size |
||||
if w0 > max_width: |
||||
nh = max(1, int(round(im_t.size[1] * (max_width / float(w0))))) |
||||
im_t = im_t.resize((max_width, nh), Image.Resampling.LANCZOS) # pyright: ignore[reportUnknownMemberType] |
||||
im = im_t |
||||
|
||||
has_alpha = im.mode in ("RGBA", "LA") or (im.mode == "P" and "transparency" in im.info) |
||||
stem = Path(original_name).stem or "image" |
||||
if has_alpha: |
||||
out = im.convert("RGBA") |
||||
buf = io.BytesIO() |
||||
out.save(buf, format="PNG", compress_level=png_level, optimize=True) |
||||
blob = buf.getvalue() |
||||
return blob, "image/png", f"{stem}.png" |
||||
|
||||
rgb = im.convert("RGB") |
||||
buf = io.BytesIO() |
||||
rgb.save(buf, format="JPEG", quality=jpeg_q, optimize=True, progressive=True) |
||||
blob = buf.getvalue() |
||||
return blob, "image/jpeg", f"{stem}.jpg" |
||||
|
||||
|
||||
def _compress_video_ffmpeg( |
||||
src: Path, |
||||
dst: Path, |
||||
*, |
||||
max_width: int, |
||||
strength: MediaStrength, |
||||
) -> bool: |
||||
p = _strength_params(strength) |
||||
crf = int(p["video_crf"]) |
||||
abr = str(p["audio_abr"]) |
||||
vf = f"scale=min({max_width}\\,iw):-2" |
||||
cmd = [ |
||||
"ffmpeg", |
||||
"-hide_banner", |
||||
"-loglevel", |
||||
"error", |
||||
"-y", |
||||
"-i", |
||||
str(src), |
||||
"-vf", |
||||
vf, |
||||
"-c:v", |
||||
"libx264", |
||||
"-preset", |
||||
"medium", |
||||
"-crf", |
||||
str(crf), |
||||
"-c:a", |
||||
"aac", |
||||
"-b:a", |
||||
abr, |
||||
"-movflags", |
||||
"+faststart", |
||||
str(dst), |
||||
] |
||||
try: |
||||
subprocess.run(cmd, check=True, capture_output=True, timeout=600) |
||||
except (subprocess.CalledProcessError, OSError, subprocess.TimeoutExpired) as e: |
||||
log.warning("ffmpeg video compress failed: %s", e) |
||||
return False |
||||
return dst.is_file() and dst.stat().st_size > 0 |
||||
|
||||
|
||||
def _compress_audio_ffmpeg(src: Path, out_m4a: Path, *, strength: MediaStrength) -> bool: |
||||
p = _strength_params(strength) |
||||
abr = str(p["audio_abr"]) |
||||
cmd = [ |
||||
"ffmpeg", |
||||
"-hide_banner", |
||||
"-loglevel", |
||||
"error", |
||||
"-y", |
||||
"-i", |
||||
str(src), |
||||
"-vn", |
||||
"-c:a", |
||||
"aac", |
||||
"-b:a", |
||||
abr, |
||||
str(out_m4a), |
||||
] |
||||
try: |
||||
subprocess.run(cmd, check=True, capture_output=True, timeout=600) |
||||
except (subprocess.CalledProcessError, OSError, subprocess.TimeoutExpired) as e: |
||||
log.warning("ffmpeg audio compress failed: %s", e) |
||||
return False |
||||
return out_m4a.is_file() and out_m4a.stat().st_size > 0 |
||||
|
||||
|
||||
def prepare_bytes_for_storage( |
||||
data: bytes, |
||||
filename: str, |
||||
*, |
||||
mime_hint: str | None, |
||||
db: Database | None, |
||||
strength: MediaStrength | None = None, |
||||
) -> tuple[bytes, str, str]: |
||||
""" |
||||
Apply user-configured compression before upload or writing ``media_cache``. |
||||
|
||||
Returns ``(blob, mime_type, filename)``. On failure, returns the original bytes. |
||||
""" |
||||
if not data: |
||||
return data, mime_hint or "application/octet-stream", filename |
||||
|
||||
lvl: MediaStrength = strength if strength is not None else read_compression_strength(db) |
||||
max_w = read_max_image_width(db) |
||||
kind = _guess_kind(filename, data) |
||||
mime = mime_hint or mimetypes.guess_type(filename)[0] or "application/octet-stream" |
||||
|
||||
if kind == "image": |
||||
try: |
||||
out_b, out_m, out_n = compress_image_bytes( |
||||
data, max_width=max_w, strength=lvl, original_name=filename |
||||
) |
||||
return out_b, out_m, out_n |
||||
except Exception as e: # noqa: BLE001 |
||||
log.warning("image compress skipped: %s", e) |
||||
return data, mime, filename |
||||
|
||||
if kind in ("video", "audio") and not _ffmpeg_available(): |
||||
log.info("ffmpeg not found; skipping %s compression", kind) |
||||
return data, mime, filename |
||||
|
||||
if kind == "video" and _ffmpeg_available(): |
||||
suf = Path(filename).suffix.lower() or ".mp4" |
||||
with tempfile.TemporaryDirectory() as td: |
||||
td_path = Path(td) |
||||
src = td_path / f"in{suf}" |
||||
dst = td_path / "out.mp4" |
||||
src.write_bytes(data) |
||||
if _compress_video_ffmpeg(src, dst, max_width=max_w, strength=lvl): |
||||
out_b = dst.read_bytes() |
||||
stem = Path(filename).stem or "video" |
||||
if len(out_b) < len(data): |
||||
return out_b, "video/mp4", f"{stem}.mp4" |
||||
return data, mime, filename |
||||
|
||||
if kind == "audio" and _ffmpeg_available(): |
||||
suf = Path(filename).suffix.lower() or ".m4a" |
||||
with tempfile.TemporaryDirectory() as td: |
||||
td_path = Path(td) |
||||
src = td_path / f"in{suf}" |
||||
dst = td_path / "out.m4a" |
||||
src.write_bytes(data) |
||||
if _compress_audio_ffmpeg(src, dst, strength=lvl): |
||||
out_b = dst.read_bytes() |
||||
stem = Path(filename).stem or "audio" |
||||
if len(out_b) < len(data): |
||||
return out_b, "audio/mp4", f"{stem}.m4a" |
||||
return data, mime, filename |
||||
|
||||
return data, mime, filename |
||||
@ -0,0 +1,68 @@
@@ -0,0 +1,68 @@
|
||||
"""Media compression before storage / upload (Pillow images; ffmpeg optional for A/V).""" |
||||
|
||||
from __future__ import annotations |
||||
|
||||
import io |
||||
import tempfile |
||||
from pathlib import Path |
||||
|
||||
from PIL import Image |
||||
|
||||
from imwald.core.database import Database |
||||
from imwald.core.media_compress import ( |
||||
SETTING_MEDIA_COMPRESS_STRENGTH, |
||||
SETTING_MEDIA_MAX_IMAGE_WIDTH, |
||||
compress_image_bytes, |
||||
prepare_bytes_for_storage, |
||||
read_compression_strength, |
||||
read_max_image_width, |
||||
) |
||||
|
||||
|
||||
def _rgb_wide_png() -> bytes: |
||||
im = Image.new("RGB", (1600, 400), color=(200, 30, 30)) |
||||
buf = io.BytesIO() |
||||
im.save(buf, format="PNG", compress_level=3) |
||||
return buf.getvalue() |
||||
|
||||
|
||||
def test_compress_image_scales_to_max_width() -> None: |
||||
raw = _rgb_wide_png() |
||||
out, mime, name = compress_image_bytes(raw, max_width=1000, strength=1, original_name="x.png") |
||||
assert mime == "image/jpeg" |
||||
assert name.endswith(".jpg") |
||||
im = Image.open(io.BytesIO(out)) |
||||
assert im.size[0] <= 1000 |
||||
|
||||
|
||||
def test_strength_stronger_yields_smaller_or_equal_jpeg() -> None: |
||||
raw = _rgb_wide_png() |
||||
light, _, _ = compress_image_bytes(raw, max_width=1000, strength=0, original_name="w.png") |
||||
strong, _, _ = compress_image_bytes(raw, max_width=1000, strength=2, original_name="w.png") |
||||
assert len(strong) <= len(light) |
||||
|
||||
|
||||
def test_read_settings_from_db() -> None: |
||||
with tempfile.TemporaryDirectory() as td: |
||||
db = Database(Path(td) / "m.sqlite") |
||||
db.connect() |
||||
assert read_compression_strength(db) == 1 |
||||
assert read_max_image_width(db) == 1000 |
||||
db.set_setting(SETTING_MEDIA_COMPRESS_STRENGTH, "2") |
||||
db.set_setting(SETTING_MEDIA_MAX_IMAGE_WIDTH, "1200") |
||||
assert read_compression_strength(db) == 2 |
||||
assert read_max_image_width(db) == 1200 |
||||
|
||||
|
||||
def test_prepare_bytes_image_round_trip() -> None: |
||||
raw = _rgb_wide_png() |
||||
with tempfile.TemporaryDirectory() as td: |
||||
db = Database(Path(td) / "m2.sqlite") |
||||
db.connect() |
||||
db.set_setting(SETTING_MEDIA_MAX_IMAGE_WIDTH, "800") |
||||
out, mime, _name = prepare_bytes_for_storage( |
||||
raw, "wide.png", mime_hint="image/png", db=db, strength=1 |
||||
) |
||||
assert mime == "image/jpeg" |
||||
im = Image.open(io.BytesIO(out)) |
||||
assert im.size[0] <= 800 |
||||
Loading…
Reference in new issue