步骤/目录:
1.需求介绍
2.电影压缩
3.本地视频入库
4.ffmpeg加速
    (1)显卡加速
    (2)多电脑加速

本文首发于个人博客https://lisper517.top/index.php/archives/56/,转载请注明出处。
本文的目的是在电影下载完成后,进行一些处理。
本文写作日期为2022年10月5日。运行的平台为win10,编辑器为VS Code。

1.需求介绍

首先,现在电影占用的空间越来越大,对电影进行一定压缩有助于存储。
其次,在之前mysql的movie表中,还有两列: downloadedfile_path ,这两列是用来判断一部电影是否被下载过的。
所以目前的需求是对电影进行压缩,并进行入库。

2.电影压缩

这里使用ffmpeg,它是一个被广泛使用的处理音视频、图片的开源软件,比如格式工厂就是在内部调用ffmpeg转换格式的,暴风影音等也可以使用ffmpeg进行播放。这里是 ffmpeg的官网下载地址 ,但是官网只提供源码下载,你可以自行make编译,或者到 github网页 下载由热心网友为windows系统编译好的ffmpeg压缩包,下载并解压到合适的位置,然后把ffmpeg添加到路径(windows添加路径的方法请自行搜索)。最后在cmd中输入 ffmpeg -h ,成功显示帮助信息即可。
在ffmpeg的bin文件夹中,除了最常用的ffmpeg,还有ffplay,用于播放;ffprobe,用于获取视频信息。

写一个python脚本,目的是对一个文件夹下的视频文件进行压缩处理,脚本名为 vimg_processor.py ,内容如下:

import os, time, json, subprocess, re, traceback
from MyPythonLib import log as mylog

CWD = os.getcwd()
TEMP_DIR = os.path.join(CWD, 'temp')
if not os.path.isdir(TEMP_DIR):
    os.mkdir(TEMP_DIR)
#temp这个文件夹主要是在压缩图片时存放临时的调色板
log = mylog.get_logger(os.path.join(CWD, 'logs'))
start_time = time.time()
fail_msg_list = []


def get_suffix(file_name: str) -> str:
    '''返回文件后缀名。若文件无后缀名,返回空字符串。
    :param file_name:文件名(可带完整路径)
    :return 大写的后缀名(不含 . )
    '''
    suffix = os.path.splitext(file_name)
    suffix = (suffix[-1].lstrip('.')).upper()
    return suffix


def special_format(file_name: str) -> bool:
    '''判断文件是否特殊格式。
    :rmvb格式必须指定输出为mp4,ts、mpg格式跳跃观看时卡顿严重,所以都应该输出为mp4格式。
    :param file_name:文件名(可带完整路径)
    :return bool
    '''
    videoSuffixSet = {"RMVB", "TS", "MPG"}
    judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
    return judge


def is_video(file_name: str) -> bool:
    '''根据后缀名判断文件是否视频。
    :param file_name:文件名(可带完整路径)
    :return bool
    '''
    videoSuffixSet = {
        "WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
        "M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
    }
    judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
    return judge


def is_img(file_name: str) -> bool:
    '''根据后缀名判断文件是否图片。
    :param file_name:文件名(可带完整路径)
    :return bool
    '''
    imgSuffixSet = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
    judge = (False, True)[get_suffix(file_name) in imgSuffixSet]
    return judge


def cannot_process(file_name: str) -> bool:
    '''判断文件是否暂时处理不了。
    :param file_name:文件名(可带完整路径)
    :return bool
    '''
    videoSuffixSet = set()
    judge = (False, True)[get_suffix(file_name) in videoSuffixSet]
    return judge


def is_origin(file_name: str) -> bool:
    '''判断文件是否源文件(源文件中末尾没有#test#)。
    :param file_name:文件名(可带完整路径)
    :return bool
    '''
    prefix = (os.path.splitext(file_name))[0]
    judge = (True, False)[re.match('.+#test#$', prefix) != None]
    return judge


class vimg_processor(object):
    '''图像视频类,初始化时会读取图像视频的一些参数。'''

    def __init__(self,
                 inputName: str,
                 outName: str = "",
                 video_size: str = 'pc',
                 mp4_jpg: bool = False,
                 img_max_pix: int = 700):
        '''用完整的文件名初始化对象
        :param inputName: 源文件的绝对路径
        :param outName: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存
        :param video_size: 视频文件的输出大小,有 phone 、 pad 、 pc 、 空字符串 三种选择,空串表示保留原宽高
        :param mp4_jpg: 输出格式是否固定为mp4和jpg
        :param img_max_pix: 输出图像的宽高同时超过该值时,宽高中较长的一个数值会被压缩到该值
        '''
        self.inputName = inputName
        if outName == "":
            prefix, suffix = os.path.splitext(inputName)
            outName = prefix + "#test#" + suffix
        if mp4_jpg == True:
            prefix, suffix = os.path.splitext(outName)
            if is_video(inputName):
                outName = prefix + '.mp4'
            elif is_img(inputName):
                outName = prefix + '.jpg'
        self.outName = outName
        if is_img(self.inputName):
            if mp4_jpg == False:
                suffix = get_suffix(self.inputName).lower()
                self.out_name_palette = os.path.join(TEMP_DIR,
                                                     'palette' + suffix)
                self.temp_img = os.path.join(TEMP_DIR, 'temp_img' + suffix)
            else:
                self.out_name_palette = os.path.join(TEMP_DIR, 'palette.jpg')
                self.temp_img = os.path.join(TEMP_DIR, 'temp_img.jpg')
        # 获取视频文件的宽高,根据宽高比决定输出文件宽高。
        # 这里只是在宽度超过上限时减小宽高
        self.width, self.height = 0, 1
        if is_video(inputName):
            self.get_video_info()
            w, h = self.width, self.height
            self.ratio = w / h
            if video_size == 'phone':
                if self.ratio <= 0.6:
                    self.definition = (f'{w}:{h}', '360:640')[w >= 360]
                elif self.ratio <= 1.0:
                    self.definition = (f'{w}:{h}', '360:480')[w >= 360]
                elif self.ratio <= 1.66:
                    self.definition = (f'{w}:{h}', '480:360')[w >= 480]
                elif self.ratio > 1.66:
                    self.definition = (f'{w}:{h}', '640:360')[w >= 640]
            elif video_size == 'pad':
                if self.ratio <= 0.6:
                    self.definition = (f'{w}:{h}', '720:1280')[w >= 720]
                elif self.ratio <= 1.0:
                    self.definition = (f'{w}:{h}', '720:960')[w >= 720]
                elif self.ratio <= 1.66:
                    self.definition = (f'{w}:{h}', '960:720')[w >= 960]
                elif self.ratio > 1.66:
                    self.definition = (f'{w}:{h}', '1280:720')[w >= 1280]
            elif video_size == 'pc':
                if self.ratio <= 0.6:
                    self.definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
                elif self.ratio <= 1.0:
                    self.definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
                elif self.ratio <= 1.66:
                    self.definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
                elif self.ratio > 1.66:
                    self.definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
            elif video_size == '':
                self.definition = f'{w}:{h}'
        # 获取图片的编码格式
        elif is_img(inputName):
            self.pix_fmt = ''
            self.get_img_info()
            if self.width > img_max_pix and self.height > img_max_pix:
                if self.width <= self.height:
                    ratio = self.width / img_max_pix
                elif self.width > self.height:
                    ratio = self.height / img_max_pix
                self.width, self.height = int(self.width / ratio), int(
                    self.height / ratio)
            self.definition = '{}:{}'.format(self.width, self.height)

    def get_video_info(self):
        '''获取视频的宽高。'''
        probe = 'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{}"'.format(
            self.inputName)
        result = subprocess.Popen(probe, shell=False,
                                  stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        # self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
        # 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算
        try:
            self.width = int(json_data['streams'][0]['width'])
            self.height = int(json_data['streams'][0]['height'])
        except:
            self.width, self.height = 0, 1

    def get_img_info(self):
        '''获取图片的一些信息。'''
        probe = 'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{}"'.format(
            self.inputName)
        result = subprocess.Popen(probe, shell=False,
                                  stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        try:
            self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
            self.width = int(json_data['streams'][0]['width'])
            self.height = int(json_data['streams'][0]['height'])
        except:
            self.pix_fmt, self.width, self.height = '', 0, 0

    def Zip_Video(self):
        '''通过ffmpeg压缩视频文件(输出的分辨率格式指定为yuv420p)。'''
        compress = 'ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
            self.inputName, self.definition,
            self.outName)  # -threads 8 限制线程数,解码和编码都设置,保护CPU。压缩视频费cpu
        compress = os.system(compress)
        if compress != 0:
            return (compress, "视频压缩失败")
        return True

    def Zip_Img(self):
        '''通过ffmpeg压缩图像文件。'''
        get_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
            self.inputName, self.out_name_palette)
        compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
            self.inputName, self.out_name_palette, self.pix_fmt, self.temp_img)
        get_palette = os.system(get_palette)
        if get_palette != 0:
            return (get_palette, "无法获得调色板")
        compress = os.system(compress)
        if compress != 0:
            return (compress, "生成中间图片失败")
        compress = 'ffmpeg -i "{}" -vf scale={}  -y "{}"'.format(
            self.temp_img, self.definition, self.outName)
        compress = os.system(compress)
        if compress != 0:
            return (compress, "生成最终图片失败")
        return True


'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''


def get_definitions_video(dir):
    '''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。'''
    ratio_dict = dict()
    for path_prefix, subdirs, file_names in os.walk(dir):
        for file_name in file_names:
            complete_path = os.path.join(path_prefix, file_name)
            if is_video(complete_path):
                print('\r视频文件:{:200}'.format(complete_path), end='')
                vimg = vimg_processor(complete_path)
                ratio = vimg.ratio
                definition = "{}x{}".format(vimg.width, vimg.height)
                if ratio_dict.get(ratio) == None:
                    ratio_dict[ratio] = [definition]
                else:
                    if definition not in ratio_dict[ratio]:
                        ratio_dict[ratio].append(definition)
    ratio_list = []
    for key, value in ratio_dict.items():
        ratio_list.append(key)
    ratio_list.sort()
    with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
        for ratio in ratio_list:
            ratio_dict[ratio].sort()
            f.write('宽高比:{:.6},成员:'.format(ratio))
            for definition in ratio_dict[ratio][:-1]:
                f.write(definition + ' , ')
            f.write(ratio_dict[ratio][-1] + '\n')
    print('')


def zip_dir_video(dir, video_size='pc', mp4=False):
    '''压缩一个文件夹下的视频文件。
    :输出文件自动命名为同目录下 源文件名+#test# 文件,后缀名不变。
    :对已经存在的带有#test#的文件,会跳过。
    :param dir: 文件夹的绝对路径
    :param video_size: 视频宽高, phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变
    :param mp4: 是否仅输出mp4文件
    '''
    global log, start_time, fail_msg_list
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    print(msg)
    log.info(msg)
    for path_prefix, subdirs, file_names in os.walk(dir):
        for file_name in file_names:
            if '#test#.' in file_name:
                continue
            complete_path = os.path.join(path_prefix, file_name)
            if is_video(complete_path) and not cannot_process(complete_path):
                vimg = vimg_processor(complete_path,
                                      mp4_jpg=mp4,
                                      video_size=video_size)
                if os.path.exists(vimg.outName):  # 输出文件已存在时跳过
                    continue
                if vimg.width == 0 or vimg.height == 1:
                    msg = f'无法获得视频信息{complete_path}'
                    print(msg)
                    log.warning(msg)
                    fail_msg_list.append(msg)
                    continue
                start_time = time.time()
                log.info('{}压缩中...'.format(complete_path))
                if vimg.Zip_Video() == True:
                    log.info('视频压缩成功,耗时{:.2f}min'.format(
                        (time.time() - start_time) / 60))
                else:
                    msg = f'视频压缩失败{complete_path}'
                    log.warning(msg)
                    fail_msg_list.append(msg)


def zip_dir_img(dir, jpg=False, img_max_pix=700):
    '''压缩一个文件夹下的图像文件。
    :输出文件自动命名为同目录下 源文件名+#test# 文件,后缀名不变。
    :对已经存在的带有#test#的文件,会跳过。
    :param dir: 文件夹的绝对路径
    :param img_max_pix: 宽高同时超过这个值时,宽高中较大的数值会缩小到这个数值
    :param jpg: 是否仅输出jpg文件
    '''
    global log, start_time, fail_msg_list
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    print(msg)
    log.info(msg)
    for path_prefix, subdirs, file_names in os.walk(dir):
        for file_name in file_names:
            if '#test#.' in file_name:
                continue
            complete_path = os.path.join(path_prefix, file_name)
            if is_img(complete_path):
                print('\r{:200}'.format(complete_path), end='')
                vimg = vimg_processor(complete_path,
                                      mp4_jpg=jpg,
                                      img_max_pix=img_max_pix)
                if os.path.isfile(vimg.outName):  # 输出文件已存在时跳过
                    continue
                if vimg.width == 0 or vimg.height == 1 or vimg.pix_fmt == '':
                    msg = f'无法获得图片信息{complete_path}'
                    print(msg)
                    log.warning(msg)
                    fail_msg_list.append(msg)
                    continue
                if vimg.Zip_Img() == True:
                    log.info('图片压缩完毕{}'.format(complete_path))
                else:
                    msg = f'图片压缩失败{complete_path}'
                    log.warning(msg)
                    fail_msg_list.append(msg)


def delete_test(dir, mp4_jpg=False):
    '''在处理一个文件夹中的文件后,比较原文件与处理后文件,留下较小且大小非0的文件(对于一些特殊格式,不论大小仅留下处理后的文件)。
    :由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。
    :默认输出文件就是 源文件名+#test#。对于没有#test#的文件不做处理。
    :param dir: 文件夹的绝对路径
    :param mp4_jpg: 进行压缩时是否仅输出mp4/jpg文件
    '''
    global fail_msg_list
    for path_prefix, subdirs, file_names in os.walk(dir):
        for file_name in file_names:
            complete_path = os.path.join(path_prefix, file_name)
            if (is_video(complete_path)
                    or is_img(complete_path)) and is_origin(complete_path):
                prefix, suffix = os.path.splitext(complete_path)
                if mp4_jpg == False:
                    file_name_after_zip = prefix + "#test#" + suffix
                elif mp4_jpg == True:
                    if is_video(complete_path):
                        file_name_after_zip = prefix + "#test#" + '.mp4'
                    elif is_img(complete_path):
                        file_name_after_zip = prefix + "#test#" + '.jpg'
                if os.path.isfile(file_name_after_zip):
                    after_size = os.path.getsize(file_name_after_zip)
                    before_size = os.path.getsize(complete_path)
                    if (after_size < before_size or
                            special_format(complete_path)) and after_size != 0:
                        try:
                            os.remove(complete_path)
                            os.rename(
                                file_name_after_zip,
                                file_name_after_zip.replace('#test#.', '.'))
                            log.info('将{}替换为小文件'.format(complete_path))
                        except PermissionError:
                            msg = '删除失败:{complete_path}'
                            log.warning(msg)
                            fail_msg_list.append(msg)
                            continue
                    else:
                        os.remove(file_name_after_zip)
                        log.info('{}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'.format(
                            file_name_after_zip))
    # 最后检查一下有没有剩余的处理后文件
    for path_prefix, subdirs, file_names in os.walk(dir):
        for file_name in file_names:
            if '#test#.' in file_name:
                msg = '疑似还有#test#文件:{}'.format(
                    os.path.join(path_prefix, file_name))
                log.warning(msg)
                fail_msg_list.append(msg)


if __name__ == '__main__':
    target_dir = r'D:\压缩中'
    try:
        #get_definitions_video(target_dir)
        zip_dir_video(target_dir, mp4=True, video_size='pad')
        #zip_dir_img(target_dir, jpg=True, img_max_pix=700)
        #delete_test(target_dir, mp4_jpg=True)
        for path_prefix, subdirs, file_names in os.walk(TEMP_DIR):
            for file_name in file_names:
                os.remove(os.path.join(path_prefix, file_name))
        print('已完成')
        if len(fail_msg_list) != 0:
            print('本次压缩时出现了以下异常:')
            for i in fail_msg_list:
                print(i)
        input()
    except:
        traceback.print_exc()
        input()

这个脚本中, target_dir 是存放视频文件的目录, zip_dir_video 是对该目录下的视频文件进行压缩,压缩后的视频文件名字末尾(后缀之前)会加上 #test# 以示区分,和原视频放在同一目录下。
另外这个脚本也可以用于对图片进行压缩,笔者使用的图片压缩方法很省空间,但鲜艳度不够。

在脚本的使用上,把所有要压缩的视频文件都放到target_dir里(可以放到多层目录中),然后首先以原状态运行脚本,压缩完后随机抽取检查一下压缩后的视频文件,注意本次压缩有无提示异常,确认无误后注释掉 zip_dir_video(target_dir, mp4=True, video_size='pad') 这一行,删去 delete_test(target_dir, mp4_jpg=True) 这一行前的注释 # 号,再运行一遍,即可在原视频文件和压缩后的 #test# 文件中保留更小的那个。如果运行时需要暂停或停止,用鼠标点击cmd框中间可暂停;按ctrl+C可停止,停止时要把最后一个正在生成的 #test# 文件删除,然后下次有时间重新运行脚本,这个脚本会自动跳过带有 #test# 的文件,也不会对已经有 #test# 压缩后文件的视频重复压缩。

最后,注意 zip_dir_video(target_dir, mp4=True, video_size='pad') 中的 video_size 参数,可以取phone、pad、pc或空字符串,分别表示视频的高x宽适配手机(360x640)、平板(720x1280)、电脑(1080x1920)或原大小。输出视频的宽高越小,压缩速度越快(对2h的视频,用phone大小,笔者的电脑大概需要压缩8-25min,根据源文件大小浮动),一般选phone或pad即可,选pc时的压缩速度太慢了;大小方面,2h的视频,大概是500MB、1.5GB、3.5GB的区别。另外,视频压缩主要消耗cpu资源,在:

compress = 'ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'

这一行,两处 -threads 8 是限制调用的cpu核心数,可以根据自己的电脑情况修改。一般cpu使用率75-80%就比较合适,而此时显卡却不会怎么使用,所以可以边压缩视频边玩一些需要显卡的游戏。

3.本地视频入库

主要是将视频文件(夹)放到合适的目录下,并修改对应的movie表中downloaded和file_path这两列;如果没有对应的movie表,则可以选择在movie表中新建一行。
由于自动识别视频比较复杂,这里选择手工识别(但是能简化的地方都简化了,只是输入稍微有点繁琐)。写好的file_into_sql.py文件如下(放到scrapy项目里):

from MyPythonLib import log as mylog
from MyPythonLib.log import log_print
from MyPythonLib import torrent_parser as t_p
from libs import javbus_extensions as j_e
import os, re

TORRENT_DIRS = [r'D:\torrents'] #存放torrent文件的所有目录
VIDEO_DIR = r'D:\into_sql' #存放本次需要入库的文件(夹)
VIDEO_DIR_UNKNOWN = r'D:\unknown' #有些视频不太好处理,想要跳过。所有跳过的文件(夹)会被放到这个目录里
LOG_DIR = os.path.join(os.getcwd(), 'logs')
if not os.path.isdir(LOG_DIR):
    os.mkdir(LOG_DIR)
log = mylog.get_logger(LOG_DIR, 'file_to_sql.txt')


class video_file_processor:
    """用于处理视频文件(夹)。主要工作流程:
    :进入TORRENT_DIRS,记录torrent的hash值与文件名
    :对VIDEO_DIR内的视频文件(夹),先在torrent字典中的文件查找,若文件名出现过(或者文件夹内的文件全部匹配),则认定视频文件(夹)对应该种子,那么查找数据库对应的torrent记录,找到对应的电影记录(movie_no)。
    :若不能通过torrent记录找到movie_no,则尝试通过code查找movie_no(需要手动输入)
    :若无法找到code,则尝试通过full_name查找movie_no(需要手动输入)
    :在通过code或full_name查找movie_no的过程中,也可能新建movie_no
    :只要找到或新建movie_no,最后该文件都会被放到合适的地方;否则该文件会被转移到VIDEO_DIR_UNKNOWN中"""

    def __init__(self, logger=None) -> None:
        """:param log: 可选的logger对象。若"""
        global log
        self.log = (logger, log)[logger == None]
        self.torrents_files = self.get_torrents_files()
        self.database_api_javbus = j_e.database_api_javbus(self.log)
        self.conn = self.database_api_javbus.conn
        self.m_cur = self.database_api_javbus.m_cur
        self.r_cur = self.database_api_javbus.r_cur
        self.movie_no = 0
        self.code = "unknown"
        self.full_name = "unknown"

    def get_torrents_files(self) -> dict:
        """得到torrents_files。它是一个字典,键为hash_value(仅40位hash),值为该torrent包含的文件名列表(无类型后缀和路径前缀)"""
        log = self.log
        message = "正在建立hash与文件关系..."
        log_print(log, 'info', message)
        torrents_files = {}
        p_torrent = re.compile('.torrent$', flags=re.I)
        for torrent_dir in TORRENT_DIRS:
            torrent_progress = 1
            for dirpath, dirnames, filenames in os.walk(torrent_dir):
                for filename in filenames:
                    if re.search(p_torrent, filename):
                        abs_torrent_path = os.path.join(dirpath, filename)
                        torrent = t_p.torrent(abs_torrent_path)
                        for i in range(len(torrent.files)):
                            torrent.files[i] = os.path.splitext(
                                torrent.files[i])[0]
                        torrents_files[torrent.hash_value] = torrent.files
                        torrent_progress += 1
                        print(
                            f'\rtorrent目录{torrent_dir}:{torrent_progress}/{len(filenames)}',
                            end='')
        message = "已建立hash与文件关系"
        log_print(log, 'info', message, print_prefix='\n')
        return torrents_files

    def process_videos(self):
        """进入VIDEO_DIR,处理包含的视频文件。"""
        self.movie_no, self.code, self.full_name = 0, "unknown", "unknown"
        dirpath, dirnames, filenames = next(
            os.walk(VIDEO_DIR))  #只进入第一层,也就是说默认一个torrent对应第一层的一个文件或文件夹
        for filename in filenames:
            self.__process_videos_main(dirpath, filename, "file")
            print('\n', end='')
        for dirname in dirnames:
            self.__process_videos_main(dirpath, dirname, "dir")
            print('\n', end='')

    def execute_mysql_query_command(self, command: str):
        """执行mysql查询语句。由于用到多次,所以写成一个方法
        :param command: 查询语句"""
        try:
            self.m_cur.execute(command)
        except:
            message = f'mysql语句出错:{command}。已暂停'
            log_print(self.log, 'error', message)
            input()

    #因为对文件和文件夹的处理差不多,所以写一个函数以便复用代码
    def __process_videos_main(self,
                              dirpath,
                              file_dir_name,
                              file_or_dir: str = "file"):
        """这个函数为处理的主体。
        :param dirpath: dirpath,用os.walk()得到,直接传入
        :param file_dir_name: 文件或文件夹名
        :param file_or_dir: file_dir_name是文件还是文件夹。可取值 "file" 或 "dir" (因为有可能文件和文件夹重名,还是需要这个参数)
        :详细的选择分支图见注释"""
        m_cur, log = self.m_cur, self.log
        movie_found = False
        abs_file_dir_path = os.path.join(dirpath, file_dir_name)
        if file_or_dir == "file":
            filenames_without_suffix = [os.path.splitext(file_dir_name)[0]]
        elif file_or_dir == "dir":
            filenames_without_suffix = []
            for dirpath_temp, dirnames_temp, filenames_temp in os.walk(
                    abs_file_dir_path):
                for filename_temp in filenames_temp:
                    filename_without_suffix = os.path.splitext(
                        filename_temp)[0]
                    filenames_without_suffix.append(filename_without_suffix)
        else:
            raise Exception(
                'file_to_sql.video_file_processor.__process_videos_main错误:file_or_dir参数只应当取 "file" 或 "dir"'
            )
        filenames_without_suffix = set(filenames_without_suffix)
        #分支图:
        #---遍历torrents_files
        #|__发现包含该文件(或该文件夹下所有文件)的torrent
        #   |__查找torrent表中有无记录该hash
        #       |__无记录,则查找code或full_name
        #       |__有1条记录:
        #           |__对应的movie_no为0(默认值),则查找code或full_name
        #           |__对应的movie_no非0,则在movie表中查找该no
        #               |__有1条记录,则请求用户输入
        #                   |__按下Enter,表示使用该movie_no入库
        #                   |__输入任意字符,则查找code或full_name
        #               |__有0条记录,则查找code或full_name
        #               |__其余情况,说明movie表有错误(不可能发生)
        #       |__有多条记录,说明torrent表有错误(几乎不可能发生)
        #|__未发现对应的torrent,则查找code或full_name
        for k, v in self.torrents_files.items():
            if filenames_without_suffix.issubset(set(v)):
                message = f'找到包含{file_dir_name}的hash'
                log_print(log, 'info', message)
                command_torrent = f'SELECT movie_no FROM torrent WHERE hash_value="{k}";'
                self.execute_mysql_query_command(command_torrent)
                results = m_cur.fetchall()
                if len(results) == 0:
                    message = f'{file_dir_name}对应的hash在torrent表中无记录,查找code或full_name'
                    log_print(log, 'info', message)
                    break
                elif len(results) == 1:
                    movie_no = int(results[0][0])
                    if movie_no == 0:
                        message = f'{file_dir_name}对应的hash在torrent表中有记录,但没有对应的movie_no。查找code或full_name'
                        log_print(log, 'info', message)
                        break
                    command_movie = f'SELECT code,full_name FROM movie WHERE no={movie_no};'
                    self.execute_mysql_query_command(command_movie)
                    results = m_cur.fetchall()
                    if len(results) == 1:
                        code, full_name = results[0]
                        message = f'{file_dir_name}对应的hash在torrent表中存在,对应的movie_no={movie_no},code={code},full_name={full_name}'
                        log.info(message)
                        affirm = input(message +
                                       '\n确认请按下Enter键,或者输入任意字符来查找movie_no:')
                        if affirm == "":
                            self.movie_no = movie_no
                            movie_found = True
                            break
                        else:
                            break
                    elif len(results) == 0:
                        message = f'{file_dir_name}对应的hash在torrent表中存在、有movie_no,但没有对应的movie记录。查找code或full_name'
                        log_print(log, 'info', message)
                        break
                    else:
                        message = f'出现错误:{file_dir_name}对应的hash在torrent表中存在,但对应的movie记录有{len(results)}条(movie_no出现多次)。已暂停'
                        log_print(log, 'warning', message)
                        input()
                else:
                    message = f'出现错误:{file_dir_name}对应的hash在torrent表中存在,但有{len(results)}条记录(hash_value出现多次)。已暂停'
                    log_print(log, 'warning', message)
                    input()
                break
        if not movie_found:
            self.find_movie_no(abs_file_dir_path, file_or_dir)
        else:
            self.into_sql(abs_file_dir_path)

    def find_movie_no(self, abs_file_dir_path: str, file_or_dir: str = "file"):
        """通过code或full_name为一个单独的文件或文件夹找到movie_no编号,或新建一个movie_no。需要手动确认code或full_name
        :param abs_file_dir_path: 文件或文件夹的绝对路径
        :param file_or_dir: abs_file_dir_path是文件还是文件夹。可以取 "file" 或 "dir"
        :会将类的code、full_name、movie_no进行修改;最后一定会调用into_sql方法"""

        def integrate_multi_percent_into_one(query_str: str) -> str:
            """将字符串中紧挨着的多个百分号整合为一个。用到2次,所以把这段代码写成函数
            :param query_str: 用于mysql的LIKE查询的字符串
            :return 处理后的string"""
            while (re.search(r'%{2,}', query_str)):
                multi_percent = re.findall(r'%{2,}', query_str)
                multi_percent = list(set(multi_percent))
                multi_percent.sort(key=lambda x: len(x), reverse=True)
                for i in multi_percent:
                    query_str = query_str.replace(i, '%')
            return query_str

        m_cur, log = self.m_cur, self.log
        file_dir_name = os.path.basename(abs_file_dir_path)

        #先试着通过code确认movie_no
        code = video_file_processor.get_code(file_dir_name, file_or_dir)
        if isinstance(code, str):
            #当能提取到code时,请求确认
            affirm = input(
                f"为{file_dir_name}自动匹配的code为{code},确认请直接按下Enter,否则请手动输入code:")
            code = (affirm.upper(), code)[affirm == ""]
        elif code == None:
            #当不能提取到code时,需要手动输入
            affirm = input(
                f"无法为{file_dir_name}自动匹配code,将尝试用full_name确认编号。按下Enter确认,否则请手动输入code:"
            )
            code = (affirm.upper(), 'unknown')[affirm == ""]
        #流程图
        #------------------------------------------------------循环
        #|__code=="unknown",表示用code无法找到movie_no,跳出循环
        #|__code为其他字符串,表示疑似code,到movie表中查找
        #   |__无法根据code精确找到movie_no,则使用模糊查找(对code做一定处理),并输出找到的条目,请求用户输入
        #       |__直接按下Enter,说明用code无法找到movie_no,跳出循环
        #       |__输入y,确认现在的code,根据该code新建一条movie记录
        #       |__输入其他字符串,表示使用该字符串作为code,将会continue
        #   |__可以根据code精确找到1条movie_no,则输出该条目的信息,请求用户输入
        #       |__输入Enter,表示确认,将会进行入库
        #       |__输入n,说明用code无法找到movie_no,跳出循环
        #       |__输入其他字符串,表示使用该字符串作为code,将会continue
        #   |__可以根据code精确找到多条movie_no,则输出所有,并请求用户输入
        #       |__输入Enter,表示使用movie_no最小的一条进行入库
        #       |__输入一个数字,进入循环
        #           |__该数字是一个movie_no,表示选择该movie_no进行入库
        #           |__该数字不是一个movie_no,则继续要求输入
        #               |__输入一个数字,返回上述循环验证
        #               |__输入非纯数字的字符串,则以该字符串作为code,continue最外层循环
        #       |__输入n,说明用code无法找到movie_no,跳出循环
        #       |__输入其他字符串,表示使用该字符串作为code,将会continue
        while (True):
            if code == "unknown":
                self.code = "unknown"
                break
            elif isinstance(code, str) and code != "unknown":
                #code可能存在时,在mysql中查找
                #若找不到,用LIKE模糊查找给出近似结果;若能找到,则返回
                command_movie = f'SELECT no,code,full_name FROM movie WHERE code="{code}";'
                self.execute_mysql_query_command(command_movie)
                results = m_cur.fetchall()
                if len(results) == 0:
                    #将多个 0 替换为 % ,在头尾也加 % ,用LIKE模糊查询
                    code_like = code.replace('0', '%')
                    code_like = '%' + code_like + '%'
                    code_like = integrate_multi_percent_into_one(code_like)
                    command_movie = f'SELECT no,code FROM movie WHERE code LIKE "{code_like}";'
                    self.execute_mysql_query_command(command_movie)
                    results = m_cur.fetchall()
                    codes_list = [i[1] for i in results]
                    affirm = input(
                        f'在movie中没有完全相同的code({code}),模糊查找的code有:\n{codes_list}\n按下Enter将尝试用full_name确认编号,或者输入y确认现在的code,或者输入其他code:'
                    )
                    if affirm == "":
                        self.code = "unknown"
                        break
                    elif affirm == "y":
                        self.movie_no = 0
                        self.code = code
                        self.into_sql(abs_file_dir_path)
                        return
                    else:
                        code = affirm.upper()
                        continue
                elif len(results) == 1:
                    affirm = input(
                        f'查找到一条记录:code={results[0][1]},full_name={results[0][2]},\n按下Enter以确认,或者输入n来尝试用full_name确认编号,或者输入一个其他的code:'
                    )
                    if affirm == "":
                        self.movie_no = int(results[0][0])
                        self.into_sql(abs_file_dir_path)
                        return
                    elif affirm == "n":
                        self.code = "unknown"
                        break
                    else:
                        code = affirm.upper()
                        continue
                else:
                    message = f'通过code={code}在movie表中查找到{len(results)}条记录,这可能是一个错误。'
                    log_print(log,
                              'warning',
                              message,
                              print_suffix='查找到的记录如下:(按 no,code,full_name 输出)')
                    for i in results:
                        print(f'{i[0]} , {i[1]} , {i[2]}')
                    affirm = input(
                        '按下Enter表示选择最小的一个movie_no作为编号,或者输入一个存在的no表示选择该编号,或者输入n来尝试用full_name确认编号,或者输入其他code:'
                    )
                    if affirm == "":
                        min_movie_no = int(results[0][0])
                        for i in results:
                            movie_no = int(i[0])
                            if min_movie_no > movie_no:
                                min_movie_no = movie_no
                        self.movie_no = min_movie_no
                        self.into_sql(abs_file_dir_path)
                        return
                    elif re.search(r'^\d+$', affirm):
                        while (re.search(r'^\d+$', affirm)):
                            movie_no = int(affirm)
                            if movie_no not in [int(i[0]) for i in results]:
                                affirm = input(
                                    '输入的movie_no不在上面列出项中,请重新输入一个no,或者输入其他code(非纯数字):'
                                )
                            else:
                                self.movie_no = int(affirm)
                                self.into_sql(abs_file_dir_path)
                                return
                        code = affirm.upper()
                        continue
                    elif affirm == 'n':
                        self.code = "unknown"
                        break
                    else:
                        code = affirm.upper()
                        continue

        #然后试着通过full_name确认movie_no
        full_name = os.path.splitext(file_dir_name)[0]
        matched_results = []
        #分词查找,每次查询结果转为set存储在matched_results中,最后统计出现次数,根据匹配次数从高到低输出
        ranked_matched_movie = []  #每个元素为:[(no, full_name), 出现次数]
        affirm = input(
            f'将以文件(夹)名作为full_name({full_name}),按下Enter确认,输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入一个full_name:'
        )
        if affirm == "":
            pass
        elif affirm == "n":
            self.movie_no = 0
            self.full_name = "unknown"
            self.into_sql(abs_file_dir_path)
            return
        else:
            full_name = affirm
        #流程图
        #------初始的full_name就是文件名(无后缀)/文件夹名,请求用户输入
        #   |__按下Enter,表示认可该full_name
        #   |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
        #   |__输入其他字符串,表示以该字符串作为full_name继续
        #------------------------------------------------------循环
        #根据full_name得到一个keywords列表,对每个keyword分别进行查询。每次查询的结果非空时,放到matched_results列表中。
        #|__若matched_results列表为空,说明通过所有的keyword都不能查询到,则请求用户输入
        #   |__按下Enter,确认当前的full_name,用它新建一条movie记录
        #   |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
        #   |__输入其他字符串,表示将full_name替换为该字符串,并continue
        #|__若matched_results列表非空,则统计每个movie条目出现的次数,从高到低进行排序得到ranked_matched_movie。进入循环:
        #   输出前10条,请求用户输入
        #       |__按下Enter,以该full_name创建新的movie条目
        #       |__输入一个数字,进入循环
        #           |__该数字是一个存在的movie_no,则使用该no进行入库
        #           |__该数字不是一个存在的movie_no,则继续要求输入
        #               |__输入一个数字,返回上述循环验证
        #               |__输入非纯数字的字符串,则以该字符串作为full_name、回到最外层循环
        #       |__输入n,表示放弃用full_name寻找movie_no,将把文件(夹)放到VIDEO_DIR_UNKNOWN中
        #       |__输入all,显示ranked_matched_movie的全部条目并continue
        #       |__输入其他字符串,则以该字符串作为full_name、回到最外层循环
        while (True):
            words = self.get_keywords(full_name)
            for i in range(len(words)):
                #在头尾也加 % ,用LIKE模糊查询
                word_like = '%' + words[i] + '%'
                word_like = integrate_multi_percent_into_one(word_like)
                command_movie = f'SELECT no,full_name FROM movie WHERE full_name LIKE "{word_like}";'
                self.execute_mysql_query_command(command_movie)
                results = m_cur.fetchall()
                if len(results) != 0:
                    matched_results.append(set(results))
            if len(matched_results) == 0:
                affirm = input(
                    f'full_name={full_name}没有匹配结果。按下Enter以该full_name新建一条movie记录,输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入其他值来重新输入full_name:'
                )
                if affirm == "":
                    self.movie = 0
                    self.full_name = full_name
                    self.into_sql(abs_file_dir_path)
                    return
                elif affirm == "n":
                    self.movie = 0
                    self.full_name = "unknown"
                    self.into_sql(abs_file_dir_path)
                    return
                else:
                    full_name = affirm
                    continue
            else:
                for matched_result in matched_results:
                    for match in matched_result:
                        try:
                            exists_index = [
                                i[0] for i in ranked_matched_movie
                            ].index(match)
                        except ValueError:
                            exists_index = -1
                        if exists_index == -1:
                            ranked_matched_movie.append([(match), 1])
                        else:
                            ranked_matched_movie[exists_index][1] += 1
                ranked_matched_movie.sort(key=lambda x: x[1], reverse=True)
                show_all = False
                while (True):
                    print(
                        f'根据full_name={full_name}查找到以下可能匹配的记录:(按 no , 出现次数 , full_name 输出)'
                    )
                    for i in range(len(ranked_matched_movie)):
                        if not show_all:
                            if i >= 10:
                                break
                        print(
                            f'{ranked_matched_movie[i][0][0]} , {ranked_matched_movie[i][1]} , {ranked_matched_movie[i][0][1]}'
                        )
                    if not show_all:
                        affirm = input(
                            '按下Enter以full_name创建一条记录,或者从以上项目中挑选一个no输入,或者输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入all以显示全部匹配,或者输入其他字符串来作为full_name:'
                        )
                    else:
                        affirm = input(
                            '按下Enter以full_name创建一条记录,或者从以上项目中挑选一个no输入,或者输入n以跳过这个文件(夹)并将它放入VIDEO_DIR_UNKNOWN中,或者输入其他字符串(非all)来作为full_name:'
                        )
                    if affirm == "":
                        self.movie = 0
                        self.full_name = full_name
                        self.into_sql(abs_file_dir_path)
                        return
                    elif re.search(r'^\d+$', affirm):
                        while (re.search(r'^\d+$', affirm)):
                            movie_no = int(affirm)
                            if movie_no not in [
                                    int(i[0][0]) for i in ranked_matched_movie
                            ]:
                                affirm = input(
                                    '输入的movie_no不在上面列出项中,请重新输入一个no,或者输入非纯数字字符串来作为full_name:'
                                )
                            else:
                                self.movie_no = int(affirm)
                                self.into_sql(abs_file_dir_path)
                                return
                        matched_results = []
                        ranked_matched_movie = []
                        full_name = affirm
                        break
                    elif affirm == "n":
                        self.movie = 0
                        self.full_name = "unknown"
                        self.into_sql(abs_file_dir_path)
                        return
                    elif affirm == "all":
                        show_all = True
                        continue
                    else:
                        matched_results = []
                        ranked_matched_movie = []
                        full_name = affirm
                        break

    @staticmethod
    def get_code(file_dir_name: str, file_or_dir: str = "file"):
        """根据文件或文件夹名得到对应的code。
        :param file_dir_name: 文件名或文件夹名
        :param file_or_dir: 表示file_dir_name为文件(为文件时会尝试去掉后缀名)还是文件夹。可取值 "file" 或 "dir"
        :return string,表示可能的code;或者返回None,表示无法找到code"""
        if file_or_dir == "file":
            file_dir_name = os.path.splitext(file_dir_name)[0]

        p_fc2 = re.compile(r'fc2[-_]{0,1}(ppv){0,1}[-_]{0,1}\d{7}', re.I)
        code = re.search(p_fc2, file_dir_name)
        if code:
            code = code.group()
            code = code.replace('_', '-')
            p_ppv = re.compile(r'ppv', re.I)
            if re.search(p_ppv, code):
                start, end = re.search(p_ppv, code).span()
                code = code[:start] + code[end:]
            code = code.replace('--', '-')
            if '-' not in code:
                code = code[:3] + '-' + code[3:]
            return code

        p_123456_789 = re.compile(r'\d{6}[-_]{1}\d{3}')
        code = re.search(p_123456_789, file_dir_name)
        if code:
            code = code.group()
            code = code.replace('_', '-')
            return code

        p_code = re.compile(r'\d*[a-zA-Z]{2,6}[-_]{0,1}\d+')
        code = re.search(p_code, file_dir_name)
        if isinstance(code, re.Match):
            code = code.group()
            code = code.replace('_', '-')
            if '-' not in code:
                start, end = re.search('[a-zA-Z]{2,6}[^-]', code).span()
                code = code[:end - 1] + '-' + code[end - 1:]
            return code

        return None

    @staticmethod
    def get_keywords(full_name: str):
        """根据full_name得到关键字列表(list[str]),该列表中的每项字符串都会用于mysql查询。
        :param full_name: 文件名(无后缀)或文件夹名
        :return list[str]"""
        keywords_1 = re.findall(r'[a-zA-Z]+', full_name)
        keywords_2 = re.findall(r'\d+', full_name)
        keywords = list(set(keywords_1).union(keywords_2))
        if len(keywords) == 0:
            print('full_name中不含英文单词或数字,将作为一个整体在mysql中模糊查找')
            keywords = [full_name]
        print(f'对{full_name}提取到的关键词:\n{keywords}')
        return keywords

    def into_sql(self, abs_file_dir_path: str):
        """根据movie_no、code、full_name分情况处理。
        :首先移动文件(夹)到合适的位置,然后将其入库(修改movie表中的downloaded和file_path)
        :对于跳过的文件(夹),则只是转移到VIDEO_DIR_UNKNOWN
        :param abs_file_dir_path: 绝对路径"""
        m_cur = self.m_cur
        print(self.movie_no, self.code, self.full_name)
        if self.movie_no == 0 and self.code == "unknown" and self.full_name == "unknown":
            print('跳过该文件(夹)')
        elif self.movie_no == 0:
            print(f'以code={self.code}、full_name={self.full_name}新建一条记录')
        elif self.movie_no != 0:
            command_movie = f'SELECT code,full_name FROM movie WHERE no={self.movie_no};'
            self.execute_mysql_query_command(command_movie)
            code, full_name = m_cur.fetchall()[0]
            print(
                f'将对已存在的{self.movie_no}号记录进行修改(code={code},full_name={full_name})'
            )
        self.movie_no, self.code, self.full_name = 0, "unknown", "unknown"

    def main(self):
        """执行入口"""
        self.process_videos()


if __name__ == "__main__":
    v_f_p = video_file_processor().main()
    input('入库完成')

这里的 video_file_processor.into_sql 方法还没有完成,可以根据自己的需要调整。另外, get_code 和 get_keywords 两个方法也可以根据需要调整。

另外,在import时,除了前文提到的自建库,还可以看到这样两行:

from MyPythonLib.log import log_print
from MyPythonLib import torrent_parser as t_p

这个 log_print 是自己写的一个小函数,用来进行一些既要log又要print的信息,内容如下:

def log_print(log: logging.Logger,
              log_type: str,
              message: str,
              print_prefix: str = "",
              print_suffix: str = ""):
    """同时log和print。
    
    :param log: Logger对象
    :param log_type: Logger的级别
    :param message: 输出的信息
    :param print_prefix: print时需要在message开头加的字符串
    :param print_suffix: print时需要在message末尾加的字符串
    :return None"""
    try:
        if log_type == 'debug':
            log.debug(message)
        elif log_type == 'info':
            log.info(message)
        elif log_type == 'warning':
            log.warning(message)
        elif log_type == 'error':
            log.error(message)
        elif log_type == 'critical':
            log.critical(message)
        message = print_prefix + message + print_suffix
        print(message)
    except:
        raise Exception('error: MyPythonLib.log.log_print failed ')

torrent_parser则是解析torrent文件的一个自建库,内容如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import collections, hashlib


def bencode(elem):
    if type(elem) == str:
        elem = str.encode(elem)

    if type(elem) == bytes:
        result = str.encode(str(len(elem))) + b":" + elem
    elif type(elem) == int:
        result = str.encode("i" + str(elem) + "e")
    elif type(elem) == list:
        result = b"l"
        for item in elem:
            result += bencode(item)
        result += b"e"
    elif type(elem) in [dict, collections.OrderedDict]:
        result = b"d"
        for key in elem:
            result += bencode(key) + bencode(elem[key])
        result += b"e"
    return result


def bdecode(bytestr, recursiveCall=False):
    startingChars = dict({b"i": int, b":": str, b"l": list, b"d": dict})
    digits = [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9"]

    started = ended = False
    curtype = None

    numstring = b""  # for str, int
    result = None  # for list, dict
    key = None  # for dict

    while len(bytestr) > 0:
        # reading and popping from the beginning
        char = bytestr[:1]

        if not started:

            bytestr = bytestr[1:]

            if char in digits:
                numstring += char

            elif char in startingChars:

                started = True
                curtype = startingChars[char]

                if curtype == str:
                    size = int(bytes.decode(numstring))
                    # try to decode strings
                    try:
                        result = bytes.decode(bytestr[:size])
                    except UnicodeDecodeError:
                        result = bytestr[:size]
                    bytestr = bytestr[size:]
                    ended = True
                    break

                elif curtype == list:
                    result = []

                elif curtype == dict:
                    result = collections.OrderedDict()
            else:
                raise ValueError("Expected starting char, got ‘" +
                                 bytes.decode(char) + "’")

        else:  # if started

            if not char == b"e":

                if curtype == int:
                    bytestr = bytestr[1:]
                    numstring += char

                elif curtype == list:
                    item, bytestr = bdecode(bytestr, recursiveCall=True)
                    result.append(item)

                elif curtype == dict:

                    if key == None:
                        key, bytestr = bdecode(bytestr, recursiveCall=True)

                    else:
                        result[key], bytestr = bdecode(bytestr,
                                                       recursiveCall=True)
                        key = None

            else:  # ending: char == b"e"
                bytestr = bytestr[1:]
                if curtype == int:
                    result = int(bytes.decode(numstring))
                ended = True
                break
    if ended:
        if recursiveCall:
            return result, bytestr
        else:
            return result
    else:
        raise ValueError("String ended unexpectedly")


class torrent():
    '''解析种子文件。'''

    def __init__(self, abs_torrent_file_path: str) -> None:
        ''':param abs_torrent_file_path: 种子文件的绝对路径'''
        self.abs_torrent_file_path = abs_torrent_file_path
        bytes_stream = ''
        with open(abs_torrent_file_path, 'rb') as f:
            bytes_stream = f.read()
        self.metadata = bdecode(bytes_stream)
        encodedInfo = bencode(self.metadata["info"])
        self.hash_value = hashlib.sha1(encodedInfo).hexdigest().upper()
        self.files = []
        if 'name' in self.metadata['info'].keys(
        ) and 'files' not in self.metadata['info'].keys():
            self.files.append(self.metadata['info']['name'])
        elif 'files' in self.metadata['info'].keys():
            for i in self.metadata['info']['files']:
                self.files.append(i['path'][0])


if __name__ == '__main__':
    pass

为了能读取torrent文件、从torrent文件得到 正确的 40位的hash,笔者翻了很多教程,中文互联网上到处乱抄、有很多人讲课也是乱讲的,最后还是在 这个github项目 里得到了正确答案。

4.ffmpeg加速

实际运行 vimg_processor.py 脚本时,可能会觉得视频处理速度慢。这个问题有两种解决方法:使用显卡加速,或者多台电脑同时压缩视频。但在继续阅读之前,你需要先查看一下运行 vimg_processor.py 脚本时存放压缩视频的文件夹所在的那块硬盘的读写速度,比如在windows上可以打开 任务管理器 看看那块硬盘的读写速度是否已经达到极限(一般的机械硬盘大概150MB/s,固态硬盘则可达到1~1.5GB/s)。

(1)显卡加速

如果对ffmpeg有些了解,分析一下上文中的那条ffmpeg命令:

ffmpeg -threads 8 -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"

会发现,这样的命令只是调用了CPU、不调用GPU或其他计算的硬件,好处是可以在运行ffmpeg的同时玩一些需要显卡的游戏(但是ffmpeg即使调用显卡也未必占用很多),坏处是可能比较慢。

实际上,ffmpeg调用CPU处理视频的过程称为软编码(输出视频)、软解码(读取视频),调用GPU或其他计算硬件处理则是硬编码、硬解码。CPU和GPU在运算上的区别主要是CPU擅长处理多种性质不同的运算(加减乘除,逻辑运算等),而GPU擅长的是计算乘法和,比如 a1*b1 + a2*b2 + a3*b3 ... 这种(因为显卡输出画面时需要处理大量类似的运算),这样的区别也决定了GPU用来做深度学习一般更快。你可以这样理解:CPU软编解码,类似于人手,你可以用手拿起扇子扇风,这样的效率比较低,但是手能处理多种复杂的任务;GPU硬编解码,类似于用电风扇吹风,电风扇只能用来吹风(它在硬件层面就是为了吹风而设计的),但是它吹风的效率远非用手扇风可比。

回到ffmpeg上,用GPU加速ffmpeg首先要确定ffmpeg支持的显卡驱动和你的显卡型号。
ffmpeg -hwaccels 可以查看ffmpeg支持的加速方式。如果你用的是windows电脑,并且像前文一样从ffmpeg官网直接下载windows平台的压缩包来安装ffmpeg,那么多半会看到以下输出:

......
Hardware acceleration methods:
cuda
dxva2
qsv
d3d11va
opencl
vulkan

熟悉显卡的朋友应该知道,cuda是英伟达NVIDIA推出的针对自家产品的一种架构,cuda实现了一些CPU的指令集,这样你就可以把GPU当成CPU来用(虽然有些问题用GPU很慢);dxva2则是微软推出的。总之,如果你用英伟达显卡,就去英伟达官网下显驱;如果用AMD显卡,就去AMD官网下显卡驱动。

装好驱动后,把压缩命令加上三个参数:硬件加速方式,解码器,编码器。比如用英伟达显卡,就是:

ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"

对应的是 -hwaccel cuvid (加速方式), -c:v h264_cuvid (硬解码器)和 -c:v h264_nvenc (硬编码器)。
如果用AMD显卡,可以这样写:

ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"

ffmpeg对AMD显卡没有实现硬解码器,所以只能软解码。对应的是 -hwaccel dxva2 (加速方式), -c:v h264_amf (硬编码器)。
最后,你还可以查看ffmpeg支持的编解码器,如果只限定h264的话,在cmd中输入:ffmpeg -encoders | findstr "h264"ffmpeg -decoders | findstr "h264" 或者 ffmpeg -codecs | findstr "h264" 即可。

笔者的CPU主频3.6GHz,AMD显卡,只用CPU软解编码时占用CPU 70-80%,GPU 0%;用GPU加速后CPU占用30%,GPU占用10%。对于一段40min、比特率12000kb/s的视频片段,以pad标准压缩,前者花费10.98min,后者花费4.58min;不足之处是后者生成的文件稍大,大概是前者的7/6,也就是多了1/6,实际上硬编码生成的文件都比软编码的大。如果对文件大小敏感的话,可以参考下一种方法进行压缩加速。

(2)多电脑加速

如果一台电脑不够用,你还可以选择用多台电脑同时对一个文件夹内的视频进行处理。首先需要共享这个文件夹(NAS),比如用笔者其他文章中提到的Samba共享出来,然后修改一下 vimg_processor.py ,目标是能在多台电脑上对同一个文件夹内的视频进行编解码(需要解决冲突问题,笔者的解决方法是在直连NAS的电脑上运行一个redis数据库,来存储不同状态的视频),并且能够根据电脑的配置调整ffmpeg命令。

但是,使用NAS、多电脑之前,先看一下网速和硬盘读写是否已达上限。如果所有电脑都在同一个内网路由器下,且是千兆路由器,那么NAS读写速度大概最高110~120MB/s之间,看一下直连分享NAS的机器在每台电脑压缩时进出网速是多少,按笔者的经验一般一台台式机,压缩时接收的流量更多(因为是压缩),需要30~60 Mbps,也即3.75~7.5 MB/s(笔记本根据配置不同可能在1~5 MB/s),所以可以多加几台电脑。另外,实际的机械硬盘读写速度在150MB/s左右,减去110~120MB/s还有30~40MB/s的读写速度只能被直连NAS的机器使用,有条件的可以用更高速的路由器。

然后开始编写脚本。首先在直连NAS的电脑上编写一个python脚本,比如叫 vimg_update_redis.py ,负责更新redis、移动文件(所有 #需要更改 的前一行是可能需要自定义的地方):

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, re, traceback, redis, platform
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis

NAS_mounted_path = '/xxx/xxx/xxx/xxx/'  # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join('/xxx/xxx/logs')
#需要更改
if not os.path.isdir(s=log_path):
    os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
                       log_file_name='vimg_update_redis.log')
redis_key_set_todo = 'vimg_processor:set:todo'  # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong'  #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:'  # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:'  # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
sys_kind = platform.system()


def get_suffix(filename: str) -> str:
    '''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
    :param filename: 文件名(可带路径前缀)
    :return str,文件后缀名(不含 . )'''
    suffix = os.path.splitext(filename)[-1]
    return (suffix[1:], '')[suffix == '']


def is_video(filename: str) -> bool:
    '''根据后缀名判断文件是否视频。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是视频,False表示非视频'''
    video_suffix_set = {
        "WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
        "M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
    }
    #需要更改
    #iso文件不一定能用ffmpeg打开,可以自行取舍
    return (False, True)[get_suffix(filename).upper() in video_suffix_set]


def is_img(filename: str) -> bool:
    '''根据后缀名判断文件是否图片。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是图片,False表示非图片'''
    img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
    #需要更改
    return (False, True)[get_suffix(filename).upper() in img_suffix_set]


def replace_path_seperator(path: str, sys_kind: str) -> str:
    '''根据操作系统,替换路径中的分隔符。
    :param path: 需要替换的路径
    :param sys_kind: 目标操作系统
    :return str,替换好的路径'''
    if sys_kind == 'Windows':
        return path.replace('/', '\\')
    elif sys_kind == 'Linux':
        return path.replace('\\', '/')


def is_origin(filename: str) -> bool:
    '''接受文件名,判断这个文件是源文件还是压缩后的文件。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示该文件为源文件'''
    prefix = os.path.splitext(filename)[0]
    return (True, False)[re.search(r'#test#$', prefix) != None]


def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
    '''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
    :由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
    :将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
    :param abs_path: 文件在本机的绝对路径
    :return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
    global NAS_mounted_path
    redis_rel_path = abs_path[len(NAS_mounted_path):]
    return replace_path_seperator(redis_rel_path, sys_kind='Linux')


def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
    '''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
    :将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
    :param redis_rel_path: redis中存储的相对路径
    :return str,文件在本机中的绝对路径'''
    global NAS_mounted_path, sys_kind
    abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
    return replace_path_seperator(abs_path, sys_kind=sys_kind)


def main():
    '''更新文件信息。
    :对于redis_key_set_wrong中记录的文件,把它们放到单独的目录里(压缩错误);
    :对于redis_key_str_ongoing系列的文件,不作处理;
    :对于redis_key_str_done系列的文件,把它们放到单独的目录里(压缩完);
    :检查 压缩中 目录下的所有文件,如果在另外三个键中没有记录,把它放到redis_key_set_todo里;
    :检查 redis_key_set_todo 中记录的文件,如果不存在则删除'''
    global NAS_mounted_path, r_cur, log, sys_kind
    global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
    # 对于redis_key_set_wrong中记录的文件,把它们放到单独的目录里
    # 对于单独文件,直接剪切;对于文件夹,则尝试新建相同的目录、剪切
    while r_cur.scard(redis_key_set_wrong) != 0:
        try:
            redis_rel_path = r_cur.spop(redis_key_set_wrong, count=1)[0]
            src_abs_path = replace_path_seperator(redis_rel_path,
                                                  sys_kind=sys_kind)
            src_abs_path = os.path.join(NAS_mounted_path, src_abs_path)
            # 将中间的文件夹替换
            dst_abs_path = src_abs_path.replace('压缩中', '压缩错误', 1)
            #需要更改
            # 获得除了文件名以外,前面的目录路径
            basename = os.path.basename(src_abs_path)
            pre_dirs = dst_abs_path[:-len(basename)]
            if not os.path.isdir(pre_dirs):
                os.makedirs(pre_dirs)
            os.rename(src_abs_path, dst_abs_path)
            msg = f'redis_key_set_wrong: 将{src_abs_path}移动为{dst_abs_path}'
            mylog.log_print(log, 'info', msg)
        except:
            r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            msg = f'失败 redis_key_set_wrong: 移动{redis_rel_path}出错'
            mylog.log_print(log, 'error', msg)
            except_info = traceback.format_exc()
            mylog.log_print(log, 'error', except_info)
            break

    # 对于redis_key_str_done系列的文件,把它们放到单独的目录里
    # 对于单独文件,直接剪切;对于文件夹,则尝试新建相同的目录、剪切
    redis_rel_path_keys = r_cur.keys(redis_key_str_done + '*')
    for redis_rel_path_key in redis_rel_path_keys:
        try:
            # 获得压缩前后的源文件名
            redis_rel_path_origin = redis_rel_path_key[len(redis_key_str_done
                                                           ):]
            redis_rel_path_compressed = r_cur.get(redis_rel_path_key)
            src_abs_path_origin = replace_path_seperator(redis_rel_path_origin,
                                                         sys_kind=sys_kind)
            src_abs_path_origin = os.path.join(NAS_mounted_path,
                                               src_abs_path_origin)
            src_abs_path_compressed = replace_path_seperator(
                redis_rel_path_compressed, sys_kind=sys_kind)
            src_abs_path_compressed = os.path.join(NAS_mounted_path,
                                                   src_abs_path_compressed)
            # redis中存储的相对路径都是 压缩中
            # 只有运行vimg_processor_multi中的delete_test时,会删除done系列键
            # 有时来不及运行delete_test,这些剩余的done键不动,这里直接continue
            if not os.path.isfile(src_abs_path_origin):
                continue
            # 获得压缩前后的目标文件名
            dst_abs_path_origin = src_abs_path_origin.replace('压缩中', '压缩完', 1)
            #需要更改
            dst_abs_path_compressed = src_abs_path_compressed.replace(
                '压缩中', '压缩完', 1)
            #需要更改
            # 获得除了文件名以外,前面的目录路径
            basename = os.path.basename(src_abs_path_origin)
            pre_dirs = dst_abs_path_origin[:-len(basename)]
            if not os.path.isdir(pre_dirs):
                os.makedirs(pre_dirs)
            os.rename(src_abs_path_origin, dst_abs_path_origin)
            os.rename(src_abs_path_compressed, dst_abs_path_compressed)
            # r_cur.delete(redis_rel_path_key) 不要删除redis_key_str_done系列键,这些键在运行delete_test时删除
            msg = f'redis_key_str_done: 移动{src_abs_path_origin}为{dst_abs_path_origin},\n移动{src_abs_path_compressed}为{dst_abs_path_compressed}'
            mylog.log_print(log, 'info', msg)
        except:
            r_cur.set(redis_rel_path_key, redis_rel_path_compressed)
            msg = f'失败 redis_key_str_done: 移动{redis_rel_path_origin}和{redis_rel_path_compressed}时出错'
            mylog.log_print(log, 'error', msg)
            except_info = traceback.format_exc()
            mylog.log_print(log, 'error', except_info)
            break

    # 检查 压缩中 目录下的所有文件,如果在四个键中都没有记录,把它放到redis_key_set_todo里
    dir_compressing = replace_path_seperator('xxx/xxx/压缩中', sys_kind=sys_kind)
    #需要更改
    dir_compressing = os.path.join(NAS_mounted_path, dir_compressing)
    for dirpath, dirnames, filenames in os.walk(dir_compressing):
        for filename in filenames:
            # 跳过压缩后的文件
            if not is_origin(filename):
                continue
            abs_path = os.path.join(dirpath, filename)
            if (is_video(abs_path)
                    or is_img(abs_path)) and os.path.isfile(abs_path):
                redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
                condition = not r_cur.sismember(redis_key_set_todo,
                                                redis_rel_path)
                condition = condition and not r_cur.sismember(
                    redis_key_set_wrong, redis_rel_path)
                condition = condition and r_cur.exists(redis_key_str_ongoing +
                                                       redis_rel_path) == 0
                condition = condition and r_cur.exists(redis_key_str_done +
                                                       redis_rel_path) == 0
                if condition:
                    r_cur.sadd(redis_key_set_todo, redis_rel_path)
                    msg = f'redis_key_set_todo: 将{redis_rel_path}添加到todo中'
                    mylog.log_print(log, 'info', msg)

    # 如果有空的文件夹,删除掉。为了能一次删除完,需要深度优先遍历
    for dirpath, dirnames, filenames in os.walk(dir_compressing,
                                                topdown=False):
        for dirname in dirnames:
            abs_dir_path = os.path.join(dirpath, dirname)
            dirpath_temp, dirnames_temp, filenames_temp = next(
                os.walk(abs_dir_path))
            if len(dirnames_temp) == 0 and len(
                    filenames_temp) == 0 and os.path.isdir(abs_dir_path):
                os.rmdir(abs_dir_path)
                msg = f'目录{abs_dir_path}为空,已删除'
                mylog.log_print(log, 'info', msg)

    # 检查 redis_key_set_todo 中记录的文件,如果不存在则删除redis记录
    for redis_rel_path in r_cur.smembers(redis_key_set_todo):
        abs_path = get_abs_path_from_redis_rel_path(redis_rel_path)
        if not os.path.isfile(abs_path):
            r_cur.srem(redis_key_set_todo, redis_rel_path)
            msg = f'redis_key_set_todo: 文件{abs_path}不存在,已移除redis记录'
            mylog.log_print(log, 'info', msg)


if __name__ == '__main__':
    try:
        main()
    except:
        except_info = traceback.format_exc()
        mylog.log_print(log, 'error', except_info)
'''这个py文件用于更新vimg_processor需要使用的redis库,可以在任何连接到NAS的机器上使用。
推荐在直连NAS的机器上运行'''

如果在linux上,可以用crontab把这个脚本设置成定时启动,比如10min运行一次;windows上也有类似功能的软件,请自行寻找。

然后把 vimg_processor.py 修改为 vimg_processor_multi.py ,让它能适应不同的电脑和配置(所有 #需要更改 的前一行是可能需要自定义的地方):

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os, time, json, subprocess, re, traceback, redis, platform, sys
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis

CPU_THREADS = (12, 8)  # ffmpeg软编解码时线程数,根据CPU配置调整。分别为解码用的线程、编码用的线程,在压缩时,前者大于后者
#需要更改
# 可以用一个文件实验最佳的CPU_THREADS。软编解码的快慢看ffmpeg最右边的 speed=数字x
NAS_mounted_path = '/xxx/xxx/xxx/'  # NAS挂载路径。由于函数get_redis_rel_path_from_abs_path,最后一个分隔符必须带上
#需要更改
CWD = os.getcwd()
r_pool = myredis.get_pool(host='192.168.1.1', port=56379)
#需要更改
r_cur = redis.Redis(connection_pool=r_pool)
log_path = os.path.join(CWD, 'logs')
#需要更改
if not os.path.isdir(s=log_path):
    os.mkdir(path=log_path)
log = mylog.get_logger(abs_path=log_path,
                       log_file_name='vimg_processor_multi.log')
TEMP_DIR = os.path.join(CWD, 'temp')  # temp这个文件夹主要是在压缩图片时存放临时的调色板
#需要更改
if not os.path.isdir(s=TEMP_DIR):
    os.mkdir(path=TEMP_DIR)
redis_key_set_todo = 'vimg_processor:set:todo'  # 这个redis集合存放需要压缩的文件的相对路径(仅包含压缩前的文件名),路径分隔符为linux的/(下同)
redis_key_set_wrong = 'vimg_processor:set:wrong'  #集合,存放压缩时出现错误的文件的相对路径(仅包含压缩前的文件名)
redis_key_str_ongoing = 'vimg_processor:str:ongoing:'  # 存放正在压缩的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
redis_key_str_done = 'vimg_processor:str:done:'  # 存放已压缩完的文件的相对路径,键名包含压缩前的文件相对路径,值为压缩后的文件相对路径
start_time = time.time()
fail_msg_list = []  # 每次有错误信息时都放到这个列表中
sys_kind = platform.system()  # 字符串,表示系统类型


def get_suffix(filename: str) -> str:
    '''返回文件后缀名。若文件无后缀名(以 . 结尾,或者不含 . )则返回空字符串。
    :param filename: 文件名(可带路径前缀)
    :return str,文件后缀名(不含 . )'''
    suffix = os.path.splitext(filename)[-1]
    return (suffix[1:], '')[suffix == '']


def is_video(filename: str) -> bool:
    '''根据后缀名判断文件是否视频。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是视频,False表示非视频'''
    video_suffix_set = {
        "WMV", "ASF", "ASX", "RM", "RMVB", "MP4", "TS", "MPG", "3GP", "MOV",
        "M4V", "AVI", "MKV", "FIV", "VOB", "FLV", "MPEG", "ISO"
    }
    #需要更改
    #iso文件不一定能用ffmpeg打开,可以自行取舍
    return (False, True)[get_suffix(filename).upper() in video_suffix_set]


def is_img(filename: str) -> bool:
    '''根据后缀名判断文件是否图片。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示文件是图片,False表示非图片'''
    img_suffix_set = {"JPG", "JPEG", "BMP", "PNG", "WEBP"}
    #需要更改
    return (False, True)[get_suffix(filename).upper() in img_suffix_set]


def replace_path_seperator(path: str, sys_kind: str) -> str:
    '''根据操作系统,替换路径中的分隔符。
    :param path: 需要替换的路径
    :param sys_kind: 目标操作系统
    :return str,替换好的路径'''
    if sys_kind == 'Windows':
        return path.replace('/', '\\')
    elif sys_kind == 'Linux':
        return path.replace('\\', '/')


def is_origin(filename: str) -> bool:
    '''接受文件名,判断这个文件是源文件还是压缩后的文件。
    :param filename: 文件名(可带路径前缀)
    :return bool,True表示该文件为源文件'''
    prefix = os.path.splitext(filename)[0]
    return (True, False)[re.search(r'#test#$', prefix) != None]


def get_redis_rel_path_from_abs_path(abs_path: str) -> str:
    '''根据本机上文件的绝对路径,返回存储到redis中的相对路径。
    :由于redis中存储的是使用linux分隔符的相对路径,所以需要这个函数转换:
    :将abs_path首先去掉NAS_mounted_path,然后分隔符换成linux的。由于这个函数,NAS_mounted_path最后需要带上一个分隔符
    :param abs_path: 文件在本机的绝对路径
    :return str,redis中存储的相对路径,即 XX/XX/XX 这样的形式。注意最前面没有分隔符'''
    global NAS_mounted_path
    redis_rel_path = abs_path[len(NAS_mounted_path):]
    return replace_path_seperator(redis_rel_path, sys_kind='Linux')


def get_abs_path_from_redis_rel_path(redis_rel_path: str) -> str:
    '''根据redis中存储的相对路径,返回本机上该文件的绝对路径。
    :将redis_rel_path连上NAS_mounted_path,然后替换分隔符为本机的
    :param redis_rel_path: redis中存储的相对路径
    :return str,文件在本机中的绝对路径'''
    global NAS_mounted_path, sys_kind
    abs_path = os.path.join(NAS_mounted_path, redis_rel_path)
    return replace_path_seperator(abs_path, sys_kind=sys_kind)


def get_compressed_abs_path(abs_path: str, fmt: str = '') -> str:
    '''根据源文件的绝对路径,返回压缩后文件的绝对路径。这里的绝对路径都是本机上的。
    :压缩后的图像或者视频将放在同路径下,所以需要给转换后的文件名中加上字符串#test#(在后缀名前):1.mp4 -> 1#test#.mp4
    :有些特殊情况下压缩后文件会重名,比如一个文件夹中同时存在1.mp4、1.rmvb时,又用fmt指定压缩后格式。此时会对压缩后的文件加上序号重命名(1#test#.mp4,1#test#(1).mp4,1#test#(2).mp4)
    :param abs_path: 需要压缩的文件的绝对路径
    :param fmt: 指定转换后文件的格式,为空字符串时不改变格式
    :return str,压缩后文件的绝对路径,根据操作系统调整'''
    prefix, suffix = os.path.splitext(abs_path)
    suffix = (fmt, suffix.lstrip('.'))[fmt == '']
    compressed_abs_path = prefix + '#test#.' + suffix
    i = 1
    while (os.path.exists(compressed_abs_path)):
        compressed_abs_path = prefix + '#test#(' + str(i) + ').' + suffix
        i += 1
    return compressed_abs_path


def get_origin_abs_path(abs_path: str) -> str:
    '''根据压缩后文件的绝对路径到redis中查询,返回源文件的绝对路径。这里的绝对路径都是本机上的。
    :这个函数只应该用于已经压缩完的文件,函数将到redis_key_str_done的所有字符串中进行查询
    :param abs_path: 压缩后文件的绝对路径
    :return str,源文件的绝对路径,根据操作系统调整;若无法查询到,返回空字符串'''
    global r_cur, sys_kind, NAS_mounted_path, redis_key_str_done
    compressed_redis_rel_path = get_redis_rel_path_from_abs_path(abs_path)
    redis_rel_paths_keys = r_cur.keys(redis_key_str_done + '*')
    for redis_rel_path_key in redis_rel_paths_keys:
        if r_cur.get(redis_rel_path_key) == compressed_redis_rel_path:
            origin_abs_path = redis_rel_path_key[len(redis_key_str_done):]
            origin_abs_path = replace_path_seperator(origin_abs_path,
                                                     sys_kind=sys_kind)
            origin_abs_path = os.path.join(NAS_mounted_path, origin_abs_path)
            return origin_abs_path
    return ''


class vimg_processor(object):
    '''用于压缩的图像视频类。初始化时会读取图像视频的一些参数。'''

    def __init__(self,
                 origin_abs_path: str,
                 compressed_abs_path: str = "",
                 video_size: str = '',
                 img_max_pix: int = 0,
                 fmt: str = ''):
        '''用文件的绝对路径等参数初始化对象。
        :param origin_abs_path: 源文件的绝对路径
        :param compressed_abs_path: 输出文件的绝对路径。默认为同目录下文件名末加#test#保存(1.mp4 -> 1#test#.mp4),有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名
        :param video_size: 输出视频文件时,适配的屏幕大小。有 phone 、 pad 、 pc 、 空字符串 四种选择,空字符串表示保留原宽高
        :param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
        :param fmt: 输出文件的格式,即后缀名。为空字符串时,表示按原格式输出。其他可取值如 mp4 、 jpg 等'''
        self.origin_abs_path = origin_abs_path
        self.compressed_abs_path = (compressed_abs_path,
                                    get_compressed_abs_path(
                                        origin_abs_path,
                                        fmt=fmt))[compressed_abs_path == ""]
        self.width, self.height = 0, 1
        if is_video(origin_abs_path):
            self.get_video_info()
            w, h = self.width, self.height
            ratio = w / h
            self.ratio = ratio
            definition = ''
            if video_size == 'phone':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '360:640')[w >= 360]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '360:480')[w >= 360]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '480:360')[w >= 480]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '640:360')[w >= 640]
            elif video_size == 'pad':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '720:1280')[w >= 720]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '720:960')[w >= 720]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '960:720')[w >= 960]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '1280:720')[w >= 1280]
            elif video_size == 'pc':
                if ratio <= 0.6:
                    definition = (f'{w}:{h}', '1080:1920')[w >= 1080]
                elif ratio <= 1.0:
                    definition = (f'{w}:{h}', '1080:1440')[w >= 1080]
                elif ratio <= 1.66:
                    definition = (f'{w}:{h}', '1440:1080')[w >= 1440]
                elif ratio > 1.66:
                    definition = (f'{w}:{h}', '1920:1080')[w >= 1920]
            elif video_size == '':
                definition = f'{w}:{h}'
            self.definition = definition
        elif is_img(origin_abs_path):
            self.pix_fmt = ''  # pix_fmt即像素格式
            fmt = (fmt, get_suffix(origin_abs_path))[fmt == '']
            self.palette = os.path.join(TEMP_DIR, 'palette.' + fmt)
            self.temp_img = os.path.join(TEMP_DIR, 'temp_img.' + fmt)
            self.get_img_info()
            w, h = self.width, self.height
            img_max_pix = abs(img_max_pix)
            if img_max_pix == 0:
                pass
            elif w > img_max_pix and h > img_max_pix:
                ratio = max(w, h) / img_max_pix
                w, h = int(w / ratio), int(h / ratio)
            self.definition = f'{w}:{h}'

    def get_video_info(self):
        '''使用ffprobe获取视频的宽高(width、height)。'''
        global sys_kind
        command_probe = f'ffprobe -select_streams v -show_entries format=bit_rate -show_streams -v quiet -of csv="p=0" -of json -i "{self.origin_abs_path}"'
        if sys_kind == 'Windows':
            result = subprocess.Popen(command_probe,
                                      shell=False,
                                      stdout=subprocess.PIPE).stdout
        elif sys_kind == 'Linux':
            result = subprocess.Popen(command_probe,
                                      shell=True,
                                      stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        self.width = int(json_data['streams'][0]['width'])
        self.height = int(json_data['streams'][0]['height'])
        # self.bitrate = int(json_data['format']['bit_rate']) / 1024 # 以后可能用到视频的比特率
        # 视频的比特率有时不能用ffprobe读取到,建议自己根据视频大小和时长计算

    def get_img_info(self):
        '''获取图片的width、height、pix_fmt。'''
        global sys_kind
        command_probe = f'ffprobe -hide_banner -v quiet -print_format json -show_format -show_streams "{self.origin_abs_path}"'
        if sys_kind == 'Windows':
            result = subprocess.Popen(command_probe,
                                      shell=False,
                                      stdout=subprocess.PIPE).stdout
        elif sys_kind == 'Linux':
            result = subprocess.Popen(command_probe,
                                      shell=True,
                                      stdout=subprocess.PIPE).stdout
        list_std = result.readlines()
        str_tmp = ''
        for item in list_std:
            str_tmp += bytes.decode(item.strip())
        json_data = json.loads(str_tmp)
        self.pix_fmt = str(json_data['streams'][0]['pix_fmt'])
        self.width = int(json_data['streams'][0]['width'])
        self.height = int(json_data['streams'][0]['height'])

    def compress_video(self):
        '''通过ffmpeg压缩视频文件(输出的像素格式指定为yuv420p)。'''
        global CPU_THREADS
        # AMD显卡加速,硬编码
        '''command_compress = 'ffmpeg -hwaccel dxva2 -i "{}" -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_amf -b:a 64k -ar 44100 "{}"'.format(
            self.origin_abs_path, self.definition, self.compressed_abs_path)'''
        # NVIDIA显卡加速,硬编解码
        '''command_compress = 'ffmpeg -hwaccel cuvid -c:v h264_cuvid -i "{}" -threads 8 -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264_nvenc -b:a 64k -ar 44100 "{}"'.format(
            self.origin_abs_path, self.definition, self.compressed_abs_path)'''
        # CPU软编解码。-threads 参数可限制线程数,控制CPU占用。
        command_compress = 'ffmpeg -threads {} -i "{}" -threads {} -y -r 24 -pix_fmt yuv420p -vf scale={} -c:v h264 -b:a 64k -ar 44100 "{}"'.format(
            CPU_THREADS[0], self.origin_abs_path, CPU_THREADS[1],
            self.definition, self.compressed_abs_path)
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_video failed,视频压缩失败:{command_compress}'
            )

    def compress_img(self):
        '''通过ffmpeg压缩图像文件。'''
        command_palette = 'ffmpeg -i "{}" -vf palettegen=max_colors=256:stats_mode=single -y "{}"'.format(
            self.origin_abs_path, self.palette)
        command_compress = 'ffmpeg -i "{}" -i "{}" -lavfi "[0][1:v] paletteuse" -pix_fmt "{}" -y "{}"'.format(
            self.origin_abs_path, self.palette, self.pix_fmt, self.temp_img)
        result = os.system(command_palette)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img failed,无法获得调色板:{command_palette}'
            )
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img failed,生成中间图片失败:{command_compress}'
            )
        command_compress = 'ffmpeg -i "{}" -vf scale={}  -y "{}"'.format(
            self.temp_img, self.definition, self.compressed_abs_path)
        result = os.system(command_compress)
        if result != 0:
            raise Exception(
                f'vimg_processor.compress_img failed,生成最终图片失败:{command_compress}'
            )


'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''
'''------------------------------------------------------------主要功能区------------------------------------------------------------'''


def get_definitions_video(target_dir: str):
    '''获取一个文件夹下所有视频文件的分辨率,按宽高比进行分类。
    :param target_dir: 目标文件夹的绝对路径
    :结果会输出到当前目录下的 definitions.txt 文件中'''
    global CWD
    ratio_dict = dict()  # 字典,键为宽高的比值,值为列表,包括该比值下的具体宽高数值
    # 例:1.7777778:['640x360', '1280x720', '1920x1080']
    for dirpath, dirnames, filenames in os.walk(target_dir):
        for filename in filenames:
            abs_path = os.path.join(dirpath, filename)
            if is_video(abs_path):
                print('\r视频文件:{:200}'.format(abs_path), end='')
                vimg = vimg_processor(abs_path)
                ratio = vimg.ratio
                definition = "{}x{}".format(vimg.width, vimg.height)
                if ratio_dict.get(ratio) == None:
                    ratio_dict[ratio] = [definition]
                else:
                    if definition not in ratio_dict[ratio]:
                        ratio_dict[ratio].append(definition)
    # 按宽高比值升序输出
    ratio_list = []
    for key, value in ratio_dict.items():
        ratio_list.append(key)
    ratio_list.sort()
    with open(os.path.join(CWD, 'definitions.txt'), 'w', encoding='utf8') as f:
        for ratio in ratio_list:
            ratio_dict[ratio].sort()
            f.write('宽高比:{:.6},成员:'.format(ratio))
            for definition in ratio_dict[ratio][:-1]:
                f.write(definition + ' , ')
            f.write(ratio_dict[ratio][-1] + '\n')
    print('')


def compress_videos(video_size: str = '', fmt: str = ''):
    '''从redis中得到需要压缩的视频文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
    :对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
    :输出文件在源文件名末尾加 #test# 放在同目录下(1.mp4 -> 1#test#.mp4);输出文件有重名时会自动按 1#test#.mp4 、1#test#(1).mp4 、1#test#(2).mp4 的顺序重命名。
    :param video_size: 视频适配的屏幕大小。 phone、pad、pc、空字符串 分别表示适配手机、平板、电脑、不改变原大小
    :param fmt: 指定输出的文件格式,空字符串表示原格式。建议用mp4
    '''
    global NAS_mounted_path, r_cur, log, start_time, fail_msg_list, sys_kind
    global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    mylog.log_print(log, 'info', msg)
    while (True):
        if r_cur.scard(redis_key_set_todo) == 0:
            break
        try:
            redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
            rel_path = replace_path_seperator(redis_rel_path,
                                              sys_kind=sys_kind)
            abs_path = os.path.join(NAS_mounted_path, rel_path)
            if is_video(abs_path) and os.path.isfile(abs_path):
                vimg = vimg_processor(origin_abs_path=abs_path,
                                      video_size=video_size,
                                      fmt=fmt)
                compressed_abs_path = vimg.compressed_abs_path
                if os.path.exists(compressed_abs_path):  # 输出文件已存在时跳过
                    continue
                start_time = time.time()
                log.info('{}压缩中...'.format(abs_path))
                compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
                    compressed_abs_path)
                r_cur.set(redis_key_str_ongoing + redis_rel_path,
                          compressed_redis_rel_path)
                vimg.compress_video()
                r_cur.delete(redis_key_str_ongoing + redis_rel_path)
                r_cur.set(redis_key_str_done + redis_rel_path,
                          compressed_redis_rel_path)
                log.info('视频压缩成功,耗时{:.2f}min'.format(
                    (time.time() - start_time) / 60))
        except:
            except_info = traceback.format_exc()
            fail_msg_list.append(except_info)
            mylog.log_print(log, 'error', except_info)
            if os.path.exists(compressed_abs_path):
                os.remove(compressed_abs_path)
            r_cur.srem(redis_key_set_todo, redis_rel_path)
            r_cur.delete(redis_key_str_ongoing + redis_rel_path)
            r_cur.delete(redis_key_str_done + redis_rel_path)
            exc_type, exc_value, exc_traceback = sys.exc_info()  # 获得异常名
            exc_type = exc_type.__name__
            if exc_type == 'KeyboardInterrupt':  # 在linux上时,无法得到KeyboardInterrupt
                r_cur.sadd(redis_key_set_todo, redis_rel_path)
            else:
                mylog.log_print(log, 'error', f'错误类型为{exc_type}')
                r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            break


def compress_imgs(fmt: str = '', img_max_pix: int = 0):
    '''从redis中得到需要压缩的图片文件,进行压缩,并完成压缩;直到redis中没有剩余任务。
    :对应地,该项目会在redis的key中移动,从todo到ongoing,最后到done;若出错,则放到wrong。
    :输出文件在源文件名末尾加 #test# 放在同目录下(1.jpg -> 1#test#.jpg);输出文件有重名时会自动按 1#test#.jpg 、1#test#(1).jpg 、1#test#(2).jpg 的顺序重命名。
    :param fmt: 指定输出的文件格式,空字符串表示原格式。建议用jpg
    :param img_max_pix: 输出图片文件时,若源图片的宽高同时超过该值,宽高中较大的一个数值会被缩小到该值,另一个数值也等比例缩小;为0时,表示图片的宽高不缩小(宽高的单位为像素点)
    '''
    global NAS_mounted_path, TEMP_DIR, r_cur, log, start_time, fail_msg_list, sys_kind
    global redis_key_set_todo, redis_key_set_wrong, redis_key_str_ongoing, redis_key_str_done
    msg = '开始时间:{}'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
    mylog.log_print(log, 'info', msg)
    while (True):
        if r_cur.scard(redis_key_set_todo) == 0:
            break
        try:
            redis_rel_path = r_cur.spop(redis_key_set_todo, count=1)[0]
            rel_path = replace_path_seperator(redis_rel_path,
                                              sys_kind=sys_kind)
            abs_path = os.path.join(NAS_mounted_path, rel_path)
            if is_img(abs_path) and os.path.isfile(abs_path):
                vimg = vimg_processor(origin_abs_path=abs_path,
                                      img_max_pix=img_max_pix,
                                      fmt=fmt)
                compressed_abs_path = vimg.compressed_abs_path
                if os.path.exists(compressed_abs_path):  # 输出文件已存在时跳过
                    continue
                log.info('{}压缩中...'.format(abs_path))
                compressed_redis_rel_path = get_redis_rel_path_from_abs_path(
                    compressed_abs_path)
                r_cur.set(redis_key_str_ongoing + redis_rel_path,
                          compressed_redis_rel_path)
                vimg.compress_img()
                r_cur.delete(redis_key_str_ongoing + redis_rel_path)
                r_cur.set(redis_key_str_done + redis_rel_path,
                          compressed_redis_rel_path)
        except:
            except_info = traceback.format_exc()
            fail_msg_list.append(except_info)
            mylog.log_print(log, 'error', except_info)
            if os.path.exists(compressed_abs_path):
                os.remove(compressed_abs_path)
            r_cur.srem(redis_key_set_todo, redis_rel_path)
            r_cur.delete(redis_key_str_ongoing + redis_rel_path)
            r_cur.delete(redis_key_str_done + redis_rel_path)
            r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            exc_type, exc_value, exc_traceback = sys.exc_info()  # 获得异常名
            exc_type = exc_type.__name__
            if exc_type == 'KeyboardInterrupt':  # 在linux上时,无法得到KeyboardInterrupt
                r_cur.sadd(redis_key_set_todo, redis_rel_path)
            else:
                mylog.log_print(log, 'error', f'错误类型为{exc_type}')
                r_cur.sadd(redis_key_set_wrong, redis_rel_path)
            break
    # 最后清理一下TEMP_DIR
    for dirpath, dirnames, filenames in os.walk(TEMP_DIR):
        for filename in filenames:
            os.remove(os.path.join(dirpath, filename))


def delete_test():
    '''视频或图片压缩完毕后,比较原文件与压缩后的文件,留下较小且大小非0的文件。
    :由于ffmpeg处理效果不确定,建议检查一下处理后的文件,确认无误后再执行,因为源文件会被彻底删除。具体见 下载-压缩-入库流程
    :将到redis的redis_key_str_done中读取压缩前后的文件相对路径。删除#test#文件后,删除redis_key_str_done对应的键
    '''
    global r_cur, log, fail_msg_list
    global redis_key_str_done
    redis_rel_path_done_keys = r_cur.keys(redis_key_str_done + '*')
    for redis_rel_path_done_key in redis_rel_path_done_keys:
        origin_redis_rel_path = redis_rel_path_done_key[len(redis_key_str_done
                                                            ):]
        compressed_redis_rel_path = r_cur.get(redis_rel_path_done_key)
        origin_abs_path = get_abs_path_from_redis_rel_path(
            origin_redis_rel_path)
        compressed_abs_path = get_abs_path_from_redis_rel_path(
            compressed_redis_rel_path)
        # redis中存储的相对路径都是 压缩中 ,但是现在这些文件已经手动放到了 检查中 里
        origin_abs_path = origin_abs_path.replace('压缩中', '检查中', 1)
        #需要更改
        compressed_abs_path = compressed_abs_path.replace('压缩中', '检查中', 1)
        #需要更改

        if os.path.isfile(origin_abs_path) and os.path.isfile(
                compressed_abs_path):
            before_size = os.path.getsize(origin_abs_path)
            after_size = os.path.getsize(compressed_abs_path)
            if after_size < before_size and after_size != 0:
                try:
                    os.remove(origin_abs_path)
                    # 注意这里的压缩后文件名改回原名时是把最右边的第一个#test#去掉,而不是直接改成源文件名
                    # 因为如果压缩时用fmt指定输出格式,则可能当前目录下有重名文件
                    # 1.avi、1.wmv -> 1#test#.mp4、1#test#(1).mp4 -> 1.mp4、1(1).mp4
                    prefix, suffix = os.path.splitext(compressed_abs_path)
                    reversed_prefix = prefix[::-1]
                    reversed_prefix = reversed_prefix.replace('#tset#', '', 1)
                    prefix = reversed_prefix[::-1]
                    temp = prefix + suffix
                    os.rename(compressed_abs_path, temp)
                    r_cur.delete(redis_rel_path_done_key)
                    log.info(f'将{origin_abs_path}替换为小文件')
                except PermissionError:
                    r_cur.set(redis_rel_path_done_key,
                              compressed_redis_rel_path)
                    msg = f'delete_test failed,删除失败:{origin_abs_path}'
                    mylog.log_print(log, 'warning', msg)
                    fail_msg_list.append(msg)
                    continue
            else:
                os.remove(compressed_abs_path)
                msg = f'{origin_abs_path}压缩后的文件比之前大(或大小为0),删掉了处理后的文件'
                log.info(msg)
    # 最后检查一下有没有剩余的处理后文件
    dir_checking = replace_path_seperator('xxx/xxx/检查中', sys_kind=sys_kind)
    #需要更改
    dir_checking = os.path.join(NAS_mounted_path, dir_checking)
    for dirpath, dirnames, filenames in os.walk(dir_checking):
        for filename in filenames:
            if '#test#' in filename:
                abs_path = os.path.join(dirpath, filename)
                msg = f'疑似还有#test#文件:{abs_path}'
                mylog.log_print(log, 'warning', msg)
                fail_msg_list.append(msg)


def main():
    '''执行入口。'''
    target_dir = r'D:\xxx\xxx'
    #需要更改
    global sys_kind
    try:
        #get_definitions_video(target_dir)
        compress_videos(video_size='pad', fmt='mp4')
        #compress_imgs(fmt='jpg', img_max_pix=700)
        #delete_test()
        print('已完成')
        if len(fail_msg_list) != 0:
            print('本次压缩时出现了以下异常:')
            for i in fail_msg_list:
                print(i)
        if sys_kind == 'Windows':
            input()
    except:
        except_info = traceback.format_exc()
        mylog.log_print(log, 'error', except_info)
        if sys_kind == 'Windows':
            input()


if __name__ == '__main__':
    main()
'''下载-压缩-入库流程:
1.将需要压缩的文件先暂时放到 待压缩 中,进行人工筛选
2.筛选完后,将 待压缩 中的文件手动剪切到 压缩中 文件夹里
3.直连NAS的机器运行python脚本,更新redis库、移动文件
4.其他连接到NAS的机器从redis中获取任务、进行压缩
5.从 压缩完 中将文件剪切到 检查中 里,进行人工检查
6.检查完后,手动运行vimg_processor_multi里的delete_test,保留源文件或压缩后文件中的一个
7.手动将 检查中 里的文件放到其他目录,比如 待迁移 或 入库中 里
8.运行file_to_sql,将本地文件入库
'''

在其他机器上运行这个脚本前先搭建环境,把需要的python库安装好( pip3 install redis ),装好ffmpeg、把ffmpeg和ffprobe的二进制文件放到PATH里。
如果有兴趣的话,可以写的更复杂一点,比如写一个分布式的,能通过网页UI管理。

标签: python

添加新评论