步骤/目录:
1.运行mysql数据库与准备工作
2.自建库
3.创建、修改、运行scrapy项目
    (1)其他配置
    (2)爬虫配置
        a.a2u.py
        b.clarketm.py
        c.cool_proxy.py
        d.free_proxy_list.py
        e.ip3366.py
        f.ipaddress.py
        g.jiangxianli.py
        h.kuaidaili.py
        i.pubproxy.py
        j.rmccurdy.py
        k.sunny9577.py
        l.TheSpeedX.py
    (3)运行爬虫
4.验证ip
5.日常使用

本文首发于个人博客https://lisper517.top/index.php/archives/48/,转载请注明出处。
本文的目的是使用 scrapy+mysql 搭建一个自己的代理ip池,即ipool。由于本文是对前文进行阶段性总结+拓展,所以内容较多,笔者希望本文单独拿出来也能成功跟着一步步操作。
本文写作日期为2022年9月11日。mysql数据库运行在树莓派上,其他部分运行在win10机器;使用的编辑器为VS Code。

爬虫写的好,牢房进的早。使用爬虫时不要占用服务器太多流量,否则可能惹祸上身。

笔者的能力有限,也并非CS从业人员,很多地方难免有纰漏或者不符合代码原则的地方,请在评论中指出。

1.运行mysql数据库与准备工作

笔者在树莓派上使用docker安装mysql,需要预先安装docker、docker-compose,在树莓派上安装docker、docker-compose可查看 Docker入门(一) 。也不一定非要在树莓派上运行mysql,或者也不一定要用docker,或者也可以用redis等其他数据库代替mysql(使用其他数据库时后续的py文件可能需要自行调整),反正最终的目的是运行一个数据库,可以参考 runoob教程 使用其他方法安装运行mysql数据库。

在树莓派上进行如下操作:

mkdir -p /docker/mysql/data/mysql
mkdir -p /docker/mysql/conf/mysql
mkdir -p /docker/mysql/backup/mysql
docker pull mysql:8.0.29
nano /docker/mysql/docker-compose.yml

在docker-compose.yml文件中写入如下内容:

version: "3.9"

services:
  mysql:
    image: mysql:8.0.29
    environment:
      MYSQL_ROOT_PASSWORD: mysqlpasswd
    ports:
      - "53306:3306"
    command:
      - mysqld
      - --character-set-server=utf8mb4
    volumes:
      - /docker/mysql/data/mysql:/var/lib/mysql
      - /docker/mysql/conf/mysql:/etc/mysql/conf.d
      - /docker/mysql/backup/mysql:/backup
    logging: 
      driver: syslog
    restart: always

记得把 MYSQL_ROOT_PASSWORD: mysqlpasswd 这里改成自己想设置的密码,把 53306 也可以改成自己想要的端口(不建议用默认的3306端口)。这里的 /docker/mysql/backup/mysql 目录是为了方便docker-mysql导出、备份数据。
然后运行:

cd /docker/mysql
docker-compose config
docker-compose up -d

接下来进容器服务里看看mysql是否正常运行:

docker exec -it mysql-mysql-1 bash #如果是老版本的docker-compose,把这一条命令中的所有 - 换成 _
mysql -uroot -p
#输入密码
SHOW DATABASES;

确认mysql正常运行即可。为了保证创建database和table的命令能记录下来,这些命令将写在python文件中。

然后在win10机器上准备一下环境。需要安装python3,安装scrapy和其他库,创建自建库文件夹,创建scrapy项目文件夹。
win10机器安装python3的过程这里就不讲了。安装好后,在cmd中运行 pip install Scrapy ,其他可能需要的库还有requests(处理网络请求)、bs4(对响应内容进行解析)、pymysql(连接mysql数据库)、lxml(网页解析器)、cryptography(连接mysql时密码加密)、selenium(操控浏览器),在cmd中使用 pip install requests bs4 pymysql lxml cryptography selenium 下载(这几个库在本文中不一定都会用到,但学习爬虫迟早会用),注意若pip下载失败就一直使用上述命令直到成功,或者bing搜索一下pip换源。最后创建文件夹 D:\PythonLib\MyPythonLibD:\spiders\scrapy ,或者你也可以自己选择其他路径。

2.自建库

D:\PythonLib\MyPythonLib 中新建一个空白的 __init__.py 文件,然后到python的安装目录里存放第三方库的文件夹(win10上一般是site-packages目录),比如 C:\Program Files\Python310\Lib\site-packages 中,新建一个 PythonLib.pth 文件,写入一行内容: D:\PythonLib 。接下来创建自建库文件,本次需要用到的自建库主要与log、mysql相关。

新建 D:\PythonLib\MyPythonLib\log.py ,写入:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging, time, os


def get_logger(abs_path: str,
               log_file_name: str = '',
               getLogger_name: str = ''):
    '''传入绝对路径和log文件名,返回log对象。
    
    :param abs_path: log文件存储的绝对路径
    :param log_file_name: log文件名,若log文件名为空,则使用默认的文件名
    :param getLogger_name: logger对象名,如果在同一个程序中需要同时写多个log文件,建议这个参数不要相同
    :return log对象'''
    try:
        formatter = logging.Formatter(
            '%(lineno)d : %(asctime)s : %(levelname)s : %(funcName)s : %(message)s'
        )

        if log_file_name == '':
            log_file_name = 'log#{}.txt'.format(time.strftime('-%Y-%m-%d'))
        fileHandler = logging.FileHandler(
            (os.path.join(abs_path, log_file_name)),
            mode='w',
            encoding='utf-8')
        fileHandler.setFormatter(formatter)

        if getLogger_name == '':
            getLogger_name = 'logger'
        log = logging.getLogger(getLogger_name)
        log.setLevel(logging.DEBUG)
        log.addHandler(fileHandler)
        return log
    except:
        raise Exception('error: MyPythonLib.log.get_logger failed ')

新建 D:\PythonLib\MyPythonLib\mysql.py ,写入(注意把get_conn函数中对应的host、port、密码改成自己的mysql配置):

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql, os, logging, time
from MyPythonLib import log as mylog


def get_conn(database_name: str = 'mysql'):
    '''指定数据库名,返回mysql连接对象。

    :param database_name: 数据库名,不指定时默认用mysql库
    :return Connection'''
    try:
        conn = pymysql.connect(host='127.0.0.1',
                               port=53306,
                               user='root',
                               passwd='mysqlpasswd',
                               db=database_name,
                               charset='utf8')
        return conn
    except:
        raise Exception('MyPythonLib.mysql.get_conn failed ')


def str_into_mysql(string: str) -> str:
    '''输入一个字符串,将其转换为可以在mysql中储存的形式。

    这个字符串的使用位置是列值处,比如 "SELECT * FROM 表名 WHERE 列名='字符串';" 
    中的字符串,若包含 \\ 、 ' 、 " 这三个字符,就需要用本函数处理。

    :param string: 目标字符串
    :return 可以存入mysql的字符串
    '''
    try:
        string = string.replace('\\', '\\\\')
        string = string.replace("'", "\\'")
        string = string.replace('"', '\\"')
        return string
    except:
        raise Exception('MyPythonLib.mysql.str_into_mysql failed ')


def table_exists(cur, table_name: str) -> bool:
    '''判断表在当前库中是否存在。
    
    :param cur: 游标对象
    :param table_name: 表名
    :return 存在则返回True,否则返回False
    '''
    try:
        cur.execute("SHOW TABLES;")
        results = cur.fetchall()
        for table in results:
            table = table[0]
            if table == table_name:
                return True
        return False
    except:
        raise Exception('MyPythonLib.mysql.table_exists failed ')


class mysql_api_ipool():
    """通过这个接口实现提交爬取的ip、取出ip的操作。"""
    conn = get_conn()
    cur = conn.cursor()  #这个cursor是类公用的,用于不需要实例化对象的mysql操作

    def __init__(self, log: logging.Logger = None):
        """:param log: 可选的log对象,若不提供,将在当前文件夹下新建logs目录、创建log文件"""
        if log == None:
            log_dir = os.path.join(os.getcwd(), 'logs')
            if not os.path.isdir(log_dir):
                os.mkdir(log_dir)
            log_file_name = 'mysql_api_ipool_log#{}.txt'.format(
                time.strftime('-%Y-%m-%d_%H_%M_%S'))
            log = mylog.get_logger(log_dir, log_file_name, 'mysql_api_ipool')
        self.log = log
        message = 'mysql_api_ipool object initialized'
        self.log.info(message)
        self.cur = mysql_api_ipool.conn.cursor()  #对象自己的cur
        mysql_api_ipool.check_table()
        message = 'database ipool and table ipool checked'
        self.log.info(message)

    def __del__(self):
        self.cur.close()
        message = 'mysql_api_ipool object deleted'
        self.log.info(message)

    @staticmethod
    def check_table():
        """检查一下mysql中是否已经建好了表,若表不存在则新建表。
        :这里并没有检查:若表存在,表的格式是否正确。
        
        数据库的各列含义与默认值:
        :no,自增主键;
        :ip,代理ip("0.0.0.0");
        :port,代理ip端口(0);
        :anonymous,是否为高匿,0为否,1为是,3为其他(3);
        :type,0为HTTP,1为HTTPS,2为都可,3为其他(3);
        :physical_address,代理ip的地理位置("unknown");
        :response_time,响应时间,单位为ms(-1);
        :last_verify,上次验证该代理ip的时间("1000-01-01 00:00:00");
        :grade,该代理ip的分数。每验证成功一次会加分,失败则扣分,扣到0会被删除,在爬取ip时不会使用到(100);
        :created,该行创建时间。"""
        cur, conn = mysql_api_ipool.cur, mysql_api_ipool.conn
        command = '''
        CREATE TABLE IF NOT EXISTS ipool ( 
        no BIGINT UNSIGNED AUTO_INCREMENT, 
        ip VARCHAR(50) NOT NULL DEFAULT "0.0.0.0" COMMENT "IP address", 
        port SMALLINT UNSIGNED NOT NULL DEFAULT 0 COMMENT "port", 
        anonymous BIT(2) NOT NULL DEFAULT 3 COMMENT "whether ip is anonymous", 
        type BIT(2) NOT NULL DEFAULT 3 COMMENT "HTTP or HTTPS or both or others", 
        physical_address VARCHAR(100) NOT NULL DEFAULT "unknown" COMMENT "where is the server", 
        response_time MEDIUMINT NOT NULL DEFAULT -1 COMMENT "response_time in microseconds", 
        last_verify DATETIME NOT NULL DEFAULT '1000-01-01 00:00:00' COMMENT "last verify time", 
        grade TINYINT UNSIGNED NOT NULL DEFAULT 100 COMMENT "grade of ip, used for validation", 
        created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 
        PRIMARY KEY (no));'''
        command = command.replace('\n', '')
        command = command.replace('    ', '')
        try:
            cur.execute("CREATE DATABASE IF NOT EXISTS ipool;")
            cur.execute("USE ipool;")
            cur.execute(command)
            conn.commit()
        except:
            conn.rollback()
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.check_table failed')

    def save_ips(self, ips_dict_list: list):
        """存入多个ip(以字典的列表形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
        :param ips_dict_list: ip字典的列表,每个ip是一个字典。"""
        cur, conn, log = self.cur, mysql_api_ipool.conn, self.log
        if len(ips_dict_list) == 0:
            message = 'no ip submitted'
            log.info(message)
            return
        command = '''INSERT INTO ipool (ip,port,anonymous,type,physical_address,response_time,last_verify) VALUES '''
        for ip_dict in ips_dict_list:
            ip, port, anonymous, type_, physical_address, response_time, last_verify = "0.0.0.0", 0, 3, 3, 'unknown', -1, "1000-01-01 00:00:00"
            if 'ip' in ip_dict.keys():
                ip = ip_dict['ip']
            if 'port' in ip_dict.keys():
                port = ip_dict['port']
            if 'anonymous' in ip_dict.keys():
                anonymous = ip_dict['anonymous']
            if 'type' in ip_dict.keys():
                type_ = ip_dict['type']
            if 'physical_address' in ip_dict.keys():
                physical_address = ip_dict['physical_address']
                physical_address = str_into_mysql(physical_address)
            if 'response_time' in ip_dict.keys():
                response_time = ip_dict['response_time']
            if 'last_verify' in ip_dict.keys():
                last_verify = ip_dict['last_verify']
            command += '("{}",{},{},{},"{}",{},"{}"),'.format(
                ip, port, anonymous, type_, physical_address, response_time,
                last_verify)
            message = 'trying to save ip:port={}:{}'.format(ip, port)
            log.info(message)
        command = command[:-1] + ';'
        try:
            cur.execute(command)
            conn.commit()
            message = 'saving ips successfully'
            log.info(message)
        except:
            conn.rollback()
            log.info('错误命令为:' + command)
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.save_ips failed')

    def check_repeat(self):
        """检查爬到的数据里有没有重复的,只检查ip和port都相同的。
        
        如果有重复的,保留no最小的一条记录。"""
        cur, conn, log = mysql_api_ipool.cur, mysql_api_ipool.conn, self.log
        cur.execute('USE ipool;')
        cur.execute(
            'SELECT ip,port FROM ipool GROUP BY ip,port HAVING COUNT(*)>1;')
        repeated_items = cur.fetchall()
        if len(repeated_items) == 0:
            message = 'no repeated ip:port in ipool'
            log.info(message)
            return
        else:
            message = 'found repeated ip:port {} kinds'.format(
                len(repeated_items))
            log.info(message)
            command = '''
            DELETE FROM ipool WHERE no IN (SELECT no FROM
            (SELECT no FROM ipool WHERE 
            (ip,port) IN (SELECT ip,port FROM ipool GROUP BY ip,port HAVING COUNT(*)>1) 
            AND no NOT IN (SELECT MIN(no) FROM ipool GROUP BY ip,port HAVING COUNT(*)>1)) AS a);'''
            command = command.replace('\n', '')
            command = command.replace('    ', '')
            try:
                cur.execute(command)
                conn.commit()
                message = 'repeated ip:port deleted successfully. '
                log.info(message)
            except:
                conn.rollback()
                raise Exception(
                    'MyPythonLib.mysql.mysql_api_ipool.check_repeat failed')

    @staticmethod
    def get_ips(random: bool = False,
                anonymous: list = [0, 1, 2, 3],
                type_: list = [0, 1, 2, 3],
                response_time: int = 30000,
                last_verify_interval: str = '48:00:00',
                grade: int = 100,
                limit: int = 1000) -> tuple:
        """根据要求返回多个ip,只返回ip和port。
        
        :param random: 是否随机返回ip
        :param anonymous: 以列表形式指定匿名性
        :param type_: 对HTTP还是HTTPS或其他代理
        :param response_time: 响应时间在多少ms之内
        :param last_verify_interval: 用于指定上次验证的时间距现在不超过多久,若为None,则是不限定验证时间
        :param grade: ip的分数必须 >= grade
        :param limit: 最多返回几条代理ip
        :return tuple('ip:port','ip:port', ...)"""
        cur = mysql_api_ipool.cur
        cur.execute('USE ipool;')
        anonymous, type_ = tuple(anonymous), tuple(type_)
        if last_verify_interval == None:
            last_verify_interval = '99999999:00:00'
        command = '''
        SELECT ip,port FROM ipool 
        WHERE anonymous IN {} AND type IN {} 
        AND response_time BETWEEN 0 AND {} AND (NOW()-last_verify)<"{}" 
        AND grade >= {} 
        ORDER BY response_time,last_verify DESC LIMIT {};
        '''.format(anonymous, type_, response_time, last_verify_interval,
                   grade, limit)
        if random:
            command = '''
            SELECT ip,port FROM (SELECT ip,port FROM ipool 
            WHERE anonymous IN {} AND type IN {} 
            AND response_time BETWEEN 0 AND {} AND (NOW()-last_verify)<"{}" 
            AND grade >= {} 
            ) AS a ORDER BY RAND() LIMIT {};
            '''.format(anonymous, type_, response_time, last_verify_interval,
                       grade, limit)
        command = command.replace('\n', '')
        command = command.replace('    ', '')
        try:
            cur.execute(command)
        except:
            raise Exception('MyPythonLib.mysql.mysql_api_ipool.get_ips failed')
        ips = cur.fetchall()
        ips = tuple(
            [ips[i][0] + ':' + str(ips[i][1]) for i in range(len(ips))])
        return ips

    def get_to_be_validated_ips(self,
                                last_verify_interval: str = '48:00:00',
                                limit: int = -1):
        """返回需要验证的ip,只返回no、ip和port。
        
        :param last_verify_interval: 用于指定上次验证的时间距现在大于多久
        :param limit: 最多返回几条代理ip,为-1时全部取出
        :return tuple((no,ip,port,garde), (no,ip,port,garde), ...)"""
        cur, log = self.cur, self.log
        cur.execute('USE ipool;')
        command = '''
        SELECT no,ip,port,grade FROM ipool 
        WHERE (NOW()-last_verify)>"{}" 
        ORDER BY last_verify,no LIMIT {};
        '''.format(last_verify_interval, limit)
        command = command.replace('\n', '')
        command = command.replace('    ', '')
        if limit == -1:
            command = command.replace(' LIMIT -1', '')
        try:
            cur.execute(command)
        except:
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.get_to_be_validated_ips failed'
            )
        message = 'get {} to_be_validated ips'.format(limit)
        log.info(message)
        return cur.fetchall()

    def update_ip(self, ip_dict: dict):
        '''根据ip_dict更新数据,ip_dict中必须包含no、ip、port,另外还应该有至少一个需要更改的列,未提到的列不更改。'''
        cur, conn, log = self.cur, mysql_api_ipool.conn, self.log
        try:
            no, ip, port = ip_dict['no'], ip_dict['ip'], ip_dict['port']
        except:
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.update_ip failed: ip_dict must include no,ip,port'
            )
        if len(ip_dict) <= 3:
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.update_ip failed: at least 1 column must be updated'
            )
        cur.execute('USE ipool;')
        command = 'SELECT no,ip,port FROM ipool WHERE no={};'.format(no)
        cur.execute(command)
        results = cur.fetchall()
        if len(results) == 0:
            message = 'ip no.{} not exists'.format(no)
            log.warning(message)
            return
        if ip != results[0][1] or port != results[0][2]:
            message = 'ip no.{} ip:port not right: {}:{}, recieved {}:{}, update given up'.format(
                no, results[0][1], results[0][2], ip, port)
            log.error(message)
            return

        command = '''UPDATE ipool SET '''
        if 'anonymous' in ip_dict.keys():
            command += 'anonymous={},'.format(ip_dict['anonymous'])
        if 'type' in ip_dict.keys():
            command += 'type={},'.format(ip_dict['type'])
        if 'physical_address' in ip_dict.keys():
            command += 'physical_address="{}",'.format(
                str_into_mysql(ip_dict['physical_address']))
        if 'response_time' in ip_dict.keys():
            command += 'response_time={},'.format(ip_dict['response_time'])
        if 'last_verify' in ip_dict.keys():
            command += 'last_verify="{}",'.format(ip_dict['last_verify'])
        if 'grade' in ip_dict.keys():
            command += 'grade={},'.format(ip_dict['grade'])
        command = command[:-1] + ' WHERE no={};'.format(no)
        try:
            cur.execute(command)
            conn.commit()
            message = 'update ip no.{} successfully'.format(no)
            log.info(message)
        except:
            conn.rollback()
            log.info('错误命令为:' + command)
            raise Exception(
                'MyPythonLib.mysql.mysql_api_ipool.update_ip failed')

3.创建并修改scrapy项目

本次使用到的提供代理ip的网站有12个,如下:

http://www.ip3366.net/free/
https://www.kuaidaili.com
https://ip.jiangxianli.com/
https://www.cool-proxy.net/
https://free-proxy-list.net
https://www.ipaddress.com/proxy-list/
https://www.proxynova.com/proxy-server-list/
http://pubproxy.com/api/proxy?limit=5&format=txt&type=http&level=anonymous&last_check=60&no_country=CN
https://www.rmccurdy.com/.scripts/proxy/good.txt
https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json
https://raw.githubusercontent.com/a2u/free-proxy-list/master/free-proxy-list.txt
https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list.txt
https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt

其中 https://www.kuaidaili.com 页数很多(最早的数据是2014年的,有4000多页),但用scrapy会封ip,所以只爬前2页。

另外,还有3个可提供代理ip的网站:

http://proxy-list.org/english/
https://www.proxynova.com/proxy-server-list/
http://spys.one/en/anonymous-proxy-list/

这三个网站使用js生成代理ip表格,爬取它们最简单的方法是使用selenium,但它们提供的代理ip不多,且proxynova可用 https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json 替代,所以这里就不爬了。

(1)其他配置

使用cmd,进行如下操作:

D:
cd D:\spiders\scrapy
scrapy startproject ipool

在资源管理器中打开 D:\spiders\scrapy\ipool\ipool ,修改一下4个文件,修改settings.py为:

# Scrapy settings for ipool project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://docs.scrapy.org/en/latest/topics/settings.html
#     https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://docs.scrapy.org/en/latest/topics/spider-middleware.html

LOG_LEVEL = 'ERROR'  # 只打印错误信息

BOT_NAME = 'ipool'

SPIDER_MODULES = ['ipool.spiders']
NEWSPIDER_MODULE = 'ipool.spiders'

# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = 'ipool (+http://www.yourdomain.com)'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False  #不遵守robots.txt

# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
#DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
#COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False

# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
#   'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
#   'Accept-Language': 'en',
#}

# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
#    'ipool.middlewares.IpoolSpiderMiddleware': 543,
#}

# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
    'ipool.middlewares.IpoolDownloaderMiddleware': 543,
}

# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
#    'scrapy.extensions.telnet.TelnetConsole': None,
#}

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {'ipool.pipelines.MysqlPipeline': 1}

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

pipelines.py为:

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html

# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
from MyPythonLib.mysql import mysql_api_ipool
import os


class MysqlPipeline:

    def open_spider(self, spider):
        self.mysql_api_ipool = mysql_api_ipool()

    def process_item(self, item, spider):
        self.mysql_api_ipool.save_ips(item['ips_dict_list'])
        return item

    def close_spider(self, spider):
        self.mysql_api_ipool.check_repeat()

middlewares.py为:

# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html

from scrapy import signals

# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter


# 爬虫中间件
class IpoolSpiderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the spider middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_spider_input(self, response, spider):
        # Called for each response that goes through the spider
        # middleware and into the spider.

        # Should return None or raise an exception.
        return None

    def process_spider_output(self, response, result, spider):
        # Called with the results returned from the Spider, after
        # it has processed the response.

        # Must return an iterable of Request, or item objects.
        for i in result:
            yield i

    def process_spider_exception(self, response, exception, spider):
        # Called when a spider or process_spider_input() method
        # (from other spider middleware) raises an exception.

        # Should return either None or an iterable of Request or item objects.
        pass

    def process_start_requests(self, start_requests, spider):
        # Called with the start requests of the spider, and works
        # similarly to the process_spider_output() method, except
        # that it doesn’t have a response associated.

        # Must return only requests (not items).
        for r in start_requests:
            yield r

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)


import random


# 下载中间件
class IpoolDownloaderMiddleware:
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the downloader middleware does not modify the
    # passed objects.
    UA_list = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:104.0) Gecko/20100101 Firefox/104.0'
    ]

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    # 拦截无异常的请求,可进行UA伪装
    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called
        request.headers['User-Agent'] = random.choice(self.UA_list)
        return None

    # 拦截所有响应,可用于爬动态加载的数据
    def process_response(self, request, response, spider):
        # Called with the response returned from the downloader.

        # Must either;
        # - return a Response object
        # - return a Request object
        # - or raise IgnoreRequest
        return response

    # 拦截发生异常的请求,可进行代理ip设置
    def process_exception(self, request, exception, spider):
        # Called when a download handler or a process_request()
        # (from other downloader middleware) raises an exception.

        # Must either:
        # - return None: continue processing this exception
        # - return a Response object: stops process_exception() chain
        # - return a Request object: stops process_exception() chain
        pass

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

items.py为:

# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy


class IpoolItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    ips_dict_list = scrapy.Field()

(2)爬虫配置

要爬12个网站,所以需要写12个爬虫。其实有几个网站的数据格式差不多,这些爬虫还能再精简合并一下。
这些爬虫都放在 D:\spiders\scrapy\ipool\ipool\spiders 目录下。

a.a2u.py

import scrapy, re
from ipool.items import IpoolItem


class A2uSpider(scrapy.Spider):
    name = 'a2u'
    start_urls = [
        'https://raw.githubusercontent.com/a2u/free-proxy-list/master/free-proxy-list.txt'
    ]

    def parse(self, response):
        ip_nodes = re.findall(
            r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
            response.text)
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip, port = ip_node.split(':')
            port = int(port)

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

b.clarketm.py

import scrapy, re
from ipool.items import IpoolItem


class ClarketmSpider(scrapy.Spider):
    name = 'clarketm'
    start_urls = [
        'https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list.txt'
    ]

    def parse(self, response):
        ip_nodes = re.findall(
            r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
            response.text)
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip, port = ip_node.split(':')
            ip = ip.strip()
            port = int(port.strip())

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

c.cool_proxy.py

import scrapy, json, time
from ipool.items import IpoolItem


class CoolProxySpider(scrapy.Spider):
    name = 'cool_proxy'
    start_urls = ['https://www.cool-proxy.net/proxies.json']

    def parse(self, response):
        ip_nodes = json.loads(response.text.replace('\'', '\"'))
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node['ip'].strip()
            port = int(ip_node['port'])
            anonymous = (0, 1)[1 == ip_node['anonymous']]
            type_ = 3
            physical_address = ip_node['country_name'].strip()
            response_time = int(1000 * float(ip_node['response_time_average']))
            last_verify = float(ip_node['update_time'])
            last_verify = time.localtime(last_verify)
            last_verify = time.strftime('%Y-%m-%d %H:%M:%S', last_verify)

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

d.free_proxy_list.py

import scrapy, time
from ipool.items import IpoolItem


class FreeProxyListSpider(scrapy.Spider):
    name = 'free_proxy_list'
    start_urls = ['https://free-proxy-list.net/']

    def parse(self, response):
        ip_nodes = response.xpath(
            r'//*[@id="list"]/div/div[2]/div/table/tbody/tr')
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node.xpath('./td[1]/text()').extract()[0].strip()
            port = int(ip_node.xpath('./td[2]/text()').extract()[0].strip())
            anonymous = (
                0,
                1)['anonymous' in ip_node.xpath('./td[5]/text()').extract()[0]]
            # (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
            type_ = (0,
                     1)['yes' in ip_node.xpath('./td[7]/text()').extract()[0]]
            physical_address = ip_node.xpath(
                './td[4]/text()').extract()[0].strip()
            response_time = 1000
            last_verify = time.strftime('%Y-%m-%d %H:%M:%S')

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

e.ip3366.py

import scrapy
from ipool.items import IpoolItem


class Ip3366Spider(scrapy.Spider):
    name = 'ip3366'
    #allowed_domains = ['www.ip3366.net']  # 允许的域名。如果域名在此之外,则不会爬取
    start_urls = [
        'http://www.ip3366.net/free/', 'http://www.ip3366.net/free/?stype=2'
    ]  # 起始url的列表
    base_url = 'http://www.ip3366.net/free/'

    def parse(self, response):
        ip_nodes = response.xpath(r'//*[@id="list"]/table/tbody/tr')
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node.xpath(r'./td[1]/text()').extract()[0].strip()
            port = int(ip_node.xpath(r'./td[2]/text()').extract()[0].strip())
            anonymous = (
                0, 1)['高匿' in ip_node.xpath(r'./td[3]/text()').extract()[0]]
            # (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
            type_ = (
                0, 1)['HTTPS' in ip_node.xpath(r'./td[4]/text()').extract()[0]]
            physical_address = ip_node.xpath(
                r'./td[5]/text()').extract()[0].strip()
            response_time = 1000 * int(
                ip_node.xpath(r'./td[6]/text()').extract()[0].strip().replace(
                    '秒', ''))
            response_time = (response_time, 250)[response_time == 0]
            #这个网站中有很多代理ip的延迟为0,不太准确,一律改成250即0.25s
            last_verify = ip_node.xpath(
                r'./td[7]/text()').extract()[0].strip().replace('/', '-')

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

        next_url = response.xpath(
            r'//*[@id="listnav"]/ul/a[text()="下一页"]/@href').extract()
        if next_url != None and len(next_url) != 0:
            next_url = Ip3366Spider.base_url + next_url[0]
            yield scrapy.Request(url=next_url,
                                 callback=self.parse)  # 回调函数是自己,即递归解析网页

    # 本方法在爬虫结束时被调用
    def closed(self, spider):
        pass

f.ipaddress.py

import scrapy
from ipool.items import IpoolItem


class IpaddressSpider(scrapy.Spider):
    name = 'ipaddress'
    start_urls = ['https://www.ipaddress.com/proxy-list/']

    def parse(self, response):
        ip_nodes = response.xpath(r'/html/body/div[1]/main/table/tbody/tr')
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node.xpath('./td[1]/a[1]/text()').extract()[0].strip()
            port = int(
                ip_node.xpath('./td[1]/text()').extract()[0].strip().replace(
                    '"', '').replace(':', ''))
            anonymous = (
                0,
                1)['anonymous' in ip_node.xpath('./td[2]/text()').extract()[0]]
            # (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
            type_ = 3
            physical_address = ip_node.xpath(
                './td[3]/text()').extract()[0].strip()[3:]
            response_time = 1000
            last_verify = ip_node.xpath(
                './td[4]/text()').extract()[0].strip() + ':00'

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

g.jiangxianli.py

import scrapy, json
from ipool.items import IpoolItem


class JiangxianliSpider(scrapy.Spider):
    name = 'jiangxianli'
    start_urls = ['https://ip.jiangxianli.com/api/proxy_ips?page=1']

    def parse(self, response):
        ip_nodes = json.loads(response.text)['data']['data']
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for i in range(len(ip_nodes)):
            ip_dict = {}
            ip = ip_nodes[i]['ip'].strip()
            port = int(ip_nodes[i]['port'].strip())
            anonymous = (0, 1)[2 == ip_nodes[i]['anonymity']]
            type_ = (0, 1)['https' == ip_nodes[i]['protocol']]
            physical_address = ip_nodes[i]['ip_address'].strip()
            response_time = ip_nodes[i]['speed']
            last_verify = ip_nodes[i]['validated_at'].strip()

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

        next_url = json.loads(response.text)['data']['next_page_url']
        if next_url != None:
            yield scrapy.Request(url=next_url, callback=self.parse)

h.kuaidaili.py

import scrapy
from ipool.items import IpoolItem


class KuaidailiSpider(scrapy.Spider):
    name = 'kuaidaili'
    #allowed_domains = ['www.kuaidaili.com']
    start_urls = [
        'https://free.kuaidaili.com/free/inha/1/',
        'https://free.kuaidaili.com/free/intr/1/'
    ]
    base_url = 'https://free.kuaidaili.com'
    custom_settings = {
        'CONCURRENT_REQUESTS': 1,
        'DOWNLOAD_DELAY': 10,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 1
    }

    def parse(self, response):
        ip_nodes = response.xpath(r'//*[@id="list"]/table/tbody/tr')
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node.xpath(r'./td[1]/text()').extract()[0].strip()
            port = int(ip_node.xpath(r'./td[2]/text()').extract()[0].strip())
            anonymous = (
                0, 1)['高匿' in ip_node.xpath(r'./td[3]/text()').extract()[0]]
            # (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
            type_ = (
                0, 1)['HTTPS' in ip_node.xpath(r'./td[4]/text()').extract()[0]]
            physical_address = ip_node.xpath(
                r'./td[5]/text()').extract()[0].strip()
            response_time = 1000 * float(
                ip_node.xpath(r'./td[6]/text()').extract()[0].strip().replace(
                    '秒', ''))
            last_verify = ip_node.xpath(r'./td[7]/text()').extract()[0].strip()

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ip_dict['response_time'] = response_time
            ip_dict['last_verify'] = last_verify
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

        page_num = response.url.split('/')[-2]
        next_url = response.url.replace(page_num, str(int(page_num) + 1))
        other_urls_list = response.xpath(
            r'//*[@id="listnav"]/ul/li/a/@href').extract()
        other_urls_list = [
            KuaidailiSpider.base_url + other_urls_list[i]
            for i in range(len(other_urls_list))
        ]
        if int(page_num) < 5 and next_url in other_urls_list:
            yield scrapy.Request(url=next_url, callback=self.parse)

custom_settings 用于指定这个爬虫的个性化设置,本例中限制了爬取的频率;并且只爬前5页。

i.pubproxy.py

import scrapy, re
from ipool.items import IpoolItem


class PubproxySpider(scrapy.Spider):
    name = 'pubproxy'
    start_urls = [
        'http://pubproxy.com/api/proxy?limit=5&format=txt&type=http&level=anonymous&last_check=60&no_country=CN'
    ]

    def parse(self, response):
        ip_nodes = re.findall(
            r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
            response.text)
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip, port = ip_node.split(':')
            port = int(port)

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

j.rmccurdy.py

import scrapy, re
from ipool.items import IpoolItem


class RmccurdySpider(scrapy.Spider):
    name = 'rmccurdy'
    start_urls = ['https://www.rmccurdy.com/.scripts/proxy/good.txt']

    def parse(self, response):
        ip_nodes = re.findall(
            r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
            response.text)
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip, port = ip_node.split(':')
            port = int(port)

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

k.sunny9577.py

import scrapy, json
from ipool.items import IpoolItem


class Sunny9577Spider(scrapy.Spider):
    name = 'sunny9577'
    start_urls = [
        'https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json'
    ]

    def parse(self, response):
        ip_nodes = json.loads(response.text)['proxynova']
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip = ip_node['ip'].strip()
            port = int(ip_node['port'].strip())
            anonymous = (0, 1)['Anonymous' in ip_node['anonymity']]
            # (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
            type_ = 3
            physical_address = ip_node['country']

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['anonymous'] = anonymous
            ip_dict['type'] = type_
            ip_dict['physical_address'] = physical_address
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

l.TheSpeedX.py

import scrapy, re
from ipool.items import IpoolItem


class ThespeedxSpider(scrapy.Spider):
    name = 'TheSpeedX'
    start_urls = [
        'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt',
        'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt',
        'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks4.txt'
    ]

    def parse(self, response):
        ip_nodes = re.findall(
            r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
            response.text)
        ips_dict_list = []  #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
        for ip_node in ip_nodes:
            ip_dict = {}
            ip, port = ip_node.split(':')
            ip = ip.strip()
            port = int(port.strip())

            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ips_dict_list.append(ip_dict)
        item = IpoolItem()
        item['ips_dict_list'] = ips_dict_list
        yield item

(3)运行爬虫

这里涉及到了多个爬虫,运行的时候一个个使用 scrapy crawl 爬虫名 比较麻烦。scrapy提供了其他方法,可同时运行一个项目中的多个爬虫或全部爬虫。以下内容参考了 这篇文章
D:\spiders\scrapy\ipool\ipool 下新建一个py文件,比如 crawl_all.py ,内容为:

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from scrapy.spiderloader import SpiderLoader

# 根据项目配置获取 CrawlerProcess 实例
process = CrawlerProcess(get_project_settings())

# 获取 spiderloader 对象,以进一步获取项目下所有爬虫名称
spider_loader = SpiderLoader(get_project_settings())

# 添加全部爬虫
for spidername in spider_loader.list():
    process.crawl(spidername)

# 添加一个爬虫
#process.crawl('爬虫名')

# 执行
process.start()

如上所述,添加一个爬虫就用 process.crawl('爬虫名') ,而 spider_loader.list() 方法可获取项目中的所有爬虫名,两者搭配就可运行全部爬虫。另外需要提醒,笔者爬取的几个网站中有几个不能直接访问,比如 raw.githubusercontent.com 那几个,你可以把它们去掉,或者使用一些设置。如果需要使用V2rayN并使用requests库或scrapy,可查看 Docker实践——如何查阅wiki百科及V2Ray ,其中提到了如何使用V2rayN并运行爬虫。

最后,在cmd中进入 D:\spiders\scrapy\ipool\ipool ,运行 python crawl_all.py 即可。

4.验证ip

原本想使用树莓派多进程验证ip,但占用树莓派资源太高,改在win10电脑上验证。
在win10电脑的自建库目录中,除了 log.py 、 mysql.py ,还需要添加一个spider.py,文件内容如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import requests, random

headers = {
    'User-Agent':
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36'
}


def get_session(max_retries: int = 3) -> requests.Session:
    '''接受最大重连数,返回一个requests会话对象

    :param max_retries: 最大重连次数
    :return 会话对象'''
    # 设置重连次数。包括开头,若出现异常,最多尝试连接4次
    try:
        session = requests.Session()
        session.mount('http://',
                      requests.adapters.HTTPAdapter(max_retries=max_retries))
        session.mount('https://',
                      requests.adapters.HTTPAdapter(max_retries=max_retries))
        return session
    except:
        raise Exception('error: MyPythonLib.spider.get_session failed ')


def multiprocess_job_spliter(process_num: int,
                             to_do_list,
                             random_shuffle: bool = True) -> dict:
    '''给出进程数、总任务序列,返回分好的任务列表。

    :param process_num: 进程数(需要将总任务序列分为多少份)
    :param to_do_list: 总任务序列(元组或列表),比如待爬取的所有url字符串
    :param random_shuffle: 是否打乱总任务序列的顺序
    :return {0: todo_list1, 1: todo_list2, ... }'''
    try:
        to_do_list = list(to_do_list)
        if random_shuffle:
            random.shuffle(to_do_list)
        total_num = len(to_do_list)
        one_share = total_num // process_num
        share_list = {}
        for i in range(0, process_num):
            if i != process_num - 1:
                share_list[i] = to_do_list[(i * one_share):((i + 1) *
                                                            one_share)]
            elif i == process_num - 1:
                share_list[i] = to_do_list[(i * one_share):]
        return share_list
    except:
        raise Exception(
            'error: MyPythonLib.spider.multiprocess_job_spliter failed ')

这个文件后面还有很多内容,但实际上只会用到 multiprocess_job_spliter 这个函数。

然后在一个合适的地方放python脚本,比如就放在scrapy的ipool项目中: D:\spiders\scrapy\ipool\ipool\update_ips.py ,文件内容如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from MyPythonLib import mysql, spider
from MyPythonLib import log as mylog
import logging, os, time, multiprocessing

headers = spider.headers


class update_ips():
    test_http_url = 'http://example.org/'
    test_https_url = 'https://example.org/'

    def __init__(self, log: logging.Logger = None):
        """:param log: 可选的log对象,若不提供,将在当前文件夹下新建logs目录、创建log文件"""
        if log == None:
            log_dir = os.path.join(os.getcwd(), 'logs')
            if not os.path.isdir(log_dir):
                os.mkdir(log_dir)
            log_file_name = 'mysql_api_ipool_log#{}.txt'.format(
                time.strftime('-%Y-%m-%d_%H_%M_%S'))
            log = mylog.get_logger(log_dir, log_file_name, 'mysql_api_ipool')
        self.log = log
        message = 'ready to update ips'
        self.log.info(message)
        self.mysql_api_ipool = mysql.mysql_api_ipool(self.log)
        self.conn = self.mysql_api_ipool.conn
        self.cur = self.conn.cursor()
        self.mysql_api_ipool.check_table()
        message = 'database ipool and table ipool checked'
        self.log.info(message)
        self.cur.execute('USE ipool;')
        self.conn.commit()

    def __del__(self):
        self.cur.close()
        message = 'update ips complete'
        self.log.info(message)

    @staticmethod
    def validate_ips_single_process(ips_list: list,
                                    process_no: int = 0,
                                    grade_minus: int = 50):
        '''验证ip,只涉及type、last_verify、grade的改变。若grade<=0,会将这个ip删除。
        :这个方法是单进程的。
        :param ips_list: 待验证ip的列表,格式为 [(no,ip,port,grade), (no,ip,port,grade), ...]
        :param process_no: 进程编号,主要用于log文件取名
        :param grade_minus: 若代理ip无法使用,则扣除该分数'''
        global headers
        log = mylog.get_logger(
            abs_path=os.path.join(os.getcwd(), 'logs'),
            log_file_name='validate_ips-{}'.format(process_no),
            getLogger_name=str(process_no))
        message = 'received {} ips, validating...'.format(len(ips_list))
        log.info(message)
        mysql_api_ipool = mysql.mysql_api_ipool(log)
        session = spider.get_session(max_retries=1)
        for no, ip, port, grade in ips_list:
            ip_port = '{}:{}'.format(ip, port)
            proxies = {'http': ip_port, 'https': ip_port}
            http_success, https_success = True, True
            try:
                session.get(url=update_ips.test_http_url,
                            timeout=10,
                            headers=headers,
                            proxies=proxies)
            except:
                http_success = False
            try:
                session.get(url=update_ips.test_https_url,
                            timeout=10,
                            headers=headers,
                            proxies=proxies)
            except:
                https_success = False

            if http_success and not https_success:
                type_ = 0
            elif https_success and not http_success:
                type_ = 1
            elif http_success and https_success:
                type_ = 2
            elif not http_success and not https_success:
                type_ = 3
            ip_dict = {}
            ip_dict['no'] = no
            ip_dict['ip'] = ip
            ip_dict['port'] = port
            ip_dict['type'] = type_
            ip_dict['last_verify'] = time.strftime('%Y-%m-%d %H:%M:%S')
            if type_ == 3:
                grade -= grade_minus
            else:
                grade = 100
            ip_dict['grade'] = grade
            mysql_api_ipool.update_ip(ip_dict=ip_dict)

    def validate_ips_multiprocess(self,
                                  process_num: int = 5,
                                  limit: int = -1,
                                  grade_minus: int = 50):
        '''验证ip,只涉及type、last_verify、grade的改变。若grade<=0,会将这个ip删除。
        :这个方法是多进程的,将会调用 validate_ips_single_process 。
        :param process_num: 验证ip的进程数
        :param limit: 限制待验证ip的条数,为-1时表示全部取出
        :param grade_minus: 若代理ip无法使用,则扣除该分数'''
        cur, conn, log = self.cur, self.conn, self.log
        command = 'DELETE FROM ipool WHERE grade<=0;'
        try:
            cur.execute(command)
            conn.commit()
            message = 'delete ips where grade<=0 successfully'
            log.info(message)
        except:
            conn.rollback()
            log.info('错误命令为:' + command)
            raise Exception(
                'update_ips.validate_ips_multiprocess failed: delete error')
        ips_list = self.mysql_api_ipool.get_to_be_validated_ips(limit=limit)
        ips_list_dict = spider.multiprocess_job_spliter(process_num, ips_list)
        process_pool = multiprocessing.Pool(processes=process_num)
        for i in range(process_num):
            process_pool.apply_async(update_ips.validate_ips_single_process,
                                     args=(ips_list_dict[i], i, grade_minus))
        process_pool.close()
        process_pool.join()
        try:
            cur.execute(command)
            conn.commit()
            message = 'delete ips where grade<=0 successfully'
            log.info(message)
        except:
            conn.rollback()
            log.info('错误命令为:' + command)
            raise Exception(
                'update_ips.validate_ips_multiprocess failed: delete error')


if __name__ == '__main__':
    update_ips().validate_ips_multiprocess(process_num=60,
                                           limit=-1,
                                           grade_minus=50)

update_ips().validate_ips_multiprocess(process_num=60, limit=-1, grade_minus=50) 中, process_numlimit 可以自行修改,主要与win10机器的配置有关,验证ip的过程mysql读写频率不算高。

另外,这里的验证ip只涉及了type(http还是https)、last_verify(最后验证时间)、grade(ip分数)的改变,如果想一同更新代理ip的匿名性、物理地址、响应时间,可以在这里的基础上自行更改。

最后,在cmd中,使用:

cd D:\spiders\scrapy\ipool\ipool
python update_ips.py

即可运行验证ip的py脚本。

5.日常使用

爬取ip和验证ip的脚本可以每天运行一次,顺序如下:
(1)若需要使用V2rayN,先开启V2rayN。
(2)在cmd中使用如下命令:

cd D:\spiders\scrapy\ipool\ipool
python crawl_all.py
python update_ips.py

即可。
如果想把命令写成脚本、开机自动运行,可以参考 不同系统下的开机自动运行

另外,mysql记得时常备份。使用代理ip时,用 MyPythonLib.mysql.mysql_api_ipool.get_ips 方法可取出ip。

标签: python, mysql, scrapy

添加新评论