如何压缩图片
步骤/目录:
1.需求介绍
2.ffmpeg
3.tinypng
4.squoosh
(1)Node.js
(2)克隆仓库并安装运行
a.本地运行网页服务器
b.运行桌面版
c.squoosh-cli
(3)squoosh的使用
5.修改vimg_processor_multi
本文首发于个人博客https://lisper517.top/index.php/archives/60/
,转载请注明出处。
本文的目的是介绍一些图片压缩的工具和使用方法。
本文写作日期为2022年10月19日。运行的平台为win10,编辑器为VS Code。
1.需求介绍
在处理前端时,常常需要放一些图片在网页中,但是一般素材图片占用空间都很大,直接放到网页上不太合适,如果有方法把图片压缩一下就very的nice。本文就介绍一些图片压缩的工具和方法。
2.ffmpeg
在之前的文章 Python,爬虫与深度学习(17)——电影的信息爬取、下载与处理(二) 中,笔者用python写了一个vimg_processor类,主要是用ffmpeg来获取图片/视频信息、压缩图片/视频。其中使用ffmpeg压缩图片的命令参考了 这篇文章 ,如下:
'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(self.origin_abs_path, self.palette)
'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
'ffmpeg -i "{}" -vf scale={} -y "{}"'.format(self.temp_img, self.definition, self.compressed_abs_path)
总的来说是使用调色板将原图使用的256色减少,然后根据调色板输出图片,最后再压缩图片尺寸。
笔者其实不太了解ffmpeg,使用上面的命令,虽然图片占用空间能压缩到很小,但色彩减少导致肉眼能看出很多差距,比较明显的是在人像中,如果涂了口红,压缩后的图片会呈现一种灰红色的质感,这是无法接受的。
经过一番寻找,笔者发现了其他压缩图片的方法,它们压缩图片的算法比较复杂,效果也远非简单的3句ffmpeg命令可比。
3.tinypng
TinyPNG 据说是一个久负盛名的图片压缩网站,可惜它的压缩算法没有开源。如果你只是想压缩几张图片,可以到它的网址 https://tinypng.com/ 手动上传图片,据说每天只能上传20张不超过5MB的图片;如果需要批量压缩图片,你可以到 https://tinypng.com/developers 填写姓名、邮箱申请一个API key(限制是每月只能压缩500张),然后使用python或者其他语言写一个客户端实现压缩脚本,可以参考 这个项目 。
使用TinyPNG确实压缩效果不错,可惜源码不开源,而且能压缩的图片张数有限制(据说有方法开源绕开限制,参考 这篇文章 ,原理是tinypng服务器限制上传次数时用到了ip进行判断,所以在构造请求头时修改了ip)。综合考虑下来,笔者放弃了TinyPNG,毕竟万一哪天这类闭源工具开始收费就不太合适了。
像TinyPNG这样的在线图片压缩网站还有很多,比如:
https://tinyjpg.com/ (TinyPNG的系列网站)
https://imagestool.com/zh_CN/compress-images.html
https://squoosh.app/
本文将重点介绍squoosh。
4.squoosh
squoosh 是google chrome labs推出的一款开源图片压缩工具,源码放到了github上: https://github.com/GoogleChromeLabs/squoosh ,这些源码主要是TypeScript、JavaScript写的,但是笔者希望用python调用这个图像压缩的工具,这需要一些技巧,下面以Win10为例讲解。
(1)Node.js
因为要用JavaScript,需要安装Node.js、npm。按照 runoob教程 ,进行如下操作:
到 https://nodejs.org/zh-cn/download/ 下载对应的安装包,笔者选择的是14.15.1的64位Windows安装包(下载地址为 https://nodejs.org/dist/v14.15.1/node-v14.15.1-x64.msi ,因为在 这个问题 中提到开发时用的就是14.15.1版本),一路确认,唯一需要注意的是中间有一步是勾选 Automatically install the neccessary tools
,把这个勾选去掉,因为它会安装其他版本的python3、污染环境。安装完后,在cmd中实验一下,输入 node --version
、 npm --version
,可输出版本号。
其他安装方法包括安装二进制文件、docker安装、linux上编译安装,这里就不介绍了。
(2)克隆仓库并安装运行
a.本地运行网页服务器
第一种方法是在本地运行网页服务器,这种方法并不是很推荐。
到 项目的github地址 下载zip包(或者你也可用pull到本地),解压到合适的位置,在cmd中进入该目录,进行如下操作:
npm install
npm run build
npm run dev
最后一行命令会开启一个本地的服务器,好比你把 https://squoosh.app/ 搬到了本地的服务器运行。笔者在实验时出现了一些错误,具体信息如下:
>npm install
npm WARN read-shrinkwrap This version of npm is compatible with lockfileVersion@1, but package-lock.json was generated for lockfileVersion@2. I'll try to do my best with it!
> squoosh@2.0.0 prepare D:\squoosh-dev
> husky install
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@2.1.3 (node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@2.1.3: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})
added 816 packages from 299 contributors and audited 819 packages in 34.939s
43 packages are looking for funding
run `npm fund` for details
found 106 vulnerabilities (89 moderate, 12 high, 5 critical)
run `npm audit fix` to fix them, or `npm audit` for details
>npm run build
> squoosh@2.0.0 build D:\squoosh-dev
> rollup -c && node lib/move-output.js
src/static-build/index.tsx → .tmp/build...
Browserslist: caniuse-lite is outdated. Please run:
npx browserslist@latest --update-db
Unable to determine site origin, defaulting to https://squoosh.app
created .tmp/build in 26.7s
>npm run dev
> squoosh@2.0.0 dev D:\squoosh-dev
> DEV_PORT="${DEV_PORT:=5000}" run-p watch serve
'DEV_PORT' 不是内部或外部命令,也不是可运行的程序
或批处理文件。
npm ERR! code ELIFECYCLE
npm ERR! errno 1
npm ERR! squoosh@2.0.0 dev: `DEV_PORT="${DEV_PORT:=5000}" run-p watch serve`
npm ERR! Exit status 1
npm ERR!
npm ERR! Failed at the squoosh@2.0.0 dev script.
npm ERR! This is probably not a problem with npm. There is likely additional logging output above.
npm ERR! A complete log of this run can be found in:
npm ERR! C:\Users\xxx\AppData\Roaming\npm-cache\_logs\xxxx-debug.log
不知道这是为什么。即使运行 npm audit fix
、 npx browserslist@latest --update-db
也无济于事;安装16版本的Node.js,又报错 Could not load /squoosh-dev/src/shared/prerendered-app/colors.css
,最后只好放弃。根据报错信息,如果成功安装,大概访问本机的5000端口就能看到squoosh网页。
实际上,就算通过这种方式成功安装了squoosh,也和浏览器访问 https://squoosh.app/ 几乎一样,唯一的区别是不需要联网。如果有需要,笔者更推荐下一种方法。
b.运行桌面版
根据 https://github.com/matiasbenedetto/squoosh-desktop-app 这个项目的介绍,到 https://squoosh-desktop.vercel.app/ 网页去下载 Windows版的Portable文件 即可,是一个exe文件(只有45MB左右大小),打开来是一个隐藏了一些UI的浏览器,这种方式下都不需要安装Node.js了。
c.squoosh-cli
上面两种方法提供的主要是网页UI,只适合少量图片压缩。如果要大量压缩图片,最好还是能使用命令行、脚本。squoosh确实提供了这样的API。
同样安装Node.js-v14.15.1,下载squoosh项目的zip包,解压到合适的位置,在cmd中进入该目录,但是这次执行以下命令:
npm i -g @squoosh/cli
这样的话会安装squoosh-cli,以后在cmd中用 squoosh-cli <options...>
就可以调用squoosh进行图片压缩。或者你也可以不运行 npm i -g @squoosh/cli
安装,而是直接用 npx @squoosh/cli <options...>
的格式来调用squoosh(推荐还是用 npm i -g @squoosh/cli
安装一下)。最后记得把squoosh-dev文件夹加到PATH中,就可以随时调用squoosh-cli。
(3)squoosh的使用
前两种方法安装squoosh或者网页访问 https://squoosh.app/ 时使用都差不多,这里主要介绍一下命令行使用 squoosh-cli
的参数。cmd输入 squoosh-cli -h
可显示以下信息:
Usage: squoosh-cli [options] <files...>
Options:
-d, --output-dir <dir> Output directory (default: ".")
-s, --suffix <suffix> Append suffix to output files (default: "")
--max-optimizer-rounds <rounds> Maximum number of compressions to use for auto optimizations
(default: "6")
--optimizer-butteraugli-target <butteraugli distance> Target Butteraugli distance for auto optimizer (default:
"1.4")
--resize [config] Resize the image before compressing
--quant [config] Reduce the number of colors used (aka. paletting)
--rotate [config] Rotate image
--mozjpeg [config] Use MozJPEG to generate a .jpg file with the given
configuration
--webp [config] Use WebP to generate a .webp file with the given configuration
--avif [config] Use AVIF to generate a .avif file with the given configuration
--jxl [config] Use JPEG-XL to generate a .jxl file with the given
configuration
--wp2 [config] Use WebP2 to generate a .wp2 file with the given configuration
--oxipng [config] Use OxiPNG to generate a .png file with the given
configuration
-h, --help display help for command
参考 这篇文章 ,使用以下参数:
squoosh-cli --mozjpeg {quality:75,baseline:false,arithmetic:false,progressive:true,optimize_coding:true,smoothing:0,color_space:3,quant_table:3,trellis_multipass:false,trellis_opt_zero:false,trellis_opt_table:false,trellis_loops:1,auto_subsample:true,chroma_subsample:2,separate_chroma_quality:false,chroma_quality:75} "输入路径" -d "输出目录" -s "#test#"
就能达到非常不错的压缩效果。你可以自己在squoosh网页里实验不同的参数,笔者只实验调整了一下quality。
5.修改vimg_processor_multi
修改了之前的vimg_processor_multi.py,调整了vimg_processor的compress_img方法,稍微修改了compress_imgs函数,最终成品如下(所有标注 #需要更改 的上一行是可能需要更改的地方):
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, time, json, subprocess, re, traceback, redis, platform, sys
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis
#CPU_THREADS = (12, 8)
CPU_THREADS = (15, 10) # ffmpeg软编解码时线程数,根据CPU配置调整。分别为解码用的线程、编码用的线程,在压缩时,前者大于后者
#需要更改
# 可以用一个文件实验最佳的CPU_THREADS。软编解码的快慢看ffmpeg最右边的 speed=数字x
NAS_mounted_path = 'D:\\' # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
CWD = os.getcwd()
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join(CWD, 'logs')
#需要更改
if not os.path.isdir(s=log_path):
os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
log_file_name='vimg_processor_multi.log')
TEMP_DIR = os.path.join(CWD, 'temp') # temp这个文件夹主要是在压缩图片时存放临时的调色板
#需要更改
if not os.path.isdir(s=TEMP_DIR):
os.mkdir(path=TEMP_DIR)
redis_key_set_todo = 'vimg_processor:set:todo' # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong' #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:' # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:' # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
start_time = time.time()
fail_msg_list = [] # 每次有错误信息时都放到这个列表中
sys_kind = platform.system() # 字符串,表示系统类型
def get_suffix(filename: str) -> str:
'''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
:param filename: 文件名(可带路径前缀)
:return str,文件后缀名(不含 . )'''
suffix = os.path.splitext(filename)[-1]
return (suffix[1:], '')[suffix == '']
def is_video(filename: str) -> bool:
'''根据后缀名判断文件是否视频。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是视频,False表示非视频'''
video_suffix_set = {
"WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
"M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
}
#需要更改
#iso文件不一定能用ffmpeg打开,可以自行取舍
return (False, True)[get_suffix(filename).upper() in video_suffix_set]
def is_img(filename: str) -> bool:
'''根据后缀名判断文件是否图片。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是图片,False表示非图片'''
img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
#需要更改
return (False, True)[get_suffix(filename).upper() in img_suffix_set]
def replace_path_seperator(path: str, sys_kind: str) -> str:
'''根据操作系统,替换路径中的分隔符。
:param path: 需要替换的路径
:param sys_kind: 目标操作系统
:return str,替换好的路径'''
if sys_kind == 'Windows':
return path.replace('/', '\\')
elif sys_kind == 'Linux':
return path.replace('\\', '/')
def is_origin(filename: str) -> bool:
'''接受文件名,判断这个文件是源文件还是压缩后的文件。
:param filename: 文件名(可带路径前缀)
:return bool,True表示该文件为源文件'''
prefix = os.path.splitext(filename)[0]
return (True, False)[re.search(r'#test#$', prefix) != None]
def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
'''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
:由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
:将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
:param abs_path: 文件在本机的绝对路径
:return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
global NAS_mounted_path
redis_rel_path = abs_path[len(NAS_mounted_path):]
return replace_path_seperator(redis_rel_path, sys_kind='Linux')
def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
'''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
:将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
:param redis_rel_path: redis中存储的相对路径
:return str,文件在本机中的绝对路径'''
global NAS_mounted_path, sys_kind
abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
return replace_path_seperator(abs_path, sys_kind=sys_kind)
def get_compressed_abs_path(abs_path: str, fmt: str = '') -> str:
'''根据源文件的绝对路径,返回压缩后文件的绝对路径。这里的绝对路径都是本机上的。
:压缩后的图像或者视频将放在同路径下,所以需要给转换后的文件名中加上字符串#test#(在后缀名前):1.mp4 -> 1#test#.mp4
:有些特殊情况下压缩后文件会重名,比如一个文件夹中同时存在1.mp4、1.rmvb时,又用fmt指定压缩后格式。此时会对压缩后的文件加上序号重命名(1#test#.mp4,1#test#(1).mp4,1#test#(2).mp4)
:param abs_path: 需要压缩的文件的绝对路径
:param fmt: 指定转换后文件的格式,为空字符串时不改变格式
:return str,压缩后文件的绝对路径,根据操作系统调整'''
prefix, suffix = os.path.splitext(abs_path)
suffix = (fmt, suffix.lstrip('.'))[fmt == '']
compressed_abs_path = prefix + '#test#.' + suffix
i = 1
while (os.path.exists(compressed_abs_path)):
compressed_abs_path = prefix + '#test#(' + str(i) + ').' + suffix
i += 1
return compressed_abs_path
def get_origin_abs_path(abs_path: str) -> str:
'''根据压缩后文件的绝对路径到redis中查询,返回源文件的绝对路径。这里的绝对路径都是本机上的。
:这个函数只应该用于已经压缩完的文件,函数将到redis_key_str_done的所有字符串中进行查询
:param abs_path: 压缩后文件的绝对路径
:return str,源文件的绝对路径,根据操作系统调整;若无法查询到,返回空字符串'''
global r_cur, sys_kind, NAS_mounted_path, redis_key_str_done
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
redis_rel_paths_keys = r_cur.keys(redis_key_str_done + '*')
for redis_rel_path_key in redis_rel_paths_keys:
if r_cur.get(redis_rel_path_key) == compressed_redis_rel_path:
origin_abs_path = redis_rel_path_key[len(redis_key_str_done):]
origin_abs_path = replace_path_seperator(origin_abs_path,
sys_kind=sys_kind)
origin_abs_path = os.path.join(NAS_mounted_path, origin_abs_path)
return origin_abs_path
return ''
class vimg_processor(object):
'''用于压缩的图像视频类。初始化时会读取图像视频的一些参数。'''
def __init__(self,
origin_abs_path: str,
compressed_abs_path: str = "",
video_size: str = '',
img_max_pix: int = 0,
fmt: str = ''):
'''用文件的绝对路径等参数初始化对象。
:param origin_abs_path: 源文件的绝对路径
:param compressed_abs_path: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存(1.mp4 -> 1#test#.mp4),有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名
:param video_size: 输出视频文件时,适配的屏幕大小。有 phone 、 pad 、 pc 、 空字符串 四种选择,空字符串表示保留原宽高
:param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
:param fmt: 输出文件的格式,即后缀名。为空字符串时,表示按原格式输出。其他可取值如 mp4 、 jpg 等'''
self.origin_abs_path = origin_abs_path
self.compressed_abs_path = (compressed_abs_path,
get_compressed_abs_path(
origin_abs_path,
fmt=fmt))[compressed_abs_path == ""]
self.width, self.height = 0, 1
if is_video(origin_abs_path):
self.get_video_info()
w, h = self.width, self.height
ratio = w / h
self.ratio = ratio
definition = ''
if video_size == 'phone':
if ratio <= 0.6:
definition = (f'{w}:{h}', '360:640')[w >= 360]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '360:480')[w >= 360]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '480:360')[w >= 480]
elif ratio > 1.66:
definition = (f'{w}:{h}', '640:360')[w >= 640]
elif video_size == 'pad':
if ratio <= 0.6:
definition = (f'{w}:{h}', '720:1280')[w >= 720]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '720:960')[w >= 720]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '960:720')[w >= 960]
elif ratio > 1.66:
definition = (f'{w}:{h}', '1280:720')[w >= 1280]
elif video_size == 'pc':
if ratio <= 0.6:
definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
elif ratio > 1.66:
definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
elif video_size == '':
definition = f'{w}:{h}'
self.definition = definition
elif is_img(origin_abs_path):
self.pix_fmt = '' # pix_fmt即像素格式
fmt = (fmt, get_suffix(origin_abs_path))[fmt == '']
self.palette = os.path.join(TEMP_DIR, 'palette.' + fmt)
self.temp_img = os.path.join(TEMP_DIR, 'temp_img.' + fmt)
self.get_img_info()
w, h = self.width, self.height
img_max_pix = abs(img_max_pix)
if img_max_pix == 0:
pass
elif w > img_max_pix and h > img_max_pix:
ratio = max(w, h) / img_max_pix
w, h = int(w / ratio), int(h / ratio)
self.definition = f'{w}:{h}'
def get_video_info(self):
'''使用ffprobe获取视频的宽高(width、height)。'''
global sys_kind
command_probe = f'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{self.origin_abs_path}"'
if sys_kind == 'Windows':
result = subprocess.Popen(command_probe,
shell=False,
stdout=subprocess.PIPE).stdout
elif sys_kind == 'Linux':
result = subprocess.Popen(command_probe,
shell=True,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
# self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
# 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算
def get_img_info(self):
'''获取图片的width、height、pix_fmt。'''
global sys_kind
command_probe = f'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{self.origin_abs_path}"'
if sys_kind == 'Windows':
result = subprocess.Popen(command_probe,
shell=False,
stdout=subprocess.PIPE).stdout
elif sys_kind == 'Linux':
result = subprocess.Popen(command_probe,
shell=True,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
def compress_video(self):
'''通过ffmpeg压缩视频文件(输出的像素格式指定为yuv420p)。'''
global CPU_THREADS
# AMD显卡加速,硬编码
'''command_compress = 'ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"'.format(
self.origin_abs_path, self.definition, self.compressed_abs_path)'''
# NVIDIA显卡加速,硬编解码
'''command_compress = 'ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"'.format(
self.origin_abs_path, self.definition, self.compressed_abs_path)'''
# CPU软编解码。-threads 参数可限制线程数,控制CPU占用。
command_compress = 'ffmpeg -threads {} -i "{}" -threads {} -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
CPU_THREADS[0], self.origin_abs_path, CPU_THREADS[1],
self.definition, self.compressed_abs_path)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_video failed,视频压缩失败:{command_compress}'
)
def compress_img_ffmpeg(self):
'''通过ffmpeg压缩图像文件。'''
command_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
self.origin_abs_path, self.palette)
command_compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
result = os.system(command_palette)
if result != 0:
raise Exception(
f'vimg_processor.compress_img_ffmpeg failed,无法获得调色板:{command_palette}'
)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_img_ffmpeg failed,生成中间图片失败:{command_compress}'
)
command_compress = 'ffmpeg -i "{}" -vf scale={} -y "{}"'.format(
self.temp_img, self.definition, self.compressed_abs_path)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_img_ffmpeg failed,生成最终图片失败:{command_compress}'
)
def compress_img_squoosh(self, quality: int = 75):
'''通过squoosh-cli压缩图像文件。\n安装squoosh-cli见https://lisper517.top/index.php/archives/60/
:param quality: 压缩图片的品质,取值范围为0-100,详见squoosh的说明'''
basename_origin = os.path.basename(self.origin_abs_path)
abs_dir = self.origin_abs_path[:-len(basename_origin)]
basename_compressed = os.path.basename(self.compressed_abs_path)
prefix_origin = os.path.splitext(basename_origin)[0]
prefix_compressed = os.path.splitext(basename_compressed)[0]
suffix = prefix_compressed[len(prefix_origin):]
fmt = get_suffix(basename_compressed).upper()
if fmt == 'JPG':
fmt = 'mozjpeg'
elif fmt == 'PNG':
fmt = 'oxipng'
elif fmt == 'WEBP':
fmt = 'webp'
command_compress = '''squoosh-cli --{} {{quality:{},baseline:false,arithmetic:false,progressive:true,optimize_coding:true,smoothing:0,color_space:3,quant_table:3,trellis_multipass:false,trellis_opt_zero:false,trellis_opt_table:false,trellis_loops:1,auto_subsample:true,chroma_subsample:2,separate_chroma_quality:false,chroma_quality:75}} "{}" -d "{}" -s "{}"'''.format(
fmt, quality, self.origin_abs_path, abs_dir, suffix)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_img_squoosh failed,生成最终图片失败:{command_compress}'
)
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
def get_definitions_video(target_dir: str):
'''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。
:param target_dir: 目标文件夹的绝对路径
:结果会输出到当前目录下的 definitions.txt 文件中'''
global CWD
ratio_dict = dict() # 字典,键为宽高的比值,值为列表,包括该比值下的具体宽高数值
# 例:1.7777778:['640x360', '1280x720', '1920x1080']
for dirpath, dirnames, filenames in os.walk(target_dir):
for filename in filenames:
abs_path = os.path.join(dirpath, filename)
if is_video(abs_path):
print('\r视频文件:{:200}'.format(abs_path), end='')
vimg = vimg_processor(abs_path)
ratio = vimg.ratio
definition = "{}x{}".format(vimg.width, vimg.height)
if ratio_dict.get(ratio) == None:
ratio_dict[ratio] = [definition]
else:
if definition not in ratio_dict[ratio]:
ratio_dict[ratio].append(definition)
# 按宽高比值升序输出
ratio_list = []
for key, value in ratio_dict.items():
ratio_list.append(key)
ratio_list.sort()
with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
for ratio in ratio_list:
ratio_dict[ratio].sort()
f.write('宽高比:{:.6},成员:'.format(ratio))
for definition in ratio_dict[ratio][:-1]:
f.write(definition + ' , ')
f.write(ratio_dict[ratio][-1] + '\n')
print('')
def compress_videos(video_size: str = '', fmt: str = ''):
'''从redis中得到需要压缩的视频文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
:对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
:输出文件在源文件名末尾加 #test# 放在同目录下(1.mp4 -> 1#test#.mp4);输出文件有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名。
:param video_size: 视频适配的屏幕大小。 phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变原大小
:param fmt: 指定输出的文件格式,空字符串表示原格式。建议用mp4
'''
global NAS_mounted_path, r_cur, log, start_time, fail_msg_list, sys_kind
global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
mylog.log_print(log, 'info', msg)
while (True):
if r_cur.scard(redis_key_set_todo) == 0:
break
try:
redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
rel_path = replace_path_seperator(redis_rel_path,
sys_kind=sys_kind)
abs_path = os.path.join(NAS_mounted_path, rel_path)
if is_video(abs_path) and os.path.isfile(abs_path):
vimg = vimg_processor(origin_abs_path=abs_path,
video_size=video_size,
fmt=fmt)
compressed_abs_path = vimg.compressed_abs_path
if os.path.exists(compressed_abs_path): # 输出文件已存在时跳过
continue
start_time = time.time()
log.info('{}压缩中...'.format(abs_path))
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
compressed_abs_path)
r_cur.set(redis_key_str_ongoing + redis_rel_path,
compressed_redis_rel_path)
vimg.compress_video()
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.set(redis_key_str_done + redis_rel_path,
compressed_redis_rel_path)
log.info('视频压缩成功,耗时{:.2f}min'.format(
(time.time() - start_time) / 60))
except:
except_info = traceback.format_exc()
fail_msg_list.append(except_info)
mylog.log_print(log, 'error', except_info)
if os.path.exists(compressed_abs_path):
os.remove(compressed_abs_path)
r_cur.srem(redis_key_set_todo, redis_rel_path)
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.delete(redis_key_str_done + redis_rel_path)
exc_type, exc_value, exc_traceback = sys.exc_info() # 获得异常名
exc_type = exc_type.__name__
if exc_type == 'KeyboardInterrupt': # 在linux上时,无法得到KeyboardInterrupt
r_cur.sadd(redis_key_set_todo, redis_rel_path)
else:
mylog.log_print(log, 'error', f'错误类型为{exc_type}')
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
break
def compress_imgs(compressor: str = 'squoosh',
fmt: str = '',
img_max_pix: int = 0,
quality: int = 75):
'''从redis中得到需要压缩的图片文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
:对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
:输出文件在源文件名末尾加 #test# 放在同目录下(1.jpg -> 1#test#.jpg);输出文件有重名时会自动按 1#test#.jpg 、1#test#(1).jpg 、1#test#(2).jpg 的顺序重命名。
:param compressor: 指定压缩的方式。可选择 ffmpeg 或 squoosh ,推荐后者
:param fmt: 指定输出的文件格式,空字符串表示原格式。建议用jpg,目前squoosh只支持jpg、png、webp
:param img_max_pix: (仅在ffmpeg时可用)输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
:使用squoosh时,图片会保留原有尺寸
:param quality: (仅在squoosh时可用)压缩图片的品质,取值范围为0-100,详见squoosh的说明
'''
global NAS_mounted_path, TEMP_DIR, r_cur, log, start_time, fail_msg_list, sys_kind
global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
mylog.log_print(log, 'info', msg)
while (True):
if r_cur.scard(redis_key_set_todo) == 0:
break
try:
redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
rel_path = replace_path_seperator(redis_rel_path,
sys_kind=sys_kind)
abs_path = os.path.join(NAS_mounted_path, rel_path)
if is_img(abs_path) and os.path.isfile(abs_path):
vimg = vimg_processor(origin_abs_path=abs_path,
img_max_pix=img_max_pix,
fmt=fmt)
compressed_abs_path = vimg.compressed_abs_path
if os.path.exists(compressed_abs_path): # 输出文件已存在时跳过
continue
log.info('{}压缩中...'.format(abs_path))
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
compressed_abs_path)
r_cur.set(redis_key_str_ongoing + redis_rel_path,
compressed_redis_rel_path)
if compressor == 'ffmpeg':
vimg.compress_img_ffmpeg()
elif compressor == 'squoosh':
vimg.compress_img_squoosh(quality=quality)
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.set(redis_key_str_done + redis_rel_path,
compressed_redis_rel_path)
except:
except_info = traceback.format_exc()
fail_msg_list.append(except_info)
mylog.log_print(log, 'error', except_info)
if os.path.exists(compressed_abs_path):
os.remove(compressed_abs_path)
r_cur.srem(redis_key_set_todo, redis_rel_path)
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.delete(redis_key_str_done + redis_rel_path)
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
exc_type, exc_value, exc_traceback = sys.exc_info() # 获得异常名
exc_type = exc_type.__name__
if exc_type == 'KeyboardInterrupt': # 在linux上时,无法得到KeyboardInterrupt
r_cur.sadd(redis_key_set_todo, redis_rel_path)
else:
mylog.log_print(log, 'error', f'错误类型为{exc_type}')
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
break
# 最后清理一下TEMP_DIR
for dirpath, dirnames, filenames in os.walk(TEMP_DIR):
for filename in filenames:
os.remove(os.path.join(dirpath, filename))
def delete_test():
'''视频或图片压缩完毕后,比较原文件与压缩后的文件,留下较小且大小非0的文件。
:由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。具体见 下载-压缩-入库流程
:将到redis的redis_key_str_done中读取压缩前后的文件相对路径。删除#test#文件后,删除redis_key_str_done对应的键
'''
global r_cur, log, fail_msg_list
global redis_key_str_done
redis_rel_path_done_keys = r_cur.keys(redis_key_str_done + '*')
for redis_rel_path_done_key in redis_rel_path_done_keys:
origin_redis_rel_path = redis_rel_path_done_key[len(redis_key_str_done
):]
compressed_redis_rel_path = r_cur.get(redis_rel_path_done_key)
origin_abs_path = get_abs_path_from_redis_rel_path(
origin_redis_rel_path)
compressed_abs_path = get_abs_path_from_redis_rel_path(
compressed_redis_rel_path)
# redis中存储的相对路径都是 压缩中 ,但是现在这些文件已经手动放到了 检查中 里
origin_abs_path = origin_abs_path.replace('压缩中', '检查中', 1)
#需要更改
compressed_abs_path = compressed_abs_path.replace('压缩中', '检查中', 1)
#需要更改
if os.path.isfile(origin_abs_path) and os.path.isfile(
compressed_abs_path):
before_size = os.path.getsize(origin_abs_path)
after_size = os.path.getsize(compressed_abs_path)
if after_size < before_size and after_size != 0:
try:
os.remove(origin_abs_path)
# 注意这里的压缩后文件名改回原名时是把最右边的第一个#test#去掉,而不是直接改成源文件名
# 因为如果压缩时用fmt指定输出格式,则可能当前目录下有重名文件
# 1.avi、1.wmv -> 1#test#.mp4、1#test#(1).mp4 -> 1.mp4、1(1).mp4
prefix, suffix = os.path.splitext(compressed_abs_path)
reversed_prefix = prefix[::-1]
reversed_prefix = reversed_prefix.replace('#tset#', '', 1)
prefix = reversed_prefix[::-1]
temp = prefix + suffix
os.rename(compressed_abs_path, temp)
r_cur.delete(redis_rel_path_done_key)
log.info(f'将{origin_abs_path}替换为小文件')
except PermissionError:
r_cur.set(redis_rel_path_done_key,
compressed_redis_rel_path)
msg = f'delete_test failed,删除失败:{origin_abs_path}'
mylog.log_print(log, 'warning', msg)
fail_msg_list.append(msg)
continue
else:
os.remove(compressed_abs_path)
msg = f'{origin_abs_path}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'
log.info(msg)
# 最后检查一下有没有剩余的处理后文件
dir_checking = replace_path_seperator('xxx/xxx/检查中', sys_kind=sys_kind)
#需要更改
dir_checking = os.path.join(NAS_mounted_path, dir_checking)
for dirpath, dirnames, filenames in os.walk(dir_checking):
for filename in filenames:
if '#test#' in filename:
abs_path = os.path.join(dirpath, filename)
msg = f'疑似还有#test#文件:{abs_path}'
mylog.log_print(log, 'warning', msg)
fail_msg_list.append(msg)
def main():
'''执行入口。'''
target_dir = r'D:\xxx\xxx\压缩中'
#需要更改
global sys_kind
try:
#get_definitions_video(target_dir)
#compress_videos(video_size='pad', fmt='mp4')
compress_imgs(compressor='squoosh', fmt='jpg', quality=75)
#delete_test()
print('已完成')
if len(fail_msg_list) != 0:
print('本次压缩时出现了以下异常:')
for i in fail_msg_list:
print(i)
if sys_kind == 'Windows':
input()
except:
except_info = traceback.format_exc()
mylog.log_print(log, 'error', except_info)
if sys_kind == 'Windows':
input()
if __name__ == '__main__':
main()
'''下载-压缩-入库流程:
1.将需要压缩的文件先暂时放到 待压缩 中,进行人工筛选
2.筛选完后,将 待压缩 中的文件手动剪切到 压缩中 文件夹里
3.直连NAS的机器运行python脚本,更新redis库、移动文件
4.其他连接到NAS的机器从redis中获取任务、进行压缩
5.从 压缩完 中将文件剪切到 检查中 里,进行人工检查
6.检查完后,手动运行vimg_processor_multi里的delete_test,保留源文件或压缩后文件中的一个
7.手动将 检查中 里的文件放到其他目录,比如 待迁移 或 入库中 里
8.运行file_to_sql,将本地文件入库
'''
如前所述,这是一个NAS、多电脑协作压缩的版本,如果只是在本地压缩图片的话可以自行修改。