Python,爬虫与深度学习(17)——电影的信息爬取、下载与处理(二)
步骤/目录:
1.需求介绍
2.电影压缩
3.本地视频入库
4.ffmpeg加速
(1)显卡加速
(2)多电脑加速
本文首发于个人博客https://lisper517.top/index.php/archives/56/
,转载请注明出处。
本文的目的是在电影下载完成后,进行一些处理。
本文写作日期为2022年10月5日。运行的平台为win10,编辑器为VS Code。
1.需求介绍
首先,现在电影占用的空间越来越大,对电影进行一定压缩有助于存储。
其次,在之前mysql的movie表中,还有两列: downloaded
和 file_path
,这两列是用来判断一部电影是否被下载过的。
所以目前的需求是对电影进行压缩,并进行入库。
2.电影压缩
这里使用ffmpeg,它是一个被广泛使用的处理音视频、图片的开源软件,比如格式工厂就是在内部调用ffmpeg转换格式的,暴风影音等也可以使用ffmpeg进行播放。这里是 ffmpeg的官网下载地址 ,但是官网只提供源码下载,你可以自行make编译,或者到 github网页 下载由热心网友为windows系统编译好的ffmpeg压缩包,下载并解压到合适的位置,然后把ffmpeg添加到路径(windows添加路径的方法请自行搜索)。最后在cmd中输入 ffmpeg -h
,成功显示帮助信息即可。
在ffmpeg的bin文件夹中,除了最常用的ffmpeg,还有ffplay,用于播放;ffprobe,用于获取视频信息。
写一个python脚本,目的是对一个文件夹下的视频文件进行压缩处理,脚本名为 vimg_processor.py
,内容如下:
import os, time, json, subprocess, re, traceback
from MyPythonLib import log as mylog
CWD = os.getcwd()
TEMP_DIR = os.path.join(CWD, 'temp')
if not os.path.isdir(TEMP_DIR):
os.mkdir(TEMP_DIR)
#temp这个文件夹主要是在压缩图片时存放临时的调色板
log = mylog.get_logger(os.path.join(CWD, 'logs'))
start_time = time.time()
fail_msg_list = []
def get_suffix(file_name: str) -> str:
'''返回文件后缀名。若文件无后缀名,返回空字符串。
:param file_name:文件名(可带完整路径)
:return 大写的后缀名(不含 . )
'''
suffix = os.path.splitext(file_name)
suffix = (suffix[-1].lstrip('.')).upper()
return suffix
def special_format(file_name: str) -> bool:
'''判断文件是否特殊格式。
:rmvb格式必须指定输出为mp4,ts、mpg格式跳跃观看时卡顿严重,所以都应该输出为mp4格式。
:param file_name:文件名(可带完整路径)
:return bool
'''
videoSuffixSet = {"RMVB", "TS", "MPG"}
judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
return judge
def is_video(file_name: str) -> bool:
'''根据后缀名判断文件是否视频。
:param file_name:文件名(可带完整路径)
:return bool
'''
videoSuffixSet = {
"WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
"M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
}
judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
return judge
def is_img(file_name: str) -> bool:
'''根据后缀名判断文件是否图片。
:param file_name:文件名(可带完整路径)
:return bool
'''
imgSuffixSet = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
judge = (False, True)[get_suffix(file_name) in imgSuffixSet]
return judge
def cannot_process(file_name: str) -> bool:
'''判断文件是否暂时处理不了。
:param file_name:文件名(可带完整路径)
:return bool
'''
videoSuffixSet = set()
judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
return judge
def is_origin(file_name: str) -> bool:
'''判断文件是否源文件(源文件中末尾没有#test#)。
:param file_name:文件名(可带完整路径)
:return bool
'''
prefix = (os.path.splitext(file_name))[0]
judge = (True, False)[re.match('.+#test#$', prefix) != None]
return judge
class vimg_processor(object):
'''图像视频类,初始化时会读取图像视频的一些参数。'''
def __init__(self,
inputName: str,
outName: str = "",
video_size: str = 'pc',
mp4_jpg: bool = False,
img_max_pix: int = 700):
'''用完整的文件名初始化对象
:param inputName: 源文件的绝对路径
:param outName: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存
:param video_size: 视频文件的输出大小,有 phone 、 pad 、 pc 、 空字符串 三种选择,空串表示保留原宽高
:param mp4_jpg: 输出格式是否固定为mp4和jpg
:param img_max_pix: 输出图像的宽高同时超过该值时,宽高中较长的一个数值会被压缩到该值
'''
self.inputName = inputName
if outName == "":
prefix, suffix = os.path.splitext(inputName)
outName = prefix + "#test#" + suffix
if mp4_jpg == True:
prefix, suffix = os.path.splitext(outName)
if is_video(inputName):
outName = prefix + '.mp4'
elif is_img(inputName):
outName = prefix + '.jpg'
self.outName = outName
if is_img(self.inputName):
if mp4_jpg == False:
suffix = get_suffix(self.inputName).lower()
self.out_name_palette = os.path.join(TEMP_DIR,
'palette' + suffix)
self.temp_img = os.path.join(TEMP_DIR, 'temp_img' + suffix)
else:
self.out_name_palette = os.path.join(TEMP_DIR, 'palette.jpg')
self.temp_img = os.path.join(TEMP_DIR, 'temp_img.jpg')
# 获取视频文件的宽高,根据宽高比决定输出文件宽高。
# 这里只是在宽度超过上限时减小宽高
self.width, self.height = 0, 1
if is_video(inputName):
self.get_video_info()
w, h = self.width, self.height
self.ratio = w / h
if video_size == 'phone':
if self.ratio <= 0.6:
self.definition = (f'{w}:{h}', '360:640')[w >= 360]
elif self.ratio <= 1.0:
self.definition = (f'{w}:{h}', '360:480')[w >= 360]
elif self.ratio <= 1.66:
self.definition = (f'{w}:{h}', '480:360')[w >= 480]
elif self.ratio > 1.66:
self.definition = (f'{w}:{h}', '640:360')[w >= 640]
elif video_size == 'pad':
if self.ratio <= 0.6:
self.definition = (f'{w}:{h}', '720:1280')[w >= 720]
elif self.ratio <= 1.0:
self.definition = (f'{w}:{h}', '720:960')[w >= 720]
elif self.ratio <= 1.66:
self.definition = (f'{w}:{h}', '960:720')[w >= 960]
elif self.ratio > 1.66:
self.definition = (f'{w}:{h}', '1280:720')[w >= 1280]
elif video_size == 'pc':
if self.ratio <= 0.6:
self.definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
elif self.ratio <= 1.0:
self.definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
elif self.ratio <= 1.66:
self.definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
elif self.ratio > 1.66:
self.definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
elif video_size == '':
self.definition = f'{w}:{h}'
# 获取图片的编码格式
elif is_img(inputName):
self.pix_fmt = ''
self.get_img_info()
if self.width > img_max_pix and self.height > img_max_pix:
if self.width <= self.height:
ratio = self.width / img_max_pix
elif self.width > self.height:
ratio = self.height / img_max_pix
self.width, self.height = int(self.width / ratio), int(
self.height / ratio)
self.definition = '{}:{}'.format(self.width, self.height)
def get_video_info(self):
'''获取视频的宽高。'''
probe = 'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{}"'.format(
self.inputName)
result = subprocess.Popen(probe, shell=False,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
# self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
# 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算
try:
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
except:
self.width, self.height = 0, 1
def get_img_info(self):
'''获取图片的一些信息。'''
probe = 'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{}"'.format(
self.inputName)
result = subprocess.Popen(probe, shell=False,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
try:
self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
except:
self.pix_fmt, self.width, self.height = '', 0, 0
def Zip_Video(self):
'''通过ffmpeg压缩视频文件(输出的分辨率格式指定为yuv420p)。'''
compress = 'ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
self.inputName, self.definition,
self.outName) # -threads 8 限制线程数,解码和编码都设置,保护CPU。压缩视频费cpu
compress = os.system(compress)
if compress != 0:
return (compress, "视频压缩失败")
return True
def Zip_Img(self):
'''通过ffmpeg压缩图像文件。'''
get_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
self.inputName, self.out_name_palette)
compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
self.inputName, self.out_name_palette, self.pix_fmt, self.temp_img)
get_palette = os.system(get_palette)
if get_palette != 0:
return (get_palette, "无法获得调色板")
compress = os.system(compress)
if compress != 0:
return (compress, "生成中间图片失败")
compress = 'ffmpeg -i "{}" -vf scale={} -y "{}"'.format(
self.temp_img, self.definition, self.outName)
compress = os.system(compress)
if compress != 0:
return (compress, "生成最终图片失败")
return True
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
def get_definitions_video(dir):
'''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。'''
ratio_dict = dict()
for path_prefix, subdirs, file_names in os.walk(dir):
for file_name in file_names:
complete_path = os.path.join(path_prefix, file_name)
if is_video(complete_path):
print('\r视频文件:{:200}'.format(complete_path), end='')
vimg = vimg_processor(complete_path)
ratio = vimg.ratio
definition = "{}x{}".format(vimg.width, vimg.height)
if ratio_dict.get(ratio) == None:
ratio_dict[ratio] = [definition]
else:
if definition not in ratio_dict[ratio]:
ratio_dict[ratio].append(definition)
ratio_list = []
for key, value in ratio_dict.items():
ratio_list.append(key)
ratio_list.sort()
with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
for ratio in ratio_list:
ratio_dict[ratio].sort()
f.write('宽高比:{:.6},成员:'.format(ratio))
for definition in ratio_dict[ratio][:-1]:
f.write(definition + ' , ')
f.write(ratio_dict[ratio][-1] + '\n')
print('')
def zip_dir_video(dir, video_size='pc', mp4=False):
'''压缩一个文件夹下的视频文件。
:输出文件自动命名为同目录下 源文件名+#test# 文件,后缀名不变。
:对已经存在的带有#test#的文件,会跳过。
:param dir: 文件夹的绝对路径
:param video_size: 视频宽高, phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变
:param mp4: 是否仅输出mp4文件
'''
global log, start_time, fail_msg_list
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
print(msg)
log.info(msg)
for path_prefix, subdirs, file_names in os.walk(dir):
for file_name in file_names:
if '#test#.' in file_name:
continue
complete_path = os.path.join(path_prefix, file_name)
if is_video(complete_path) and not cannot_process(complete_path):
vimg = vimg_processor(complete_path,
mp4_jpg=mp4,
video_size=video_size)
if os.path.exists(vimg.outName): # 输出文件已存在时跳过
continue
if vimg.width == 0 or vimg.height == 1:
msg = f'无法获得视频信息{complete_path}'
print(msg)
log.warning(msg)
fail_msg_list.append(msg)
continue
start_time = time.time()
log.info('{}压缩中...'.format(complete_path))
if vimg.Zip_Video() == True:
log.info('视频压缩成功,耗时{:.2f}min'.format(
(time.time() - start_time) / 60))
else:
msg = f'视频压缩失败{complete_path}'
log.warning(msg)
fail_msg_list.append(msg)
def zip_dir_img(dir, jpg=False, img_max_pix=700):
'''压缩一个文件夹下的图像文件。
:输出文件自动命名为同目录下 源文件名+#test# 文件,后缀名不变。
:对已经存在的带有#test#的文件,会跳过。
:param dir: 文件夹的绝对路径
:param img_max_pix: 宽高同时超过这个值时,宽高中较大的数值会缩小到这个数值
:param jpg: 是否仅输出jpg文件
'''
global log, start_time, fail_msg_list
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
print(msg)
log.info(msg)
for path_prefix, subdirs, file_names in os.walk(dir):
for file_name in file_names:
if '#test#.' in file_name:
continue
complete_path = os.path.join(path_prefix, file_name)
if is_img(complete_path):
print('\r{:200}'.format(complete_path), end='')
vimg = vimg_processor(complete_path,
mp4_jpg=jpg,
img_max_pix=img_max_pix)
if os.path.isfile(vimg.outName): # 输出文件已存在时跳过
continue
if vimg.width == 0 or vimg.height == 1 or vimg.pix_fmt == '':
msg = f'无法获得图片信息{complete_path}'
print(msg)
log.warning(msg)
fail_msg_list.append(msg)
continue
if vimg.Zip_Img() == True:
log.info('图片压缩完毕{}'.format(complete_path))
else:
msg = f'图片压缩失败{complete_path}'
log.warning(msg)
fail_msg_list.append(msg)
def delete_test(dir, mp4_jpg=False):
'''在处理一个文件夹中的文件后,比较原文件与处理后文件,留下较小且大小非0的文件(对于一些特殊格式,不论大小仅留下处理后的文件)。
:由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。
:默认输出文件就是 源文件名+#test#。对于没有#test#的文件不做处理。
:param dir: 文件夹的绝对路径
:param mp4_jpg: 进行压缩时是否仅输出mp4/jpg文件
'''
global fail_msg_list
for path_prefix, subdirs, file_names in os.walk(dir):
for file_name in file_names:
complete_path = os.path.join(path_prefix, file_name)
if (is_video(complete_path)
or is_img(complete_path)) and is_origin(complete_path):
prefix, suffix = os.path.splitext(complete_path)
if mp4_jpg == False:
file_name_after_zip = prefix + "#test#" + suffix
elif mp4_jpg == True:
if is_video(complete_path):
file_name_after_zip = prefix + "#test#" + '.mp4'
elif is_img(complete_path):
file_name_after_zip = prefix + "#test#" + '.jpg'
if os.path.isfile(file_name_after_zip):
after_size = os.path.getsize(file_name_after_zip)
before_size = os.path.getsize(complete_path)
if (after_size < before_size or
special_format(complete_path)) and after_size != 0:
try:
os.remove(complete_path)
os.rename(
file_name_after_zip,
file_name_after_zip.replace('#test#.', '.'))
log.info('将{}替换为小文件'.format(complete_path))
except PermissionError:
msg = '删除失败:{complete_path}'
log.warning(msg)
fail_msg_list.append(msg)
continue
else:
os.remove(file_name_after_zip)
log.info('{}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'.format(
file_name_after_zip))
# 最后检查一下有没有剩余的处理后文件
for path_prefix, subdirs, file_names in os.walk(dir):
for file_name in file_names:
if '#test#.' in file_name:
msg = '疑似还有#test#文件:{}'.format(
os.path.join(path_prefix, file_name))
log.warning(msg)
fail_msg_list.append(msg)
if __name__ == '__main__':
target_dir = r'D:\压缩中'
try:
#get_definitions_video(target_dir)
zip_dir_video(target_dir, mp4=True, video_size='pad')
#zip_dir_img(target_dir, jpg=True, img_max_pix=700)
#delete_test(target_dir, mp4_jpg=True)
for path_prefix, subdirs, file_names in os.walk(TEMP_DIR):
for file_name in file_names:
os.remove(os.path.join(path_prefix, file_name))
print('已完成')
if len(fail_msg_list) != 0:
print('本次压缩时出现了以下异常:')
for i in fail_msg_list:
print(i)
input()
except:
traceback.print_exc()
input()
这个脚本中, target_dir
是存放视频文件的目录, zip_dir_video
是对该目录下的视频文件进行压缩,压缩后的视频文件名字末尾(后缀之前)会加上 #test#
以示区分,和原视频放在同一目录下。
另外这个脚本也可以用于对图片进行压缩,笔者使用的图片压缩方法很省空间,但鲜艳度不够。
在脚本的使用上,把所有要压缩的视频文件都放到target_dir里(可以放到多层目录中),然后首先以原状态运行脚本,压缩完后随机抽取检查一下压缩后的视频文件,注意本次压缩有无提示异常,确认无误后注释掉 zip_dir_video(target_dir, mp4=True, video_size='pad')
这一行,删去 delete_test(target_dir, mp4_jpg=True)
这一行前的注释 # 号,再运行一遍,即可在原视频文件和压缩后的 #test#
文件中保留更小的那个。如果运行时需要暂停或停止,用鼠标点击cmd框中间可暂停;按ctrl+C可停止,停止时要把最后一个正在生成的 #test#
文件删除,然后下次有时间重新运行脚本,这个脚本会自动跳过带有 #test# 的文件,也不会对已经有 #test# 压缩后文件的视频重复压缩。
最后,注意 zip_dir_video(target_dir, mp4=True, video_size='pad')
中的 video_size
参数,可以取phone、pad、pc或空字符串,分别表示视频的高x宽适配手机(360x640)、平板(720x1280)、电脑(1080x1920)或原大小。输出视频的宽高越小,压缩速度越快(对2h的视频,用phone大小,笔者的电脑大概需要压缩8-25min,根据源文件大小浮动),一般选phone或pad即可,选pc时的压缩速度太慢了;大小方面,2h的视频,大概是500MB、1.5GB、3.5GB的区别。另外,视频压缩主要消耗cpu资源,在:
compress = 'ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'
这一行,两处 -threads 8
是限制调用的cpu核心数,可以根据自己的电脑情况修改。一般cpu使用率75-80%就比较合适,而此时显卡却不会怎么使用,所以可以边压缩视频边玩一些需要显卡的游戏。
3.本地视频入库
主要是将视频文件(夹)放到合适的目录下,并修改对应的movie表中downloaded和file_path这两列;如果没有对应的movie表,则可以选择在movie表中新建一行。
由于自动识别视频比较复杂,这里选择手工识别(但是能简化的地方都简化了,只是输入稍微有点繁琐)。写好的file_into_sql.py文件如下(放到scrapy项目里):
from MyPythonLib import log as mylog
from MyPythonLib.log import log_print
from MyPythonLib import torrent_parser as t_p
from libs import javbus_extensions as j_e
import os, re
TORRENT_DIRS = [r'D:\torrents'] #存放torrent文件的所有目录
VIDEO_DIR = r'D:\into_sql' #存放本次需要入库的文件(夹)
VIDEO_DIR_UNKNOWN = r'D:\unknown' #有些视频不太好处理,想要跳过。所有跳过的文件(夹)会被放到这个目录里
LOG_DIR = os.path.join(os.getcwd(), 'logs')
if not os.path.isdir(LOG_DIR):
os.mkdir(LOG_DIR)
log = mylog.get_logger(LOG_DIR, 'file_to_sql.txt')
class video_file_processor:
"""用于处理视频文件(夹)。主要工作流程:
:进入TORRENT_DIRS,记录torrent的hash值与文件名
:对VIDEO_DIR内的视频文件(夹),先在torrent字典中的文件查找,若文件名出现过(或者文件夹内的文件全部匹配),则认定视频文件(夹)对应该种子,那么查找数据库对应的torrent记录,找到对应的电影记录(movie_no)。
:若不能通过torrent记录找到movie_no,则尝试通过code查找movie_no(需要手动输入)
:若无法找到code,则尝试通过full_name查找movie_no(需要手动输入)
:在通过code或full_name查找movie_no的过程中,也可能新建movie_no
:只要找到或新建movie_no,最后该文件都会被放到合适的地方;否则该文件会被转移到VIDEO_DIR_UNKNOWN中"""
def __init__(self, logger=None) -> None:
""":param log: 可选的logger对象。若"""
global log
self.log = (logger, log)[logger == None]
self.torrents_files = self.get_torrents_files()
self.database_api_javbus = j_e.database_api_javbus(self.log)
self.conn = self.database_api_javbus.conn
self.m_cur = self.database_api_javbus.m_cur
self.r_cur = self.database_api_javbus.r_cur
self.movie_no = 0
self.code = "unknown"
self.full_name = "unknown"
def get_torrents_files(self) -> dict:
"""得到torrents_files。它是一个字典,键为hash_value(仅40位hash),值为该torrent包含的文件名列表(无类型后缀和路径前缀)"""
log = self.log
message = "正在建立hash与文件关系..."
log_print(log, 'info', message)
torrents_files = {}
p_torrent = re.compile('.torrent$', flags=re.I)
for torrent_dir in TORRENT_DIRS:
torrent_progress = 1
for dirpath, dirnames, filenames in os.walk(torrent_dir):
for filename in filenames:
if re.search(p_torrent, filename):
abs_torrent_path = os.path.join(dirpath, filename)
torrent = t_p.torrent(abs_torrent_path)
for i in range(len(torrent.files)):
torrent.files[i] = os.path.splitext(
torrent.files[i])[0]
torrents_files[torrent.hash_value] = torrent.files
torrent_progress += 1
print(
f'\rtorrent目录{torrent_dir}:{torrent_progress}/{len(filenames)}',
end='')
message = "已建立hash与文件关系"
log_print(log, 'info', message, print_prefix='\n')
return torrents_files
def process_videos(self):
"""进入VIDEO_DIR,处理包含的视频文件。"""
self.movie_no, self.code, self.full_name = 0, "unknown", "unknown"
dirpath, dirnames, filenames = next(
os.walk(VIDEO_DIR)) #只进入第一层,也就是说默认一个torrent对应第一层的一个文件或文件夹
for filename in filenames:
self.__process_videos_main(dirpath, filename, "file")
print('\n', end='')
for dirname in dirnames:
self.__process_videos_main(dirpath, dirname, "dir")
print('\n', end='')
def execute_mysql_query_command(self, command: str):
"""执行mysql查询语句。由于用到多次,所以写成一个方法
:param command: 查询语句"""
try:
self.m_cur.execute(command)
except:
message = f'mysql语句出错:{command}。已暂停'
log_print(self.log, 'error', message)
input()
#因为对文件和文件夹的处理差不多,所以写一个函数以便复用代码
def __process_videos_main(self,
dirpath,
file_dir_name,
file_or_dir: str = "file"):
"""这个函数为处理的主体。
:param dirpath: dirpath,用os.walk()得到,直接传入
:param file_dir_name: 文件或文件夹名
:param file_or_dir: file_dir_name是文件还是文件夹。可取值 "file" 或 "dir" (因为有可能文件和文件夹重名,还是需要这个参数)
:详细的选择分支图见注释"""
m_cur, log = self.m_cur, self.log
movie_found = False
abs_file_dir_path = os.path.join(dirpath, file_dir_name)
if file_or_dir == "file":
filenames_without_suffix = [os.path.splitext(file_dir_name)[0]]
elif file_or_dir == "dir":
filenames_without_suffix = []
for dirpath_temp, dirnames_temp, filenames_temp in os.walk(
abs_file_dir_path):
for filename_temp in filenames_temp:
filename_without_suffix = os.path.splitext(
filename_temp)[0]
filenames_without_suffix.append(filename_without_suffix)
else:
raise Exception(
'file_to_sql.video_file_processor.__process_videos_main错误:file_or_dir参数只应当取 "file" 或 "dir"'
)
filenames_without_suffix = set(filenames_without_suffix)
#分支图:
#---遍历torrents_files
#|__发现包含该文件(或该文件夹下所有文件)的torrent
# |__查找torrent表中有无记录该hash
# |__无记录,则查找code或full_name
# |__有1条记录:
# |__对应的movie_no为0(默认值),则查找code或full_name
# |__对应的movie_no非0,则在movie表中查找该no
# |__有1条记录,则请求用户输入
# |__按下Enter,表示使用该movie_no入库
# |__输入任意字符,则查找code或full_name
# |__有0条记录,则查找code或full_name
# |__其余情况,说明movie表有错误(不可能发生)
# |__有多条记录,说明torrent表有错误(几乎不可能发生)
#|__未发现对应的torrent,则查找code或full_name
for k, v in self.torrents_files.items():
if filenames_without_suffix.issubset(set(v)):
message = f'找到包含{file_dir_name}的hash'
log_print(log, 'info', message)
command_torrent = f'SELECT movie_no FROM torrent WHERE hash_value="{k}";'
self.execute_mysql_query_command(command_torrent)
results = m_cur.fetchall()
if len(results) == 0:
message = f'{file_dir_name}对应的hash在torrent表中无记录,查找code或full_name'
log_print(log, 'info', message)
break
elif len(results) == 1:
movie_no = int(results[0][0])
if movie_no == 0:
message = f'{file_dir_name}对应的hash在torrent表中有记录,但没有对应的movie_no。查找code或full_name'
log_print(log, 'info', message)
break
command_movie = f'SELECT code,full_name FROM movie WHERE no={movie_no};'
self.execute_mysql_query_command(command_movie)
results = m_cur.fetchall()
if len(results) == 1:
code, full_name = results[0]
message = f'{file_dir_name}对应的hash在torrent表中存在,对应的movie_no={movie_no},code={code},full_name={full_name}'
log.info(message)
affirm = input(message +
'\n确认请按下Enter键,或者输入任意字符来查找movie_no:')
if affirm == "":
self.movie_no = movie_no
movie_found = True
break
else:
break
elif len(results) == 0:
message = f'{file_dir_name}对应的hash在torrent表中存在、有movie_no,但没有对应的movie记录。查找code或full_name'
log_print(log, 'info', message)
break
else:
message = f'出现错误:{file_dir_name}对应的hash在torrent表中存在,但对应的movie记录有{len(results)}条(movie_no出现多次)。已暂停'
log_print(log, 'warning', message)
input()
else:
message = f'出现错误:{file_dir_name}对应的hash在torrent表中存在,但有{len(results)}条记录(hash_value出现多次)。已暂停'
log_print(log, 'warning', message)
input()
break
if not movie_found:
self.find_movie_no(abs_file_dir_path, file_or_dir)
else:
self.into_sql(abs_file_dir_path)
def find_movie_no(self, abs_file_dir_path: str, file_or_dir: str = "file"):
"""通过code或full_name为一个单独的文件或文件夹找到movie_no编号,或新建一个movie_no。需要手动确认code或full_name
:param abs_file_dir_path: 文件或文件夹的绝对路径
:param file_or_dir: abs_file_dir_path是文件还是文件夹。可以取 "file" 或 "dir"
:会将类的code、full_name、movie_no进行修改;最后一定会调用into_sql方法"""
def integrate_multi_percent_into_one(query_str: str) -> str:
"""将字符串中紧挨着的多个百分号整合为一个。用到2次,所以把这段代码写成函数
:param query_str: 用于mysql的LIKE查询的字符串
:return 处理后的string"""
while (re.search(r'%{2,}', query_str)):
multi_percent = re.findall(r'%{2,}', query_str)
multi_percent = list(set(multi_percent))
multi_percent.sort(key=lambda x: len(x), reverse=True)
for i in multi_percent:
query_str = query_str.replace(i, '%')
return query_str
m_cur, log = self.m_cur, self.log
file_dir_name = os.path.basename(abs_file_dir_path)
#先试着通过code确认movie_no
code = video_file_processor.get_code(file_dir_name, file_or_dir)
if isinstance(code, str):
#当能提取到code时,请求确认
affirm = input(
f"为{file_dir_name}自动匹配的code为{code},确认请直接按下Enter,否则请手动输入code:")
code = (affirm.upper(), code)[affirm == ""]
elif code == None:
#当不能提取到code时,需要手动输入
affirm = input(
f"无法为{file_dir_name}自动匹配code,将尝试用full_name确认编号。按下Enter确认,否则请手动输入code:"
)
code = (affirm.upper(), 'unknown')[affirm == ""]
#流程图
#------------------------------------------------------循环
#|__code=="unknown",表示用code无法找到movie_no,跳出循环
#|__code为其他字符串,表示疑似code,到movie表中查找
# |__无法根据code精确找到movie_no,则使用模糊查找(对code做一定处理),并输出找到的条目,请求用户输入
# |__直接按下Enter,说明用code无法找到movie_no,跳出循环
# |__输入y,确认现在的code,根据该code新建一条movie记录
# |__输入其他字符串,表示使用该字符串作为code,将会continue
# |__可以根据code精确找到1条movie_no,则输出该条目的信息,请求用户输入
# |__输入Enter,表示确认,将会进行入库
# |__输入n,说明用code无法找到movie_no,跳出循环
# |__输入其他字符串,表示使用该字符串作为code,将会continue
# |__可以根据code精确找到多条movie_no,则输出所有,并请求用户输入
# |__输入Enter,表示使用movie_no最小的一条进行入库
# |__输入一个数字,进入循环
# |__该数字是一个movie_no,表示选择该movie_no进行入库
# |__该数字不是一个movie_no,则继续要求输入
# |__输入一个数字,返回上述循环验证
# |__输入非纯数字的字符串,则以该字符串作为code,continue最外层循环
# |__输入n,说明用code无法找到movie_no,跳出循环
# |__输入其他字符串,表示使用该字符串作为code,将会continue
while (True):
if code == "unknown":
self.code = "unknown"
break
elif isinstance(code, str) and code != "unknown":
#code可能存在时,在mysql中查找
#若找不到,用LIKE模糊查找给出近似结果;若能找到,则返回
command_movie = f'SELECT no,code,full_name FROM movie WHERE code="{code}";'
self.execute_mysql_query_command(command_movie)
results = m_cur.fetchall()
if len(results) == 0:
#将多个 0 替换为 % ,在头尾也加 % ,用LIKE模糊查询
code_like = code.replace('0', '%')
code_like = '%' + code_like + '%'
code_like = integrate_multi_percent_into_one(code_like)
command_movie = f'SELECT no,code FROM movie WHERE code LIKE "{code_like}";'
self.execute_mysql_query_command(command_movie)
results = m_cur.fetchall()
codes_list = [i[1] for i in results]
affirm = input(
f'在movie中没有完全相同的code({code}),模糊查找的code有:\n{codes_list}\n按下Enter将尝试用full_name确认编号,或者输入y确认现在的code,或者输入其他code:'
)
if affirm == "":
self.code = "unknown"
break
elif affirm == "y":
self.movie_no = 0
self.code = code
self.into_sql(abs_file_dir_path)
return
else:
code = affirm.upper()
continue
elif len(results) == 1:
affirm = input(
f'查找到一条记录:code={results[0][1]},full_name={results[0][2]},\n按下Enter以确认,或者输入n来尝试用full_name确认编号,或者输入一个其他的code:'
)
if affirm == "":
self.movie_no = int(results[0][0])
self.into_sql(abs_file_dir_path)
return
elif affirm == "n":
self.code = "unknown"
break
else:
code = affirm.upper()
continue
else:
message = f'通过code={code}在movie表中查找到{len(results)}条记录,这可能是一个错误。'
log_print(log,
'warning',
message,
print_suffix='查找到的记录如下:(按 no,code,full_name 输出)')
for i in results:
print(f'{i[0]} , {i[1]} , {i[2]}')
affirm = input(
'按下Enter表示选择最小的一个movie_no作为编号,或者输入一个存在的no表示选择该编号,或者输入n来尝试用full_name确认编号,或者输入其他code:'
)
if affirm == "":
min_movie_no = int(results[0][0])
for i in results:
movie_no = int(i[0])
if min_movie_no > movie_no:
min_movie_no = movie_no
self.movie_no = min_movie_no
self.into_sql(abs_file_dir_path)
return
elif re.search(r'^\d+$', affirm):
while (re.search(r'^\d+$', affirm)):
movie_no = int(affirm)
if movie_no not in [int(i[0]) for i in results]:
affirm = input(
'输入的movie_no不在上面列出项中,请重新输入一个no,或者输入其他code(非纯数字):'
)
else:
self.movie_no = int(affirm)
self.into_sql(abs_file_dir_path)
return
code = affirm.upper()
continue
elif affirm == 'n':
self.code = "unknown"
break
else:
code = affirm.upper()
continue
#然后试着通过full_name确认movie_no
full_name = os.path.splitext(file_dir_name)[0]
matched_results = []
#分词查找,每次查询结果转为set存储在matched_results中,最后统计出现次数,根据匹配次数从高到低输出
ranked_matched_movie = [] #每个元素为:[(no, full_name), 出现次数]
affirm = input(
f'将以文件(夹)名作为full_name({full_name}),按下Enter确认,输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入一个full_name:'
)
if affirm == "":
pass
elif affirm == "n":
self.movie_no = 0
self.full_name = "unknown"
self.into_sql(abs_file_dir_path)
return
else:
full_name = affirm
#流程图
#------初始的full_name就是文件名(无后缀)/文件夹名,请求用户输入
# |__按下Enter,表示认可该full_name
# |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
# |__输入其他字符串,表示以该字符串作为full_name继续
#------------------------------------------------------循环
#根据full_name得到一个keywords列表,对每个keyword分别进行查询。每次查询的结果非空时,放到matched_results列表中。
#|__若matched_results列表为空,说明通过所有的keyword都不能查询到,则请求用户输入
# |__按下Enter,确认当前的full_name,用它新建一条movie记录
# |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
# |__输入其他字符串,表示将full_name替换为该字符串,并continue
#|__若matched_results列表非空,则统计每个movie条目出现的次数,从高到低进行排序得到ranked_matched_movie。进入循环:
# 输出前10条,请求用户输入
# |__按下Enter,以该full_name创建新的movie条目
# |__输入一个数字,进入循环
# |__该数字是一个存在的movie_no,则使用该no进行入库
# |__该数字不是一个存在的movie_no,则继续要求输入
# |__输入一个数字,返回上述循环验证
# |__输入非纯数字的字符串,则以该字符串作为full_name、回到最外层循环
# |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
# |__输入all,显示ranked_matched_movie的全部条目并continue
# |__输入其他字符串,则以该字符串作为full_name、回到最外层循环
while (True):
words = self.get_keywords(full_name)
for i in range(len(words)):
#在头尾也加 % ,用LIKE模糊查询
word_like = '%' + words[i] + '%'
word_like = integrate_multi_percent_into_one(word_like)
command_movie = f'SELECT no,full_name FROM movie WHERE full_name LIKE "{word_like}";'
self.execute_mysql_query_command(command_movie)
results = m_cur.fetchall()
if len(results) != 0:
matched_results.append(set(results))
if len(matched_results) == 0:
affirm = input(
f'full_name={full_name}没有匹配结果。按下Enter以该full_name新建一条movie记录,输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入其他值来重新输入full_name:'
)
if affirm == "":
self.movie = 0
self.full_name = full_name
self.into_sql(abs_file_dir_path)
return
elif affirm == "n":
self.movie = 0
self.full_name = "unknown"
self.into_sql(abs_file_dir_path)
return
else:
full_name = affirm
continue
else:
for matched_result in matched_results:
for match in matched_result:
try:
exists_index = [
i[0] for i in ranked_matched_movie
].index(match)
except ValueError:
exists_index = -1
if exists_index == -1:
ranked_matched_movie.append([(match), 1])
else:
ranked_matched_movie[exists_index][1] += 1
ranked_matched_movie.sort(key=lambda x: x[1], reverse=True)
show_all = False
while (True):
print(
f'根据full_name={full_name}查找到以下可能匹配的记录:(按 no , 出现次数 , full_name 输出)'
)
for i in range(len(ranked_matched_movie)):
if not show_all:
if i >= 10:
break
print(
f'{ranked_matched_movie[i][0][0]} , {ranked_matched_movie[i][1]} , {ranked_matched_movie[i][0][1]}'
)
if not show_all:
affirm = input(
'按下Enter以full_name创建一条记录,或者从以上项目中挑选一个no输入,或者输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入all以显示全部匹配,或者输入其他字符串来作为full_name:'
)
else:
affirm = input(
'按下Enter以full_name创建一条记录,或者从以上项目中挑选一个no输入,或者输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入其他字符串(非all)来作为full_name:'
)
if affirm == "":
self.movie = 0
self.full_name = full_name
self.into_sql(abs_file_dir_path)
return
elif re.search(r'^\d+$', affirm):
while (re.search(r'^\d+$', affirm)):
movie_no = int(affirm)
if movie_no not in [
int(i[0][0]) for i in ranked_matched_movie
]:
affirm = input(
'输入的movie_no不在上面列出项中,请重新输入一个no,或者输入非纯数字字符串来作为full_name:'
)
else:
self.movie_no = int(affirm)
self.into_sql(abs_file_dir_path)
return
matched_results = []
ranked_matched_movie = []
full_name = affirm
break
elif affirm == "n":
self.movie = 0
self.full_name = "unknown"
self.into_sql(abs_file_dir_path)
return
elif affirm == "all":
show_all = True
continue
else:
matched_results = []
ranked_matched_movie = []
full_name = affirm
break
@staticmethod
def get_code(file_dir_name: str, file_or_dir: str = "file"):
"""根据文件或文件夹名得到对应的code。
:param file_dir_name: 文件名或文件夹名
:param file_or_dir: 表示file_dir_name为文件(为文件时会尝试去掉后缀名)还是文件夹。可取值 "file" 或 "dir"
:return string,表示可能的code;或者返回None,表示无法找到code"""
if file_or_dir == "file":
file_dir_name = os.path.splitext(file_dir_name)[0]
p_fc2 = re.compile(r'fc2[-_]{0,1}(ppv){0,1}[-_]{0,1}\d{7}', re.I)
code = re.search(p_fc2, file_dir_name)
if code:
code = code.group()
code = code.replace('_', '-')
p_ppv = re.compile(r'ppv', re.I)
if re.search(p_ppv, code):
start, end = re.search(p_ppv, code).span()
code = code[:start] + code[end:]
code = code.replace('--', '-')
if '-' not in code:
code = code[:3] + '-' + code[3:]
return code
p_123456_789 = re.compile(r'\d{6}[-_]{1}\d{3}')
code = re.search(p_123456_789, file_dir_name)
if code:
code = code.group()
code = code.replace('_', '-')
return code
p_code = re.compile(r'\d*[a-zA-Z]{2,6}[-_]{0,1}\d+')
code = re.search(p_code, file_dir_name)
if isinstance(code, re.Match):
code = code.group()
code = code.replace('_', '-')
if '-' not in code:
start, end = re.search('[a-zA-Z]{2,6}[^-]', code).span()
code = code[:end - 1] + '-' + code[end - 1:]
return code
return None
@staticmethod
def get_keywords(full_name: str):
"""根据full_name得到关键字列表(list[str]),该列表中的每项字符串都会用于mysql查询。
:param full_name: 文件名(无后缀)或文件夹名
:return list[str]"""
keywords_1 = re.findall(r'[a-zA-Z]+', full_name)
keywords_2 = re.findall(r'\d+', full_name)
keywords = list(set(keywords_1).union(keywords_2))
if len(keywords) == 0:
print('full_name中不含英文单词或数字,将作为一个整体在mysql中模糊查找')
keywords = [full_name]
print(f'对{full_name}提取到的关键词:\n{keywords}')
return keywords
def into_sql(self, abs_file_dir_path: str):
"""根据movie_no、code、full_name分情况处理。
:首先移动文件(夹)到合适的位置,然后将其入库(修改movie表中的downloaded和file_path)
:对于跳过的文件(夹),则只是转移到VIDEO_DIR_UNKNOWN
:param abs_file_dir_path: 绝对路径"""
m_cur = self.m_cur
print(self.movie_no, self.code, self.full_name)
if self.movie_no == 0 and self.code == "unknown" and self.full_name == "unknown":
print('跳过该文件(夹)')
elif self.movie_no == 0:
print(f'以code={self.code}、full_name={self.full_name}新建一条记录')
elif self.movie_no != 0:
command_movie = f'SELECT code,full_name FROM movie WHERE no={self.movie_no};'
self.execute_mysql_query_command(command_movie)
code, full_name = m_cur.fetchall()[0]
print(
f'将对已存在的{self.movie_no}号记录进行修改(code={code},full_name={full_name})'
)
self.movie_no, self.code, self.full_name = 0, "unknown", "unknown"
def main(self):
"""执行入口"""
self.process_videos()
if __name__ == "__main__":
v_f_p = video_file_processor().main()
input('入库完成')
这里的 video_file_processor.into_sql
方法还没有完成,可以根据自己的需要调整。另外, get_code 和 get_keywords 两个方法也可以根据需要调整。
另外,在import时,除了前文提到的自建库,还可以看到这样两行:
from MyPythonLib.log import log_print
from MyPythonLib import torrent_parser as t_p
这个 log_print 是自己写的一个小函数,用来进行一些既要log又要print的信息,内容如下:
def log_print(log: logging.Logger,
log_type: str,
message: str,
print_prefix: str = "",
print_suffix: str = ""):
"""同时log和print。
:param log: Logger对象
:param log_type: Logger的级别
:param message: 输出的信息
:param print_prefix: print时需要在message开头加的字符串
:param print_suffix: print时需要在message末尾加的字符串
:return None"""
try:
if log_type == 'debug':
log.debug(message)
elif log_type == 'info':
log.info(message)
elif log_type == 'warning':
log.warning(message)
elif log_type == 'error':
log.error(message)
elif log_type == 'critical':
log.critical(message)
message = print_prefix + message + print_suffix
print(message)
except:
raise Exception('error: MyPythonLib.log.log_print failed ')
torrent_parser则是解析torrent文件的一个自建库,内容如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import collections, hashlib
def bencode(elem):
if type(elem) == str:
elem = str.encode(elem)
if type(elem) == bytes:
result = str.encode(str(len(elem))) + b":" + elem
elif type(elem) == int:
result = str.encode("i" + str(elem) + "e")
elif type(elem) == list:
result = b"l"
for item in elem:
result += bencode(item)
result += b"e"
elif type(elem) in [dict, collections.OrderedDict]:
result = b"d"
for key in elem:
result += bencode(key) + bencode(elem[key])
result += b"e"
return result
def bdecode(bytestr, recursiveCall=False):
startingChars = dict({b"i": int, b":": str, b"l": list, b"d": dict})
digits = [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9"]
started = ended = False
curtype = None
numstring = b"" # for str, int
result = None # for list, dict
key = None # for dict
while len(bytestr) > 0:
# reading and popping from the beginning
char = bytestr[:1]
if not started:
bytestr = bytestr[1:]
if char in digits:
numstring += char
elif char in startingChars:
started = True
curtype = startingChars[char]
if curtype == str:
size = int(bytes.decode(numstring))
# try to decode strings
try:
result = bytes.decode(bytestr[:size])
except UnicodeDecodeError:
result = bytestr[:size]
bytestr = bytestr[size:]
ended = True
break
elif curtype == list:
result = []
elif curtype == dict:
result = collections.OrderedDict()
else:
raise ValueError("Expected starting char, got ‘" +
bytes.decode(char) + "’")
else: # if started
if not char == b"e":
if curtype == int:
bytestr = bytestr[1:]
numstring += char
elif curtype == list:
item, bytestr = bdecode(bytestr, recursiveCall=True)
result.append(item)
elif curtype == dict:
if key == None:
key, bytestr = bdecode(bytestr, recursiveCall=True)
else:
result[key], bytestr = bdecode(bytestr,
recursiveCall=True)
key = None
else: # ending: char == b"e"
bytestr = bytestr[1:]
if curtype == int:
result = int(bytes.decode(numstring))
ended = True
break
if ended:
if recursiveCall:
return result, bytestr
else:
return result
else:
raise ValueError("String ended unexpectedly")
class torrent():
'''解析种子文件。'''
def __init__(self, abs_torrent_file_path: str) -> None:
''':param abs_torrent_file_path: 种子文件的绝对路径'''
self.abs_torrent_file_path = abs_torrent_file_path
bytes_stream = ''
with open(abs_torrent_file_path, 'rb') as f:
bytes_stream = f.read()
self.metadata = bdecode(bytes_stream)
encodedInfo = bencode(self.metadata["info"])
self.hash_value = hashlib.sha1(encodedInfo).hexdigest().upper()
self.files = []
if 'name' in self.metadata['info'].keys(
) and 'files' not in self.metadata['info'].keys():
self.files.append(self.metadata['info']['name'])
elif 'files' in self.metadata['info'].keys():
for i in self.metadata['info']['files']:
self.files.append(i['path'][0])
if __name__ == '__main__':
pass
为了能读取torrent文件、从torrent文件得到 正确的 40位的hash,笔者翻了很多教程,中文互联网上到处乱抄、有很多人讲课也是乱讲的,最后还是在 这个github项目 里得到了正确答案。
4.ffmpeg加速
实际运行 vimg_processor.py
脚本时,可能会觉得视频处理速度慢。这个问题有两种解决方法:使用显卡加速,或者多台电脑同时压缩视频。但在继续阅读之前,你需要先查看一下运行 vimg_processor.py
脚本时存放压缩视频的文件夹所在的那块硬盘的读写速度,比如在windows上可以打开 任务管理器 看看那块硬盘的读写速度是否已经达到极限(一般的机械硬盘大概150MB/s,固态硬盘则可达到1~1.5GB/s)。
(1)显卡加速
如果对ffmpeg有些了解,分析一下上文中的那条ffmpeg命令:
ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"
会发现,这样的命令只是调用了CPU、不调用GPU或其他计算的硬件,好处是可以在运行ffmpeg的同时玩一些需要显卡的游戏(但是ffmpeg即使调用显卡也未必占用很多),坏处是可能比较慢。
实际上,ffmpeg调用CPU处理视频的过程称为软编码(输出视频)、软解码(读取视频),调用GPU或其他计算硬件处理则是硬编码、硬解码。CPU和GPU在运算上的区别主要是CPU擅长处理多种性质不同的运算(加减乘除,逻辑运算等),而GPU擅长的是计算乘法和,比如 a1*b1 + a2*b2 + a3*b3 ...
这种(因为显卡输出画面时需要处理大量类似的运算),这样的区别也决定了GPU用来做深度学习一般更快。你可以这样理解:CPU软编解码,类似于人手,你可以用手拿起扇子扇风,这样的效率比较低,但是手能处理多种复杂的任务;GPU硬编解码,类似于用电风扇吹风,电风扇只能用来吹风(它在硬件层面就是为了吹风而设计的),但是它吹风的效率远非用手扇风可比。
回到ffmpeg上,用GPU加速ffmpeg首先要确定ffmpeg支持的显卡驱动和你的显卡型号。
用 ffmpeg -hwaccels
可以查看ffmpeg支持的加速方式。如果你用的是windows电脑,并且像前文一样从ffmpeg官网直接下载windows平台的压缩包来安装ffmpeg,那么多半会看到以下输出:
......
Hardware acceleration methods:
cuda
dxva2
qsv
d3d11va
opencl
vulkan
熟悉显卡的朋友应该知道,cuda是英伟达NVIDIA推出的针对自家产品的一种架构,cuda实现了一些CPU的指令集,这样你就可以把GPU当成CPU来用(虽然有些问题用GPU很慢);dxva2则是微软推出的。总之,如果你用英伟达显卡,就去英伟达官网下显驱;如果用AMD显卡,就去AMD官网下显卡驱动。
装好驱动后,把压缩命令加上三个参数:硬件加速方式,解码器,编码器。比如用英伟达显卡,就是:
ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"
对应的是 -hwaccel cuvid
(加速方式), -c:v h264_cuvid
(硬解码器)和 -c:v h264_nvenc
(硬编码器)。
如果用AMD显卡,可以这样写:
ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"
ffmpeg对AMD显卡没有实现硬解码器,所以只能软解码。对应的是 -hwaccel dxva2
(加速方式), -c:v h264_amf
(硬编码器)。
最后,你还可以查看ffmpeg支持的编解码器,如果只限定h264的话,在cmd中输入:ffmpeg -encoders | findstr "h264"
、 ffmpeg -decoders | findstr "h264"
或者 ffmpeg -codecs | findstr "h264"
即可。
笔者的CPU主频3.6GHz,AMD显卡,只用CPU软解编码时占用CPU 70-80%,GPU 0%;用GPU加速后CPU占用30%,GPU占用10%。对于一段40min、比特率12000kb/s的视频片段,以pad标准压缩,前者花费10.98min,后者花费4.58min;不足之处是后者生成的文件稍大,大概是前者的7/6,也就是多了1/6,实际上硬编码生成的文件都比软编码的大。如果对文件大小敏感的话,可以参考下一种方法进行压缩加速。
(2)多电脑加速
如果一台电脑不够用,你还可以选择用多台电脑同时对一个文件夹内的视频进行处理。首先需要共享这个文件夹(NAS),比如用笔者其他文章中提到的Samba共享出来,然后修改一下 vimg_processor.py
,目标是能在多台电脑上对同一个文件夹内的视频进行编解码(需要解决冲突问题,笔者的解决方法是在直连NAS的电脑上运行一个redis数据库,来存储不同状态的视频),并且能够根据电脑的配置调整ffmpeg命令。
但是,使用NAS、多电脑之前,先看一下网速和硬盘读写是否已达上限。如果所有电脑都在同一个内网路由器下,且是千兆路由器,那么NAS读写速度大概最高110~120MB/s之间,看一下直连分享NAS的机器在每台电脑压缩时进出网速是多少,按笔者的经验一般一台台式机,压缩时接收的流量更多(因为是压缩),需要30~60 Mbps,也即3.75~7.5 MB/s(笔记本根据配置不同可能在1~5 MB/s),所以可以多加几台电脑。另外,实际的机械硬盘读写速度在150MB/s左右,减去110~120MB/s还有30~40MB/s的读写速度只能被直连NAS的机器使用,有条件的可以用更高速的路由器。
然后开始编写脚本。首先在直连NAS的电脑上编写一个python脚本,比如叫 vimg_update_redis.py
,负责更新redis、移动文件(所有 #需要更改 的前一行是可能需要自定义的地方):
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, re, traceback, redis, platform
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis
NAS_mounted_path = '/xxx/xxx/xxx/xxx/' # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join('/xxx/xxx/logs')
#需要更改
if not os.path.isdir(s=log_path):
os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
log_file_name='vimg_update_redis.log')
redis_key_set_todo = 'vimg_processor:set:todo' # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong' #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:' # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:' # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
sys_kind = platform.system()
def get_suffix(filename: str) -> str:
'''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
:param filename: 文件名(可带路径前缀)
:return str,文件后缀名(不含 . )'''
suffix = os.path.splitext(filename)[-1]
return (suffix[1:], '')[suffix == '']
def is_video(filename: str) -> bool:
'''根据后缀名判断文件是否视频。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是视频,False表示非视频'''
video_suffix_set = {
"WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
"M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
}
#需要更改
#iso文件不一定能用ffmpeg打开,可以自行取舍
return (False, True)[get_suffix(filename).upper() in video_suffix_set]
def is_img(filename: str) -> bool:
'''根据后缀名判断文件是否图片。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是图片,False表示非图片'''
img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
#需要更改
return (False, True)[get_suffix(filename).upper() in img_suffix_set]
def replace_path_seperator(path: str, sys_kind: str) -> str:
'''根据操作系统,替换路径中的分隔符。
:param path: 需要替换的路径
:param sys_kind: 目标操作系统
:return str,替换好的路径'''
if sys_kind == 'Windows':
return path.replace('/', '\\')
elif sys_kind == 'Linux':
return path.replace('\\', '/')
def is_origin(filename: str) -> bool:
'''接受文件名,判断这个文件是源文件还是压缩后的文件。
:param filename: 文件名(可带路径前缀)
:return bool,True表示该文件为源文件'''
prefix = os.path.splitext(filename)[0]
return (True, False)[re.search(r'#test#$', prefix) != None]
def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
'''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
:由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
:将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
:param abs_path: 文件在本机的绝对路径
:return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
global NAS_mounted_path
redis_rel_path = abs_path[len(NAS_mounted_path):]
return replace_path_seperator(redis_rel_path, sys_kind='Linux')
def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
'''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
:将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
:param redis_rel_path: redis中存储的相对路径
:return str,文件在本机中的绝对路径'''
global NAS_mounted_path, sys_kind
abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
return replace_path_seperator(abs_path, sys_kind=sys_kind)
def main():
'''更新文件信息。
:对于redis_key_set_wrong中记录的文件,把它们放到单独的目录里(压缩错误);
:对于redis_key_str_ongoing系列的文件,不作处理;
:对于redis_key_str_done系列的文件,把它们放到单独的目录里(压缩完);
:检查 压缩中 目录下的所有文件,如果在另外三个键中没有记录,把它放到redis_key_set_todo里;
:检查 redis_key_set_todo 中记录的文件,如果不存在则删除'''
global NAS_mounted_path, r_cur, log, sys_kind
global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
# 对于redis_key_set_wrong中记录的文件,把它们放到单独的目录里
# 对于单独文件,直接剪切;对于文件夹,则尝试新建相同的目录、剪切
while r_cur.scard(redis_key_set_wrong) != 0:
try:
redis_rel_path = r_cur.spop(redis_key_set_wrong, count=1)[0]
src_abs_path = replace_path_seperator(redis_rel_path,
sys_kind=sys_kind)
src_abs_path = os.path.join(NAS_mounted_path, src_abs_path)
# 将中间的文件夹替换
dst_abs_path = src_abs_path.replace('压缩中', '压缩错误', 1)
#需要更改
# 获得除了文件名以外,前面的目录路径
basename = os.path.basename(src_abs_path)
pre_dirs = dst_abs_path[:-len(basename)]
if not os.path.isdir(pre_dirs):
os.makedirs(pre_dirs)
os.rename(src_abs_path, dst_abs_path)
msg = f'redis_key_set_wrong: 将{src_abs_path}移动为{dst_abs_path}'
mylog.log_print(log, 'info', msg)
except:
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
msg = f'失败 redis_key_set_wrong: 移动{redis_rel_path}出错'
mylog.log_print(log, 'error', msg)
except_info = traceback.format_exc()
mylog.log_print(log, 'error', except_info)
break
# 对于redis_key_str_done系列的文件,把它们放到单独的目录里
# 对于单独文件,直接剪切;对于文件夹,则尝试新建相同的目录、剪切
redis_rel_path_keys = r_cur.keys(redis_key_str_done + '*')
for redis_rel_path_key in redis_rel_path_keys:
try:
# 获得压缩前后的源文件名
redis_rel_path_origin = redis_rel_path_key[len(redis_key_str_done
):]
redis_rel_path_compressed = r_cur.get(redis_rel_path_key)
src_abs_path_origin = replace_path_seperator(redis_rel_path_origin,
sys_kind=sys_kind)
src_abs_path_origin = os.path.join(NAS_mounted_path,
src_abs_path_origin)
src_abs_path_compressed = replace_path_seperator(
redis_rel_path_compressed, sys_kind=sys_kind)
src_abs_path_compressed = os.path.join(NAS_mounted_path,
src_abs_path_compressed)
# redis中存储的相对路径都是 压缩中
# 只有运行vimg_processor_multi中的delete_test时,会删除done系列键
# 有时来不及运行delete_test,这些剩余的done键不动,这里直接continue
if not os.path.isfile(src_abs_path_origin):
continue
# 获得压缩前后的目标文件名
dst_abs_path_origin = src_abs_path_origin.replace('压缩中', '压缩完', 1)
#需要更改
dst_abs_path_compressed = src_abs_path_compressed.replace(
'压缩中', '压缩完', 1)
#需要更改
# 获得除了文件名以外,前面的目录路径
basename = os.path.basename(src_abs_path_origin)
pre_dirs = dst_abs_path_origin[:-len(basename)]
if not os.path.isdir(pre_dirs):
os.makedirs(pre_dirs)
os.rename(src_abs_path_origin, dst_abs_path_origin)
os.rename(src_abs_path_compressed, dst_abs_path_compressed)
# r_cur.delete(redis_rel_path_key) 不要删除redis_key_str_done系列键,这些键在运行delete_test时删除
msg = f'redis_key_str_done: 移动{src_abs_path_origin}为{dst_abs_path_origin},\n移动{src_abs_path_compressed}为{dst_abs_path_compressed}'
mylog.log_print(log, 'info', msg)
except:
r_cur.set(redis_rel_path_key, redis_rel_path_compressed)
msg = f'失败 redis_key_str_done: 移动{redis_rel_path_origin}和{redis_rel_path_compressed}时出错'
mylog.log_print(log, 'error', msg)
except_info = traceback.format_exc()
mylog.log_print(log, 'error', except_info)
break
# 检查 压缩中 目录下的所有文件,如果在四个键中都没有记录,把它放到redis_key_set_todo里
dir_compressing = replace_path_seperator('xxx/xxx/压缩中', sys_kind=sys_kind)
#需要更改
dir_compressing = os.path.join(NAS_mounted_path, dir_compressing)
for dirpath, dirnames, filenames in os.walk(dir_compressing):
for filename in filenames:
# 跳过压缩后的文件
if not is_origin(filename):
continue
abs_path = os.path.join(dirpath, filename)
if (is_video(abs_path)
or is_img(abs_path)) and os.path.isfile(abs_path):
redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
condition = not r_cur.sismember(redis_key_set_todo,
redis_rel_path)
condition = condition and not r_cur.sismember(
redis_key_set_wrong, redis_rel_path)
condition = condition and r_cur.exists(redis_key_str_ongoing +
redis_rel_path) == 0
condition = condition and r_cur.exists(redis_key_str_done +
redis_rel_path) == 0
if condition:
r_cur.sadd(redis_key_set_todo, redis_rel_path)
msg = f'redis_key_set_todo: 将{redis_rel_path}添加到todo中'
mylog.log_print(log, 'info', msg)
# 如果有空的文件夹,删除掉。为了能一次删除完,需要深度优先遍历
for dirpath, dirnames, filenames in os.walk(dir_compressing,
topdown=False):
for dirname in dirnames:
abs_dir_path = os.path.join(dirpath, dirname)
dirpath_temp, dirnames_temp, filenames_temp = next(
os.walk(abs_dir_path))
if len(dirnames_temp) == 0 and len(
filenames_temp) == 0 and os.path.isdir(abs_dir_path):
os.rmdir(abs_dir_path)
msg = f'目录{abs_dir_path}为空,已删除'
mylog.log_print(log, 'info', msg)
# 检查 redis_key_set_todo 中记录的文件,如果不存在则删除redis记录
for redis_rel_path in r_cur.smembers(redis_key_set_todo):
abs_path = get_abs_path_from_redis_rel_path(redis_rel_path)
if not os.path.isfile(abs_path):
r_cur.srem(redis_key_set_todo, redis_rel_path)
msg = f'redis_key_set_todo: 文件{abs_path}不存在,已移除redis记录'
mylog.log_print(log, 'info', msg)
if __name__ == '__main__':
try:
main()
except:
except_info = traceback.format_exc()
mylog.log_print(log, 'error', except_info)
'''这个py文件用于更新vimg_processor需要使用的redis库,可以在任何连接到NAS的机器上使用。
推荐在直连NAS的机器上运行'''
如果在linux上,可以用crontab把这个脚本设置成定时启动,比如10min运行一次;windows上也有类似功能的软件,请自行寻找。
然后把 vimg_processor.py
修改为 vimg_processor_multi.py
,让它能适应不同的电脑和配置(所有 #需要更改 的前一行是可能需要自定义的地方):
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, time, json, subprocess, re, traceback, redis, platform, sys
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis
CPU_THREADS = (12, 8) # ffmpeg软编解码时线程数,根据CPU配置调整。分别为解码用的线程、编码用的线程,在压缩时,前者大于后者
#需要更改
# 可以用一个文件实验最佳的CPU_THREADS。软编解码的快慢看ffmpeg最右边的 speed=数字x
NAS_mounted_path = '/xxx/xxx/xxx/' # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
CWD = os.getcwd()
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join(CWD, 'logs')
#需要更改
if not os.path.isdir(s=log_path):
os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
log_file_name='vimg_processor_multi.log')
TEMP_DIR = os.path.join(CWD, 'temp') # temp这个文件夹主要是在压缩图片时存放临时的调色板
#需要更改
if not os.path.isdir(s=TEMP_DIR):
os.mkdir(path=TEMP_DIR)
redis_key_set_todo = 'vimg_processor:set:todo' # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong' #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:' # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:' # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
start_time = time.time()
fail_msg_list = [] # 每次有错误信息时都放到这个列表中
sys_kind = platform.system() # 字符串,表示系统类型
def get_suffix(filename: str) -> str:
'''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
:param filename: 文件名(可带路径前缀)
:return str,文件后缀名(不含 . )'''
suffix = os.path.splitext(filename)[-1]
return (suffix[1:], '')[suffix == '']
def is_video(filename: str) -> bool:
'''根据后缀名判断文件是否视频。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是视频,False表示非视频'''
video_suffix_set = {
"WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
"M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
}
#需要更改
#iso文件不一定能用ffmpeg打开,可以自行取舍
return (False, True)[get_suffix(filename).upper() in video_suffix_set]
def is_img(filename: str) -> bool:
'''根据后缀名判断文件是否图片。
:param filename: 文件名(可带路径前缀)
:return bool,True表示文件是图片,False表示非图片'''
img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
#需要更改
return (False, True)[get_suffix(filename).upper() in img_suffix_set]
def replace_path_seperator(path: str, sys_kind: str) -> str:
'''根据操作系统,替换路径中的分隔符。
:param path: 需要替换的路径
:param sys_kind: 目标操作系统
:return str,替换好的路径'''
if sys_kind == 'Windows':
return path.replace('/', '\\')
elif sys_kind == 'Linux':
return path.replace('\\', '/')
def is_origin(filename: str) -> bool:
'''接受文件名,判断这个文件是源文件还是压缩后的文件。
:param filename: 文件名(可带路径前缀)
:return bool,True表示该文件为源文件'''
prefix = os.path.splitext(filename)[0]
return (True, False)[re.search(r'#test#$', prefix) != None]
def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
'''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
:由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
:将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
:param abs_path: 文件在本机的绝对路径
:return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
global NAS_mounted_path
redis_rel_path = abs_path[len(NAS_mounted_path):]
return replace_path_seperator(redis_rel_path, sys_kind='Linux')
def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
'''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
:将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
:param redis_rel_path: redis中存储的相对路径
:return str,文件在本机中的绝对路径'''
global NAS_mounted_path, sys_kind
abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
return replace_path_seperator(abs_path, sys_kind=sys_kind)
def get_compressed_abs_path(abs_path: str, fmt: str = '') -> str:
'''根据源文件的绝对路径,返回压缩后文件的绝对路径。这里的绝对路径都是本机上的。
:压缩后的图像或者视频将放在同路径下,所以需要给转换后的文件名中加上字符串#test#(在后缀名前):1.mp4 -> 1#test#.mp4
:有些特殊情况下压缩后文件会重名,比如一个文件夹中同时存在1.mp4、1.rmvb时,又用fmt指定压缩后格式。此时会对压缩后的文件加上序号重命名(1#test#.mp4,1#test#(1).mp4,1#test#(2).mp4)
:param abs_path: 需要压缩的文件的绝对路径
:param fmt: 指定转换后文件的格式,为空字符串时不改变格式
:return str,压缩后文件的绝对路径,根据操作系统调整'''
prefix, suffix = os.path.splitext(abs_path)
suffix = (fmt, suffix.lstrip('.'))[fmt == '']
compressed_abs_path = prefix + '#test#.' + suffix
i = 1
while (os.path.exists(compressed_abs_path)):
compressed_abs_path = prefix + '#test#(' + str(i) + ').' + suffix
i += 1
return compressed_abs_path
def get_origin_abs_path(abs_path: str) -> str:
'''根据压缩后文件的绝对路径到redis中查询,返回源文件的绝对路径。这里的绝对路径都是本机上的。
:这个函数只应该用于已经压缩完的文件,函数将到redis_key_str_done的所有字符串中进行查询
:param abs_path: 压缩后文件的绝对路径
:return str,源文件的绝对路径,根据操作系统调整;若无法查询到,返回空字符串'''
global r_cur, sys_kind, NAS_mounted_path, redis_key_str_done
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
redis_rel_paths_keys = r_cur.keys(redis_key_str_done + '*')
for redis_rel_path_key in redis_rel_paths_keys:
if r_cur.get(redis_rel_path_key) == compressed_redis_rel_path:
origin_abs_path = redis_rel_path_key[len(redis_key_str_done):]
origin_abs_path = replace_path_seperator(origin_abs_path,
sys_kind=sys_kind)
origin_abs_path = os.path.join(NAS_mounted_path, origin_abs_path)
return origin_abs_path
return ''
class vimg_processor(object):
'''用于压缩的图像视频类。初始化时会读取图像视频的一些参数。'''
def __init__(self,
origin_abs_path: str,
compressed_abs_path: str = "",
video_size: str = '',
img_max_pix: int = 0,
fmt: str = ''):
'''用文件的绝对路径等参数初始化对象。
:param origin_abs_path: 源文件的绝对路径
:param compressed_abs_path: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存(1.mp4 -> 1#test#.mp4),有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名
:param video_size: 输出视频文件时,适配的屏幕大小。有 phone 、 pad 、 pc 、 空字符串 四种选择,空字符串表示保留原宽高
:param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
:param fmt: 输出文件的格式,即后缀名。为空字符串时,表示按原格式输出。其他可取值如 mp4 、 jpg 等'''
self.origin_abs_path = origin_abs_path
self.compressed_abs_path = (compressed_abs_path,
get_compressed_abs_path(
origin_abs_path,
fmt=fmt))[compressed_abs_path == ""]
self.width, self.height = 0, 1
if is_video(origin_abs_path):
self.get_video_info()
w, h = self.width, self.height
ratio = w / h
self.ratio = ratio
definition = ''
if video_size == 'phone':
if ratio <= 0.6:
definition = (f'{w}:{h}', '360:640')[w >= 360]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '360:480')[w >= 360]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '480:360')[w >= 480]
elif ratio > 1.66:
definition = (f'{w}:{h}', '640:360')[w >= 640]
elif video_size == 'pad':
if ratio <= 0.6:
definition = (f'{w}:{h}', '720:1280')[w >= 720]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '720:960')[w >= 720]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '960:720')[w >= 960]
elif ratio > 1.66:
definition = (f'{w}:{h}', '1280:720')[w >= 1280]
elif video_size == 'pc':
if ratio <= 0.6:
definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
elif ratio <= 1.0:
definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
elif ratio <= 1.66:
definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
elif ratio > 1.66:
definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
elif video_size == '':
definition = f'{w}:{h}'
self.definition = definition
elif is_img(origin_abs_path):
self.pix_fmt = '' # pix_fmt即像素格式
fmt = (fmt, get_suffix(origin_abs_path))[fmt == '']
self.palette = os.path.join(TEMP_DIR, 'palette.' + fmt)
self.temp_img = os.path.join(TEMP_DIR, 'temp_img.' + fmt)
self.get_img_info()
w, h = self.width, self.height
img_max_pix = abs(img_max_pix)
if img_max_pix == 0:
pass
elif w > img_max_pix and h > img_max_pix:
ratio = max(w, h) / img_max_pix
w, h = int(w / ratio), int(h / ratio)
self.definition = f'{w}:{h}'
def get_video_info(self):
'''使用ffprobe获取视频的宽高(width、height)。'''
global sys_kind
command_probe = f'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{self.origin_abs_path}"'
if sys_kind == 'Windows':
result = subprocess.Popen(command_probe,
shell=False,
stdout=subprocess.PIPE).stdout
elif sys_kind == 'Linux':
result = subprocess.Popen(command_probe,
shell=True,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
# self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
# 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算
def get_img_info(self):
'''获取图片的width、height、pix_fmt。'''
global sys_kind
command_probe = f'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{self.origin_abs_path}"'
if sys_kind == 'Windows':
result = subprocess.Popen(command_probe,
shell=False,
stdout=subprocess.PIPE).stdout
elif sys_kind == 'Linux':
result = subprocess.Popen(command_probe,
shell=True,
stdout=subprocess.PIPE).stdout
list_std = result.readlines()
str_tmp = ''
for item in list_std:
str_tmp += bytes.decode(item.strip())
json_data = json.loads(str_tmp)
self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
self.width = int(json_data['streams'][0]['width'])
self.height = int(json_data['streams'][0]['height'])
def compress_video(self):
'''通过ffmpeg压缩视频文件(输出的像素格式指定为yuv420p)。'''
global CPU_THREADS
# AMD显卡加速,硬编码
'''command_compress = 'ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"'.format(
self.origin_abs_path, self.definition, self.compressed_abs_path)'''
# NVIDIA显卡加速,硬编解码
'''command_compress = 'ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"'.format(
self.origin_abs_path, self.definition, self.compressed_abs_path)'''
# CPU软编解码。-threads 参数可限制线程数,控制CPU占用。
command_compress = 'ffmpeg -threads {} -i "{}" -threads {} -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
CPU_THREADS[0], self.origin_abs_path, CPU_THREADS[1],
self.definition, self.compressed_abs_path)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_video failed,视频压缩失败:{command_compress}'
)
def compress_img(self):
'''通过ffmpeg压缩图像文件。'''
command_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
self.origin_abs_path, self.palette)
command_compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
result = os.system(command_palette)
if result != 0:
raise Exception(
f'vimg_processor.compress_img failed,无法获得调色板:{command_palette}'
)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_img failed,生成中间图片失败:{command_compress}'
)
command_compress = 'ffmpeg -i "{}" -vf scale={} -y "{}"'.format(
self.temp_img, self.definition, self.compressed_abs_path)
result = os.system(command_compress)
if result != 0:
raise Exception(
f'vimg_processor.compress_img failed,生成最终图片失败:{command_compress}'
)
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
def get_definitions_video(target_dir: str):
'''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。
:param target_dir: 目标文件夹的绝对路径
:结果会输出到当前目录下的 definitions.txt 文件中'''
global CWD
ratio_dict = dict() # 字典,键为宽高的比值,值为列表,包括该比值下的具体宽高数值
# 例:1.7777778:['640x360', '1280x720', '1920x1080']
for dirpath, dirnames, filenames in os.walk(target_dir):
for filename in filenames:
abs_path = os.path.join(dirpath, filename)
if is_video(abs_path):
print('\r视频文件:{:200}'.format(abs_path), end='')
vimg = vimg_processor(abs_path)
ratio = vimg.ratio
definition = "{}x{}".format(vimg.width, vimg.height)
if ratio_dict.get(ratio) == None:
ratio_dict[ratio] = [definition]
else:
if definition not in ratio_dict[ratio]:
ratio_dict[ratio].append(definition)
# 按宽高比值升序输出
ratio_list = []
for key, value in ratio_dict.items():
ratio_list.append(key)
ratio_list.sort()
with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
for ratio in ratio_list:
ratio_dict[ratio].sort()
f.write('宽高比:{:.6},成员:'.format(ratio))
for definition in ratio_dict[ratio][:-1]:
f.write(definition + ' , ')
f.write(ratio_dict[ratio][-1] + '\n')
print('')
def compress_videos(video_size: str = '', fmt: str = ''):
'''从redis中得到需要压缩的视频文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
:对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
:输出文件在源文件名末尾加 #test# 放在同目录下(1.mp4 -> 1#test#.mp4);输出文件有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名。
:param video_size: 视频适配的屏幕大小。 phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变原大小
:param fmt: 指定输出的文件格式,空字符串表示原格式。建议用mp4
'''
global NAS_mounted_path, r_cur, log, start_time, fail_msg_list, sys_kind
global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
mylog.log_print(log, 'info', msg)
while (True):
if r_cur.scard(redis_key_set_todo) == 0:
break
try:
redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
rel_path = replace_path_seperator(redis_rel_path,
sys_kind=sys_kind)
abs_path = os.path.join(NAS_mounted_path, rel_path)
if is_video(abs_path) and os.path.isfile(abs_path):
vimg = vimg_processor(origin_abs_path=abs_path,
video_size=video_size,
fmt=fmt)
compressed_abs_path = vimg.compressed_abs_path
if os.path.exists(compressed_abs_path): # 输出文件已存在时跳过
continue
start_time = time.time()
log.info('{}压缩中...'.format(abs_path))
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
compressed_abs_path)
r_cur.set(redis_key_str_ongoing + redis_rel_path,
compressed_redis_rel_path)
vimg.compress_video()
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.set(redis_key_str_done + redis_rel_path,
compressed_redis_rel_path)
log.info('视频压缩成功,耗时{:.2f}min'.format(
(time.time() - start_time) / 60))
except:
except_info = traceback.format_exc()
fail_msg_list.append(except_info)
mylog.log_print(log, 'error', except_info)
if os.path.exists(compressed_abs_path):
os.remove(compressed_abs_path)
r_cur.srem(redis_key_set_todo, redis_rel_path)
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.delete(redis_key_str_done + redis_rel_path)
exc_type, exc_value, exc_traceback = sys.exc_info() # 获得异常名
exc_type = exc_type.__name__
if exc_type == 'KeyboardInterrupt': # 在linux上时,无法得到KeyboardInterrupt
r_cur.sadd(redis_key_set_todo, redis_rel_path)
else:
mylog.log_print(log, 'error', f'错误类型为{exc_type}')
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
break
def compress_imgs(fmt: str = '', img_max_pix: int = 0):
'''从redis中得到需要压缩的图片文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
:对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
:输出文件在源文件名末尾加 #test# 放在同目录下(1.jpg -> 1#test#.jpg);输出文件有重名时会自动按 1#test#.jpg 、1#test#(1).jpg 、1#test#(2).jpg 的顺序重命名。
:param fmt: 指定输出的文件格式,空字符串表示原格式。建议用jpg
:param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
'''
global NAS_mounted_path, TEMP_DIR, r_cur, log, start_time, fail_msg_list, sys_kind
global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
mylog.log_print(log, 'info', msg)
while (True):
if r_cur.scard(redis_key_set_todo) == 0:
break
try:
redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
rel_path = replace_path_seperator(redis_rel_path,
sys_kind=sys_kind)
abs_path = os.path.join(NAS_mounted_path, rel_path)
if is_img(abs_path) and os.path.isfile(abs_path):
vimg = vimg_processor(origin_abs_path=abs_path,
img_max_pix=img_max_pix,
fmt=fmt)
compressed_abs_path = vimg.compressed_abs_path
if os.path.exists(compressed_abs_path): # 输出文件已存在时跳过
continue
log.info('{}压缩中...'.format(abs_path))
compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
compressed_abs_path)
r_cur.set(redis_key_str_ongoing + redis_rel_path,
compressed_redis_rel_path)
vimg.compress_img()
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.set(redis_key_str_done + redis_rel_path,
compressed_redis_rel_path)
except:
except_info = traceback.format_exc()
fail_msg_list.append(except_info)
mylog.log_print(log, 'error', except_info)
if os.path.exists(compressed_abs_path):
os.remove(compressed_abs_path)
r_cur.srem(redis_key_set_todo, redis_rel_path)
r_cur.delete(redis_key_str_ongoing + redis_rel_path)
r_cur.delete(redis_key_str_done + redis_rel_path)
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
exc_type, exc_value, exc_traceback = sys.exc_info() # 获得异常名
exc_type = exc_type.__name__
if exc_type == 'KeyboardInterrupt': # 在linux上时,无法得到KeyboardInterrupt
r_cur.sadd(redis_key_set_todo, redis_rel_path)
else:
mylog.log_print(log, 'error', f'错误类型为{exc_type}')
r_cur.sadd(redis_key_set_wrong, redis_rel_path)
break
# 最后清理一下TEMP_DIR
for dirpath, dirnames, filenames in os.walk(TEMP_DIR):
for filename in filenames:
os.remove(os.path.join(dirpath, filename))
def delete_test():
'''视频或图片压缩完毕后,比较原文件与压缩后的文件,留下较小且大小非0的文件。
:由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。具体见 下载-压缩-入库流程
:将到redis的redis_key_str_done中读取压缩前后的文件相对路径。删除#test#文件后,删除redis_key_str_done对应的键
'''
global r_cur, log, fail_msg_list
global redis_key_str_done
redis_rel_path_done_keys = r_cur.keys(redis_key_str_done + '*')
for redis_rel_path_done_key in redis_rel_path_done_keys:
origin_redis_rel_path = redis_rel_path_done_key[len(redis_key_str_done
):]
compressed_redis_rel_path = r_cur.get(redis_rel_path_done_key)
origin_abs_path = get_abs_path_from_redis_rel_path(
origin_redis_rel_path)
compressed_abs_path = get_abs_path_from_redis_rel_path(
compressed_redis_rel_path)
# redis中存储的相对路径都是 压缩中 ,但是现在这些文件已经手动放到了 检查中 里
origin_abs_path = origin_abs_path.replace('压缩中', '检查中', 1)
#需要更改
compressed_abs_path = compressed_abs_path.replace('压缩中', '检查中', 1)
#需要更改
if os.path.isfile(origin_abs_path) and os.path.isfile(
compressed_abs_path):
before_size = os.path.getsize(origin_abs_path)
after_size = os.path.getsize(compressed_abs_path)
if after_size < before_size and after_size != 0:
try:
os.remove(origin_abs_path)
# 注意这里的压缩后文件名改回原名时是把最右边的第一个#test#去掉,而不是直接改成源文件名
# 因为如果压缩时用fmt指定输出格式,则可能当前目录下有重名文件
# 1.avi、1.wmv -> 1#test#.mp4、1#test#(1).mp4 -> 1.mp4、1(1).mp4
prefix, suffix = os.path.splitext(compressed_abs_path)
reversed_prefix = prefix[::-1]
reversed_prefix = reversed_prefix.replace('#tset#', '', 1)
prefix = reversed_prefix[::-1]
temp = prefix + suffix
os.rename(compressed_abs_path, temp)
r_cur.delete(redis_rel_path_done_key)
log.info(f'将{origin_abs_path}替换为小文件')
except PermissionError:
r_cur.set(redis_rel_path_done_key,
compressed_redis_rel_path)
msg = f'delete_test failed,删除失败:{origin_abs_path}'
mylog.log_print(log, 'warning', msg)
fail_msg_list.append(msg)
continue
else:
os.remove(compressed_abs_path)
msg = f'{origin_abs_path}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'
log.info(msg)
# 最后检查一下有没有剩余的处理后文件
dir_checking = replace_path_seperator('xxx/xxx/检查中', sys_kind=sys_kind)
#需要更改
dir_checking = os.path.join(NAS_mounted_path, dir_checking)
for dirpath, dirnames, filenames in os.walk(dir_checking):
for filename in filenames:
if '#test#' in filename:
abs_path = os.path.join(dirpath, filename)
msg = f'疑似还有#test#文件:{abs_path}'
mylog.log_print(log, 'warning', msg)
fail_msg_list.append(msg)
def main():
'''执行入口。'''
target_dir = r'D:\xxx\xxx'
#需要更改
global sys_kind
try:
#get_definitions_video(target_dir)
compress_videos(video_size='pad', fmt='mp4')
#compress_imgs(fmt='jpg', img_max_pix=700)
#delete_test()
print('已完成')
if len(fail_msg_list) != 0:
print('本次压缩时出现了以下异常:')
for i in fail_msg_list:
print(i)
if sys_kind == 'Windows':
input()
except:
except_info = traceback.format_exc()
mylog.log_print(log, 'error', except_info)
if sys_kind == 'Windows':
input()
if __name__ == '__main__':
main()
'''下载-压缩-入库流程:
1.将需要压缩的文件先暂时放到 待压缩 中,进行人工筛选
2.筛选完后,将 待压缩 中的文件手动剪切到 压缩中 文件夹里
3.直连NAS的机器运行python脚本,更新redis库、移动文件
4.其他连接到NAS的机器从redis中获取任务、进行压缩
5.从 压缩完 中将文件剪切到 检查中 里,进行人工检查
6.检查完后,手动运行vimg_processor_multi里的delete_test,保留源文件或压缩后文件中的一个
7.手动将 检查中 里的文件放到其他目录,比如 待迁移 或 入库中 里
8.运行file_to_sql,将本地文件入库
'''
在其他机器上运行这个脚本前先搭建环境,把需要的python库安装好( pip3 install redis
),装好ffmpeg、把ffmpeg和ffprobe的二进制文件放到PATH里。
如果有兴趣的话,可以写的更复杂一点,比如写一个分布式的,能通过网页UI管理。