Python,爬虫与深度学习(10)——scrapy+mysql构建ipool
步骤/目录:
1.运行mysql数据库与准备工作
2.自建库
3.创建、修改、运行scrapy项目
(1)其他配置
(2)爬虫配置
a.a2u.py
b.clarketm.py
c.cool_proxy.py
d.free_proxy_list.py
e.ip3366.py
f.ipaddress.py
g.jiangxianli.py
h.kuaidaili.py
i.pubproxy.py
j.rmccurdy.py
k.sunny9577.py
l.TheSpeedX.py
(3)运行爬虫
4.验证ip
5.日常使用
本文首发于个人博客https://lisper517.top/index.php/archives/48/
,转载请注明出处。
本文的目的是使用 scrapy+mysql 搭建一个自己的代理ip池,即ipool。由于本文是对前文进行阶段性总结+拓展,所以内容较多,笔者希望本文单独拿出来也能成功跟着一步步操作。
本文写作日期为2022年9月11日。mysql数据库运行在树莓派上,其他部分运行在win10机器;使用的编辑器为VS Code。
爬虫写的好,牢房进的早。使用爬虫时不要占用服务器太多流量,否则可能惹祸上身。
笔者的能力有限,也并非CS从业人员,很多地方难免有纰漏或者不符合代码原则的地方,请在评论中指出。
1.运行mysql数据库与准备工作
笔者在树莓派上使用docker安装mysql,需要预先安装docker、docker-compose,在树莓派上安装docker、docker-compose可查看 Docker入门(一) 。也不一定非要在树莓派上运行mysql,或者也不一定要用docker,或者也可以用redis等其他数据库代替mysql(使用其他数据库时后续的py文件可能需要自行调整),反正最终的目的是运行一个数据库,可以参考 runoob教程 使用其他方法安装运行mysql数据库。
在树莓派上进行如下操作:
mkdir -p /docker/mysql/data/mysql
mkdir -p /docker/mysql/conf/mysql
mkdir -p /docker/mysql/backup/mysql
docker pull mysql:8.0.29
nano /docker/mysql/docker-compose.yml
在docker-compose.yml文件中写入如下内容:
version: "3.9"
services:
mysql:
image: mysql:8.0.29
environment:
MYSQL_ROOT_PASSWORD: mysqlpasswd
ports:
- "53306:3306"
command:
- mysqld
- --character-set-server=utf8mb4
volumes:
- /docker/mysql/data/mysql:/var/lib/mysql
- /docker/mysql/conf/mysql:/etc/mysql/conf.d
- /docker/mysql/backup/mysql:/backup
logging:
driver: syslog
restart: always
记得把 MYSQL_ROOT_PASSWORD: mysqlpasswd
这里改成自己想设置的密码,把 53306
也可以改成自己想要的端口(不建议用默认的3306端口)。这里的 /docker/mysql/backup/mysql
目录是为了方便docker-mysql导出、备份数据。
然后运行:
cd /docker/mysql
docker-compose config
docker-compose up -d
接下来进容器服务里看看mysql是否正常运行:
docker exec -it mysql-mysql-1 bash #如果是老版本的docker-compose,把这一条命令中的所有 - 换成 _
mysql -uroot -p
#输入密码
SHOW DATABASES;
确认mysql正常运行即可。为了保证创建database和table的命令能记录下来,这些命令将写在python文件中。
然后在win10机器上准备一下环境。需要安装python3,安装scrapy和其他库,创建自建库文件夹,创建scrapy项目文件夹。
win10机器安装python3的过程这里就不讲了。安装好后,在cmd中运行 pip install Scrapy
,其他可能需要的库还有requests(处理网络请求)、bs4(对响应内容进行解析)、pymysql(连接mysql数据库)、lxml(网页解析器)、cryptography(连接mysql时密码加密)、selenium(操控浏览器),在cmd中使用 pip install requests bs4 pymysql lxml cryptography selenium
下载(这几个库在本文中不一定都会用到,但学习爬虫迟早会用),注意若pip下载失败就一直使用上述命令直到成功,或者bing搜索一下pip换源。最后创建文件夹 D:\PythonLib\MyPythonLib
和 D:\spiders\scrapy
,或者你也可以自己选择其他路径。
2.自建库
在 D:\PythonLib\MyPythonLib
中新建一个空白的 __init__.py
文件,然后到python的安装目录里存放第三方库的文件夹(win10上一般是site-packages目录),比如 C:\Program Files\Python310\Lib\site-packages
中,新建一个 PythonLib.pth
文件,写入一行内容: D:\PythonLib
。接下来创建自建库文件,本次需要用到的自建库主要与log、mysql相关。
新建 D:\PythonLib\MyPythonLib\log.py
,写入:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging, time, os
def get_logger(abs_path: str,
log_file_name: str = '',
getLogger_name: str = ''):
'''传入绝对路径和log文件名,返回log对象。
:param abs_path: log文件存储的绝对路径
:param log_file_name: log文件名,若log文件名为空,则使用默认的文件名
:param getLogger_name: logger对象名,如果在同一个程序中需要同时写多个log文件,建议这个参数不要相同
:return log对象'''
try:
formatter = logging.Formatter(
'%(lineno)d : %(asctime)s : %(levelname)s : %(funcName)s : %(message)s'
)
if log_file_name == '':
log_file_name = 'log#{}.txt'.format(time.strftime('-%Y-%m-%d'))
fileHandler = logging.FileHandler(
(os.path.join(abs_path, log_file_name)),
mode='w',
encoding='utf-8')
fileHandler.setFormatter(formatter)
if getLogger_name == '':
getLogger_name = 'logger'
log = logging.getLogger(getLogger_name)
log.setLevel(logging.DEBUG)
log.addHandler(fileHandler)
return log
except:
raise Exception('error: MyPythonLib.log.get_logger failed ')
新建 D:\PythonLib\MyPythonLib\mysql.py
,写入(注意把get_conn函数中对应的host、port、密码改成自己的mysql配置):
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql, os, logging, time
from MyPythonLib import log as mylog
def get_conn(database_name: str = 'mysql'):
'''指定数据库名,返回mysql连接对象。
:param database_name: 数据库名,不指定时默认用mysql库
:return Connection'''
try:
conn = pymysql.connect(host='127.0.0.1',
port=53306,
user='root',
passwd='mysqlpasswd',
db=database_name,
charset='utf8')
return conn
except:
raise Exception('MyPythonLib.mysql.get_conn failed ')
def str_into_mysql(string: str) -> str:
'''输入一个字符串,将其转换为可以在mysql中储存的形式。
这个字符串的使用位置是列值处,比如 "SELECT * FROM 表名 WHERE 列名='字符串';"
中的字符串,若包含 \\ 、 ' 、 " 这三个字符,就需要用本函数处理。
:param string: 目标字符串
:return 可以存入mysql的字符串
'''
try:
string = string.replace('\\', '\\\\')
string = string.replace("'", "\\'")
string = string.replace('"', '\\"')
return string
except:
raise Exception('MyPythonLib.mysql.str_into_mysql failed ')
def table_exists(cur, table_name: str) -> bool:
'''判断表在当前库中是否存在。
:param cur: 游标对象
:param table_name: 表名
:return 存在则返回True,否则返回False
'''
try:
cur.execute("SHOW TABLES;")
results = cur.fetchall()
for table in results:
table = table[0]
if table == table_name:
return True
return False
except:
raise Exception('MyPythonLib.mysql.table_exists failed ')
class mysql_api_ipool():
"""通过这个接口实现提交爬取的ip、取出ip的操作。"""
conn = get_conn()
cur = conn.cursor() #这个cursor是类公用的,用于不需要实例化对象的mysql操作
def __init__(self, log: logging.Logger = None):
""":param log: 可选的log对象,若不提供,将在当前文件夹下新建logs目录、创建log文件"""
if log == None:
log_dir = os.path.join(os.getcwd(), 'logs')
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
log_file_name = 'mysql_api_ipool_log#{}.txt'.format(
time.strftime('-%Y-%m-%d_%H_%M_%S'))
log = mylog.get_logger(log_dir, log_file_name, 'mysql_api_ipool')
self.log = log
message = 'mysql_api_ipool object initialized'
self.log.info(message)
self.cur = mysql_api_ipool.conn.cursor() #对象自己的cur
mysql_api_ipool.check_table()
message = 'database ipool and table ipool checked'
self.log.info(message)
def __del__(self):
self.cur.close()
message = 'mysql_api_ipool object deleted'
self.log.info(message)
@staticmethod
def check_table():
"""检查一下mysql中是否已经建好了表,若表不存在则新建表。
:这里并没有检查:若表存在,表的格式是否正确。
数据库的各列含义与默认值:
:no,自增主键;
:ip,代理ip("0.0.0.0");
:port,代理ip端口(0);
:anonymous,是否为高匿,0为否,1为是,3为其他(3);
:type,0为HTTP,1为HTTPS,2为都可,3为其他(3);
:physical_address,代理ip的地理位置("unknown");
:response_time,响应时间,单位为ms(-1);
:last_verify,上次验证该代理ip的时间("1000-01-01 00:00:00");
:grade,该代理ip的分数。每验证成功一次会加分,失败则扣分,扣到0会被删除,在爬取ip时不会使用到(100);
:created,该行创建时间。"""
cur, conn = mysql_api_ipool.cur, mysql_api_ipool.conn
command = '''
CREATE TABLE IF NOT EXISTS ipool (
no BIGINT UNSIGNED AUTO_INCREMENT,
ip VARCHAR(50) NOT NULL DEFAULT "0.0.0.0" COMMENT "IP address",
port SMALLINT UNSIGNED NOT NULL DEFAULT 0 COMMENT "port",
anonymous BIT(2) NOT NULL DEFAULT 3 COMMENT "whether ip is anonymous",
type BIT(2) NOT NULL DEFAULT 3 COMMENT "HTTP or HTTPS or both or others",
physical_address VARCHAR(100) NOT NULL DEFAULT "unknown" COMMENT "where is the server",
response_time MEDIUMINT NOT NULL DEFAULT -1 COMMENT "response_time in microseconds",
last_verify DATETIME NOT NULL DEFAULT '1000-01-01 00:00:00' COMMENT "last verify time",
grade TINYINT UNSIGNED NOT NULL DEFAULT 100 COMMENT "grade of ip, used for validation",
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (no));'''
command = command.replace('\n', '')
command = command.replace(' ', '')
try:
cur.execute("CREATE DATABASE IF NOT EXISTS ipool;")
cur.execute("USE ipool;")
cur.execute(command)
conn.commit()
except:
conn.rollback()
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.check_table failed')
def save_ips(self, ips_dict_list: list):
"""存入多个ip(以字典的列表形式提交)。注意这里只是对不存在的数据添加默认值,没有验证数据类型等。
:param ips_dict_list: ip字典的列表,每个ip是一个字典。"""
cur, conn, log = self.cur, mysql_api_ipool.conn, self.log
if len(ips_dict_list) == 0:
message = 'no ip submitted'
log.info(message)
return
command = '''INSERT INTO ipool (ip,port,anonymous,type,physical_address,response_time,last_verify) VALUES '''
for ip_dict in ips_dict_list:
ip, port, anonymous, type_, physical_address, response_time, last_verify = "0.0.0.0", 0, 3, 3, 'unknown', -1, "1000-01-01 00:00:00"
if 'ip' in ip_dict.keys():
ip = ip_dict['ip']
if 'port' in ip_dict.keys():
port = ip_dict['port']
if 'anonymous' in ip_dict.keys():
anonymous = ip_dict['anonymous']
if 'type' in ip_dict.keys():
type_ = ip_dict['type']
if 'physical_address' in ip_dict.keys():
physical_address = ip_dict['physical_address']
physical_address = str_into_mysql(physical_address)
if 'response_time' in ip_dict.keys():
response_time = ip_dict['response_time']
if 'last_verify' in ip_dict.keys():
last_verify = ip_dict['last_verify']
command += '("{}",{},{},{},"{}",{},"{}"),'.format(
ip, port, anonymous, type_, physical_address, response_time,
last_verify)
message = 'trying to save ip:port={}:{}'.format(ip, port)
log.info(message)
command = command[:-1] + ';'
try:
cur.execute(command)
conn.commit()
message = 'saving ips successfully'
log.info(message)
except:
conn.rollback()
log.info('错误命令为:' + command)
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.save_ips failed')
def check_repeat(self):
"""检查爬到的数据里有没有重复的,只检查ip和port都相同的。
如果有重复的,保留no最小的一条记录。"""
cur, conn, log = mysql_api_ipool.cur, mysql_api_ipool.conn, self.log
cur.execute('USE ipool;')
cur.execute(
'SELECT ip,port FROM ipool GROUP BY ip,port HAVING COUNT(*)>1;')
repeated_items = cur.fetchall()
if len(repeated_items) == 0:
message = 'no repeated ip:port in ipool'
log.info(message)
return
else:
message = 'found repeated ip:port {} kinds'.format(
len(repeated_items))
log.info(message)
command = '''
DELETE FROM ipool WHERE no IN (SELECT no FROM
(SELECT no FROM ipool WHERE
(ip,port) IN (SELECT ip,port FROM ipool GROUP BY ip,port HAVING COUNT(*)>1)
AND no NOT IN (SELECT MIN(no) FROM ipool GROUP BY ip,port HAVING COUNT(*)>1)) AS a);'''
command = command.replace('\n', '')
command = command.replace(' ', '')
try:
cur.execute(command)
conn.commit()
message = 'repeated ip:port deleted successfully. '
log.info(message)
except:
conn.rollback()
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.check_repeat failed')
@staticmethod
def get_ips(random: bool = False,
anonymous: list = [0, 1, 2, 3],
type_: list = [0, 1, 2, 3],
response_time: int = 30000,
last_verify_interval: str = '48:00:00',
grade: int = 100,
limit: int = 1000) -> tuple:
"""根据要求返回多个ip,只返回ip和port。
:param random: 是否随机返回ip
:param anonymous: 以列表形式指定匿名性
:param type_: 对HTTP还是HTTPS或其他代理
:param response_time: 响应时间在多少ms之内
:param last_verify_interval: 用于指定上次验证的时间距现在不超过多久,若为None,则是不限定验证时间
:param grade: ip的分数必须 >= grade
:param limit: 最多返回几条代理ip
:return tuple('ip:port','ip:port', ...)"""
cur = mysql_api_ipool.cur
cur.execute('USE ipool;')
anonymous, type_ = tuple(anonymous), tuple(type_)
if last_verify_interval == None:
last_verify_interval = '99999999:00:00'
command = '''
SELECT ip,port FROM ipool
WHERE anonymous IN {} AND type IN {}
AND response_time BETWEEN 0 AND {} AND (NOW()-last_verify)<"{}"
AND grade >= {}
ORDER BY response_time,last_verify DESC LIMIT {};
'''.format(anonymous, type_, response_time, last_verify_interval,
grade, limit)
if random:
command = '''
SELECT ip,port FROM (SELECT ip,port FROM ipool
WHERE anonymous IN {} AND type IN {}
AND response_time BETWEEN 0 AND {} AND (NOW()-last_verify)<"{}"
AND grade >= {}
) AS a ORDER BY RAND() LIMIT {};
'''.format(anonymous, type_, response_time, last_verify_interval,
grade, limit)
command = command.replace('\n', '')
command = command.replace(' ', '')
try:
cur.execute(command)
except:
raise Exception('MyPythonLib.mysql.mysql_api_ipool.get_ips failed')
ips = cur.fetchall()
ips = tuple(
[ips[i][0] + ':' + str(ips[i][1]) for i in range(len(ips))])
return ips
def get_to_be_validated_ips(self,
last_verify_interval: str = '48:00:00',
limit: int = -1):
"""返回需要验证的ip,只返回no、ip和port。
:param last_verify_interval: 用于指定上次验证的时间距现在大于多久
:param limit: 最多返回几条代理ip,为-1时全部取出
:return tuple((no,ip,port,garde), (no,ip,port,garde), ...)"""
cur, log = self.cur, self.log
cur.execute('USE ipool;')
command = '''
SELECT no,ip,port,grade FROM ipool
WHERE (NOW()-last_verify)>"{}"
ORDER BY last_verify,no LIMIT {};
'''.format(last_verify_interval, limit)
command = command.replace('\n', '')
command = command.replace(' ', '')
if limit == -1:
command = command.replace(' LIMIT -1', '')
try:
cur.execute(command)
except:
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.get_to_be_validated_ips failed'
)
message = 'get {} to_be_validated ips'.format(limit)
log.info(message)
return cur.fetchall()
def update_ip(self, ip_dict: dict):
'''根据ip_dict更新数据,ip_dict中必须包含no、ip、port,另外还应该有至少一个需要更改的列,未提到的列不更改。'''
cur, conn, log = self.cur, mysql_api_ipool.conn, self.log
try:
no, ip, port = ip_dict['no'], ip_dict['ip'], ip_dict['port']
except:
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.update_ip failed: ip_dict must include no,ip,port'
)
if len(ip_dict) <= 3:
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.update_ip failed: at least 1 column must be updated'
)
cur.execute('USE ipool;')
command = 'SELECT no,ip,port FROM ipool WHERE no={};'.format(no)
cur.execute(command)
results = cur.fetchall()
if len(results) == 0:
message = 'ip no.{} not exists'.format(no)
log.warning(message)
return
if ip != results[0][1] or port != results[0][2]:
message = 'ip no.{} ip:port not right: {}:{}, recieved {}:{}, update given up'.format(
no, results[0][1], results[0][2], ip, port)
log.error(message)
return
command = '''UPDATE ipool SET '''
if 'anonymous' in ip_dict.keys():
command += 'anonymous={},'.format(ip_dict['anonymous'])
if 'type' in ip_dict.keys():
command += 'type={},'.format(ip_dict['type'])
if 'physical_address' in ip_dict.keys():
command += 'physical_address="{}",'.format(
str_into_mysql(ip_dict['physical_address']))
if 'response_time' in ip_dict.keys():
command += 'response_time={},'.format(ip_dict['response_time'])
if 'last_verify' in ip_dict.keys():
command += 'last_verify="{}",'.format(ip_dict['last_verify'])
if 'grade' in ip_dict.keys():
command += 'grade={},'.format(ip_dict['grade'])
command = command[:-1] + ' WHERE no={};'.format(no)
try:
cur.execute(command)
conn.commit()
message = 'update ip no.{} successfully'.format(no)
log.info(message)
except:
conn.rollback()
log.info('错误命令为:' + command)
raise Exception(
'MyPythonLib.mysql.mysql_api_ipool.update_ip failed')
3.创建并修改scrapy项目
本次使用到的提供代理ip的网站有12个,如下:
http://www.ip3366.net/free/
https://www.kuaidaili.com
https://ip.jiangxianli.com/
https://www.cool-proxy.net/
https://free-proxy-list.net
https://www.ipaddress.com/proxy-list/
https://www.proxynova.com/proxy-server-list/
http://pubproxy.com/api/proxy?limit=5&format=txt&type=http&level=anonymous&last_check=60&no_country=CN
https://www.rmccurdy.com/.scripts/proxy/good.txt
https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json
https://raw.githubusercontent.com/a2u/free-proxy-list/master/free-proxy-list.txt
https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list.txt
https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt
其中 https://www.kuaidaili.com 页数很多(最早的数据是2014年的,有4000多页),但用scrapy会封ip,所以只爬前2页。
另外,还有3个可提供代理ip的网站:
http://proxy-list.org/english/
https://www.proxynova.com/proxy-server-list/
http://spys.one/en/anonymous-proxy-list/
这三个网站使用js生成代理ip表格,爬取它们最简单的方法是使用selenium,但它们提供的代理ip不多,且proxynova可用 https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json 替代,所以这里就不爬了。
(1)其他配置
使用cmd,进行如下操作:
D:
cd D:\spiders\scrapy
scrapy startproject ipool
在资源管理器中打开 D:\spiders\scrapy\ipool\ipool
,修改一下4个文件,修改settings.py为:
# Scrapy settings for ipool project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
LOG_LEVEL = 'ERROR' # 只打印错误信息
BOT_NAME = 'ipool'
SPIDER_MODULES = ['ipool.spiders']
NEWSPIDER_MODULE = 'ipool.spiders'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = 'ipool (+http://www.yourdomain.com)'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False #不遵守robots.txt
# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32
# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
#DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
#COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
#}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# 'ipool.middlewares.IpoolSpiderMiddleware': 543,
#}
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
'ipool.middlewares.IpoolDownloaderMiddleware': 543,
}
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {'ipool.pipelines.MysqlPipeline': 1}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
pipelines.py为:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
from MyPythonLib.mysql import mysql_api_ipool
import os
class MysqlPipeline:
def open_spider(self, spider):
self.mysql_api_ipool = mysql_api_ipool()
def process_item(self, item, spider):
self.mysql_api_ipool.save_ips(item['ips_dict_list'])
return item
def close_spider(self, spider):
self.mysql_api_ipool.check_repeat()
middlewares.py为:
# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter
# 爬虫中间件
class IpoolSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, or item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Request or item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesn’t have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
import random
# 下载中间件
class IpoolDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
UA_list = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.33',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:104.0) Gecko/20100101 Firefox/104.0'
]
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
# 拦截无异常的请求,可进行UA伪装
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
request.headers['User-Agent'] = random.choice(self.UA_list)
return None
# 拦截所有响应,可用于爬动态加载的数据
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
# 拦截发生异常的请求,可进行代理ip设置
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
items.py为:
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class IpoolItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
ips_dict_list = scrapy.Field()
(2)爬虫配置
要爬12个网站,所以需要写12个爬虫。其实有几个网站的数据格式差不多,这些爬虫还能再精简合并一下。
这些爬虫都放在 D:\spiders\scrapy\ipool\ipool\spiders
目录下。
a.a2u.py
import scrapy, re
from ipool.items import IpoolItem
class A2uSpider(scrapy.Spider):
name = 'a2u'
start_urls = [
'https://raw.githubusercontent.com/a2u/free-proxy-list/master/free-proxy-list.txt'
]
def parse(self, response):
ip_nodes = re.findall(
r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
response.text)
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip, port = ip_node.split(':')
port = int(port)
ip_dict['ip'] = ip
ip_dict['port'] = port
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
b.clarketm.py
import scrapy, re
from ipool.items import IpoolItem
class ClarketmSpider(scrapy.Spider):
name = 'clarketm'
start_urls = [
'https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list.txt'
]
def parse(self, response):
ip_nodes = re.findall(
r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
response.text)
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip, port = ip_node.split(':')
ip = ip.strip()
port = int(port.strip())
ip_dict['ip'] = ip
ip_dict['port'] = port
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
c.cool_proxy.py
import scrapy, json, time
from ipool.items import IpoolItem
class CoolProxySpider(scrapy.Spider):
name = 'cool_proxy'
start_urls = ['https://www.cool-proxy.net/proxies.json']
def parse(self, response):
ip_nodes = json.loads(response.text.replace('\'', '\"'))
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node['ip'].strip()
port = int(ip_node['port'])
anonymous = (0, 1)[1 == ip_node['anonymous']]
type_ = 3
physical_address = ip_node['country_name'].strip()
response_time = int(1000 * float(ip_node['response_time_average']))
last_verify = float(ip_node['update_time'])
last_verify = time.localtime(last_verify)
last_verify = time.strftime('%Y-%m-%d %H:%M:%S', last_verify)
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
d.free_proxy_list.py
import scrapy, time
from ipool.items import IpoolItem
class FreeProxyListSpider(scrapy.Spider):
name = 'free_proxy_list'
start_urls = ['https://free-proxy-list.net/']
def parse(self, response):
ip_nodes = response.xpath(
r'//*[@id="list"]/div/div[2]/div/table/tbody/tr')
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node.xpath('./td[1]/text()').extract()[0].strip()
port = int(ip_node.xpath('./td[2]/text()').extract()[0].strip())
anonymous = (
0,
1)['anonymous' in ip_node.xpath('./td[5]/text()').extract()[0]]
# (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
type_ = (0,
1)['yes' in ip_node.xpath('./td[7]/text()').extract()[0]]
physical_address = ip_node.xpath(
'./td[4]/text()').extract()[0].strip()
response_time = 1000
last_verify = time.strftime('%Y-%m-%d %H:%M:%S')
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
e.ip3366.py
import scrapy
from ipool.items import IpoolItem
class Ip3366Spider(scrapy.Spider):
name = 'ip3366'
#allowed_domains = ['www.ip3366.net'] # 允许的域名。如果域名在此之外,则不会爬取
start_urls = [
'http://www.ip3366.net/free/', 'http://www.ip3366.net/free/?stype=2'
] # 起始url的列表
base_url = 'http://www.ip3366.net/free/'
def parse(self, response):
ip_nodes = response.xpath(r'//*[@id="list"]/table/tbody/tr')
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node.xpath(r'./td[1]/text()').extract()[0].strip()
port = int(ip_node.xpath(r'./td[2]/text()').extract()[0].strip())
anonymous = (
0, 1)['高匿' in ip_node.xpath(r'./td[3]/text()').extract()[0]]
# (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
type_ = (
0, 1)['HTTPS' in ip_node.xpath(r'./td[4]/text()').extract()[0]]
physical_address = ip_node.xpath(
r'./td[5]/text()').extract()[0].strip()
response_time = 1000 * int(
ip_node.xpath(r'./td[6]/text()').extract()[0].strip().replace(
'秒', ''))
response_time = (response_time, 250)[response_time == 0]
#这个网站中有很多代理ip的延迟为0,不太准确,一律改成250即0.25s
last_verify = ip_node.xpath(
r'./td[7]/text()').extract()[0].strip().replace('/', '-')
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
next_url = response.xpath(
r'//*[@id="listnav"]/ul/a[text()="下一页"]/@href').extract()
if next_url != None and len(next_url) != 0:
next_url = Ip3366Spider.base_url + next_url[0]
yield scrapy.Request(url=next_url,
callback=self.parse) # 回调函数是自己,即递归解析网页
# 本方法在爬虫结束时被调用
def closed(self, spider):
pass
f.ipaddress.py
import scrapy
from ipool.items import IpoolItem
class IpaddressSpider(scrapy.Spider):
name = 'ipaddress'
start_urls = ['https://www.ipaddress.com/proxy-list/']
def parse(self, response):
ip_nodes = response.xpath(r'/html/body/div[1]/main/table/tbody/tr')
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node.xpath('./td[1]/a[1]/text()').extract()[0].strip()
port = int(
ip_node.xpath('./td[1]/text()').extract()[0].strip().replace(
'"', '').replace(':', ''))
anonymous = (
0,
1)['anonymous' in ip_node.xpath('./td[2]/text()').extract()[0]]
# (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
type_ = 3
physical_address = ip_node.xpath(
'./td[3]/text()').extract()[0].strip()[3:]
response_time = 1000
last_verify = ip_node.xpath(
'./td[4]/text()').extract()[0].strip() + ':00'
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
g.jiangxianli.py
import scrapy, json
from ipool.items import IpoolItem
class JiangxianliSpider(scrapy.Spider):
name = 'jiangxianli'
start_urls = ['https://ip.jiangxianli.com/api/proxy_ips?page=1']
def parse(self, response):
ip_nodes = json.loads(response.text)['data']['data']
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for i in range(len(ip_nodes)):
ip_dict = {}
ip = ip_nodes[i]['ip'].strip()
port = int(ip_nodes[i]['port'].strip())
anonymous = (0, 1)[2 == ip_nodes[i]['anonymity']]
type_ = (0, 1)['https' == ip_nodes[i]['protocol']]
physical_address = ip_nodes[i]['ip_address'].strip()
response_time = ip_nodes[i]['speed']
last_verify = ip_nodes[i]['validated_at'].strip()
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
next_url = json.loads(response.text)['data']['next_page_url']
if next_url != None:
yield scrapy.Request(url=next_url, callback=self.parse)
h.kuaidaili.py
import scrapy
from ipool.items import IpoolItem
class KuaidailiSpider(scrapy.Spider):
name = 'kuaidaili'
#allowed_domains = ['www.kuaidaili.com']
start_urls = [
'https://free.kuaidaili.com/free/inha/1/',
'https://free.kuaidaili.com/free/intr/1/'
]
base_url = 'https://free.kuaidaili.com'
custom_settings = {
'CONCURRENT_REQUESTS': 1,
'DOWNLOAD_DELAY': 10,
'CONCURRENT_REQUESTS_PER_DOMAIN': 1
}
def parse(self, response):
ip_nodes = response.xpath(r'//*[@id="list"]/table/tbody/tr')
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node.xpath(r'./td[1]/text()').extract()[0].strip()
port = int(ip_node.xpath(r'./td[2]/text()').extract()[0].strip())
anonymous = (
0, 1)['高匿' in ip_node.xpath(r'./td[3]/text()').extract()[0]]
# (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
type_ = (
0, 1)['HTTPS' in ip_node.xpath(r'./td[4]/text()').extract()[0]]
physical_address = ip_node.xpath(
r'./td[5]/text()').extract()[0].strip()
response_time = 1000 * float(
ip_node.xpath(r'./td[6]/text()').extract()[0].strip().replace(
'秒', ''))
last_verify = ip_node.xpath(r'./td[7]/text()').extract()[0].strip()
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ip_dict['response_time'] = response_time
ip_dict['last_verify'] = last_verify
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
page_num = response.url.split('/')[-2]
next_url = response.url.replace(page_num, str(int(page_num) + 1))
other_urls_list = response.xpath(
r'//*[@id="listnav"]/ul/li/a/@href').extract()
other_urls_list = [
KuaidailiSpider.base_url + other_urls_list[i]
for i in range(len(other_urls_list))
]
if int(page_num) < 5 and next_url in other_urls_list:
yield scrapy.Request(url=next_url, callback=self.parse)
custom_settings
用于指定这个爬虫的个性化设置,本例中限制了爬取的频率;并且只爬前5页。
i.pubproxy.py
import scrapy, re
from ipool.items import IpoolItem
class PubproxySpider(scrapy.Spider):
name = 'pubproxy'
start_urls = [
'http://pubproxy.com/api/proxy?limit=5&format=txt&type=http&level=anonymous&last_check=60&no_country=CN'
]
def parse(self, response):
ip_nodes = re.findall(
r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
response.text)
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip, port = ip_node.split(':')
port = int(port)
ip_dict['ip'] = ip
ip_dict['port'] = port
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
j.rmccurdy.py
import scrapy, re
from ipool.items import IpoolItem
class RmccurdySpider(scrapy.Spider):
name = 'rmccurdy'
start_urls = ['https://www.rmccurdy.com/.scripts/proxy/good.txt']
def parse(self, response):
ip_nodes = re.findall(
r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
response.text)
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip, port = ip_node.split(':')
port = int(port)
ip_dict['ip'] = ip
ip_dict['port'] = port
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
k.sunny9577.py
import scrapy, json
from ipool.items import IpoolItem
class Sunny9577Spider(scrapy.Spider):
name = 'sunny9577'
start_urls = [
'https://raw.githubusercontent.com/sunny9577/proxy-scraper/master/proxies.json'
]
def parse(self, response):
ip_nodes = json.loads(response.text)['proxynova']
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip = ip_node['ip'].strip()
port = int(ip_node['port'].strip())
anonymous = (0, 1)['Anonymous' in ip_node['anonymity']]
# (a,b)[条件] 也是python三元表达式的一种写法。注意条件为真时,表达式整体值为b
type_ = 3
physical_address = ip_node['country']
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['anonymous'] = anonymous
ip_dict['type'] = type_
ip_dict['physical_address'] = physical_address
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
l.TheSpeedX.py
import scrapy, re
from ipool.items import IpoolItem
class ThespeedxSpider(scrapy.Spider):
name = 'TheSpeedX'
start_urls = [
'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt',
'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks5.txt',
'https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/socks4.txt'
]
def parse(self, response):
ip_nodes = re.findall(
r'[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}\.[1-2]{0,1}\d{0,2}:\d{1,5}',
response.text)
ips_dict_list = [] #这是一个字典的列表,和mysql_api.save_ips里的格式匹配
for ip_node in ip_nodes:
ip_dict = {}
ip, port = ip_node.split(':')
ip = ip.strip()
port = int(port.strip())
ip_dict['ip'] = ip
ip_dict['port'] = port
ips_dict_list.append(ip_dict)
item = IpoolItem()
item['ips_dict_list'] = ips_dict_list
yield item
(3)运行爬虫
这里涉及到了多个爬虫,运行的时候一个个使用 scrapy crawl 爬虫名
比较麻烦。scrapy提供了其他方法,可同时运行一个项目中的多个爬虫或全部爬虫。以下内容参考了 这篇文章 。
在 D:\spiders\scrapy\ipool\ipool
下新建一个py文件,比如 crawl_all.py
,内容为:
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from scrapy.spiderloader import SpiderLoader
# 根据项目配置获取 CrawlerProcess 实例
process = CrawlerProcess(get_project_settings())
# 获取 spiderloader 对象,以进一步获取项目下所有爬虫名称
spider_loader = SpiderLoader(get_project_settings())
# 添加全部爬虫
for spidername in spider_loader.list():
process.crawl(spidername)
# 添加一个爬虫
#process.crawl('爬虫名')
# 执行
process.start()
如上所述,添加一个爬虫就用 process.crawl('爬虫名')
,而 spider_loader.list()
方法可获取项目中的所有爬虫名,两者搭配就可运行全部爬虫。另外需要提醒,笔者爬取的几个网站中有几个不能直接访问,比如 raw.githubusercontent.com
那几个,你可以把它们去掉,或者使用一些设置。如果需要使用V2rayN并使用requests库或scrapy,可查看 Docker实践——如何查阅wiki百科及V2Ray ,其中提到了如何使用V2rayN并运行爬虫。
最后,在cmd中进入 D:\spiders\scrapy\ipool\ipool
,运行 python crawl_all.py
即可。
4.验证ip
原本想使用树莓派多进程验证ip,但占用树莓派资源太高,改在win10电脑上验证。
在win10电脑的自建库目录中,除了 log.py 、 mysql.py ,还需要添加一个spider.py,文件内容如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import requests, random
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36'
}
def get_session(max_retries: int = 3) -> requests.Session:
'''接受最大重连数,返回一个requests会话对象
:param max_retries: 最大重连次数
:return 会话对象'''
# 设置重连次数。包括开头,若出现异常,最多尝试连接4次
try:
session = requests.Session()
session.mount('http://',
requests.adapters.HTTPAdapter(max_retries=max_retries))
session.mount('https://',
requests.adapters.HTTPAdapter(max_retries=max_retries))
return session
except:
raise Exception('error: MyPythonLib.spider.get_session failed ')
def multiprocess_job_spliter(process_num: int,
to_do_list,
random_shuffle: bool = True) -> dict:
'''给出进程数、总任务序列,返回分好的任务列表。
:param process_num: 进程数(需要将总任务序列分为多少份)
:param to_do_list: 总任务序列(元组或列表),比如待爬取的所有url字符串
:param random_shuffle: 是否打乱总任务序列的顺序
:return {0: todo_list1, 1: todo_list2, ... }'''
try:
to_do_list = list(to_do_list)
if random_shuffle:
random.shuffle(to_do_list)
total_num = len(to_do_list)
one_share = total_num // process_num
share_list = {}
for i in range(0, process_num):
if i != process_num - 1:
share_list[i] = to_do_list[(i * one_share):((i + 1) *
one_share)]
elif i == process_num - 1:
share_list[i] = to_do_list[(i * one_share):]
return share_list
except:
raise Exception(
'error: MyPythonLib.spider.multiprocess_job_spliter failed ')
这个文件后面还有很多内容,但实际上只会用到 multiprocess_job_spliter 这个函数。
然后在一个合适的地方放python脚本,比如就放在scrapy的ipool项目中: D:\spiders\scrapy\ipool\ipool\update_ips.py
,文件内容如下:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from MyPythonLib import mysql, spider
from MyPythonLib import log as mylog
import logging, os, time, multiprocessing
headers = spider.headers
class update_ips():
test_http_url = 'http://example.org/'
test_https_url = 'https://example.org/'
def __init__(self, log: logging.Logger = None):
""":param log: 可选的log对象,若不提供,将在当前文件夹下新建logs目录、创建log文件"""
if log == None:
log_dir = os.path.join(os.getcwd(), 'logs')
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
log_file_name = 'mysql_api_ipool_log#{}.txt'.format(
time.strftime('-%Y-%m-%d_%H_%M_%S'))
log = mylog.get_logger(log_dir, log_file_name, 'mysql_api_ipool')
self.log = log
message = 'ready to update ips'
self.log.info(message)
self.mysql_api_ipool = mysql.mysql_api_ipool(self.log)
self.conn = self.mysql_api_ipool.conn
self.cur = self.conn.cursor()
self.mysql_api_ipool.check_table()
message = 'database ipool and table ipool checked'
self.log.info(message)
self.cur.execute('USE ipool;')
self.conn.commit()
def __del__(self):
self.cur.close()
message = 'update ips complete'
self.log.info(message)
@staticmethod
def validate_ips_single_process(ips_list: list,
process_no: int = 0,
grade_minus: int = 50):
'''验证ip,只涉及type、last_verify、grade的改变。若grade<=0,会将这个ip删除。
:这个方法是单进程的。
:param ips_list: 待验证ip的列表,格式为 [(no,ip,port,grade), (no,ip,port,grade), ...]
:param process_no: 进程编号,主要用于log文件取名
:param grade_minus: 若代理ip无法使用,则扣除该分数'''
global headers
log = mylog.get_logger(
abs_path=os.path.join(os.getcwd(), 'logs'),
log_file_name='validate_ips-{}'.format(process_no),
getLogger_name=str(process_no))
message = 'received {} ips, validating...'.format(len(ips_list))
log.info(message)
mysql_api_ipool = mysql.mysql_api_ipool(log)
session = spider.get_session(max_retries=1)
for no, ip, port, grade in ips_list:
ip_port = '{}:{}'.format(ip, port)
proxies = {'http': ip_port, 'https': ip_port}
http_success, https_success = True, True
try:
session.get(url=update_ips.test_http_url,
timeout=10,
headers=headers,
proxies=proxies)
except:
http_success = False
try:
session.get(url=update_ips.test_https_url,
timeout=10,
headers=headers,
proxies=proxies)
except:
https_success = False
if http_success and not https_success:
type_ = 0
elif https_success and not http_success:
type_ = 1
elif http_success and https_success:
type_ = 2
elif not http_success and not https_success:
type_ = 3
ip_dict = {}
ip_dict['no'] = no
ip_dict['ip'] = ip
ip_dict['port'] = port
ip_dict['type'] = type_
ip_dict['last_verify'] = time.strftime('%Y-%m-%d %H:%M:%S')
if type_ == 3:
grade -= grade_minus
else:
grade = 100
ip_dict['grade'] = grade
mysql_api_ipool.update_ip(ip_dict=ip_dict)
def validate_ips_multiprocess(self,
process_num: int = 5,
limit: int = -1,
grade_minus: int = 50):
'''验证ip,只涉及type、last_verify、grade的改变。若grade<=0,会将这个ip删除。
:这个方法是多进程的,将会调用 validate_ips_single_process 。
:param process_num: 验证ip的进程数
:param limit: 限制待验证ip的条数,为-1时表示全部取出
:param grade_minus: 若代理ip无法使用,则扣除该分数'''
cur, conn, log = self.cur, self.conn, self.log
command = 'DELETE FROM ipool WHERE grade<=0;'
try:
cur.execute(command)
conn.commit()
message = 'delete ips where grade<=0 successfully'
log.info(message)
except:
conn.rollback()
log.info('错误命令为:' + command)
raise Exception(
'update_ips.validate_ips_multiprocess failed: delete error')
ips_list = self.mysql_api_ipool.get_to_be_validated_ips(limit=limit)
ips_list_dict = spider.multiprocess_job_spliter(process_num, ips_list)
process_pool = multiprocessing.Pool(processes=process_num)
for i in range(process_num):
process_pool.apply_async(update_ips.validate_ips_single_process,
args=(ips_list_dict[i], i, grade_minus))
process_pool.close()
process_pool.join()
try:
cur.execute(command)
conn.commit()
message = 'delete ips where grade<=0 successfully'
log.info(message)
except:
conn.rollback()
log.info('错误命令为:' + command)
raise Exception(
'update_ips.validate_ips_multiprocess failed: delete error')
if __name__ == '__main__':
update_ips().validate_ips_multiprocess(process_num=60,
limit=-1,
grade_minus=50)
update_ips().validate_ips_multiprocess(process_num=60, limit=-1, grade_minus=50)
中, process_num
和 limit
可以自行修改,主要与win10机器的配置有关,验证ip的过程mysql读写频率不算高。
另外,这里的验证ip只涉及了type(http还是https)、last_verify(最后验证时间)、grade(ip分数)的改变,如果想一同更新代理ip的匿名性、物理地址、响应时间,可以在这里的基础上自行更改。
最后,在cmd中,使用:
cd D:\spiders\scrapy\ipool\ipool
python update_ips.py
即可运行验证ip的py脚本。
5.日常使用
爬取ip和验证ip的脚本可以每天运行一次,顺序如下:
(1)若需要使用V2rayN,先开启V2rayN。
(2)在cmd中使用如下命令:
cd D:\spiders\scrapy\ipool\ipool
python crawl_all.py
python update_ips.py
即可。
如果想把命令写成脚本、开机自动运行,可以参考 不同系统下的开机自动运行 。
另外,mysql记得时常备份。使用代理ip时,用 MyPythonLib.mysql.mysql_api_ipool.get_ips
方法可取出ip。