步骤/目录:
1.创建文件夹及添加路径
2.自建库的编写
    (1)log.py
    (2)mysql.py
    (3)spider.py
    (4)interaction.py

本文首发于个人博客https://lisper517.top/index.php/archives/45/,转载请注明出处。
本文的目的是讲解自建库的编写,并初步添加一些函数。
本文写作日期为2022年9月6日。使用的平台为win10,编辑器为VS code。

在python的日常使用中,我们常常需要复用一些代码,有时可以把这些代码写成函数,存在自建库中备用。本文将演示如何自建python库,并添加一些基础的函数到自建库中。

1.创建文件夹及添加路径

在一个合适的地方新建一个文件夹,用于存储自己写的库。本例中,该文件夹为 D:\PythonLib\MyPythonLib (以后所有自建库都放在这个文件夹里),注意这个文件夹要创建两层。然后新建一个文件: D:\PythonLib\MyPythonLib\__init__.py 。linux下也是同样的操作。

然后把这个库添加到python的库路径中。找到python的根目录里存放库的文件夹,比如 C:\Program Files\Python310\Lib\site-packages (linux系统中一般在 /usr/lib/python3/dist-packages ),在这里新建一个.pth文件,比如 PythonLibs.pth ,写上自建库文件夹的位置,即 D:\PythonLib ,linux系统可以写上 /root/PythonLib/MyPythonLib

2.自建库的编写

然后就是编写自建库,在编写自建库时应该慎重,有些地方宁可写复杂一点,也要使代码简单易懂、便于维护;在处理异常时也要更加谨慎,避免有些时候是这些自建函数出现问题,却半天排查不到;注释要详尽,确保第一次用的人也能看懂。
有如下的几个函数可以写成自建库:

(1)log.py

import logging
import time
import os


def get_logger(abs_path: str,
               log_file_name: str = '',
               getLogger_name: str = ''):
    '''传入绝对路径和log文件名,返回log对象。
    
    :param abs_path: log文件存储的绝对路径
    :param log_file_name: log文件名,若log文件名为空,则使用默认的文件名
    :param getLogger_name: logger对象名,如果在同一个程序中需要同时写多个log文件,建议这个参数不要相同
    :return log对象'''
    try:
        formatter = logging.Formatter(
            '%(lineno)d : %(asctime)s : %(levelname)s : %(funcName)s : %(message)s'
        )

        if log_file_name == '':
            log_file_name = 'log#{}.txt'.format(time.strftime('-%Y-%m-%d'))
        fileHandler = logging.FileHandler(
            (os.path.join(abs_path, log_file_name)),
            mode='w',
            encoding='utf-8')
        fileHandler.setFormatter(formatter)

        if getLogger_name == '':
            getLogger_name = 'logger'
        log = logging.getLogger(getLogger_name)
        log.setLevel(logging.DEBUG)
        log.addHandler(fileHandler)
        return log
    except:
        raise Exception('error: MyPythonLib.log.get_logger failed ')

(2)mysql.py

注意把mysql连接的服务器ip,端口,用户密码改成自己的。

import pymysql


def get_conn(database_name: str = 'mysql'):
    '''指定数据库名,返回mysql连接对象。

    :param database_name: 数据库名,不指定时默认用mysql库
    :return conn'''
    try:
        conn = pymysql.connect(host='127.0.0.1',
                               port=53306,
                               user='root',
                               passwd='mysqlpasswd',
                               db=database_name,
                               charset='utf8')
        return conn
    except:
        raise Exception('error: MyPythonLib.mysql.get_conn failed ')


def str_into_mysql(string: str) -> str:
    '''输入一个字符串,将其转换为可以在mysql中储存的形式。

    这个字符串的使用位置是列值处,比如 "SELECT * FROM 表名 WHERE 列名='字符串';" 
    中的字符串,若包含 \\ 、 ' 、 " 这三个字符,就需要用本函数处理。

    :param string: 目标字符串
    :return mysql可以识别的字符串
    '''
    try:
        string = string.replace('\\', '\\\\')
        string = string.replace("'", "\\'")
        string = string.replace('"', '\\"')
        return string
    except:
        raise Exception('error: MyPythonLib.mysql.str_into_mysql failed ')


def table_exists(cur, table_name: str) -> bool:
    '''判断表在当前库中是否存在。
    
    :param cur: 游标对象
    :param table_name: 表名
    :return 存在则返回True,否则返回False
    '''
    try:
        cur.execute("SHOW TABLES;")
        tables = cur.fetchall()
        for table in tables:
            table = table[0]
            if table == table_name:
                return True
        return False
    except:
        raise Exception('error: MyPythonLib.mysql.table_exists failed ')

(3)spider.py

import requests
import random


def get_session(max_retries: int = 3) -> requests.Session:
    '''接受最大重连数,返回一个requests会话对象

    :param max_retries: 最大重连次数
    :return 会话对象'''
    # 设置重连次数。包括开头,若出现异常,最多尝试连接4次
    try:
        session = requests.Session()
        session.mount('http://',
                      requests.adapters.HTTPAdapter(max_retries=max_retries))
        session.mount('https://',
                      requests.adapters.HTTPAdapter(max_retries=max_retries))
        return session
    except:
        raise Exception('error: MyPythonLib.spider.get_session failed ')


def multiprocess_job_spliter(process_num: int,
                             to_do_list,
                             random_shuffle: bool = True) -> dict:
    '''给出进程数、总任务序列,返回分好的任务列表。

    :param process_num: 进程数(需要将总任务序列分为多少份)
    :param to_do_list: 总任务序列(元组或列表),比如待爬取的所有url字符串
    :param random_shuffle: 是否打乱总任务序列的顺序
    :return {0: todo_list1, 1: todo_list2, ... }'''
    try:
        to_do_list = list(to_do_list)
        if random_shuffle:
            random.shuffle(to_do_list)
        total_num = len(to_do_list)
        one_share = total_num // process_num
        share_list = {}
        for i in range(0, process_num):
            if i != process_num - 1:
                share_list[i] = to_do_list[(i * one_share):((i + 1) *
                                                            one_share)]
            elif i == process_num - 1:
                share_list[i] = to_do_list[(i * one_share):]
        return share_list
    except:
        raise Exception(
            'error: MyPythonLib.spider.multiprocess_job_spliter failed ')

(4)interaction.py

一些命令行下交互、打印输出的小函数。

import time
import os


def print_progress(start_time: float,
                   progress: int,
                   total: int,
                   msg: str,
                   cls=False):
    '''图形化打印进度及附加信息(清空屏幕)。

    :param start_time: 开始时间,由time.time()得到的float
    :param progress: 整数,表示目前进度
    :param total: 整数,表示总量
    :param msg: 附加信息
    :param cls: 是否清空屏幕(windows下)'''
    try:
        percent = 100. * progress / total
        x = int(2 * percent / 5)
        duration = time.time() - start_time
        h, m, s = duration // 3600, duration // 60 % 60, duration % 60
        time_format = '{:0>2.0f}:{:0>2.0f}:{:0>2.0f}'.format(h, m, s)
        expect_time = duration * total / progress
        h, m, s = expect_time // 3600, expect_time // 60 % 60, expect_time % 60
        expect_time_format = '{:0>2.0f}:{:0>2.0f}:{:0>2.0f}'.format(h, m, s)
        print('\r已运行{:.2f}% '.format(percent), 
              str(progress) + '/' + str(total), 
              time_format + '/' + expect_time_format + ' [' + '#' * x + '.' * (40 - x) + ']',
              msg,
              end='')
        if cls:
            os.system('cls')
    except:
        raise Exception(
            'error: MyPythonLib.interaction.print_progress failed ')

标签: none

添加新评论