步骤/目录:
1.需求介绍
2.准备工作
    (0)安装python库
    (1)创建scrapy项目
    (2)创建并运行mysql
    (3)创建并运行redis
    (4)创建python自建库
        a.log.py
        b.redis.py
        c.mysql.py
        d.interaction.py
        e.torrent_parser.py
        f.其他
3.scrapy项目的修改
    (1)items.py
    (2)middlewares.py
    (3)pipelines.py
    (4)settings.py
    (5)爬虫
4.使用说明

本文首发于个人博客https://lisper517.top/index.php/archives/55/,转载请注明出处。
本文的目的是使用scrapy爬取电影的信息。
本文写作日期为2022年10月3日。运行的平台为win10和树莓派,编辑器为VS Code。

1.需求介绍

在日常生活中,我们常常会想要看一些精彩的电影,但是看电影又比较费时间,所以最好能把电影信息爬取下来,通过一些方法筛选出想要的电影,再通过一些标准筛选出对应的磁力链接或其他下载链接,下载电影到本地,最后对本地存储的电影信息进行入库、对占用空间较大的电影进行压缩等。
本文是系列文章的第一篇,目的是使用scrapy从电影信息网站上爬取数据。为了方便,本文选择了一个能同时提供电影磁链的网站,该网站提供了演员主页,收录了每位演员出演的电影。由于笔者挑选电影时有演员偏好,所以打算收藏喜欢的演员的主页,根据演员来挑选电影。

2.准备工作

(0)安装python库

笔者也搞不太清楚需要哪些库了,不过推荐安装的有以下这些:
Scrapy(python-scrapy)、requests(处理网络请求)、bs4(对响应内容进行解析)、pymysql(连接mysql数据库)、lxml(网页解析器)、cryptography(连接mysql时密码加密)、selenium(操控浏览器)、redis(python-redis)。
如果后续运行时有些依赖的库没安装,请自行安装。

(1)创建scrapy项目

在爬取磁链时,这些数据是通过javascript动态生成的,所以这个项目就命名为javaspider。在win10电脑上的cmd中运行如下命令:

D:
cd D:\spiders\scrapy
scrapy startproject javaspider
scrapy genspider java

(2)创建并运行mysql

这里选择在树莓派上通过docker运行mysql,具体的操作步骤见 Python,爬虫与深度学习(10)——scrapy+mysql构建ipool 这篇文章。
mysql中主要有两张表,movie和torrent,分别存储电影和磁链的信息。库、表的创建将使用python,这里不单独创建。

(3)创建并运行redis

这里还要使用到redis,因为一个演员不止出演一部电影,一部电影通常也不止一个tag,并且筛选磁链时还可能用到磁链中的文件(也是一对多的关系)。这种一对多、无重复的表用redis的集合比较容易实现,而用mysql这样的关系型数据库比较复杂。由于redis对内存的要求比较大,这里就在win10机器上运行docker-redis。

在windows上安装docker(即安装docker-desktop)的方法见 Docker入门(一)

在win10上运行容器的过程是一样的,比如cmd执行如下命令:

D:
md D:\docker\redis\data\redis

然后新建一个 D:\docker\redis\docker-compose.yml compose模板文件,写入以下内容:

version: "3.9"

services:
  redis:
    image: redis
    ports:
      - "56379:6379"
    volumes:
      - D:\docker\redis\data\redis:/data
    logging:
      driver: json-file
    restart: always

最后运行:

cd D:\docker\redis
docker-compose config
docker-compose up

成功运行redis。注意这种方法运行的redis是完全没有安全措施的,不要开放到外网了。另外,为了使用redis-cli,可以在wsl环境(如果你是按照 Docker入门(一) 的方法在win10上运行docker,应该清楚wsl是什么)里安装redis。在cmd中进行如下操作:

wsl -d Ubuntu-20.04 #如果你也用的是Ubuntu-20.04的话
cd /mnt/d/docker/redis
wget https://download.redis.io/releases/redis-7.0.4.tar.gz #最新版本的redis
tar -xzf redis-7.0.4.tar.gz
cd redis-7.0.4
apt-get install pkg-config make gcc libssl-dev #一些依赖项
make BUILD_TLS=yes

安装完成后,用这里的redis-cli连接本地的redis:

cd /mnt/d/docker/redis/redis-7.0.4/src
./redis-cli -p 56379
ping
#redis回应 PONG

至此windows上redis、redis-cli安装完成。

(4)创建python自建库

创建python自建库的方法见 Python,爬虫与深度学习(7)——建立自己的库 。这里需要使用的自建库主要有以下几个文件:

a.log.py

这个主要是获取一个logger对象,如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging, time, os


def get_logger(abs_path: str,
               log_file_name: str = '',
               getLogger_name: str = ''):
    '''传入绝对路径和log文件名,返回log对象。
    
    :param abs_path: log文件存储的绝对路径
    :param log_file_name: log文件名,若log文件名为空,则使用默认的文件名
    :param getLogger_name: logger对象名,如果在同一个程序中需要同时写多个log文件,建议这个参数不要相同
    :return log对象'''
    try:
        formatter = logging.Formatter(
            '%(lineno)d : %(asctime)s : %(levelname)s : %(funcName)s : %(message)s'
        )

        if log_file_name == '':
            log_file_name = 'log#{}.txt'.format(time.strftime('-%Y-%m-%d'))
        fileHandler = logging.FileHandler(
            (os.path.join(abs_path, log_file_name)),
            mode='w',
            encoding='utf-8')
        fileHandler.setFormatter(formatter)

        if getLogger_name == '':
            getLogger_name = 'logger'
        log = logging.getLogger(getLogger_name)
        log.setLevel(logging.DEBUG)
        log.addHandler(fileHandler)
        return log
    except:
        raise Exception('error: MyPythonLib.log.get_logger failed ')

b.redis.py

返回redis的连接池:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import redis


def get_pool(database_num: int = 0, decode_responses=True):
    '''指定数据库编号,返回redis连接池。

    :param database_num: 数据库编号,不指定时默认用0号
    :param decode_responses: 是否自动解析数据。如果只存string,设置为True;如果要存二进制对象,设置为False
    :return redis.ConnectionPool'''
    try:
        pool = redis.ConnectionPool(
            host='192.168.1.2',
            port=56379,
            db=database_num,
            decode_responses=decode_responses,
        )
        return pool
    except:
        raise Exception('MyPythonLib.redis.get_pool failed ')

记得把 redis.ConnectionPool 方法里的参数改成自己的。

c.mysql.py

主要是连接mysql、字符串转换。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql, os, logging, time


def get_conn(database_name: str = 'mysql'):
    '''指定数据库名,返回mysql连接对象。

    :param database_name: 数据库名,不指定时默认用mysql库
    :return Connection'''
    try:
        conn = pymysql.connect(host='192.168.1.1',
                               port=53306,
                               user='root',
                               passwd='mypasswd',
                               db=database_name,
                               charset='utf8')
        return conn
    except:
        raise Exception('MyPythonLib.mysql.get_conn failed ')


def str_into_mysql(string: str) -> str:
    '''输入一个字符串,将其转换为可以在mysql中储存的形式。

    这个字符串的使用位置是列值处,比如 "SELECT * FROM 表名 WHERE 列名='字符串';" 
    中的字符串,若包含 \\ 、 ' 、 " 这三个字符,就需要用本函数处理。

    :param string: 目标字符串
    :return 可以存入mysql的字符串
    '''
    try:
        string = string.replace('\\', '\\\\')
        string = string.replace("'", "\\'")
        string = string.replace('"', '\\"')
        return string
    except:
        raise Exception('MyPythonLib.mysql.str_into_mysql failed ')

记得把 pymysql.connect 方法里的参数改成自己的。

d.interaction.py

这个文件目前只需要用到一个显示进度的函数,内容如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time, os


def print_progress(start_time: float,
                   progress: int,
                   total: int,
                   msg: str,
                   cls=False):
    '''图形化打印进度及附加信息(清空屏幕)。

    :param start_time: 开始时间,由time.time()得到的float
    :param progress: 整数,表示目前进度
    :param total: 整数,表示总量
    :param msg: 附加信息
    :param cls: 是否清空屏幕(windows下)'''
    try:
        percent = 100. * progress / total
        x = int(2 * percent / 5)
        duration = time.time() - start_time
        h, m, s = duration // 3600, duration // 60 % 60, duration % 60
        time_format = '{:0>2.0f}:{:0>2.0f}:{:0>2.0f}'.format(h, m, s)
        print('\r已运行' + time_format + ' [' + '#' * x + '.' * (40 - x) + ']',
              '{:.2f}%'.format(percent),
              str(progress) + '/' + str(total),
              msg,
              end='')
        if cls:
            os.system('cls')
    except:
        raise Exception(
            'error: MyPythonLib.interaction.print_progress failed ')

e.torrent_parser.py

用于解析torrent文件,主要用于根据torrent文件得到磁链。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import collections, hashlib


def bencode(elem):
    if type(elem) == str:
        elem = str.encode(elem)

    if type(elem) == bytes:
        result = str.encode(str(len(elem))) + b":" + elem
    elif type(elem) == int:
        result = str.encode("i" + str(elem) + "e")
    elif type(elem) == list:
        result = b"l"
        for item in elem:
            result += bencode(item)
        result += b"e"
    elif type(elem) in [dict, collections.OrderedDict]:
        result = b"d"
        for key in elem:
            result += bencode(key) + bencode(elem[key])
        result += b"e"
    return result


def bdecode(bytestr, recursiveCall=False):
    startingChars = dict({b"i": int, b":": str, b"l": list, b"d": dict})
    digits = [b"0", b"1", b"2", b"3", b"4", b"5", b"6", b"7", b"8", b"9"]

    started = ended = False
    curtype = None

    numstring = b""  # for str, int
    result = None  # for list, dict
    key = None  # for dict

    while len(bytestr) > 0:
        # reading and popping from the beginning
        char = bytestr[:1]

        if not started:

            bytestr = bytestr[1:]

            if char in digits:
                numstring += char

            elif char in startingChars:

                started = True
                curtype = startingChars[char]

                if curtype == str:
                    size = int(bytes.decode(numstring))
                    # try to decode strings
                    try:
                        result = bytes.decode(bytestr[:size])
                    except UnicodeDecodeError:
                        result = bytestr[:size]
                    bytestr = bytestr[size:]
                    ended = True
                    break

                elif curtype == list:
                    result = []

                elif curtype == dict:
                    result = collections.OrderedDict()
            else:
                raise ValueError("Expected starting char, got ‘" +
                                 bytes.decode(char) + "’")

        else:  # if started

            if not char == b"e":

                if curtype == int:
                    bytestr = bytestr[1:]
                    numstring += char

                elif curtype == list:
                    item, bytestr = bdecode(bytestr, recursiveCall=True)
                    result.append(item)

                elif curtype == dict:

                    if key == None:
                        key, bytestr = bdecode(bytestr, recursiveCall=True)

                    else:
                        result[key], bytestr = bdecode(bytestr,
                                                       recursiveCall=True)
                        key = None

            else:  # ending: char == b"e"
                bytestr = bytestr[1:]
                if curtype == int:
                    result = int(bytes.decode(numstring))
                ended = True
                break
    if ended:
        if recursiveCall:
            return result, bytestr
        else:
            return result
    else:
        raise ValueError("String ended unexpectedly")


class torrent():
    '''解析种子文件。'''

    def __init__(self, abs_torrent_file_path: str) -> None:
        ''':param abs_torrent_file_path: 种子文件的绝对路径'''
        self.abs_torrent_file_path = abs_torrent_file_path
        bytes_stream = ''
        with open(abs_torrent_file_path, 'rb') as f:
            bytes_stream = f.read()
        self.metadata = bdecode(bytes_stream)
        encodedInfo = bencode(self.metadata["info"])
        self.hash_value = hashlib.sha1(encodedInfo).hexdigest().upper()


if __name__ == '__main__':
    pass

参考了github上的 这个项目 。不得不说稍微有点深度的问题还是github靠谱,csdn纯纯的互相乱抄。

f.其他

在之前的 Python,爬虫与深度学习(10)——scrapy+mysql构建ipool 一文中,笔者把mysql的api写到了自建库的mysql.py里,现在感觉这样不太合适,写到scrapy项目里更好一点,毕竟自建库应该写一些更基础、通用的功能。于是新建一个libs目录: D:\spiders\scrapy\javaspider\javaspider\libs ,里面放一个空的 __init__.py 文件,和一个 java_extensions.py ,它主要是为scrapy项目提供一些功能,但这些功能又不适合放在自建库里,比如mysql、redis的api。内容如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os, logging, time, redis, re, datetime
from MyPythonLib import log as mylog
from MyPythonLib import redis as myredis
from MyPythonLib.torrent_parser import torrent
import urllib.parse

ORIGIN_URL_JAVA = r'https://www.java.com' #电影网站的url
TEMP_URL_JAVA = r'https://www.seejava.com' #有时候该电影网站无法访问,需要换成临时网址
TORRENT_DIR = r'D:\torrents' #存放下载过的磁链的目录


def get_actor_main_pages() -> list[str]:
    '''收藏演员主页(或者搜索页,或者根页面)->复制网址到txt文件->根据相对url和TEMP_URL_JAVA提供待爬url
    :return 待爬的url列表'''
    with open(os.path.join(os.getcwd(), 'actor_main_pages.txt'),
              'r',
              encoding='utf-8') as f:
        abs_urls = f.read().split('\n')
        rel_urls_set = set()
        for abs_url in abs_urls:
            if re.search(r'\w', abs_url) != None:
                base_url = re.search(
                    r'((https://)|(http://)){0,1}(\w+\.){0,1}(\w+\.){1}\w+',
                    abs_url).group()
                rel_url = abs_url.replace(base_url, '')
                if rel_url in rel_urls_set:
                    print('重复网址:{}'.format(rel_url))
                rel_urls_set.add(rel_url)
        abs_urls = []
        for rel_url in rel_urls_set:
            abs_url = urllib.parse.urljoin(TEMP_URL_JAVA, rel_url)
            abs_urls.append(abs_url)
        return abs_urls


class database_api_java():
    """通过这个接口实现提交爬取到的作品信息。还可以更新数据、取出磁链。"""
    conn = mysql.get_conn()
    m_cur = conn.cursor()  #这个cursor是类公用的,用于不需要实例化对象的mysql操作
    pool = myredis.get_pool(database_num=0)
    r_cur = redis.Redis(
        connection_pool=pool)  #这个cursor是类公用的,用于不需要实例化对象的redis操作

    def __init__(self, log: logging.Logger = None):
        """:param log: 可选的log对象,若不提供,将在当前文件夹下新建logs目录、创建log文件"""
        if log == None:
            log_dir = os.path.join(os.getcwd(), 'logs')
            if not os.path.isdir(log_dir):
                os.mkdir(log_dir)
            log_file_name = 'database_api_java#{}.txt'.format(
                time.strftime('-%Y-%m-%d_%H_%M_%S'))
            log = mylog.get_logger(log_dir, log_file_name, 'mysql_api_java')
        self.log = log
        message = 'database_api_java object initialized'
        self.log.info(message)
        self.m_cur = database_api_java.conn.cursor()  #对象自己的cur
        self.r_cur = redis.Redis(connection_pool=database_api_java.pool)
        database_api_java.check_table()
        message = 'database java and tables checked'
        self.log.info(message)

    def __del__(self):
        self.m_cur.close()
        self.r_cur.close()
        message = 'mysql_api_java object deleted'
        self.log.info(message)

    @staticmethod
    def check_table():
        """检查一下mysql中是否已经建好了表,若表不存在则新建表。
        :这里并没有检查:若表存在,表的格式是否正确。
        
        mysql中movie表各列含义与默认值:
        :no,自增主键;
        :code,作品识别码("unknown");
        :full_name,作品全名("unknown");
        :release_date,发行日期("1000-01-01");
        :duration,持续时间,单位为分钟(0);
        :director,导演名("unknown");
        :producer,制作商("unknown");
        :publisher,发行商("unknown");
        :series,系列("unknown");
        :url,来源网页("unknown");
        :downloaded,是否下载(0);
        :file_path,如果已下载,文件位置("unknown");
        :created,该行创建时间。

        mysql中torrent表各列含义与默认值:
        :no,自增主键;
        :hash_value,hash值("unknown");
        :movie_no,对应电影编号(0);
        :name,名称("unknown");
        :file_num,包含文件数量(0);
        :size,文件大小(单位为MB,0);
        :high_definition,是否高清(0);
        :record_date,网站收录日期("1000-01-01")。
        
        redis中actor_movie:
        集合,键为 java:actor:名字 ,值为电影在mysql中的编号。
        比较特殊的是 java:actor:unknown ,这个键里存的是演员未知的电影。
        
        redis中tag_movie:
        集合,键为 java:tag:标签名 ,值为电影在mysql中的编号。
        比较特殊的是 java:tag:no ,这个键里存的是没有tag的电影。
        
        redis中torrent_files:
        集合,键为 java:torrent:编号 ,值为文件名。"""
        conn, m_cur = database_api_java.conn, database_api_java.m_cur
        command_movie = '''
        CREATE TABLE IF NOT EXISTS movie ( 
        no BIGINT UNSIGNED AUTO_INCREMENT, 
        code VARCHAR(30) NOT NULL DEFAULT "unknown" COMMENT "作品识别码", 
        full_name VARCHAR(400) NOT NULL DEFAULT "unknown" COMMENT "作品全名", 
        release_date DATE NOT NULL DEFAULT "1000-01-01" COMMENT "发行日期", 
        duration SMALLINT UNSIGNED NOT NULL DEFAULT 0 COMMENT "持续时间(min)", 
        director VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "导演名", 
        producer VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "制作商", 
        publisher VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "发行商", 
        series VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "系列", 
        url VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "来源网页", 
        downloaded BIT(1) NOT NULL DEFAULT 0 COMMENT "是否下载", 
        file_path VARCHAR(500) NOT NULL DEFAULT "unknown" COMMENT "如果已下载,文件位置", 
        created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 
        PRIMARY KEY (no));'''
        command_movie = command_movie.replace('\n', '')
        command_movie = command_movie.replace('    ', '')
        command_torrent = '''
        CREATE TABLE IF NOT EXISTS torrent ( 
        no BIGINT UNSIGNED AUTO_INCREMENT, 
        hash_value VARCHAR(200) NOT NULL DEFAULT "unknown" COMMENT "hash值", 
        movie_no BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT "电影编号", 
        name VARCHAR(400) NOT NULL DEFAULT "unknown" COMMENT "名称", 
        file_num INT UNSIGNED NOT NULL DEFAULT 0 COMMENT "包含文件数量", 
        size INT UNSIGNED NOT NULL DEFAULT 0 COMMENT "文件大小(MB)", 
        high_definition BIT(1) NOT NULL DEFAULT 0 COMMENT "是否高清", 
        record_date DATE NOT NULL DEFAULT "1000-01-01" COMMENT "网站收录日期", 
        PRIMARY KEY (no));'''
        command_torrent = command_torrent.replace('\n', '')
        command_torrent = command_torrent.replace('    ', '')
        try:
            m_cur.execute("CREATE DATABASE IF NOT EXISTS java;")
            m_cur.execute("USE java;")
            m_cur.execute(command_movie)
            m_cur.execute(command_torrent)
            conn.commit()
        except:
            conn.rollback()
            raise Exception('java_extensions.database_api_java failed')

    def save_movie(self, movie_dict: dict):
        """存入一个movie(以字典形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
        工作流程:向mysql的movie表中存入数据;向redis的tag_movie存入数据;向redis的actor_movie存入数据。
        :param movie_dict: 包含movie数据的字典。
        :return movie_no: 电影的编号"""
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        if len(movie_dict) == 0:
            message = 'no movie data submitted'
            log.info(message)
            return
        command_movie = '''INSERT INTO movie (code,full_name,release_date,duration,director,producer,publisher,series,url,downloaded,file_path) VALUES '''
        code, full_name, release_date, duration, director, producer, publisher, series, url, downloaded, file_path = 'unknown', 'unknown', "1000-01-01", 0, 'unknown', 'unknown', 'unknown', 'unknown', 'unknown', 0, 'unknown'
        tags, actors = set(['no']), set(['unknown'])
        if 'code' in movie_dict.keys():
            code = mysql.str_into_mysql(movie_dict['code'])
        if 'full_name' in movie_dict.keys():
            full_name = mysql.str_into_mysql(movie_dict['full_name'])
        if 'release_date' in movie_dict.keys():
            release_date = movie_dict['release_date']
            if release_date == '0000-00-00':
                release_date = '1000-01-01'
        if 'duration' in movie_dict.keys():
            duration = movie_dict['duration']
        if 'director' in movie_dict.keys():
            director = mysql.str_into_mysql(movie_dict['director'])
        if 'producer' in movie_dict.keys():
            producer = mysql.str_into_mysql(movie_dict['producer'])
        if 'publisher' in movie_dict.keys():
            publisher = mysql.str_into_mysql(movie_dict['publisher'])
        if 'series' in movie_dict.keys():
            series = mysql.str_into_mysql(movie_dict['series'])
        if 'url' in movie_dict.keys():
            url = mysql.str_into_mysql(movie_dict['url'])
        if 'downloaded' in movie_dict.keys():
            downloaded = movie_dict['downloaded']
        if 'file_path' in movie_dict.keys():
            file_path = mysql.str_into_mysql(movie_dict['file_path'])
        if 'tags' in movie_dict.keys():
            tags = movie_dict['tags']
            if tags == None:
                tags = set(['no'])
        if 'actors' in movie_dict.keys():
            actors = movie_dict['actors']
            if actors == None:
                actors = set(['unknown'])
        command_movie += '("{}","{}","{}",{},"{}","{}","{}","{}","{}",{},"{}"),'.format(
            code, full_name, release_date, duration, director, producer,
            publisher, series, url, downloaded, file_path)
        command_movie = command_movie[:-1] + ';'
        try:
            m_cur.execute(command_movie)
            conn.commit()
            m_cur.execute(
                'SELECT MAX(no) FROM movie WHERE code="{}" AND full_name="{}";'
                .format(code, full_name))
            movie_no = str(m_cur.fetchall()[0][0])
            pipe = r_cur.pipeline()
            for tag in tags:
                key = 'java:tag:{}'.format(tag)
                value = movie_no
                pipe.sadd(key, value)
            for actor in actors:
                key = 'java:actor:{}'.format(actor)
                value = movie_no
                pipe.sadd(key, value)
            pipe.execute()
            message = 'saved movie successfully, code:full_name={}:{}'.format(
                code, full_name)
            log.info(message)
            return movie_no
        except:
            conn.rollback()
            log.info('mysql错误命令为:' + command_movie)
            raise Exception('java_extensions.save_movie failed')

    def save_torrent(self, torrent_dict: dict):
        """存入一个torrent(以字典形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
        工作流程:向mysql的torrent表中存入数据;向redis的torrent_files存入数据。
        :param torrent_dict: 包含torrent数据的字典。"""
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        if len(torrent_dict) == 0:
            message = 'no torrent data submitted'
            log.info(message)
            return
        command_torrent = '''INSERT INTO torrent (hash_value,movie_no,name,file_num,size,high_definition,record_date) VALUES '''
        hash_value, movie_no, name, file_num, size, high_definition, record_date = 'unknown', 0, 'unknown', 0, 0, 0, "1000-01-01"
        torrent_files = set()
        if 'hash_value' in torrent_dict.keys():
            hash_value = mysql.str_into_mysql(torrent_dict['hash_value'])
        if 'movie_no' in torrent_dict.keys():
            movie_no = int(torrent_dict['movie_no'])
        if 'name' in torrent_dict.keys():
            name = mysql.str_into_mysql(torrent_dict['name'])
        if 'file_num' in torrent_dict.keys():
            file_num = torrent_dict['file_num']
        if 'size' in torrent_dict.keys():
            size = torrent_dict['size']
            if size == '':
                size = 0
        if 'high_definition' in torrent_dict.keys():
            high_definition = torrent_dict['high_definition']
        if 'record_date' in torrent_dict.keys():
            record_date = torrent_dict['record_date']
            if record_date == '0000-00-00':
                record_date = '1000-01-01'
        if 'torrent_files' in torrent_dict.keys():
            torrent_files = torrent_dict['torrent_files']
        command_torrent += '("{}",{},"{}",{},{},{},"{}"),'.format(
            hash_value, movie_no, name, file_num, size, high_definition,
            record_date)
        command_torrent = command_torrent[:-1] + ';'
        try:
            m_cur.execute(command_torrent)
            conn.commit()
            m_cur.execute(
                'SELECT MIN(no) FROM torrent WHERE hash_value="{}";'.format(
                    hash_value))
            torrent_no = str(m_cur.fetchall()[0][0])
            key = 'java:torrent:{}'.format(torrent_no)
            pipe = r_cur.pipeline()
            for torrent_file in torrent_files:
                value = torrent_file
                pipe.sadd(key, value)
            pipe.execute()
            message = 'saved torrent successfully, hash_value={}'.format(
                hash_value)
            log.info(message)
        except:
            conn.rollback()
            log.info('mysql错误命令为:' + command_torrent)
            raise Exception('java_extensions.save_torrent failed')

    def update_movie(self, movie_dict: dict):
        """更新一个movie(以字典形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
        工作流程:向mysql的movie表中存入数据;向redis的tag_movie存入数据;向redis的actor_movie存入数据。
        必须提供的数据有:no,即电影编号。未提供的部分不更新。
        :param movie_dict: 包含movie数据的字典。"""
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        if len(movie_dict) <= 1:
            message = 'java_extensions.update_movie failed: movie_dict must include at least 2 items'
            log.warning(message)
            return
        try:
            no = movie_dict['no']
        except:
            message = 'java_extensions.update_movie failed: no not offered'
            log.warning(message)
            return
        m_cur.execute('USE java;')
        command_movie = 'SELECT no FROM movie WHERE no={};'.format(no)
        m_cur.execute(command_movie)
        if len(m_cur.fetchall()) == 0:
            message = 'movie no.{} not exists'.format(no)
            log.warning(message)
            return
        command_movie = '''UPDATE movie SET '''
        if 'code' in movie_dict.keys():
            code = mysql.str_into_mysql(movie_dict['code'])
            command_movie += 'code="{}",'.format(code)
        if 'full_name' in movie_dict.keys():
            full_name = mysql.str_into_mysql(movie_dict['full_name'])
            command_movie += 'full_name="{}",'.format(full_name)
        if 'release_date' in movie_dict.keys():
            release_date = movie_dict['release_date']
            command_movie += 'release_date="{}",'.format(release_date)
        if 'duration' in movie_dict.keys():
            duration = movie_dict['duration']
            command_movie += 'duration={},'.format(duration)
        if 'director' in movie_dict.keys():
            director = mysql.str_into_mysql(movie_dict['director'])
            command_movie += 'director="{}",'.format(director)
        if 'producer' in movie_dict.keys():
            producer = mysql.str_into_mysql(movie_dict['producer'])
            command_movie += 'producer="{}",'.format(producer)
        if 'publisher' in movie_dict.keys():
            publisher = mysql.str_into_mysql(movie_dict['publisher'])
            command_movie += 'publisher="{}",'.format(publisher)
        if 'series' in movie_dict.keys():
            series = mysql.str_into_mysql(movie_dict['series'])
            command_movie += 'series="{}",'.format(series)
        if 'url' in movie_dict.keys():
            url = mysql.str_into_mysql(movie_dict['url'])
            command_movie += 'url="{}",'.format(url)
        if 'downloaded' in movie_dict.keys():
            downloaded = movie_dict['downloaded']
            command_movie += 'downloaded={},'.format(downloaded)
        if 'file_path' in movie_dict.keys():
            file_path = mysql.str_into_mysql(movie_dict['file_path'])
            command_movie += 'file_path="{}",'.format(file_path)
        command_movie = command_movie[:-1] + ' WHERE no={};'.format(no)
        try:
            if command_movie.count('=') > 1:
                m_cur.execute(command_movie)
                conn.commit()
            pipe = r_cur.pipeline()
            if 'tags' in movie_dict.keys() and movie_dict['tags'] != None:
                pipe.srem('java:tag:no', str(no))
                for tag in movie_dict['tags']:
                    key = 'java:tag:{}'.format(tag)
                    value = str(no)
                    pipe.sadd(key, value)
            if 'actors' in movie_dict.keys() and movie_dict['actors'] != None:
                pipe.srem('java:actor:unknown', str(no))
                for actor in movie_dict['actors']:
                    key = 'java:actor:{}'.format(actor)
                    value = str(no)
                    pipe.sadd(key, value)
            pipe.execute()
            message = 'updated movie successfully, no={}'.format(no)
            log.info(message)
        except:
            conn.rollback()
            log.info('mysql错误命令为:' + command_movie)
            raise Exception('java_extensions.update_movie failed')

    def update_torrent(self, torrent_dict: dict):
        """更新一个torrent(以字典形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
        工作流程:向mysql的torrent表中存入数据;向redis的torrent_files存入数据。
        必须提供的数据有:no和hash_value,即torrent编号及hash值。未提供的部分不更新。
        :param torrent_dict: 包含torrent数据的字典。"""
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        if len(torrent_dict) <= 2:
            message = 'java_extensions.update_torrent failed: torrent_dict must include at least 3 items'
            log.warning(message)
            return
        try:
            no, hash_value = torrent_dict['no'], torrent_dict['hash_value']
        except:
            message = 'java_extensions.update_torrent failed: no or hash_value not offered'
            log.warning(message)
            return
        m_cur.execute('USE java;')
        command_torrent = 'SELECT hash_value FROM torrent WHERE no={};'.format(
            no)
        m_cur.execute(command_torrent)
        if len(m_cur.fetchall()) == 0:
            message = 'torrent no.{} not exists'.format(no)
            log.warning(message)
            return
        if hash_value != m_cur.fetchall()[0][0]:
            message = 'torrent no.{} hash_value not match: {} versus {}'.format(
                no, hash_value,
                m_cur.fetchall()[0][0])
            log.warning(message)
            return
        command_torrent = '''UPDATE torrent SET '''
        if 'movie_no' in torrent_dict.keys():
            movie_no = torrent_dict['movie_no']
            command_torrent += 'movie_no={},'.format(movie_no)
        if 'name' in torrent_dict.keys():
            name = mysql.str_into_mysql(torrent_dict['name'])
            command_torrent += 'name="{}",'.format(name)
        if 'file_num' in torrent_dict.keys():
            file_num = torrent_dict['file_num']
            command_torrent += 'file_num={},'.format(file_num)
        if 'size' in torrent_dict.keys():
            size = torrent_dict['size']
            command_torrent += 'size={},'.format(size)
        if 'high_definition' in torrent_dict.keys():
            high_definition = torrent_dict['high_definition']
            command_torrent += 'high_definition={},'.format(high_definition)
        if 'record_date' in torrent_dict.keys():
            record_date = torrent_dict['record_date']
            command_torrent += 'record_date="{}",'.format(record_date)
        command_torrent = command_torrent[:-1] + ' WHERE no={};'.format(no)
        try:
            if command_torrent.count('=') > 1:
                m_cur.execute(command_torrent)
                conn.commit()
            pipe = r_cur.pipeline()
            key = 'java:torrent:{}'.format(no)
            if 'torrent_files' in torrent_dict.keys():
                for torrent_file in torrent_dict['torrent_files']:
                    value = torrent_file
                    pipe.sadd(key, value)
            pipe.execute()
            message = 'updated torrent successfully, no={}'.format(no)
            log.info(message)
        except:
            conn.rollback()
            log.info('mysql错误命令为:' + command_torrent)
            raise Exception('java_extensions.update_torrent failed')

    def check_repeat(self):
        """检查mysql的movie、torrent表里有没有重复的,只检查 (code和full_name) 、 (hash_value) 相同的。如果有重复的,保留no最小的一条记录。
        检查redis的 java:actor: 、 java:tag: 、 java:torrent: ,是否对应的编号在mysql中还存在。"""
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        m_cur.execute('USE java;')
        #检查movie表
        m_cur.execute(
            'SELECT code,full_name FROM movie GROUP BY code,full_name HAVING COUNT(*)>1;'
        )
        repeated_items = m_cur.fetchall()
        if len(repeated_items) == 0:
            message = 'no repeated code:full_name in movie'
            log.info(message)
        else:
            message = 'found repeated code:full_name {} kinds'.format(
                len(repeated_items))
            log.info(message)
            command_movie = '''
            DELETE FROM movie WHERE no IN (SELECT no FROM
            (SELECT no FROM movie WHERE 
            (code,full_name) IN (SELECT code,full_name FROM movie GROUP BY code,full_name HAVING COUNT(*)>1) 
            AND no NOT IN (SELECT MIN(no) FROM movie GROUP BY code,full_name HAVING COUNT(*)>1)) AS a);'''
            command_movie = command_movie.replace('\n', '')
            command_movie = command_movie.replace('    ', '')
            try:
                m_cur.execute(command_movie)
                conn.commit()
                message = 'repeated code:full_name deleted successfully. '
                log.info(message)
            except:
                conn.rollback()
                log.info('mysql错误命令为:' + command_movie)
                raise Exception(
                    'java_extensions.check_repeat failed: table movie check_repeat failed'
                )
        #检查torrent表
        m_cur.execute(
            'SELECT hash_value FROM torrent GROUP BY hash_value HAVING COUNT(*)>1;'
        )
        repeated_items = m_cur.fetchall()
        if len(repeated_items) == 0:
            message = 'no repeated hash_value in torrent'
            log.info(message)
        else:
            message = 'found repeated hash_value {} kinds'.format(
                len(repeated_items))
            log.info(message)
            command_torrent = '''
            DELETE FROM torrent WHERE no IN (SELECT no FROM
            (SELECT no FROM torrent WHERE 
            (hash_value) IN (SELECT hash_value FROM torrent GROUP BY hash_value HAVING COUNT(*)>1) 
            AND no NOT IN (SELECT MIN(no) FROM torrent GROUP BY hash_value HAVING COUNT(*)>1)) AS a);'''
            command_torrent = command_torrent.replace('\n', '')
            command_torrent = command_torrent.replace('    ', '')
            try:
                m_cur.execute(command_torrent)
                conn.commit()
                command_torrent = 'DELETE FROM torrent WHERE hash_value="unknown";'
                m_cur.execute(command_torrent)
                conn.commit()
                message = 'repeated and unknown hash_value deleted successfully. '
                log.info(message)
            except:
                conn.rollback()
                log.info('mysql错误命令为:' + command_torrent)
                raise Exception(
                    'java_extensions.check_repeat failed: table torrent check_repeat failed'
                )
        try:
            pipe = r_cur.pipeline()
            #检查 java:actor:名字
            actor_keys = r_cur.keys('java:actor:*')
            for actor_key in actor_keys:
                movie_nos = r_cur.smembers(actor_key)
                for movie_no in movie_nos:
                    m_cur.execute(
                        'SELECT no FROM movie WHERE no={};'.format(movie_no))
                    if len(m_cur.fetchall()) == 0:
                        pipe.srem(actor_key, movie_no)
            pipe.execute()
            #检查 java:tag:标签名
            tag_keys = r_cur.keys('java:tag:*')
            for tag_key in tag_keys:
                movie_nos = r_cur.smembers(tag_key)
                for movie_no in movie_nos:
                    m_cur.execute(
                        'SELECT no FROM movie WHERE no={};'.format(movie_no))
                    if len(m_cur.fetchall()) == 0:
                        pipe.srem(tag_key, movie_no)
            pipe.execute()
            #检查 java:torrent:编号
            torrent_keys = r_cur.keys('java:torrent:*')
            for torrent_key in torrent_keys:
                torrent_no = torrent_key.replace('java:torrent:', '')
                m_cur.execute(
                    'SELECT no FROM torrent WHERE no={};'.format(torrent_no))
                if len(m_cur.fetchall()) == 0:
                    pipe.delete(torrent_key)
            pipe.execute()
        except:
            raise Exception(
                'java_extensions.check_repeat failed: redis check_repeat failed'
            )

    def get_to_be_updated_torrents(self) -> list:
        '''对于未知name、file_num、文件名的torrent,取出它们。
        :return [(no1,hash_value1), (no2,hash_value2), ...]'''
        m_cur, log = self.m_cur, self.log
        m_cur.execute('USE java;')
        command_torrent = '''SELECT no,hash_value FROM torrent WHERE name="unknown" UNION 
        SELECT no,hash_value FROM torrent WHERE file_num=0 UNION 
        SELECT no,hash_value FROM torrent WHERE size=0;'''
        command_torrent = command_torrent.replace('\n', '')
        command_torrent = command_torrent.replace('    ', '')
        try:
            m_cur.execute(command_torrent)
            message = 'to_be_updated_torrents selected. '
            log.info(message)
            result = m_cur.fetchall()
        except:
            log.info('mysql错误命令为:' + command_torrent)
            raise Exception(
                'java_extensions.get_to_be_updated_torrents failed: sql syntax wrong'
            )
        return list(result)

    def torrent_selector(self,
                         actors: list = None,
                         tags: list = None,
                         check_repeat=True,
                         limit: int = 0):
        '''对电影、磁链进行筛选,每部电影筛选出一个最合适的磁链。结果保存在torrent_selector.txt中。
        :param actors: 限定演员。多个演员时表示或(OR),只要有一个演员出现即可。演员名为模糊搜索,只要包含即可
        :param tags: 限定tag。多个tag时表示或(OR),只要有一个标签出现即可
        :param check_repeat: 是否去掉已经下载过的磁链。需要指定一个文件夹,该文件夹存放下载过的torrent文件。文件夹用TORRENT_DIR指定。
        :param limit: 限定条数,0表示不限条数
        演员和tag同时限定时,二者并不相关,就是说某部电影包含需要的演员、但不包含指定的tag(或者包含tag但不含演员),则该电影还是能被选上。'''
        conn, m_cur, r_cur, log = database_api_java.conn, self.m_cur, self.r_cur, self.log
        downloaded_hash = set()
        if check_repeat:
            for dirpath, dirnames, filenames in os.walk(TORRENT_DIR):
                for filename in filenames:
                    abs_file_path = os.path.join(dirpath, filename)
                    hash_value = torrent(abs_file_path).hash_value
                    downloaded_hash.add(hash_value)
                    print(f'\rdownloaded_hash added:{len(downloaded_hash)}',
                          end='')
        m_cur.execute('USE java;')
        command_movie = 'SELECT no,release_date FROM movie WHERE downloaded=0 AND file_path="unknown"'
        movie_no_set = set()
        if actors != None:
            for actor in actors:
                actor_keys = r_cur.keys(f'java:actor:*{actor}*')
                for actor_key in actor_keys:
                    movie_no = r_cur.smembers(actor_key)
                    movie_no_set = movie_no_set.union(movie_no)
        if tags != None:
            for tag in tags:
                movie_no = r_cur.smembers(f'java:tag:{tag}')
                movie_no_set = movie_no_set.union(movie_no)
        if len(movie_no_set) != 0:
            movie_no_tuple = tuple(int(i) for i in movie_no_set)
            command_movie += ' AND no IN {}'.format(movie_no_tuple)
        if limit != 0:
            command_movie += f' LIMIT {limit}'
        command_movie += ';'
        m_cur.execute(command_movie)
        allowed_files = set()
        #allowed_files这个目录放的是一些文件,根据以往的经验,种子里带这些文件时电影的品质更好
        for dirpath, dirnames, filenames in os.walk(
                os.path.join(os.getcwd(), 'allowed_files')):
            allowed_files.add(filenames)
        for movie_no, release_date in m_cur.fetchall():
            command_torrent = 'SELECT * FROM torrent WHERE movie_no={};'.format(
                movie_no)
            m_cur.execute(command_torrent)
            torrent_tuples_list = list(m_cur.fetchall())
            torrent_tuples_list.sort(key=lambda x: x[7])
            #先根据record_date排序一次
            torrent_score_list = []
            for torrent_tuple in torrent_tuples_list:
                no, hash_value, movie_no, name, file_num, size, high_definition, record_date = torrent_tuple
                torrent_files = r_cur.smembers('java:torrent:{}'.format(no))
                score = 0
                #根据name对torrent进行打分,这也是最主要的打分手段
                p_thz = re.compile('thz', flags=re.I)
                p_hnd = re.compile('hnd', flags=re.I)
                p_C = re.compile('CH?$', flags=re.I)
                p_HD = re.compile('(HD$)|(^HD)', flags=re.I)
                p_2048 = re.compile('^hjd-2048', flags=re.I)
                p_zip = re.compile('\.zip$', flags=re.I)
                p_mp4 = re.compile('[^C ]\.mp4$', flags=re.I)
                if re.search(p_thz, name) != None or re.search(
                        p_hnd, name) != None or ("原版首发" in name) or re.search(
                            p_mp4, name) != None:
                    score += 50
                if re.search(p_C, name) != None or re.search(
                        p_2048, name) != None or re.search(
                            p_zip, name) != None or re.search(
                                p_HD, name) != None or name.startswith(
                                    '-') or name.startswith(
                                        '@') or name.startswith('_'):
                    score -= 50
                    #对中文字幕的要求不是很强烈(而且中文字幕的经常带广告),所以这里是扣分项
                letters = re.findall('[a-zA-Z]+', name)
                all_upper_letter = True
                for letter in letters:
                    if letter == 'mp' or letter.isupper():
                        continue
                    else:
                        all_upper_letter = False
                        break
                if all_upper_letter == True:
                    score += 30
                #file_num
                if file_num == 1:
                    score += 100
                #high_definition
                if high_definition == 1:
                    score += 50
                #record_date
                #如果电影早于2018年,这时的磁链以能下载为优先,时间近的分数更高;晚于2018年的电影则时间越早分数越高,因为晚出来的磁链经常被人处理过
                #这里没有分的太细,比如有些磁链的日期相同,却没有给相同的分数
                if release_date > datetime.date(2018, 1, 1):
                    score += 100 / len(torrent_tuples_list) * (
                        len(torrent_tuples_list) - 1 -
                        torrent_tuples_list.index(torrent_tuple))
                else:
                    score += 100 / len(torrent_tuples_list) * (
                        1 + torrent_tuples_list.index(torrent_tuple))
                #torrent_files
                for torrent_file in torrent_files:
                    if torrent_file in allowed_files or 'hnd' in torrent_file:
                        score += 100
                        break
                torrent_score_list.append((score, name, hash_value))
            torrent_score_list.sort(key=lambda x: x[0])
            if len(torrent_score_list) != 0:
                score, name, hash_value = torrent_score_list[-1]
                hash_value = hash_value.upper()
                if check_repeat:
                    if hash_value not in downloaded_hash:
                        with open(os.path.join(os.getcwd(),
                                               'torrent_selector.txt'),
                                  'a',
                                  encoding='utf-8') as f:
                            f.write(
                                f'magnet:?xt=urn:btih:{hash_value}&dn={name}' +
                                '\n')
                    else:
                        message = f'torrent file downloaded, hash_value={hash_value}'
                        log.info(message)
                else:
                    with open(os.path.join(os.getcwd(),
                                           'torrent_selector.txt'),
                              'a',
                              encoding='utf-8') as f:
                        f.write(f'magnet:?xt=urn:btih:{hash_value}&dn={name}' +
                                '\n')
            else:
                message = f'cannot find torrent for movie_no={movie_no}'
                log.warning(message)


if __name__ == '__main__':
    actors = ['1', '2', '3']
    tags = []
    database_api_java().torrent_selector(actors=actors, tags=tags, limit=0)

本来还有一个 btsow_extensions.py ,它的作用是查询一些磁链搜索网站,这些网站能根据hash值查询磁链的信息,但是这些网站的反爬比较严格,这个文件和对应的爬虫这里就不写了。因此,实际上 get_to_be_updated_torrents 这个方法没有用到,根据磁链内文件名对磁链打分的功能也是缺失的。

3.scrapy项目的修改

(1)items.py

# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy


class JavaspiderItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    movie_dict = scrapy.Field()
    torrent_dicts_list = scrapy.Field()


class TorrentDetailItem(scrapy.Item):
    torrent_dict = scrapy.Field()

(2)middlewares.py

# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html

from scrapy import signals

# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter


class JavaspiderSpiderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the spider middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_spider_input(self, response, spider):
        # Called for each response that goes through the spider
        # middleware and into the spider.

        # Should return None or raise an exception.
        return None

    def process_spider_output(self, response, result, spider):
        # Called with the results returned from the Spider, after
        # it has processed the response.

        # Must return an iterable of Request, or item objects.
        for i in result:
            yield i

    def process_spider_exception(self, response, exception, spider):
        # Called when a spider or process_spider_input() method
        # (from other spider middleware) raises an exception.

        # Should return either None or an iterable of Request or item objects.
        pass

    def process_start_requests(self, start_requests, spider):
        # Called with the start requests of the spider, and works
        # similarly to the process_spider_output() method, except
        # that it doesn’t have a response associated.

        # Must return only requests (not items).
        for r in start_requests:
            yield r

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)


import random


class JavaspiderDownloaderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the downloader middleware does not modify the
    # passed objects.
    UA_list = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:104.0) Gecko/20100101 Firefox/104.0'
    ]

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called
        request.headers['User-Agent'] = random.choice(self.UA_list)
        return None

    def process_response(self, request, response, spider):
        # Called with the response returned from the downloader.

        # Must either;
        # - return a Response object
        # - return a Request object
        # - or raise IgnoreRequest
        return response

    def process_exception(self, request, exception, spider):
        # Called when a download handler or a process_request()
        # (from other downloader middleware) raises an exception.

        # Must either:
        # - return None: continue processing this exception
        # - return a Response object: stops process_exception() chain
        # - return a Request object: stops process_exception() chain
        pass

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

(3)pipelines.py

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html

# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
from javaspider.libs.java_extensions import database_api_java


class JavaspiderPipeline:

    def open_spider(self, spider):
        self.database_api_java = database_api_java()

    def process_item(self, item, spider):
        movie_no = '0'
        if 'movie_dict' in item.keys():
            movie_no = self.database_api_java.save_movie(item['movie_dict'])
        if 'torrent_dicts_list' in item.keys():
            for torrent_dict in item['torrent_dicts_list']:
                if movie_no != None:
                    torrent_dict['movie_no'] = int(movie_no)
                    self.database_api_java.save_torrent(torrent_dict)
        if 'torrent_dict' in item.keys():
            #self.database_api_java.update_torrent(torrent_dict)
            pass
        return item

    def close_spider(self, spider):
        self.database_api_java.check_repeat()

(4)settings.py

# Scrapy settings for javaspider project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://docs.scrapy.org/en/latest/topics/settings.html
#     https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://docs.scrapy.org/en/latest/topics/spider-middleware.html

LOG_LEVEL = 'ERROR'

BOT_NAME = 'javaspider'

SPIDER_MODULES = ['javaspider.spiders']
NEWSPIDER_MODULE = 'javaspider.spiders'

# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = 'javaspider (+http://www.yourdomain.com)'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
#DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
#COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False

# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
#   'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
#   'Accept-Language': 'en',
#}

# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
#    'javaspider.middlewares.JavaspiderSpiderMiddleware': 543,
#}

# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
    'javaspider.middlewares.JavaspiderDownloaderMiddleware': 543,
}

# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
#    'scrapy.extensions.telnet.TelnetConsole': None,
#}

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    'javaspider.pipelines.JavaspiderPipeline': 300,
}

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

(5)爬虫

爬虫文件名为 java.py ,在之前已经创建过了,内容为:

import scrapy, re, random, os, time
import urllib.parse
from javaspider.items import JavaspiderItem
from javaspider.libs import java_extensions as j_e
from MyPythonLib import interaction

error_html_file_path = os.path.join(os.getcwd(), 'wrong_response_java.html')
#这个html文件用于存储解析出错的页面,便于查找问题
RECURSIVE=True
#如果演员主页都是第一次爬,那么就打开递归爬取的功能,自动爬取下一页;对于以前曾经爬过的演员,并且间隔时间不久(3个月之内),则只爬取第一页就行了,把这个值设为False。


class JavaSpider(scrapy.Spider):
    name = 'java'
    start_urls = j_e.get_actor_main_pages()
    custom_settings = custom_settings = {
        'CONCURRENT_REQUESTS': 16,
        'DOWNLOAD_DELAY': 2,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 1
    }#控制爬取速度

    origin_url = j_e.ORIGIN_URL_JAVA
    temp_url = j_e.TEMP_URL_JAVA
    start_time = time.time()
    total = len(start_urls)
    progress = 0
    msg = ''

    def parse(self, response):
        try:
            movie_nodes = response.xpath(
                '//*[@id="waterfall"]/div/a[@class="movie-box" and @href]')
            for movie_node in movie_nodes:
                detail_url = movie_node.xpath('./@href').get()
                if isinstance(detail_url, str):
                    detail_url = urllib.parse.urljoin(JavaSpider.temp_url,
                                                      detail_url.strip())
                movie_dict = {}
                code = movie_node.xpath('.//date[1]/text()').get()
                if isinstance(code, str):
                    movie_dict['code'] = code.strip()
                full_name = movie_node.xpath('.//span/text()[1]').get()
                if isinstance(full_name, str):
                    movie_dict['full_name'] = full_name.strip()
                release_date = movie_node.xpath('.//date[2]/text()').get()
                if isinstance(release_date, str):
                    movie_dict['release_date'] = release_date.strip()
                movie_dict['url'] = detail_url.replace(JavaSpider.temp_url,
                                                       JavaSpider.origin_url)
                item = JavaspiderItem()
                item['movie_dict'] = movie_dict
                request = scrapy.Request(url=detail_url,
                                         callback=self.parse_movie)
                request.cb_kwargs['item'] = item
                yield request
            next_url = response.xpath(
                '//*[@id="next" and text()="下一頁"]/@href').get()
            if RECURSIVE and isinstance(next_url, str):
                next_url = urllib.parse.urljoin(JavaSpider.temp_url,
                                                next_url.strip())
                yield scrapy.Request(url=next_url, callback=self.parse)
            if response.url in JavaSpider.start_urls:
                JavaSpider.progress = 1 + JavaSpider.start_urls.index(
                    response.url)
                JavaSpider.msg = response.url.replace(
                    JavaSpider.temp_url, '')
        except:
            with open(error_html_file_path, 'a', encoding='utf-8') as f:
                f.write(response.text)
            message = 'JavaSpider.parse failed'
            raise Exception(message)

    def parse_movie(self, response, item):
        try:
            movie_dict = item['movie_dict']
            info_node = response.xpath('/html/body/div[5]/div[1]/div[2]')
            duration = info_node.xpath(
                './p/span[text()="長度:"]/../text()').get()
            if isinstance(duration, str):
                duration = re.search('\d+', duration.strip()).group()
                movie_dict['duration'] = duration
            director = info_node.xpath(
                './p/span[text()="導演:"]/../a/text()').get()
            if isinstance(director, str):
                movie_dict['director'] = director.strip()
            producer = info_node.xpath(
                './p/span[text()="製作商:"]/../a/text()').get()
            if isinstance(producer, str):
                movie_dict['producer'] = producer.strip()
            publisher = info_node.xpath(
                './p/span[text()="發行商:"]/../a/text()').get()
            if isinstance(publisher, str):
                movie_dict['publisher'] = publisher.strip()
            series = info_node.xpath(
                './p/span[text()="系列:"]/../a/text()').get()
            if isinstance(series, str):
                movie_dict['series'] = series.strip()
            movie_dict['tags'] = info_node.xpath(
                './p[text()="類別:"]/following-sibling::p[1]/span//a/text()'
            ).getall()
            if movie_dict['tags'] == None or len(movie_dict['tags']) == 0:
                movie_dict['tags'] = set(['no'])
            movie_dict['actors'] = info_node.xpath(
                './p/span[text()="演員"]/../following-sibling::p[1]/span/a/text()'
            ).getall()
            if movie_dict['actors'] == None or len(movie_dict['actors']) == 0:
                movie_dict['actors'] = set(['unknown'])

            js = response.xpath(r'/html/body/script[3]/text()').get().strip()
            gid = re.search(r'\d+', re.search(r'gid.+;', js).group()).group()
            uc = re.search(r'\d+', re.search(r'uc.+;', js).group()).group()
            img = re.search(r"""['|"].+['|"]""",
                            re.search(r'img.+;', js).group()).group()[1:-1]

            torrents_url = urllib.parse.urljoin(JavaSpider.temp_url,
                                                'ajax/uncledatoolsbyajax.php')
            torrents_url = torrents_url + '?gid=' + gid + '&lang=zh&img=' + img + '&uc=' + uc + '&floor=' + str(
                random.randint(10, 999))
            request = scrapy.Request(url=torrents_url,
                                     callback=self.parse_torrents)
            interaction.print_progress(self.start_time, self.progress,
                                       self.total, self.msg)
            request.cb_kwargs['item'] = item
            yield request
        except:
            with open(error_html_file_path, 'a', encoding='utf-8') as f:
                f.write(response.text)
            message = 'JavaSpider.parse_movie failed'
            raise Exception(message)

    def parse_torrents(self, response, item):
        try:
            torrent_dicts_list = []
            torrent_nodes = response.xpath('//tr')
            for torrent_node in torrent_nodes:
                torrent_dict = {}
                hash_value = torrent_node.xpath('./td[1]/a/@href').get()
                if isinstance(hash_value, str):
                    hash_value = hash_value.strip().replace(
                        r'magnet:?xt=urn:btih:', '')
                    torrent_dict['hash_value'] = hash_value.split('&')[0]
                name = torrent_node.xpath('./td[1]/a/text()').get()
                if isinstance(name, str):
                    torrent_dict['name'] = name.strip()
                size = torrent_node.xpath('./td[2]/a/text()').get()
                #该网站的磁链size时有错误,常见的有:无单位,有2个小数点等
                if isinstance(size, str):
                    size = size.strip()
                    p_gb = re.compile(r'gb$', flags=re.I)
                    p_mb = re.compile(r'mb$', flags=re.I)
                    p_tb = re.compile(r'tb$', flags=re.I)
                    p_kb = re.compile(r'kb$', flags=re.I)
                    if re.search(p_gb, size) != None:
                        size = int(1024 *
                                   float(re.search(r'[\d|\.]+', size).group()))
                    elif re.search(p_mb, size) != None:
                        size = int(float(re.search(r'[\d|\.]+', size).group()))
                    elif re.search(p_tb, size) != None:
                        size = int(1024 * 1024 *
                                   float(re.search(r'[\d|\.]+', size).group()))
                    elif re.search(p_kb, size) != None:
                        size = int(
                            float(re.search(r'[\d|\.]+', size).group()) / 1024)
                    elif size == '':
                        message = 'JavaSpider.parse_torrents warning: get size from empty string "{}"'.format(
                            size)
                        print(message)
                        size = '0'
                    else:
                        message = 'JavaSpider.parse_torrents warning: size no unit "{}"'.format(
                            size)
                        print(message)
                        try:
                            size = int(
                                1024 *
                                float(re.search(r'[\d|\.]+', size).group()))
                        except:
                            message = 'JavaSpider.parse_torrents warning: size wrong "{}"'.format(
                                size)
                            print(message)
                            size = '0'
                    torrent_dict['size'] = size
                record_date = torrent_node.xpath('./td[3]/a/text()').get()
                if isinstance(record_date, str):
                    torrent_dict['record_date'] = record_date.strip()
                high_definition = torrent_node.xpath('.//a[text()="高清"]')
                torrent_dict['high_definition'] = (
                    0, 1)[len(high_definition) != 0]
                torrent_dicts_list.append(torrent_dict)

            item['torrent_dicts_list'] = torrent_dicts_list
            yield item
        except:
            with open(error_html_file_path, 'a', encoding='utf-8') as f:
                f.write(response.text)
            message = 'JavaSpider.parse_torrents failed'
            raise Exception(message)

4.使用说明

首先用chrome或其他浏览器访问该电影网站,挑选自己偏好的演员,演员的主页保存到一个书签,比如说可以每50个演员主页放到一个书签文件夹中。要爬取时,进入书签管理器,用 ctrl+A 选中并复制一个书签文件夹内的所有演员主页,把这些网址复制到 D:\spiders\scrapy\javaspider\javaspider\actor_main_pages.txt 里,每行一个url。然后考虑一下,如果这些演员在3个月之内爬过,就到爬虫文件 java.py 里把 RECURSIVE = True 改成 False ,然后检查一下mysql和redis是否打开,最后在cmd中如下运行爬虫:

D:
cd D:\spiders\scrapy\javaspider\javaspider
scrapy crawl java

应该能看到一个图形化的进度显示条。等待爬取完成,按照笔者的经验,使用前文中限制爬虫的配置:

    custom_settings = custom_settings = {
        'CONCURRENT_REQUESTS': 16,
        'DOWNLOAD_DELAY': 2,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 1
    }

这样的话,爬取50个演员主页,大概需要4小时。

爬取完成后,登录mysql和redis检查一下数据。对于redis-cli,记得添加 --raw 参数来显示utf-8字符,比如中文、日文字符。

最后,在 java_extensions.py 的最后,有调用 torrent_selector 方法,调整一下演员、tag并运行该文件,会生成 D:\spiders\scrapy\javaspider\torrent_selector.txt 文件,里面是为每一部电影筛选出的磁链,每个磁链占一行,可以非常方便地复制到qbittorrent或aria2等客户端内进行下载(为了便捷性,不推荐使用迅雷下载)。

至此,本文中完成了电影信息爬取(根据演员主页),磁链爬取,磁链筛选与下载。下一篇文章中将介绍对本地电影进行入库,和对电影进行一定压缩,以节约空间。

标签: python, scrapy

已有 5 条评论

  1. 一位不愿意透露姓名的王先生

    兄弟,把树莓派卖了吧,上国产的派,配置高很多,

    1. 现在树莓派很久没开机了,1300买了一台二手、显示屏有问题的笔记本玩,性能强多了,缺点是不能换sd卡来换系统

      1. 一位不愿意透露姓名的王先生

        那就出了回血,你这弄了个笔记本玩,好奇你说的性能指的是什么,

        1. 比如说树莓派4b内存只有8G,笔记本可以自己加内存,(redis是运行在内存里的);还有CPU主频高,本文中视频软压缩比较费CPU,我可以用笔记本做为中心,比如负责爬虫任务分发,同时压缩视频;笔记本带电池,不怕断电

  2. 一位不愿意透露姓名的王先生

    原来是内存呀,为什么要搞笔记本,工控主机,加内存到16,32都可以,你这个价钱,直接又可以买一个理财产品了Rock-5B了,玩腻了,再出掉

添加新评论