步骤/目录:
1.需求介绍
2.ffmpeg
3.tinypng
4.squoosh
    (1)Node.js
    (2)克隆仓库并安装运行
        a.本地运行网页服务器
        b.运行桌面版
        c.squoosh-cli
    (3)squoosh的使用
5.修改vimg_processor_multi

本文首发于个人博客https://lisper517.top/index.php/archives/60/,转载请注明出处。
本文的目的是介绍一些图片压缩的工具和使用方法。
本文写作日期为2022年10月19日。运行的平台为win10,编辑器为VS Code。

1.需求介绍

在处理前端时,常常需要放一些图片在网页中,但是一般素材图片占用空间都很大,直接放到网页上不太合适,如果有方法把图片压缩一下就very的nice。本文就介绍一些图片压缩的工具和方法。

2.ffmpeg

在之前的文章 Python,爬虫与深度学习(17)——电影的信息爬取、下载与处理(二) 中,笔者用python写了一个vimg_processor类,主要是用ffmpeg来获取图片/视频信息、压缩图片/视频。其中使用ffmpeg压缩图片的命令参考了 这篇文章 ,如下:

'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(self.origin_abs_path, self.palette)
'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
'ffmpeg -i "{}" -vf scale={}  -y "{}"'.format(self.temp_img, self.definition, self.compressed_abs_path)

总的来说是使用调色板将原图使用的256色减少,然后根据调色板输出图片,最后再压缩图片尺寸。
笔者其实不太了解ffmpeg,使用上面的命令,虽然图片占用空间能压缩到很小,但色彩减少导致肉眼能看出很多差距,比较明显的是在人像中,如果涂了口红,压缩后的图片会呈现一种灰红色的质感,这是无法接受的。

经过一番寻找,笔者发现了其他压缩图片的方法,它们压缩图片的算法比较复杂,效果也远非简单的3句ffmpeg命令可比。

3.tinypng

TinyPNG 据说是一个久负盛名的图片压缩网站,可惜它的压缩算法没有开源。如果你只是想压缩几张图片,可以到它的网址 https://tinypng.com/ 手动上传图片,据说每天只能上传20张不超过5MB的图片;如果需要批量压缩图片,你可以到 https://tinypng.com/developers 填写姓名、邮箱申请一个API key(限制是每月只能压缩500张),然后使用python或者其他语言写一个客户端实现压缩脚本,可以参考 这个项目

使用TinyPNG确实压缩效果不错,可惜源码不开源,而且能压缩的图片张数有限制(据说有方法开源绕开限制,参考 这篇文章 ,原理是tinypng服务器限制上传次数时用到了ip进行判断,所以在构造请求头时修改了ip)。综合考虑下来,笔者放弃了TinyPNG,毕竟万一哪天这类闭源工具开始收费就不太合适了。

像TinyPNG这样的在线图片压缩网站还有很多,比如:
https://tinyjpg.com/ (TinyPNG的系列网站)
https://imagestool.com/zh_CN/compress-images.html
https://squoosh.app/

本文将重点介绍squoosh。

4.squoosh

squoosh 是google chrome labs推出的一款开源图片压缩工具,源码放到了github上: https://github.com/GoogleChromeLabs/squoosh ,这些源码主要是TypeScript、JavaScript写的,但是笔者希望用python调用这个图像压缩的工具,这需要一些技巧,下面以Win10为例讲解。

(1)Node.js

因为要用JavaScript,需要安装Node.js、npm。按照 runoob教程 ,进行如下操作:
https://nodejs.org/zh-cn/download/ 下载对应的安装包,笔者选择的是14.15.1的64位Windows安装包(下载地址为 https://nodejs.org/dist/v14.15.1/node-v14.15.1-x64.msi ,因为在 这个问题 中提到开发时用的就是14.15.1版本),一路确认,唯一需要注意的是中间有一步是勾选 Automatically install the neccessary tools ,把这个勾选去掉,因为它会安装其他版本的python3、污染环境。安装完后,在cmd中实验一下,输入 node --versionnpm --version ,可输出版本号。

其他安装方法包括安装二进制文件、docker安装、linux上编译安装,这里就不介绍了。

(2)克隆仓库并安装运行

a.本地运行网页服务器

第一种方法是在本地运行网页服务器,这种方法并不是很推荐。
项目的github地址 下载zip包(或者你也可用pull到本地),解压到合适的位置,在cmd中进入该目录,进行如下操作:

npm install
npm run build
npm run dev

最后一行命令会开启一个本地的服务器,好比你把 https://squoosh.app/ 搬到了本地的服务器运行。笔者在实验时出现了一些错误,具体信息如下:

>npm install
npm WARN read-shrinkwrap This version of npm is compatible with lockfileVersion@1, but package-lock.json was generated for lockfileVersion@2. I'll try to do my best with it!

> squoosh@2.0.0 prepare D:\squoosh-dev
> husky install

npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@2.1.3 (node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@2.1.3: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})

added 816 packages from 299 contributors and audited 819 packages in 34.939s

43 packages are looking for funding
  run `npm fund` for details

found 106 vulnerabilities (89 moderate, 12 high, 5 critical)
  run `npm audit fix` to fix them, or `npm audit` for details
>npm run build

> squoosh@2.0.0 build D:\squoosh-dev
> rollup -c && node lib/move-output.js


src/static-build/index.tsx → .tmp/build...
Browserslist: caniuse-lite is outdated. Please run:
npx browserslist@latest --update-db
Unable to determine site origin, defaulting to https://squoosh.app
created .tmp/build in 26.7s
>npm run dev

> squoosh@2.0.0 dev D:\squoosh-dev
> DEV_PORT="${DEV_PORT:=5000}" run-p watch serve

'DEV_PORT' 不是内部或外部命令,也不是可运行的程序
或批处理文件。
npm ERR! code ELIFECYCLE
npm ERR! errno 1
npm ERR! squoosh@2.0.0 dev: `DEV_PORT="${DEV_PORT:=5000}" run-p watch serve`
npm ERR! Exit status 1
npm ERR!
npm ERR! Failed at the squoosh@2.0.0 dev script.
npm ERR! This is probably not a problem with npm. There is likely additional logging output above.

npm ERR! A complete log of this run can be found in:
npm ERR!     C:\Users\xxx\AppData\Roaming\npm-cache\_logs\xxxx-debug.log

不知道这是为什么。即使运行 npm audit fixnpx browserslist@latest --update-db 也无济于事;安装16版本的Node.js,又报错 Could not load /squoosh-dev/src/shared/prerendered-app/colors.css ,最后只好放弃。根据报错信息,如果成功安装,大概访问本机的5000端口就能看到squoosh网页。
实际上,就算通过这种方式成功安装了squoosh,也和浏览器访问 https://squoosh.app/ 几乎一样,唯一的区别是不需要联网。如果有需要,笔者更推荐下一种方法。

b.运行桌面版

根据 https://github.com/matiasbenedetto/squoosh-desktop-app 这个项目的介绍,到 https://squoosh-desktop.vercel.app/ 网页去下载 Windows版的Portable文件 即可,是一个exe文件(只有45MB左右大小),打开来是一个隐藏了一些UI的浏览器,这种方式下都不需要安装Node.js了。

c.squoosh-cli

上面两种方法提供的主要是网页UI,只适合少量图片压缩。如果要大量压缩图片,最好还是能使用命令行、脚本。squoosh确实提供了这样的API。

同样安装Node.js-v14.15.1,下载squoosh项目的zip包,解压到合适的位置,在cmd中进入该目录,但是这次执行以下命令:

npm i -g @squoosh/cli

这样的话会安装squoosh-cli,以后在cmd中用 squoosh-cli <options...> 就可以调用squoosh进行图片压缩。或者你也可以不运行 npm i -g @squoosh/cli 安装,而是直接用 npx @squoosh/cli <options...> 的格式来调用squoosh(推荐还是用 npm i -g @squoosh/cli 安装一下)。最后记得把squoosh-dev文件夹加到PATH中,就可以随时调用squoosh-cli。

(3)squoosh的使用

前两种方法安装squoosh或者网页访问 https://squoosh.app/ 时使用都差不多,这里主要介绍一下命令行使用 squoosh-cli 的参数。cmd输入 squoosh-cli -h 可显示以下信息:

Usage: squoosh-cli [options] <files...>

Options:
  -d, --output-dir <dir>                                 Output directory (default: ".")
  -s, --suffix <suffix>                                  Append suffix to output files (default: "")
  --max-optimizer-rounds <rounds>                        Maximum number of compressions to use for auto optimizations
                                                         (default: "6")
  --optimizer-butteraugli-target <butteraugli distance>  Target Butteraugli distance for auto optimizer (default:
                                                         "1.4")
  --resize [config]                                      Resize the image before compressing
  --quant [config]                                       Reduce the number of colors used (aka. paletting)
  --rotate [config]                                      Rotate image
  --mozjpeg [config]                                     Use MozJPEG to generate a .jpg file with the given
                                                         configuration
  --webp [config]                                        Use WebP to generate a .webp file with the given configuration
  --avif [config]                                        Use AVIF to generate a .avif file with the given configuration
  --jxl [config]                                         Use JPEG-XL to generate a .jxl file with the given
                                                         configuration
  --wp2 [config]                                         Use WebP2 to generate a .wp2 file with the given configuration
  --oxipng [config]                                      Use OxiPNG to generate a .png file with the given
                                                         configuration
  -h, --help                                             display help for command

参考 这篇文章 ,使用以下参数:

squoosh-cli --mozjpeg {quality:75,baseline:false,arithmetic:false,progressive:true,optimize_coding:true,smoothing:0,color_space:3,quant_table:3,trellis_multipass:false,trellis_opt_zero:false,trellis_opt_table:false,trellis_loops:1,auto_subsample:true,chroma_subsample:2,separate_chroma_quality:false,chroma_quality:75} "输入路径" -d "输出目录" -s "#test#"

就能达到非常不错的压缩效果。你可以自己在squoosh网页里实验不同的参数,笔者只实验调整了一下quality。

5.修改vimg_processor_multi

修改了之前的vimg_processor_multi.py,调整了vimg_processor的compress_img方法,稍微修改了compress_imgs函数,最终成品如下(所有标注 #需要更改 的上一行是可能需要更改的地方):

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, time, json, subprocess, re, traceback, redis, platform, sys
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis

#CPU_THREADS = (12, 8)
CPU_THREADS = (15, 10)  # ffmpeg软编解码时线程数,根据CPU配置调整。分别为解码用的线程、编码用的线程,在压缩时,前者大于后者
#需要更改
# 可以用一个文件实验最佳的CPU_THREADS。软编解码的快慢看ffmpeg最右边的 speed=数字x
NAS_mounted_path = 'D:\\'  # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
CWD = os.getcwd()
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join(CWD, 'logs')
#需要更改
if not os.path.isdir(s=log_path):
    os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
                       log_file_name='vimg_processor_multi.log')
TEMP_DIR = os.path.join(CWD, 'temp')  # temp这个文件夹主要是在压缩图片时存放临时的调色板
#需要更改
if not os.path.isdir(s=TEMP_DIR):
    os.mkdir(path=TEMP_DIR)
redis_key_set_todo = 'vimg_processor:set:todo'  # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong'  #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:'  # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:'  # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
start_time = time.time()
fail_msg_list = []  # 每次有错误信息时都放到这个列表中
sys_kind = platform.system()  # 字符串,表示系统类型


def get_suffix(filename: str) -> str:
    '''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
    :param filename: 文件名(可带路径前缀)
    :return str,文件后缀名(不含 . )'''
    suffix = os.path.splitext(filename)[-1]
    return (suffix[1:], '')[suffix == '']


def is_video(filename: str) -> bool:
    '''根据后缀名判断文件是否视频。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是视频,False表示非视频'''
    video_suffix_set = {
        "WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
        "M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
    }
    #需要更改
    #iso文件不一定能用ffmpeg打开,可以自行取舍
    return (False, True)[get_suffix(filename).upper() in video_suffix_set]


def is_img(filename: str) -> bool:
    '''根据后缀名判断文件是否图片。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是图片,False表示非图片'''
    img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
    #需要更改
    return (False, True)[get_suffix(filename).upper() in img_suffix_set]


def replace_path_seperator(path: str, sys_kind: str) -> str:
    '''根据操作系统,替换路径中的分隔符。
    :param path: 需要替换的路径
    :param sys_kind: 目标操作系统
    :return str,替换好的路径'''
    if sys_kind == 'Windows':
        return path.replace('/', '\\')
    elif sys_kind == 'Linux':
        return path.replace('\\', '/')


def is_origin(filename: str) -> bool:
    '''接受文件名,判断这个文件是源文件还是压缩后的文件。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示该文件为源文件'''
    prefix = os.path.splitext(filename)[0]
    return (True, False)[re.search(r'#test#$', prefix) != None]


def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
    '''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
    :由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
    :将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
    :param abs_path: 文件在本机的绝对路径
    :return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
    global NAS_mounted_path
    redis_rel_path = abs_path[len(NAS_mounted_path):]
    return replace_path_seperator(redis_rel_path, sys_kind='Linux')


def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
    '''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
    :将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
    :param redis_rel_path: redis中存储的相对路径
    :return str,文件在本机中的绝对路径'''
    global NAS_mounted_path, sys_kind
    abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
    return replace_path_seperator(abs_path, sys_kind=sys_kind)


def get_compressed_abs_path(abs_path: str, fmt: str = '') -> str:
    '''根据源文件的绝对路径,返回压缩后文件的绝对路径。这里的绝对路径都是本机上的。
    :压缩后的图像或者视频将放在同路径下,所以需要给转换后的文件名中加上字符串#test#(在后缀名前):1.mp4 -> 1#test#.mp4
    :有些特殊情况下压缩后文件会重名,比如一个文件夹中同时存在1.mp4、1.rmvb时,又用fmt指定压缩后格式。此时会对压缩后的文件加上序号重命名(1#test#.mp4,1#test#(1).mp4,1#test#(2).mp4)
    :param abs_path: 需要压缩的文件的绝对路径
    :param fmt: 指定转换后文件的格式,为空字符串时不改变格式
    :return str,压缩后文件的绝对路径,根据操作系统调整'''
    prefix, suffix = os.path.splitext(abs_path)
    suffix = (fmt, suffix.lstrip('.'))[fmt == '']
    compressed_abs_path = prefix + '#test#.' + suffix
    i = 1
    while (os.path.exists(compressed_abs_path)):
        compressed_abs_path = prefix + '#test#(' + str(i) + ').' + suffix
        i += 1
    return compressed_abs_path


def get_origin_abs_path(abs_path: str) -> str:
    '''根据压缩后文件的绝对路径到redis中查询,返回源文件的绝对路径。这里的绝对路径都是本机上的。
    :这个函数只应该用于已经压缩完的文件,函数将到redis_key_str_done的所有字符串中进行查询
    :param abs_path: 压缩后文件的绝对路径
    :return str,源文件的绝对路径,根据操作系统调整;若无法查询到,返回空字符串'''
    global r_cur, sys_kind, NAS_mounted_path, redis_key_str_done
    compressed_redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
    redis_rel_paths_keys = r_cur.keys(redis_key_str_done + '*')
    for redis_rel_path_key in redis_rel_paths_keys:
        if r_cur.get(redis_rel_path_key) == compressed_redis_rel_path:
            origin_abs_path = redis_rel_path_key[len(redis_key_str_done):]
            origin_abs_path = replace_path_seperator(origin_abs_path,
                                                     sys_kind=sys_kind)
            origin_abs_path = os.path.join(NAS_mounted_path, origin_abs_path)
            return origin_abs_path
    return ''


class vimg_processor(object):
    '''用于压缩的图像视频类。初始化时会读取图像视频的一些参数。'''

    def __init__(self,
                 origin_abs_path: str,
                 compressed_abs_path: str = "",
                 video_size: str = '',
                 img_max_pix: int = 0,
                 fmt: str = ''):
        '''用文件的绝对路径等参数初始化对象。
        :param origin_abs_path: 源文件的绝对路径
        :param compressed_abs_path: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存(1.mp4 -> 1#test#.mp4),有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名
        :param video_size: 输出视频文件时,适配的屏幕大小。有 phone 、 pad 、 pc 、 空字符串 四种选择,空字符串表示保留原宽高
        :param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
        :param fmt: 输出文件的格式,即后缀名。为空字符串时,表示按原格式输出。其他可取值如 mp4 、 jpg 等'''
        self.origin_abs_path = origin_abs_path
        self.compressed_abs_path = (compressed_abs_path,
                                    get_compressed_abs_path(
                                        origin_abs_path,
                                        fmt=fmt))[compressed_abs_path == ""]
        self.width, self.height = 0, 1
        if is_video(origin_abs_path):
            self.get_video_info()
            w, h = self.width, self.height
            ratio = w / h
            self.ratio = ratio
            definition = ''
            if video_size == 'phone':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '360:640')[w >= 360]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '360:480')[w >= 360]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '480:360')[w >= 480]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '640:360')[w >= 640]
            elif video_size == 'pad':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '720:1280')[w >= 720]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '720:960')[w >= 720]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '960:720')[w >= 960]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '1280:720')[w >= 1280]
            elif video_size == 'pc':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
            elif video_size == '':
                definition = f'{w}:{h}'
            self.definition = definition
        elif is_img(origin_abs_path):
            self.pix_fmt = ''  # pix_fmt即像素格式
            fmt = (fmt, get_suffix(origin_abs_path))[fmt == '']
            self.palette = os.path.join(TEMP_DIR, 'palette.' + fmt)
            self.temp_img = os.path.join(TEMP_DIR, 'temp_img.' + fmt)
            self.get_img_info()
            w, h = self.width, self.height
            img_max_pix = abs(img_max_pix)
            if img_max_pix == 0:
                pass
            elif w > img_max_pix and h > img_max_pix:
                ratio = max(w, h) / img_max_pix
                w, h = int(w / ratio), int(h / ratio)
            self.definition = f'{w}:{h}'

    def get_video_info(self):
        '''使用ffprobe获取视频的宽高(width、height)。'''
        global sys_kind
        command_probe = f'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{self.origin_abs_path}"'
        if sys_kind == 'Windows':
            result = subprocess.Popen(command_probe,
                                      shell=False,
                                      stdout=subprocess.PIPE).stdout
        elif sys_kind == 'Linux':
            result = subprocess.Popen(command_probe,
                                      shell=True,
                                      stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        self.width = int(json_data['streams'][0]['width'])
        self.height = int(json_data['streams'][0]['height'])
        # self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
        # 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算

    def get_img_info(self):
        '''获取图片的width、height、pix_fmt。'''
        global sys_kind
        command_probe = f'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{self.origin_abs_path}"'
        if sys_kind == 'Windows':
            result = subprocess.Popen(command_probe,
                                      shell=False,
                                      stdout=subprocess.PIPE).stdout
        elif sys_kind == 'Linux':
            result = subprocess.Popen(command_probe,
                                      shell=True,
                                      stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
        self.width = int(json_data['streams'][0]['width'])
        self.height = int(json_data['streams'][0]['height'])

    def compress_video(self):
        '''通过ffmpeg压缩视频文件(输出的像素格式指定为yuv420p)。'''
        global CPU_THREADS
        # AMD显卡加速,硬编码
        '''command_compress = 'ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"'.format(
            self.origin_abs_path, self.definition, self.compressed_abs_path)'''
        # NVIDIA显卡加速,硬编解码
        '''command_compress = 'ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"'.format(
            self.origin_abs_path, self.definition, self.compressed_abs_path)'''
        # CPU软编解码。-threads 参数可限制线程数,控制CPU占用。
        command_compress = 'ffmpeg -threads {} -i "{}" -threads {} -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
            CPU_THREADS[0], self.origin_abs_path, CPU_THREADS[1],
            self.definition, self.compressed_abs_path)
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_video failed,视频压缩失败:{command_compress}'
            )

    def compress_img_ffmpeg(self):
        '''通过ffmpeg压缩图像文件。'''
        command_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
            self.origin_abs_path, self.palette)
        command_compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
            self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
        result = os.system(command_palette)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img_ffmpeg failed,无法获得调色板:{command_palette}'
            )
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img_ffmpeg failed,生成中间图片失败:{command_compress}'
            )
        command_compress = 'ffmpeg -i "{}" -vf scale={}  -y "{}"'.format(
            self.temp_img, self.definition, self.compressed_abs_path)
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img_ffmpeg failed,生成最终图片失败:{command_compress}'
            )

    def compress_img_squoosh(self, quality: int = 75):
        '''通过squoosh-cli压缩图像文件。\n安装squoosh-cli见https://lisper517.top/index.php/archives/60/
        :param quality: 压缩图片的品质,取值范围为0-100,详见squoosh的说明'''
        basename_origin = os.path.basename(self.origin_abs_path)
        abs_dir = self.origin_abs_path[:-len(basename_origin)]
        basename_compressed = os.path.basename(self.compressed_abs_path)
        prefix_origin = os.path.splitext(basename_origin)[0]
        prefix_compressed = os.path.splitext(basename_compressed)[0]
        suffix = prefix_compressed[len(prefix_origin):]
        fmt = get_suffix(basename_compressed).upper()
        if fmt == 'JPG':
            fmt = 'mozjpeg'
        elif fmt == 'PNG':
            fmt = 'oxipng'
        elif fmt == 'WEBP':
            fmt = 'webp'
        command_compress = '''squoosh-cli --{} {{quality:{},baseline:false,arithmetic:false,progressive:true,optimize_coding:true,smoothing:0,color_space:3,quant_table:3,trellis_multipass:false,trellis_opt_zero:false,trellis_opt_table:false,trellis_loops:1,auto_subsample:true,chroma_subsample:2,separate_chroma_quality:false,chroma_quality:75}} "{}" -d "{}" -s "{}"'''.format(
            fmt, quality, self.origin_abs_path, abs_dir, suffix)
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img_squoosh failed,生成最终图片失败:{command_compress}'
            )


'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''


def get_definitions_video(target_dir: str):
    '''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。
    :param target_dir: 目标文件夹的绝对路径
    :结果会输出到当前目录下的 definitions.txt 文件中'''
    global CWD
    ratio_dict = dict()  # 字典,键为宽高的比值,值为列表,包括该比值下的具体宽高数值
    # 例:1.7777778:['640x360', '1280x720', '1920x1080']
    for dirpath, dirnames, filenames in os.walk(target_dir):
        for filename in filenames:
            abs_path = os.path.join(dirpath, filename)
            if is_video(abs_path):
                print('\r视频文件:{:200}'.format(abs_path), end='')
                vimg = vimg_processor(abs_path)
                ratio = vimg.ratio
                definition = "{}x{}".format(vimg.width, vimg.height)
                if ratio_dict.get(ratio) == None:
                    ratio_dict[ratio] = [definition]
                else:
                    if definition not in ratio_dict[ratio]:
                        ratio_dict[ratio].append(definition)
    # 按宽高比值升序输出
    ratio_list = []
    for key, value in ratio_dict.items():
        ratio_list.append(key)
    ratio_list.sort()
    with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
        for ratio in ratio_list:
            ratio_dict[ratio].sort()
            f.write('宽高比:{:.6},成员:'.format(ratio))
            for definition in ratio_dict[ratio][:-1]:
                f.write(definition + ' , ')
            f.write(ratio_dict[ratio][-1] + '\n')
    print('')


def compress_videos(video_size: str = '', fmt: str = ''):
    '''从redis中得到需要压缩的视频文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
    :对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
    :输出文件在源文件名末尾加 #test# 放在同目录下(1.mp4 -> 1#test#.mp4);输出文件有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名。
    :param video_size: 视频适配的屏幕大小。 phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变原大小
    :param fmt: 指定输出的文件格式,空字符串表示原格式。建议用mp4
    '''
    global NAS_mounted_path, r_cur, log, start_time, fail_msg_list, sys_kind
    global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    mylog.log_print(log, 'info', msg)
    while (True):
        if r_cur.scard(redis_key_set_todo) == 0:
            break
        try:
            redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
            rel_path = replace_path_seperator(redis_rel_path,
                                              sys_kind=sys_kind)
            abs_path = os.path.join(NAS_mounted_path, rel_path)
            if is_video(abs_path) and os.path.isfile(abs_path):
                vimg = vimg_processor(origin_abs_path=abs_path,
                                      video_size=video_size,
                                      fmt=fmt)
                compressed_abs_path = vimg.compressed_abs_path
                if os.path.exists(compressed_abs_path):  # 输出文件已存在时跳过
                    continue
                start_time = time.time()
                log.info('{}压缩中...'.format(abs_path))
                compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
                    compressed_abs_path)
                r_cur.set(redis_key_str_ongoing + redis_rel_path,
                          compressed_redis_rel_path)
                vimg.compress_video()
                r_cur.delete(redis_key_str_ongoing + redis_rel_path)
                r_cur.set(redis_key_str_done + redis_rel_path,
                          compressed_redis_rel_path)
                log.info('视频压缩成功,耗时{:.2f}min'.format(
                    (time.time() - start_time) / 60))
        except:
            except_info = traceback.format_exc()
            fail_msg_list.append(except_info)
            mylog.log_print(log, 'error', except_info)
            if os.path.exists(compressed_abs_path):
                os.remove(compressed_abs_path)
            r_cur.srem(redis_key_set_todo, redis_rel_path)
            r_cur.delete(redis_key_str_ongoing + redis_rel_path)
            r_cur.delete(redis_key_str_done + redis_rel_path)
            exc_type, exc_value, exc_traceback = sys.exc_info()  # 获得异常名
            exc_type = exc_type.__name__
            if exc_type == 'KeyboardInterrupt':  # 在linux上时,无法得到KeyboardInterrupt
                r_cur.sadd(redis_key_set_todo, redis_rel_path)
            else:
                mylog.log_print(log, 'error', f'错误类型为{exc_type}')
                r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            break


def compress_imgs(compressor: str = 'squoosh',
                  fmt: str = '',
                  img_max_pix: int = 0,
                  quality: int = 75):
    '''从redis中得到需要压缩的图片文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
    :对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
    :输出文件在源文件名末尾加 #test# 放在同目录下(1.jpg -> 1#test#.jpg);输出文件有重名时会自动按 1#test#.jpg 、1#test#(1).jpg 、1#test#(2).jpg 的顺序重命名。
    :param compressor: 指定压缩的方式。可选择 ffmpeg 或 squoosh ,推荐后者
    :param fmt: 指定输出的文件格式,空字符串表示原格式。建议用jpg,目前squoosh只支持jpg、png、webp
    :param img_max_pix: (仅在ffmpeg时可用)输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
    :使用squoosh时,图片会保留原有尺寸
    :param quality: (仅在squoosh时可用)压缩图片的品质,取值范围为0-100,详见squoosh的说明
    '''
    global NAS_mounted_path, TEMP_DIR, r_cur, log, start_time, fail_msg_list, sys_kind
    global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    mylog.log_print(log, 'info', msg)
    while (True):
        if r_cur.scard(redis_key_set_todo) == 0:
            break
        try:
            redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
            rel_path = replace_path_seperator(redis_rel_path,
                                              sys_kind=sys_kind)
            abs_path = os.path.join(NAS_mounted_path, rel_path)
            if is_img(abs_path) and os.path.isfile(abs_path):
                vimg = vimg_processor(origin_abs_path=abs_path,
                                      img_max_pix=img_max_pix,
                                      fmt=fmt)
                compressed_abs_path = vimg.compressed_abs_path
                if os.path.exists(compressed_abs_path):  # 输出文件已存在时跳过
                    continue
                log.info('{}压缩中...'.format(abs_path))
                compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
                    compressed_abs_path)
                r_cur.set(redis_key_str_ongoing + redis_rel_path,
                          compressed_redis_rel_path)
                if compressor == 'ffmpeg':
                    vimg.compress_img_ffmpeg()
                elif compressor == 'squoosh':
                    vimg.compress_img_squoosh(quality=quality)
                r_cur.delete(redis_key_str_ongoing + redis_rel_path)
                r_cur.set(redis_key_str_done + redis_rel_path,
                          compressed_redis_rel_path)
        except:
            except_info = traceback.format_exc()
            fail_msg_list.append(except_info)
            mylog.log_print(log, 'error', except_info)
            if os.path.exists(compressed_abs_path):
                os.remove(compressed_abs_path)
            r_cur.srem(redis_key_set_todo, redis_rel_path)
            r_cur.delete(redis_key_str_ongoing + redis_rel_path)
            r_cur.delete(redis_key_str_done + redis_rel_path)
            r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            exc_type, exc_value, exc_traceback = sys.exc_info()  # 获得异常名
            exc_type = exc_type.__name__
            if exc_type == 'KeyboardInterrupt':  # 在linux上时,无法得到KeyboardInterrupt
                r_cur.sadd(redis_key_set_todo, redis_rel_path)
            else:
                mylog.log_print(log, 'error', f'错误类型为{exc_type}')
                r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            break
    # 最后清理一下TEMP_DIR
    for dirpath, dirnames, filenames in os.walk(TEMP_DIR):
        for filename in filenames:
            os.remove(os.path.join(dirpath, filename))


def delete_test():
    '''视频或图片压缩完毕后,比较原文件与压缩后的文件,留下较小且大小非0的文件。
    :由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。具体见 下载-压缩-入库流程
    :将到redis的redis_key_str_done中读取压缩前后的文件相对路径。删除#test#文件后,删除redis_key_str_done对应的键
    '''
    global r_cur, log, fail_msg_list
    global redis_key_str_done
    redis_rel_path_done_keys = r_cur.keys(redis_key_str_done + '*')
    for redis_rel_path_done_key in redis_rel_path_done_keys:
        origin_redis_rel_path = redis_rel_path_done_key[len(redis_key_str_done
                                                            ):]
        compressed_redis_rel_path = r_cur.get(redis_rel_path_done_key)
        origin_abs_path = get_abs_path_from_redis_rel_path(
            origin_redis_rel_path)
        compressed_abs_path = get_abs_path_from_redis_rel_path(
            compressed_redis_rel_path)
        # redis中存储的相对路径都是 压缩中 ,但是现在这些文件已经手动放到了 检查中 里
        origin_abs_path = origin_abs_path.replace('压缩中', '检查中', 1)
        #需要更改
        compressed_abs_path = compressed_abs_path.replace('压缩中', '检查中', 1)
        #需要更改

        if os.path.isfile(origin_abs_path) and os.path.isfile(
                compressed_abs_path):
            before_size = os.path.getsize(origin_abs_path)
            after_size = os.path.getsize(compressed_abs_path)
            if after_size < before_size and after_size != 0:
                try:
                    os.remove(origin_abs_path)
                    # 注意这里的压缩后文件名改回原名时是把最右边的第一个#test#去掉,而不是直接改成源文件名
                    # 因为如果压缩时用fmt指定输出格式,则可能当前目录下有重名文件
                    # 1.avi、1.wmv -> 1#test#.mp4、1#test#(1).mp4 -> 1.mp4、1(1).mp4
                    prefix, suffix = os.path.splitext(compressed_abs_path)
                    reversed_prefix = prefix[::-1]
                    reversed_prefix = reversed_prefix.replace('#tset#', '', 1)
                    prefix = reversed_prefix[::-1]
                    temp = prefix + suffix
                    os.rename(compressed_abs_path, temp)
                    r_cur.delete(redis_rel_path_done_key)
                    log.info(f'将{origin_abs_path}替换为小文件')
                except PermissionError:
                    r_cur.set(redis_rel_path_done_key,
                              compressed_redis_rel_path)
                    msg = f'delete_test failed,删除失败:{origin_abs_path}'
                    mylog.log_print(log, 'warning', msg)
                    fail_msg_list.append(msg)
                    continue
            else:
                os.remove(compressed_abs_path)
                msg = f'{origin_abs_path}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'
                log.info(msg)
    # 最后检查一下有没有剩余的处理后文件
    dir_checking = replace_path_seperator('xxx/xxx/检查中', sys_kind=sys_kind)
    #需要更改
    dir_checking = os.path.join(NAS_mounted_path, dir_checking)
    for dirpath, dirnames, filenames in os.walk(dir_checking):
        for filename in filenames:
            if '#test#' in filename:
                abs_path = os.path.join(dirpath, filename)
                msg = f'疑似还有#test#文件:{abs_path}'
                mylog.log_print(log, 'warning', msg)
                fail_msg_list.append(msg)


def main():
    '''执行入口。'''
    target_dir = r'D:\xxx\xxx\压缩中'
    #需要更改
    global sys_kind
    try:
        #get_definitions_video(target_dir)
        #compress_videos(video_size='pad', fmt='mp4')
        compress_imgs(compressor='squoosh', fmt='jpg', quality=75)
        #delete_test()
        print('已完成')
        if len(fail_msg_list) != 0:
            print('本次压缩时出现了以下异常:')
            for i in fail_msg_list:
                print(i)
        if sys_kind == 'Windows':
            input()
    except:
        except_info = traceback.format_exc()
        mylog.log_print(log, 'error', except_info)
        if sys_kind == 'Windows':
            input()


if __name__ == '__main__':
    main()
'''下载-压缩-入库流程:
1.将需要压缩的文件先暂时放到 待压缩 中,进行人工筛选
2.筛选完后,将 待压缩 中的文件手动剪切到 压缩中 文件夹里
3.直连NAS的机器运行python脚本,更新redis库、移动文件
4.其他连接到NAS的机器从redis中获取任务、进行压缩
5.从 压缩完 中将文件剪切到 检查中 里,进行人工检查
6.检查完后,手动运行vimg_processor_multi里的delete_test,保留源文件或压缩后文件中的一个
7.手动将 检查中 里的文件放到其他目录,比如 待迁移 或 入库中 里
8.运行file_to_sql,将本地文件入库
'''

如前所述,这是一个NAS、多电脑协作压缩的版本,如果只是在本地压缩图片的话可以自行修改。

标签: python, tinypng, squoosh

添加新评论