Mrli
别装作很努力,
因为结局不会陪你演戏。
Contacts:
QQ博客园

开源项目代码阅读记录

2022/05/29 代码阅读
Word count: 15,047 | Reading time: 68min

爬虫系列

抢购、秒杀

taobao_seckill

淘宝秒杀物品:分为selenium版和request网络请求版,微小的区别在于,api先请求的是购物车的物品信息,selenium则是勾选选择框。

流程大致都相同:①keep_wait等待临近抢购时间----->②将需要的信息缓存住,不再刷新(比如要抢购的物品信息、cookies…)----->③进行结算(/点击按钮)----->④提交订单(/点击按钮)----->⑤支付订单(/点击按钮);

  • 使用browsercookie来管理浏览器cookies——从浏览器提取保存的cookies的工具。它是一个很有用的爬虫工具,通过加载你浏览器的cookies到一个cookiejar对象里面,让你轻松下载需要登录的网页内容。

    1
    2
    >>> cj = browsercookie.chrome()	# browsercookie.load() 在不知道或者不关心浏览器时使用
    >>> r = requests.get(url, cookies=cj)

    一般情况下,作为开发者的我们都是将cookies手动写入或者是传入程序的,而browsercookie用途,我想可以是将需要传入cookies的程序打包给不会提取Cookies的用户使用。(类似通过selenium登陆后,拿取cookies再发送网络请求,这边是让/借助用户自己的浏览器来拿取cookies)

  • 使用tkinter搭建简单的GUI:点击“开始”的按钮后会将密码框和抢购设置时间框的内容传入,作为实例化ChromeDriverSpider的参数,并运行实例的sec_kill方法开始执行抢购

  • 配置chromeDriver启动配置项

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def build_chrome_options(self):
    """配置启动项"""
    chrome_options = webdriver.ChromeOptions()
    chrome_options.accept_untrusted_certs = True
    chrome_options.assume_untrusted_cert_issuer = True
    arguments = ['--no-sandbox', '--disable-impl-side-painting', '--disable-setuid-sandbox', '--disable-seccomp-filter-sandbox',
    '--disable-breakpad', '--disable-client-side-phishing-detection', '--disable-cast',
    '--disable-cast-streaming-hw-encoding', '--disable-cloud-import', '--disable-popup-blocking',
    '--ignore-certificate-errors', '--disable-session-crashed-bubble', '--disable-ipv6',
    '--allow-http-screen-capture', '--start-maximized']
    for arg in arguments:
    chrome_options.add_argument(arg)
    chrome_options.add_argument(f'--user-agent={choice(get_useragent_data())}')
    return chrome_options

taobao_api

selenium和api结合:登陆逻辑使用了selenium,刷新购物车和下单调用了手机淘宝taobao_api,自动付款也使用了selenium,能自动模拟触摸输入密码付款,接口签名sign生成方法参考get_sign_val。

  • 同步阿里云时间服务器

    1
    2
    3
    4
    5
    #一次性设置
    sudo sntp -sS ntp.aliyun.com
    #永久设置
    sudo systemsetup -setnetworktimeserver ntp.aliyun.com
    sudo systemsetup -setusingnetworktime on
  • 展示QRCode

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
       # 找到二维码图片的链接
    img = WebDriverWait(browser, 2, 0.1).until(
    EC.presence_of_element_located((By.XPATH, "//div[@id='J_QRCodeImg']/img"))).get_attribute('src')
    # 获得图片字节流内容
    img_content = requests.get(img, timeout=5).content
    # 保存到本地图片中
    file_name = 'taobao_qrcode.png'
    fp = open(file_name, 'wb')
    fp.write(img_content)
    fp.close()
    # 将其解码成链接, pyzbar为二维码和条形码库 ==> 个人觉得保存后可以直接转成PIL的Image然后img.show()
    from pyzbar.pyzbar import decode
    barcode_url = ''
    barcodes = decode(Image.open(file_name))
    for barcode in barcodes:
    # 解析获得二维码后的内容;如果是条形码,则返回条形码数
    barcode_url = barcode.data.decode("utf-8")
    # 生成二维码并通过控制台打印
    qr = qrcode.QRCode()
    qr.add_data(barcode_url)
    qr.print_ascii(invert=True)

    注:ZBar是一个开源软件套件,用于从各种来源(如视频流、图像文件和原始强度传感器)读取条形码;pyzbar 是通过 Python23接口,使用 ZBar 库读取一维条形码和QR码 。=>读取条形码读取二维码

jd_seckill

  1. 预约商品
  2. 秒杀抢购商品: ①访问商品的抢购链接(用于设置cookie等—>②访问抢购订单结算页面---->③提交抢购(秒杀)订单 [_get_seckill_order_data生成提交抢购订单所需的请求体参数、获取秒杀初始化信息(包括:地址,发票,token)]
  • 从UAList中通过random.choice(USER_AGENTS)随机UA

  • 使用内置configparser模块解析config.ini

  • 多进程池:

    1
    2
    3
    4
    5
    from concurrent.futures import ProcessPoolExecutor

    with ProcessPoolExecutor(work_count) as pool:
    for i in range(work_count):
    pool.submit(self.seckill)
  • 秒杀时间比较

    1
    2
    3
    4
    5
    6
    7
    8
    buy_time = datetime.strptime(global_config.getRaw('config','buy_time'), "%Y-%m-%d %H:%M:%S.%f"
    now_time = datetime.now
    while True:
    if now_time() >= buy_time:
    logger.info('时间到达,开始执行……')
    break
    else:
    time.sleep(self.sleep_interval)
  • 获取京东服务器时间https://api.m.jd.com/client.action?functionId=queryMaterialProducts&client=wh5

jd-assistant★

jd_seckill的优化进阶版本

  • 普通商品购买主要流程:(1)清空购物车 --> (2)添加商品到购物车 --> (3)提交订单

  • 预约抢购商品特点:

    1. 需要提前点击预约
    2. 大部分此类商品在预约后自动加入购物车,但是无法勾选✓,也无法️进入到结算页面
    3. 到了抢购的时间点后将商品加入购物车,此时才能勾选并下单
  • 普通商品:能加入购物车🛒,然后进入购物车结算下单.

  • 抢购商品:需提提前预约,开始抢购时有“立即抢购”按钮🔘,点击按钮后商品加入购物车,然后结算下单.

⭐️抢购受多种因素影响:网速、账号质量、运气等等,仅供娱乐,认真就输了.

  • deprecated、check_login装饰器:传入了self参数,并通过self实例调用了类的其他成员方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def check_login(func):
    """用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆"""
    @functools.wraps(func)
    def new_func(self, *args, **kwargs):
    if not self.is_login:
    logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
    self.login_by_QRcode()
    return func(self, *args, **kwargs)
    return new_func
  • 密码加密——pycryptodome

    1
    2
    3
    4
    5
    6
    7
    8
    from Crypto.PublicKey import RSA
    from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5

    def encrypt_pwd(password, public_key=RSA_PUBLIC_KEY):
    rsa_key = RSA.importKey(public_key)
    encryptor = Cipher_pkcs1_v1_5.new(rsa_key)
    cipher = b64encode(encryptor.encrypt(password.encode('utf-8')))
    return cipher.decode('utf-8')
  • 提供JS脚本文件来获取商品的信息

    1
    2
    3
    4
    5
    6
    // 以下代码在订单结算页面的开发者工具Console中执行,用于获取必要的参数
    var eid = $('#eid').val();
    var fp = $('#fp').val();
    var trackId = getTakId();
    var riskControl = $('#riskControl').val();
    console.log(`eid = ${eid}\nfp = ${fp}\ntrack_id = ${trackId}\nrisk_control = ${riskControl}`);
  • cookies验证-通过读取本地保存的cookies时要先验证下cookies是否有效:requests库的使用方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def _validate_cookies(self):
    """验证cookies是否有效(是否登陆)
    通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
    :return: cookies是否有效 True/False
    """
    url = 'https://order.jd.com/center/list.action'
    payload = {
    'rid': str(int(time.time() * 1000)),
    }
    try:
    resp = self.sess.get(url=url, params=payload, allow_redirects=False)
    if resp.status_code == requests.codes.OK:
    return True
    except Exception as e:
    logger.error(e)
    # 创建新的session
    self.sess = requests.session()
    return False
  • 二维码登录:下载二维码_get_QRcode–>询问获取二维码扫描状态_get_QRcode_ticket—>验证二维码信息(_validate_QRcode_ticket)–>设置is_login=True和save_cookies(二维码有效期约120s,浏览器大约2s发送一个请求询问扫描状态)——看代码是assistant参考了2018年的**jd-autobuy**

  • 扫描二维码:写入图片和打开图片

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def save_image(resp, image_file):
    with open(image_file, 'wb') as f:
    # 从网络请求中下载内容
    for chunk in resp.iter_content(chunk_size=1024):
    f.write(chunk)

    def open_image(image_file):
    if os.name == "nt":
    os.system('start ' + image_file) # for Windows
    else:
    if os.uname()[0] == "Linux":
    if "deepin" in os.uname()[2]:
    os.system("deepin-image-viewer " + image_file) # for deepin
    else:
    os.system("eog " + image_file) # for Linux
    else:
    os.system("open " + image_file) # for Mac

    注:判断操作系统还可以借助内置的platform库platform.system().lower()得到[linux、windows、mac]

▲cookies和二维码结合使用:

  1. 如果登陆成功则吧cookies持久化
  2. 使用二维码登陆前,先加载验证之前持久化的cookies是否有效,如果有效则无需再次扫码登陆(在QRCode_login的起始位置判断validation中修改的is_login是否为True)

jd-seckill-maotai

  • 主要功能

    • 登陆京东商城(www.jd.com
      • 用京东APP扫码给出的二维码
    • 预约茅台
      • 定时自动预约
    • 秒杀预约后等待抢购
      • 定时开始自动抢购

特别声明:

  • 本仓库发布的jd_seckill_maotai项目中涉及的任何脚本,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。
  • 本项目内所有资源文件,禁止任何公众号、自媒体进行任何形式的转载、发布。
  • ChinaVolvocars 对任何脚本问题概不负责,包括但不限于由任何脚本错误导致的任何损失或损害.
  • 请勿将jd_seckill_maotai项目的任何内容用于商业或非法目的,否则后果自负。
  • 如果任何单位或个人认为该项目的脚本可能涉嫌侵犯其权利,则应及时通知并提供身份证明,所有权证明,我们将在收到认证文件后删除相关脚本。
  • 您必须在下载后的24小时内从计算机或手机中完全删除以上内容。
  • 本项目遵循GPL-3.0 License协议,如果本特别声明与GPL-3.0 License协议有冲突之处,以本特别声明为准。
  • 您使用或者复制了本仓库且本人制作的任何代码或项目,则视为已接受此声明,请仔细阅读
    您在本声明未发出之时点使用或者复制了本仓库且本人制作的任何代码或项目且此时还在使用,则视为已接受此声明,请仔细阅读

根据12月14日以来抢茅台的日志分析,大胆推断再接再厉返回Json消息中resultCode小白信用的关系。
这里主要分析出现频率最高的9001690008

案例 小白信用 90016 90008 抢到耗时
张三 63.8 59.63% 40.37% 暂未抢到
李四 92.9 72.05% 27.94% 4天
王五 99.6 75.70% 24.29% 暂未抢到
赵六 103.4 91.02% 8.9% 2天

风控放行后才会进行抢购,这时候用的应该是水库计数模型,假设无法一次性拿到所有数据的情况下来尽量的做到抢购成功用户的均匀分布,这样就和概率相关了。

抢购结果确认

抢购是否成功通常在程序开始的一分钟内可见分晓!
搜索日志,出现“抢购成功,订单号xxxxx",代表成功抢到了,务必半小时内支付订单!程序暂时不支持自动停止,需要手动STOP!
若两分钟还未抢购成功,基本上就是没抢到!程序暂时不支持自动停止,需要手动STOP!

  • 在程序开始运行后,会检测本地时间与京东服务器时间,输出的差值为【本地时间-京东服务器时间】,即-50为本地时间比京东服务器时间慢50ms。 本代码的执行的抢购时间以本地电脑/服务器时间为准

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def __init__(self, ...):
    # 初始化误差
    self.diff_time = self.local_time() - self.jd_time()
    # ....
    while True:
    # 本地时间减去与京东的时间差(ms),能够将时间误差提升到0.1秒附近, 具体精度依赖获取京东服务器时间的网络时间损耗
    if self.local_time() - self.diff_time >= self.buy_time_ms:
    logger.info('时间到达,开始执行……')
    break
    else:
    time.sleep(self.sleep_interval)

    补充:Python的时间戳默认都是秒级别的,如果要ms级别的,则要乘以1000

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    """
    通过time,以元组(struct_time)为核心实现时间戳和格式化时间字符串的相互转换。
    """
    # 获得10位秒级时间戳
    seconds = time.time()
    # 获得13位毫秒级时间戳==>java默认13位
    millis = int(round(time.time() * 1000))
    # 同样,time下的mktime也需要*1000
    time.mktime(self.buy_time.timetuple()) * 1000.0
    """
    通过datetime,以datetime类实例对象为核心实现时间戳和格式化时间字符串的相互转换。
    """
    from datetime import datetime
    # 获得当前时间
    now = datetime.now()
    timestamp = datetime.timestamp(now)
    print("时间戳 =", timestamp)
    # 也是10位s级
    >>> 1648622589.624272

jd_mask

此tool只单独支持预约-抢购-成功后直接提交订单的商品,如[口罩],只提供学习参考用途.

Ⓜ️阅读了多份京东抢购的代码后,发现基本都是从一个模板里不断优化发展出来的,反应了开源的意义,也反应了京东部分细节的修改,同时也能得到一个告示:在实现相同功能前,不妨多看看、借鉴下以往可行的代码。

Automatic_ticket_purchase

大麦网抢购流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph LR
A[开始] --> B(登陆)
B --> C{是否利用cookies登陆}
C --> |否| D[页面登陆]
C --> |是| E[登陆验证]
D --> E
E --> G{是否登陆成功}
G --> |是| F[获取购票必要信息]
G --> |否|ED
F --> L[检测抢票现在购票状态]
L --> M{目标票可进行抢购}
M --> |即将开抢|L
M --> |缺货登记|ED((结束))
M --> |立刻购买|N{抢购}

跟限时、定时秒杀不同的是,大麦网的抢购信息是靠不断刷新的,需要通过不断检测页面内容,判断目前票的可否购买情况。如果刷新为可购买则进行抢购购买。

  • 点击购买是对指定接口发送请求,并更新cookies(cookies嫌少不嫌多):step2_click_buy_now

  • 提交订单:step3_submit_order

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    response = self.session.post('https://buy.damai.cn/multi/trans/createOrder',
    headers=headers,
    params=params,
    data=submit_order_info,
    cookies=self.login_cookies)
    buy_status = json.loads(response.text)
    if buy_status.get('success') is True and buy_status.get('module').get('alipayOrderId'):
    print('-' * 10, '抢票成功, 请前往 大麦网->我的大麦->交易中心->订单管理 确认订单', '-' * 10)
    print('alipayOrderId: ', buy_status.get('module').get('alipayOrderId'))
    print('支付宝支付链接: ', buy_status.get('module').get('alipayWapCashierUrl'))

NikeRobot

  • pdb调试断点:==>实际上Pycharm的调试更加好用

    1
    2
    if DEBUG:
    pdb.set_trace()
  • 将类成员属性转成字典形式: NikeLoginParam().__dict__

  • 自己管理线程:创建继承Thread的执行类,重写run方法,在main中进行创建多个实例MyThread().start(),并join住。

hpv4g

  • 是否使用代理:init_ip_proxy_pool

  • 多线程参数设置: python3.8中ThreadPoolExecutor的默认worker:max_workers = min(32, os.cpu_count() + 4)

  • 线程池:ThreadPoolExecutor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

    with ThreadPoolExecutor(max_workers=max_workers) as t:
    fs = [t.submit(sec_kill_task, miao_miao, params[i % _params_len],
    None if not _ip_proxys_len else {
    'http': None if (index := i % _ip_proxys_len) == 0 else ip_proxys[index]}) for i in
    range(max_workers + 5)]
    # 120S后结束任务, 有一个完成则完成
    wait(fs, 120, return_when=FIRST_COMPLETED)
    global KILL_FLAG
    KILL_FLAG = True
    print('>>>>>>>>>>>>>>>>>本轮未成功秒杀到疫苗<<<<<<<<<<<<<<<<<<<')
  • 缓存配置——非默认参数的装饰器:考虑提前缓存疫苗列表 秒杀开始后跳过查询列表等操作 直接调用秒杀接口

  • 获取服务器当前时间戳

  • 直接设置logging中的log

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import logging
    # 设置配置
    logging.basicConfig(handlers=[logging.FileHandler(filename=LOG_NAME,
    encoding='utf-8', mode='a+')],
    format='%(asctime)s %(message)s',
    level=getattr(logging, args.log))
    # 使用
    logging.info("xxx")
    logging.debug("yyy")
  • argueparser使用:

    • action参数: parser.add_argument('-reload_cache', action='store_true', help='刷新--region_code疫苗列表本地缓存'),出现reload_cache参数则为true,所以默认为false

    • parser.add_argument('--log', default='WARNING', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],help='日志级别 默认WARNING')

    • add_argument中的type实际上是个类型转化器,如type=int,如果无法转成int则报错,因此可以借此参数对输入进行约束

      1
      2
      3
      4
      5
      def _valid_int_type(i):
      valid_int = int(i)
      if valid_int < 1:
      raise argparse.ArgumentTypeError(f'invalid int argument:{i}')
      return valid_int
  • requests使用:

    • raise_for_status状态码非成功时抛出异常

    • 自定义_get(url, params=None, error_exit=True, **kwargs)方法,添加参数:error_exit:返回4XX 5XX错误时 是否退出

    • cookies除了放在requests.get(cookies=xxxCookieDict),还可以作为字符串设置在header[“cookie”]中(无s),如self._headers["cookie"]="k1=v1;_xxhm_=%7B%2x%7D;k3=wxapptoken:v3",通过;来分割

    • 1
      2
      # disable ssl warnings
      requests.packages.urllib3.disable_warnings()

抢购型代码总结:

  • 实现主要分为selenium(自动测试工具)模拟点击;api模拟网络请求

  • 登陆之后会对cookies进行持久化,使用pickle的好处是无法明文看懂、复制

  • 可以通过持久化的cookies来登陆,但会检测cookies是否仍然有效

  • 定时抢购会等待sleep到临近时间(while True+sleep(time)的方式 or sleepUntil+while try),然后再进行多次抢购请求(如果抢购的2分钟内没抢到,大多没希望了,只好等待下次);如果是等待页面刷新的,则不断请求页面,等到结果改变。

    • 注意:等待间隔不要超过时间阈值,如180s前检查,如果不符合不能让其等待190s
  • 推送工具:提供一个自己写的

    config.ini

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    [pusher]
    pusher_type = pushplus
    push_title = 飞鱼秒杀

    # https://sct.ftqq.com/
    [serverchan]
    sec_key = SCU35113Te369cebc21f6e483c03fffc400c4c5c05bdad63995c32

    # https://open.dingtalk.com/document/group/custom-robot-access
    [dingding]
    access_token =
    secret =

    # http://www.pushplus.plus/
    [pushplus]
    pushplus_token = fedcf6b08f6f4aaeb2948b7d7010eb93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import base64
import hashlib
import hmac
import os.path
import time
import urllib.parse
from functools import reduce
from pathlib import Path

import requests

__author__ = 'Mrli'

CONFIG_INI_FILENAME = "push_config.ini"
CONFIG_INI_PATH = Path(__file__).resolve().parent.with_name("push_config.ini")

REMIND_MSG = """
[pusher]
pusher_type = pushplus

[serverchan]
sec_key =

[dingding]
access_token =
secret =

[pushplus]
pushplus_token =
"""


class PusherException(Exception):
def __init__(self, message) -> None:
super().__init__(message)


class IPushUtil:
def push(self, content: str, title: str = "") -> bool:
pass


class PushplusPush(IPushUtil):
def __init__(self, push_title, options: dict):
self.pushplus_token = options.get("pushplus_token")
self.push_title = push_title

def push(self, content: str, title: str = "") -> bool:
d = {
"token": self.pushplus_token,
"template": "markdown",
"title": "{push_title}-{title}".format(push_title=self.push_title, title=title),
"content": content
}
res = requests.post("http://www.pushplus.plus/send", data=d)
if not (200 <= res.json().get("code") < 300):
print(res.json())
return 200 <= res.json().get("code") < 300


class ServerChanPush(IPushUtil):
def __init__(self, push_title, options: dict):
self.sec_key = options.get("sec_key")
self.push_title = push_title

def push(self, content: str, title: str = "") -> bool:
data = {
'text': "{push_title}-{title}".format(push_title=self.push_title, title=title),
'desp': content
}
res = requests.post(url='https://sc.ftqq.com/{}.send'.format(self.sec_key), data=data)
if not (res.json().get("errmsg") == "success"):
print(res.json())
return res.json().get("errmsg") == "success"


class DingDingPush(IPushUtil):
URL = "https://oapi.dingtalk.com/robot/send"

def __init__(self, push_title, options: dict):
self.access_token = options.get("access_token")
self.secret = options.get("secret")
self.target_url = self.get_url()
self.push_title = push_title

def get_url(self):
timestamp = round(time.time() * 1000)
secret_enc = bytes(self.secret, encoding="utf-8")
string_to_sign = "{}\n{}".format(timestamp, self.secret)
string_to_sign_enc = bytes(string_to_sign, encoding="utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return self.URL + "?access_token={access_token}&timestamp={timestamp}&sign={sign}".format(
access_token=self.access_token, timestamp=timestamp, sign=sign)

def push(self, content: str, title: str = "") -> bool:
msg = self.gen_markdown_msg(title, content)
return self.send(msg)

def send(self, message):
resp = requests.post(self.target_url, json=message)
return resp.json()

@staticmethod
def gen_text_msg(content, at=None, at_all=False):
if at is None:
at = []
return {
"msgtype": "text",
"text": {"content": content},
"at": {"atMobiles": at, "isAtAll": at_all},
}

def gen_markdown_msg(self, title, text, at=None, at_all=False):
def generateText():
res = ""
# 最顶行显示标题
res += "# " + "{}-".format(self.push_title) + title + "\n"
# 内容
res += text
# at对象
res += reduce(lambda x, y: x + "@" + y, at, "")
return res

return {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": generateText()
},
"at": {"atMobiles": at, "isAtAll": at_all},
}


class Pusher:
def __init__(self, logger=None):
if logger:
self.cout = logger.info
else:
self.cout = print
self._pusher = self.init()

def init(self):
"""
实例化pusher
:return:
"""
from configparser import RawConfigParser
cp = RawConfigParser()
if not os.path.exists(CONFIG_INI_PATH):
raise PusherException(
"请创建{filename}配置文件\npusher配置信息如下:\n{msg}".format(filename=CONFIG_INI_FILENAME, msg=REMIND_MSG))
cp.read(CONFIG_INI_PATH, encoding="utf8")
pusher_type = cp.get("pusher", "pusher_type").lower()
push_title = cp.get("pusher", "push_title")
# 是否使用了pusher
if not pusher_type:
self.cout("初始化Pusher: 当前未配置Pusher, 如果需要推送功能, 则在{filename}".format(filename=CONFIG_INI_FILENAME))
return None

generator_info = dict(cp.items(pusher_type))
# 检查pusher配置
if pusher_type and not self._valid(generator_info):
raise PusherException("{}_pusher配置错误,不能为空~".format(pusher_type))

if pusher_type == "serverchan":
return ServerChanPush(push_title, generator_info)
elif pusher_type == "dingding":
return DingDingPush(push_title, generator_info)
elif pusher_type == "pushplus":
return PushplusPush(push_title, generator_info)
else:
raise PusherException("不可知pusher类型~")

@staticmethod
def _valid(config_dict: dict):
"""
判断字典值是否为空
:param dict:
:return:
"""
for v in config_dict.values():
if not v:
return False
return True

def push(self, content: str, title: str = "") -> bool:
if not self._pusher:
self.cout("当前未配置pusher, 消息无法发送")
return False
return self._pusher.push(content, title)

weiboSpider

可以连续爬取一个多个新浪微博用户(如胡歌迪丽热巴郭碧婷)的数据,并将结果信息写入文件数据库

  • ①允许用户指定自定义配置文件,否则使用默认配置文件;②对配置文件进行校验,并对非法项进行提示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if __name__ == '__main__':
config = _get_config()
config_util.validate_config(config)

def _get_config():
"""获取config.json数据"""
src = os.path.split(
os.path.realpath(__file__))[0] + os.sep + 'config_sample.json'
config_path = os.getcwd() + os.sep + 'config.json'
if FLAGS.config_path:
config_path = FLAGS.config_path

def validate_config(config):
"""验证配置是否正确"""

# 验证filter、pic_download、video_download
argument_list = ['filter', 'pic_download', 'video_download']
for argument in argument_list:
if config[argument] != 0 and config[argument] != 1:
logger.warning(u'%s值应为0或1,请重新输入', config[argument])
sys.exit()

# 验证since_date
since_date = config['since_date']
if (not _is_date(str(since_date))) and (not isinstance(since_date, int)):
logger.warning(u'since_date值应为yyyy-mm-dd形式或整数,请重新输入')
sys.exit()

proxy_pool

爬虫代理IP池项目,主要功能为定时采集网上发布的免费代理验证入库,定时验证入库的代理保证代理的可用性,提供API和CLI两种使用方式。同时你也可以扩展代理源以增加代理池IP的质量和数量。

模块组成:获取功能、存储功能、校验功能、接口管理

⭐️程序主要是启动了startServer的API接口服务、startScheduler定时服务

  • startServer->runFlask:是向外提供了通过proxyHandler来获得Redis中的proxy数据
  • startScheduler->sche.add_task(__runProxyFetch)、sche.add_task(__runProxyCheck)
    • __runProxyFetch:proxy_fetcher.run()->proxy_queue->Checker("raw", proxy_queue)获得各个代理网站的代理信息后,进行校验,校验成功则入库
    • __runProxyCheck:proxy in proxy_handler.getAll()->proxy_queue->Checker("use", proxy_queue):通过proxy_handler拿到库里所有现存的数据后,进行有效性校验,无效的则删除,有效的则更新信息

作为存储功能的接口proxyHandler,也是两个API服务与定时服务的中介。程序也是通过存储功能,将核心的两个功能:定时抓取的proxy数据提供proxy数据给用户使用成功联系在了一起

  • 通过元类实现单例模式:ConfigHandler,其可以在任意模块中以c = ConfigHandler()的形式获得,而不是ConfigHandler.getInstance()

  • @LazyProperty懒加载属性的装饰器: 只有用到时才会加载并将值注入到__dict__、加载一次后值就不再变化、;讲解可见:https://www.jianshu.com/p/708dc26f9b92——描述符or修饰符实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    class LazyProperty(object):
    # 在被注解类方法被解释器运行的时候就会创建LazyProperty实例并返回
    def __init__(self, func):
    self.func = func
    """通过python描述符来实现"""
    def __get__(self, instance, owner):
    if instance is None:
    return self
    else:
    # 会将结果值通过setattr方法存入到instance对象实例的__dict__中
    value = self.func(instance)
    setattr(instance, self.func.__name__, value)
    return value
    class ConfigHandler(withMetaclass(Singleton)):
    # 返回一个LazyProperty实例
    @LazyProperty
    def serverHost(self):
    return os.environ.get("HOST", setting.HOST)
    c = ConfigHandler()
    # 会触发ConfigHandler.__dict__["serverHost"], 然后接而触发LazyProperty的__get__,value = self.func(instance)会得到真正serverHost函数的值后将其设置在ConfigHandler instance对象的__dict__中,由于对象的__dict__["serverHost"]=value优先级高于类的__dict__["serverHost"]=LazyProperty()对象,因此之后调用得到的是value结果
    print(c.serverHost)

    __get__只有访问类属性的时候才会生效,这边是通过setattr将serverHost设置成了ConfigHandler的类属性

  • 封装了一个请求工具类WebRequest:

    • 增加了异常处理的功能
    • 增加了日志功能
    • 请求头会得到随机UA
    • 设置重试
  • 使用click创建子命令:

    1. 得到一个click_group
    1
    2
    3
    4
    5
    6
    CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])

    @click.group(context_settings=CONTEXT_SETTINGS)
    @click.version_option(version=VERSION)
    def cli():
    """ProxyPool cli工具"""
    1. 指定group下的子命令
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @cli.command(name="server")
    # 还可以设置参数: @click.option('--count', default=1, help='Number of greetings.') --> def server(count)
    def server():
    """ 启动api服务 """
    click.echo(BANNER)
    startServer()

    if __name__ == '__main__':
    cli()

    然后通过bash脚本同时开启两个进程

    1
    2
    3
    #!/usr/bin/env bash
    python proxyPool.py server &
    python proxyPool.py schedule
  • DbClient DB工厂类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    from urllib.parse import urlparse

    class DbClient(withMetaclass(Singleton)):
    def __init__(self, db_conn):
    self.parseDbConn(db_conn)
    self.__initDbClient()

    @classmethod
    def parseDbConn(cls, db_conn):
    # 可以关注一下urlparse的使用
    db_conf = urlparse(db_conn)
    cls.db_type = db_conf.scheme.upper().strip()
    # _NetlocResultMixinBase对象来获取想要的参数
    cls.db_host = db_conf.hostname
    cls.db_port = db_conf.port
    cls.db_user = db_conf.username
    cls.db_pwd = db_conf.password
    cls.db_name = db_conf.path[1:]
    return cls

    def __initDbClient(self):
    """
    init DB Client
    :return:
    """
    __type = None
    if "SSDB" == self.db_type:
    __type = "ssdbClient"
    elif "REDIS" == self.db_type:
    __type = "redisClient"
    else:
    pass
    assert __type, 'type error, Not support DB type: {}'.format(self.db_type)
    self.client = getattr(__import__(__type), "%sClient" % self.db_type.title())(host=self.db_host,
    port=self.db_port,
    username=self.db_user,
    password=self.db_pwd,
    db=self.db_name)
  • python中使用redis

    Redis中代理存放的结构为hash:hash 是一个键值(key=>value)对集合;是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。

    本项目中应用:key为ip:port, value为代理属性的字典;

    • python与创建连接
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # pip install redis
    from redis import Redis, BlockingConnectionPool
    conn_dict = {
    "host": "49.235.118.244",
    "port": 6379,
    "db": 0
    }
    r = Redis(connection_pool=BlockingConnectionPool(decode_responses=True,
    timeout=5,
    socket_timeout=15,
    **conn_dict)
    )

    添加:set、zset为add、list为push、string和hash为set

    参考:Redis基本概念及使用https://www.runoob.com/redis/redis-sorted-sets.html、★Python redis 使用介绍

    redis

  • 继承重写logging.logger,可选参数为name, level=DEBUG, stream=True, file=True,让每个功能函数都能生成单独的日志文件,并进行了可选控制。

    相比单例,日志精度更细,但也使用起来也更麻烦,需要考虑什么地方需要。

  • 提供"扩展代理"接口

    1. ProxyFetcher类中添加自定义的获取代理的静态方法, 该方法需要以生成器(yield)形式返回host:ip格式的代理

    2. 添加好方法后,修改setting.py文件中的PROXY_FETCHER项下添加自定义方法的名字:

      1
      2
      3
      4
      5
      6
      PROXY_FETCHER = [
      "freeProxy01",
      "freeProxy02",
      # ....
      "freeProxyCustom1" # # 确保名字和你添加方法名字一致
      ]

      schedule 进程会每隔一段时间抓取一次代理,下次抓取时会自动识别调用你定义的方法。

    实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
self.log.info("ProxyFetch : start")

# 从配置中拿执行函数
for fetch_source in self.conf.fetchers:
# 判断ProxyFetcher中是否有定义、是否可调用
fetcher = getattr(ProxyFetcher, fetch_source, None)
if not fetcher:
self.log.error("ProxyFetch - {func}: class method not exists!".format(func=fetch_source))
continue
if not callable(fetcher):
self.log.error("ProxyFetch - {func}: must be class method".format(func=fetch_source))
continue
thread_list.append(_ThreadFetcher(fetch_source, proxy_dict))

for thread in thread_list:
thread.setDaemon(True)
thread.start()
for thread in thread_list:
thread.join()
self.log.info("ProxyFetch - all complete!")
  • Cpython(默认安装的都是Cpython)中Dict和list、tuple都是线程安全

    • 以装饰器的形式将过滤器将入到容器中

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52

      class ProxyValidator(withMetaclass(Singleton)):
      pre_validator = []
      http_validator = []
      https_validator = []

      @classmethod
      def addPreValidator(cls, func):
      cls.pre_validator.append(func)
      return func

      # 实际上执行了 formatValidator=ProxyValidator.addPreValidator(formatValidator)
      # 由于addPreValidator返回了func, 所以formatValidator还是原来的addPreValidator, 但在类定义的时候ProxyValidator.pre_validator添加了formatValidator方法
      @ProxyValidator.addPreValidator
      def formatValidator(proxy):
      """检查代理格式"""
      verify_regex = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}"
      _proxy = findall(verify_regex, proxy)
      return True if len(_proxy) == 1 and _proxy[0] == proxy else False


      class DoValidator(object):
      """ 校验执行器 """
      @classmethod
      def validator(cls, proxy):
      """
      校验入口
      Args:
      proxy: Proxy Object
      Returns:
      Proxy Object
      """
      http_r = cls.httpValidator(proxy)
      https_r = False if not http_r else cls.httpsValidator(proxy)

      proxy.check_count += 1
      proxy.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      proxy.last_status = True if http_r else False
      if http_r:
      if proxy.fail_count > 0:
      proxy.fail_count -= 1
      proxy.https = True if https_r else False
      else:
      proxy.fail_count += 1
      return proxy

      @classmethod
      def preValidator(cls, proxy):
      for func in ProxyValidator.pre_validator:
      if not func(proxy):
      return False
      return True
  • starter-banner:启动横幅

    • reflection: No、adjustment: cewnter、Stretch: Yes、width: 80
    • 还不错的font:
      • 5lineoblique——好看
      • banner3——清楚
      • bell——抽象
      • big——清晰
      • bigchief——等高线版本、艺术
      • block——块状
      • bulbhead——可爱
      • larry3d——立体3d
      • ogre——清晰
      • puffy——清晰+一点可爱
      • slant——清晰+斜体
  • 定时器框架apschedule配置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    scheduler = BlockingScheduler(logger=scheduler_log, timezone=timezone)

    scheduler.add_job(__runProxyCheck, 'interval', minutes=2, id="proxy_check", name="proxy检查")

    executors = {
    # job_defaults中的max_instances也受限于max_workers, 所以要大于max_instances;此外max_workers也决定了同时能处理几个同时发生的task
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
    }
    job_defaults = {
    # 合并将所有这些错过的执行合并为一个, 默认为True。 如果是定时的存储任务的话,参数肯定不同,不能合并所以得手动设置False
    # 像本项目每隔一段时间抓取到的数据也不太一样,所以无法直接当作一次错误任务合并
    'coalesce': False,
    # 默认情况下,每个作业只允许同时运行一个实例。这意味着,如果作业即将运行,但前一次运行尚未完成,则认为最近一次运行失败。通过在添加作业时使用关键字参数,可以设置调度程序允许同时运行的特定作业的最大实例数。默认为1
    'max_instances': 10,
    # 框架会检查每个错过的执行时间,如果当前还在misfire_grace_time时间内,则会重新尝试执行任务,设高点就可以避免任务被漏掉执行。默认为1
    # "misfire_grace_time": 5 该项目未使用,而是采用了多任务实例来规避任务错过执行==>即官方给出两种方案中的另一种。任务错过信息:Run time of job "say (trigger: interval[0:00:02])" was missed by 0:00:03.010383
    }

    scheduler.configure(executors=executors, job_defaults=job_defaults, timezone=timezone)
    scheduler.start()

    job_defaults参数含义见官方文档https://blog.csdn.net/weixin_44301439/article/details/124062178

    注: 经过测试,在add_task中的func如果起了多个线程,其执行不受限于sche的配置

  • Python中如果只是使用全局变量则不需要用global声明(因为变量搜寻会由内往外),但是如果需要修改则需要用global声明,否则无法找到相应变量

  • 生成器:使用了yield关键字的函数就是生成器,生成器是一类特殊的迭代器。

    作用:

    • 处理大量数据:生成器一次返回一个结果,而不是一次返回所有结果。比如sum([i for i in range(10000000000000)])会卡机;sum(i for i in range(10000000000000))则不会
    • 代码更加简洁:可以减少变量、空间
    • 迭代器本身的作用

    yield关键字有两点作用:

    保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起;可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)
    将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def __runProxyFetch():     
    for proxy in proxy_fetcher.run():
    proxy_queue.put(proxy)

    class Fetcher(object):
    name = "fetcher"
    def run(self):
    # ...
    # 相比使用生成推导式 return [p for p in proxy_dict.values() if DoValidator.preValidator(p.proxy)], 使用yield生成器可以节省空间
    for _ in proxy_dict.values():
    if DoValidator.preValidator(_.proxy):
    yield _
  • 应用部署:

    ①对apk换源;②设置时区

    1
    2
    3
    4
    5
    6
    7
    8
    FROM python:3.6-alpine
    # ..
    # apk repository
    RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
    # timezone
    RUN apk add -U tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && apk del tzdata
    # ...
    ENTRYPOINT [ "sh", "start.sh" ]

    docker-compose.yml: 镜像还没编译好的情况。(如果自己改了功能并启用的话,需要用这种;或者自己发布镜像后用后一种)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    version: '2'
    services:
    proxy_pool:
    build: .
    container_name: proxy_pool
    ports:
    - "5010:5010"
    links:
    - proxy_redis
    environment:
    DB_CONN: "redis://@proxy_redis:6379/0"
    proxy_redis:
    image: "redis"
    container_name: proxy_redis

    docker-compose.yml:别人镜像已经编译好并上传

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    version: "3"
    services:
    redis:
    image: redis
    expose:
    - 6379
    web:
    restart: always
    image: jhao104/proxy_pool
    environment:
    - DB_CONN=redis://redis:6379/0
    ports:
    - "5010:5010"
    depends_on:
    - redis

scheduler的逻辑

proxy_pool时序图

项目目录结构默写:

  • settings: 配置文件
  • main:启动文件
  • api:提供获取proxy数据接口
  • handler:
    • loggerHandler:日志类
    • configHandler:单例的配置接口类
    • ProxyHandler: Proxy CRUD操作类
  • fetcher: 代理数据获取类
  • db:
    • dbClinet: 存储功能接口类
    • redisClient:存储功能实现类
  • helper
    • scheduler: 定时任务的定义与启动类
    • validator: proxy有效性校验类
    • check: 具体执行校验逻辑类
    • proxy: 获取的proxy数据封装类
  • utils:
    • lazyProperty: 懒加载描述器
    • singleton: 单例管理器类
    • six: python2与python3兼容类
    • webRequest: 网络请求封装类

极验滑块验证码破解与研究(三):滑块缺口识别

首先我们需要准备两张图片:1. 带缺口的背景图;2. 与之对应的接口图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
from pathlib import Path

import PIL
import cv2
import numpy as np


def imshow(img, winname='test', delay=0):
"""cv2展示图片"""
cv2.imshow(winname, img)
cv2.waitKey(delay)
cv2.destroyAllWindows()


def pil_to_cv2(img):
"""
pil转cv2图片
:param img: pil图像, <type 'PIL.JpegImagePlugin.JpegImageFile'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
img = cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
return img


def bytes_to_cv2(img):
"""
二进制图片转cv2
:param img: 二进制图片数据, <type 'bytes'>
:return: cv2图像, <type 'numpy.ndarray'>
"""
# 将图片字节码bytes, 转换成一维的numpy数组到缓存中
img_buffer_np = np.frombuffer(img, dtype=np.uint8)
# 从指定的内存缓存中读取一维numpy数据, 并把数据转换(解码)成图像矩阵格式
img_np = cv2.imdecode(img_buffer_np, 1)
return img_np


def cv2_open(img, flag=None):
"""
统一输出图片格式为cv2图像, <type 'numpy.ndarray'>
:param img: <type 'bytes'/'numpy.ndarray'/'str'/'Path'/'PIL.JpegImagePlugin.JpegImageFile'>
:param flag: 颜色空间转换类型, default: None
eg: cv2.COLOR_BGR2GRAY(灰度图)
:return: cv2图像, <numpy.ndarray>
"""
if isinstance(img, bytes):
img = bytes_to_cv2(img)
elif isinstance(img, (str, Path)):
img = cv2.imread(str(img))
elif isinstance(img, np.ndarray):
img = img
elif isinstance(img, PIL.Image):
img = pil_to_cv2(img)
else:
raise ValueError(f'输入的图片类型无法解析: {type(img)}')
if flag is not None:
img = cv2.cvtColor(img, flag)
return img


def get_distance(bg, tp, im_show=False, save_path=None):
"""
:param bg: 背景图路径或Path对象或图片二进制
eg: 'assets/bg.jpg'
Path('assets/bg.jpg')
:param tp: 缺口图路径或Path对象或图片二进制
eg: 'assets/tp.jpg'
Path('assets/tp.jpg')
:param im_show: 是否显示结果, <type 'bool'>; default: False
:param save_path: 保存路径, <type 'str'/'Path'>; default: None
:return: 缺口位置
"""
# 读取图片
bg_gray = cv2_open(bg, flag=cv2.COLOR_BGR2GRAY)
tp_gray = cv2_open(tp, flag=cv2.COLOR_BGR2GRAY)
# 边缘检测
tp_gray = cv2.Canny(tp_gray, 255, 255)
bg_gray = cv2.Canny(bg_gray, 255, 255)
# 目标匹配
result = cv2.matchTemplate(bg_gray, tp_gray, cv2.TM_CCOEFF_NORMED)
# 解析匹配结果
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)

distance = max_loc[0]
if save_path or im_show:
# 需要绘制的方框高度和宽度
tp_height, tp_width = tp_gray.shape[:2]
# 矩形左上角点位置
x, y = max_loc
# 矩形右下角点位置
_x, _y = x + tp_width, y + tp_height
# 绘制矩形
bg_img = cv2_open(bg)
cv2.rectangle(bg_img, (x, y), (_x, _y), (0, 0, 255), 2)
# 保存缺口识别结果到背景图
if save_path:
save_path = Path(save_path).resolve()
save_path = save_path.parent / f"{save_path.stem}.{distance}{save_path.suffix}"
save_path = save_path.__str__()
cv2.imwrite(save_path, bg_img)
# 显示缺口识别结果
if im_show:
imshow(bg_img)
return distance


if __name__ == '__main__':
d = get_distance(
bg='assets/bg.jpg',
tp='assets/tp.png',
im_show=True,
save_path='assets/bg.jpg'
)
print(d)

类似代码:lxBook

相关文章:

  1. 极验滑块验证码破解与研究(一):AST还原混淆JS
  2. 极验滑块验证码破解与研究(二):缺口图片还原
  3. 极验滑块验证码破解与研究(三):滑块缺口识别
  4. 极验滑块验证码破解与研究(四):滑块轨迹构造
  5. 极验滑块验证码破解与研究(五):请求分析及加密参数破解

机器学习

cardRecorder

通过pyautoGUI实现的斗地主记牌器,提供了tkinter的界面

  • 使用pyAutoGui找到类似的图片

    1
    2
    3
    4
    5
    6
    7
    8
       # 进行截图
    img = pyautogui.screenshot(region=myPos)
    # 遍历所有排类型
    for i in myCardsNum.keys():
    # 进行定位, 找到pics\\m1.png在当前图片img里的位置
    result = pyautogui.locateAll(needleImage='pics\\m' + i + '.png', haystackImage=img, confidence=myConfidence)

    myCardsNum[i] = cardsFilter(list(result), myFilter)

    pyautoGUI图片识别功能介绍:

    PyAutoGUI可以截取屏幕截图(截图),将其保存到文件中,并在屏幕中查找图像(定位)。

    • 截图
    1
    2
    3
    4
    5
    # 截全屏
    pg.screenshot() # 只会保存到内存,不会缓存到本地
    pg.screenshot('data/screenshot.jpg') # 指定路径,则会保存到本地
    # 按照指定区域截屏, region(x坐标,y坐标,宽度,高度),坐标为指定截图的左上角坐标
    pg.screenshot(imageFilename='data/screenshot.jpeg', region=(300, 300, 500, 500))
    • 图片定位
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 在屏幕中匹配图片,并返回位置相关数据,匹配不到则返回 None
    locate_1 = pg.locateOnScreen(image='data/test.png')
    print(locate_1) # Box(left=177, top=614, width=43, height=37)
    print(locate_1.left) # 177 可以通过 .left 等属性获取相应的值
    print(locate_1[0]) # 177 也可以通过下标获取相应的值
    # “模糊”匹配 -- 可以使用参数 confidence 来指定模糊程度,默认是 1,范围是 0 - 1(当然,太低就没意义了), 使用该参数要求已经安装好 openCV
    # 这里的路径关键字是 image,和上面的 imageFilename 不同,所以建议不要写关键字,直接写路径就好,防止写错
    locate_2 = pg.locateOnScreen(image='data/test.png', confidence=0.9)
    # 返回匹配到的图片的中心位置坐标
    center_point_1 = pg.center(locate_1)
    print(center_point_1) # Point(x=198, y=632)

    注:屏幕截图功能需要Pillow模块、置信度需要cv

    参考:PyAutoGui 图片识别+定位+截图函数文档pyautogui 的截图及图片匹配pyautogui 文档(五):截图及定位功能

  • tkinter使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 变量使用
    num_8 = tkinter.StringVar() # 8
    num_8.set(strCards[8])

    root = tkinter.Tk()
    # 显示
    x_start = 20
    y_start = 5
    x_add = 30
    x_dif = 2
    y_dif = 25
    # 标签
    tkinter.Label(root, text='大', font=('', 14), width=1).place(x=x_start, y=y_start)
    # 输入框
    tkinter.Entry(root, textvariable=num_dw, font=('', 14), width=1).place(x=x_start + x_dif, y=y_start + y_dif)
    # 进行循环等待事件发生
    root.main_loop()

    布局

    • pack():将控件放置在父控件内之前,规划此控件在区块内的位置。
    • grid():将控件放置在父控件内之前,规划此控件为一个网格类型的架构。
    • place():将控件放置在父控件内的特定(指定)位置。

wiki_zh_word2vec

利用Python构建Wiki中文语料词向量模型试验——找到给定词相似的词语

代码涉及:开发环境准备、数据的获取、数据的预处理、模型构建和模型测试四大内容,对应的是实现模型构建的五个步骤。

  1. 开发环境准备: Python + gensim (NLP神器库) +jieba + OpenCC(中文繁体替换成简体)

  2. 数据获取: 到wiki官网下载中文语料、将XML的Wiki数据转换为text格式,使用到了gensim.corpora中的WikiCorpus函数

  3. Wiki数据预处理:①使用OpenCC进行繁简转换;②jieba分词

  4. Word2Vec模型训练:分好词的文档即可进行word2vec词向量模型的训练了。

    1
    2
    3
    4
    5
    6
    7
    # 训练skip-gram模型
    model = Word2Vec(LineSentence(inp), size=400, window=5, min_count=5,
    workers=multiprocessing.cpu_count())

    # 保存模型
    model.save(outp1)
    model.wv.save_word2vec_format(outp2, binary=False)

    代码运行完成后得到如下四个文件,其中wiki.zh.text.model是建好的模型,wiki.zh.text.vector是词向量。

  5. 模型测试:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    model = gensim.models.Word2Vec.load(fdir + 'wiki.zh.text.model')
    word = model.most_similar(u"足球")
    for t in word:
    print (t[0],t[1])
    """
    足球运动 0.531202495098
    国际足球 0.510423064232
    足球比赛 0.499910503626
    国家足球队 0.488919615746
    排球 0.488108128309
    男体子篮球0.482638418674
    体育 0.47221454978
    """

附:什么是词向量? ——词向量技术是将词转化成为稠密向量,并且对于相似的词,其对应的词向量也相近。

自然语言处理任务中,首先需要考虑词如何在计算机中表示。通常,有两种表示方式:

  • one-hot representation(离散表示)
    • 传统的基于规则或基于统计的自然语义处理方法将单词看作一个原子符号
    • 把每个词表示为一个长向量。这个向量的维度是词表大小,向量中只有一个维度的值为1,其余维度为0,这个维度就代表了当前的词。
    • one-hot representation相当于给每个词分配一个id,这就导致这种表示方式不能展示词与词之间的关系。另外,one-hot representation将会导致特征空间非常大,但也带来一个好处,就是在高维空间中,很多应用任务线性可分。
  • distribution representation(分布式表示)
    • word embedding指的是将词转化成一种分布式表示,又称词向量。分布式表示将词表示成一个定长的连续的稠密向量。
    • 分布式表示优点:
      1. 词之间存在相似关系:是词之间存在“距离”概念,这对很多自然语言处理的任务非常有帮助。
      2. 包含更多信息:词向量能够包含更多信息,并且每一维都有特定的含义。在采用one-hot特征时,可以对特征向量进行删减,词向量则不能。

系统设计

Python 自制屏幕翻译工具

程序的功能主要由3个模块组成

  1. screenshot:截图

    使用Pillow库

    1
    2
    3
    4
    5
    6
    7
    # 安装PIL依赖: pip install pillow

    # Step1: 从剪切板读取图片
    img = ImageGrab.grabclipboard()
    # 保存到本地 or 内存中
    image_result = './temp.png'
    img.save(image_result)
  2. recognition:将截图中的问题识别成文字

    使用easyocr或者pytesseract

    easyocr:

    • 编程使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import easyocr
    import requests

    # 运行的过程中会安装所需要的模型文件, 也可以自己下载后放到对应路径下
    # Windows:C:\Users\用户名.EasyOCR\model
    # Linux:~/ .EasyOCR / model
    reader = easyocr.Reader(['ch_sim'])

    # 使用GPU,看是否会有提示 CUDA not available
    # reader = easyocr.Reader(['ch_sim'])

    # 支持同时多个语言, 虽然可以一次性识别许多种语言,
    # 但并非所有语言都可以一起用,通常是公共语言和一个特殊语种可以一起识别,相互兼容,比如英语和日语。
    # reader = easyocr.Reader(['ch_sim','en'])

    # 识别
    resp = requests.get('https://images.zsxq.com/Fsl4plilMdpd_C8gXVJT1mKJDui9?e=1656604799&token=kIxbL07-8jAj8w1n4s9zv64FuZZNEATmlU_Vm6zD:Pg0990rjMMQUAlf2bJlNL9lmLSU=')
    result = reader.readtext(resp.content)
    print(result)
    • 命令行运行: easyocr -l ch_sim en -f test.png --detail=1 --gpu=True

    图片加载函数readtext接收三种参数

    • 文件路径
    • 图片的np_array
    • 字节流对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    from PIL import Image

    import easyocr
    from PIL import ImageGrab

    # 从剪切板中获得图片
    r = easyocr.Reader(["ch_sim"])

    img = ImageGrab.grabclipboard()
    print(img, type(img))

    # 参数为: file path or numpy-array or a byte stream object
    # 1: file path
    img_path = "ttt.png"
    img.save(img_path)
    reg_result = r.readtext(img_path)

    # 2: numpy-array
    import numpy as np
    np_img = np.array(img)
    reg_result = r.readtext(np_img)
    print(reg_result)

    # 3: a byte stream object
    import requests
    resp = requests.get('https://images.zsxq.com/Fsl4plilMdpd_C8gXVJT1mKJDui9?e=1656604799&token=kIxbL07-8jAj8w1n4s9zv64FuZZNEATmlU_Vm6zD:Pg0990rjMMQUAlf2bJlNL9lmLSU=')
    reg_result = r.readtext(resp.content)
    print(reg_result)
  3. Translation:将文字翻译

    可以使用网上的有道翻译API、或者直接使用Google提供的翻译库googletrans

    1
    2
    3
    4
    5
    # Google翻译
    translator = Translator(service_urls=['translate.google.cn'])

    # 翻译成中文
    content_chinese = translator.translate(content_eng, src='en', dest='zh-cn').text
  4. GUI:将翻译后的结果展示

    使用 Python 自带的 GUI tkinker,将识别后的中文显示出来。

    1
    2
    3
    4
    5
    6
    # 初始化
    root = Tk()
    root.withdraw()

    # 显示翻译后的结果,以对话框的形式
    tkinter.messagebox.showinfo('翻译结果', content_chinese)

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import easyocr
from PIL import ImageGrab, Image

# r = easyocr.Reader(["ch_sim"])
r = easyocr.Reader(["en"])

def screenShot():
"""1. 从剪切板中获得图片"""
img = ImageGrab.grabclipboard()
print(img, type(img))
return img


def identify(img: Image):
"""2. 识别文字"""
import numpy as np
np_img = np.array(img)
reg_result = r.readtext(np_img)
rs = sorted(reg_result, key=lambda x: x[2], reverse=True)[0][1]
return rs


def translate(rs: str):
"""3. 翻译:googletrans 库"""
from googletrans import Translator
# Google翻译
translator = Translator(service_urls=['translate.google.cn'])
# 翻译成中文
translated_str = translator.translate(rs, src='en', dest='zh-cn').text
return translated_str


def showText(s: str):
"""4. tk展示"""
import tkinter as tk
from tkinter.messagebox import showinfo # 弹窗库

# 找到主应用
root = tk.Tk()
# 隐藏主界面
root.withdraw()
print(s)
showinfo("翻译结果", s)


if __name__ == '__main__':
img = screenShot()
# 识别出来的文字
rs = identify(img)
translated_str = translate(rs)
showText(translated_str)

system-design-primer

go-package-plantuml

  • go中的argueparser:"import github.com/jessevdk/go-flags"。除此之外,还有flag、pflag来处理输入参数

id-maker

health_checker

浙大的登陆加密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Python转义实现
def _rsa_encrypt(self, password_str, e_str, M_str):
# 将str转化为ascii的字节
password_bytes = bytes(password_str, 'ascii')
# 大端填充
password_int = int.from_bytes(password_bytes, 'big')
# 将其按照16进制的形式转成10进制
e_int = int(e_str, 16)
M_int = int(M_str, 16)
# password_int^e%M
result_int = pow(password_int, e_int, M_int)
# 10进制转成16进制str, 并 rjust() 返回一个原字符串右对齐,并使用空格填充至长度 width 的新字符串。
# "hello".rjust(8, "0") ==> 000hello
return hex(result_int)[2:].rjust(128, '0')

Js加密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
RSAUtils.encryptedString = function(key, s) {
var a = [];
// s为反向的password, 如321321aaa
var sl = s.length;
var i = 0;
while (i < sl) {
a[i] = s.charCodeAt(i);
i++;
}
// 补足0
while (a.length % key.chunkSize != 0) {
a[i++] = 0;
}
// a ==> 321312aaa000000 的 ascii字符数组
var al = a.length;
var result = "";
var j, k, block;
for (i = 0; i < al; i += key.chunkSize) {
block = new BigInt();
j = 0;
for (k = i; k < i + key.chunkSize; ++j) {
block.digits[j] = a[k++];
block.digits[j] += a[k++] << 8;
}
var crypt = key.barrett.powMod(block, key.e);

var text = key.radix == 16 ? RSAUtils.biToHex(crypt) : RSAUtils.biToString(crypt, key.radix);
result += text + " ";
}
return result.substring(0, result.length - 1); // Remove last space.
};
  • new BigInt();得到的是digits.len=130的int数组

cookiecutter-golang

快速生成Go项目的cookiecutter模板

dddd_trainer

  • Cli命令使用fire库:fire是python中用于生成命令行界面(Command Line Interfaces, CLIs)的工具,不需要做任何额外的工作,只需要从主模块中调用fire.Fire(),它会自动将你的代码转化为CLI,Fire()的参数可以说任何的python对象

    特点:所有的子命令都能以函数、类等形式书写,流程和参数获取都比较清晰;传参可以自动被解析成Python类型:字符串、元组、列表、字典

    ★使用指南: https://blog.csdn.net/qq_17550379/article/details/79943740

  • ONNXRuntime: 通常我们在训练模型时可以使用很多不同的框架,比如有的同学喜欢用 Pytorch,有的同学喜欢使用 TensorFLow,也有的喜欢 MXNet,以及深度学习最开始流行的 Caffe等等,这样不同的训练框架就导致了产生不同的模型结果包,在模型进行部署推理时就需要不同的依赖库,而且同一个框架比如tensorflow 不同的版本之间的差异较大, 为了解决这个混乱问题,LF AI 这个组织联合 Facebook, MicroSoft等公司制定了机器学习模型的标准,这个标准叫做ONNX, Open Neural Network Exchage所有其他框架产生的模型包 (.pth, .pb) 都可以转换成这个标准格式,转换成这个标准格式后,就可以使用统一的 ONNX Runtime等工具进行统一部署。

  • Loguru — 最强大的 Python 日志记录器: 强大的全局日志对象,具有开箱即用的特性、美观的输出,支持f-string、支持在主进程和线程中捕获异常

    1
    2
    logger.add("file.log", rotation="12:00" , format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO")
    logger.info("That's it, beautiful and simple logging!")

NJUPT-API

南京邮电大学网站接口, Python模块

  • @login_required 装饰器: 由于有些操作必须登录后才会有正确结果,所以相当于在执行前做了一层校验(至于抛出异常or转向登录就看装饰器中怎么写了)—> 由于是封装成了package库,所以在这一层需要进行限制。

    1
    2
    3
    4
    5
    6
    7
    8
    def login_required(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
    if not self.verified:
    self.login_by_sso()
    return func(self, *args, **kwargs)

    return wrapper
  • setuptools使用

  • 模块暴露:

    1
    2
    3
    4
    5
    6
    7
    8
    # njupt/__init__.py
    from njupt.card import Card # noqa
    from njupt.zhengfang import Zhengfang # noqa
    from njupt.library import Library # noqa
    from njupt.runningman import RunningMan # noqa
    from njupt.sso import SSO # noqa
    # 表示njupt模块提供以下5个接口
    __all__ = ['SSO', 'Card', 'Zhengfang', 'Library', 'RunningMan']
  • 验证码识别

    字符型验证码:把图片去噪、分割、归类。收集了足够多的样本,然后对新的图片也分割。将分隔后的每个字符与样本集匹配,选概率最大的字符,将组合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55

    class CardCaptcha:
    """
    一卡通系统的验证码工具类
    """

    def __init__(self, im):
    """
    :param im: PIL.image对象
    """
    self.im = im.convert('L')
    self.image_pre_process()

    def image_pre_process(self):
    # 二值化, 去除杂色点
    threshold = 200
    for x in range(self.im.width):
    for y in range(self.im.height):
    pix = self.im.getpixel((x, y))
    if pix > threshold:
    self.im.putpixel((x, y), BLACK)
    else:
    self.im.putpixel((x, y), WHITE)

    def handle_split_image(self):
    # 切割验证码,返回包含五个字符图像的列表
    y_min, y_max = 0, 82
    # 字符位置大致固定([7,49], [49, 91], ...)
    split_lines = [7, 49, 91, 133, 175, 217]
    ims = [self.im.crop([u, y_min, v, y_max]) for u, v in zip(split_lines[:-1], split_lines[1:])]
    return ims

    def crack(self):
    result = []
    # 装载训练数据集
    with open(os.path.join(current_dir, 'image_data.json'), 'rb') as f:
    image_data = json.load(f)
    for letter in self.handle_split_image():
    # build_vector将图片转成一维向量: for pixel in image.getdata():
    letter_vector = build_vector(letter)
    guess = []
    # 在模型数据中, 找一个汉明距离最小(最像的结果)
    for image in image_data:
    # x为key, 当前匹配的字符; y为向量
    for x, y in image.items():
    if len(y) != 0:
    # 计算两个一维向量的汉明距(向量只包含0,1时):(只要元素值不等, 距离就+1)
    guess.append((distance_hanmming(y, letter_vector), x))
    guess.sort()
    # 找出距离最小的, 即最匹配的字符
    result.append(guess[0][1])
    return ''.join(result)

    def __str__(self):
    return self.crack()

    注:PIL中image.convert()函数使用

    1. 模式“1”:为二值图像,非黑即白。但是它每个像素用8个bit表示,0表示黑,255表示白。
    2. 模式“L”: 为灰色图像,它的每个像素用8个bit表示,0表示黑,255表示白,其他数字表示不同的灰度。在PIL中,从模式“RGB”转换为“L”模式是按照下面的公式转换的:L = R * 299/1000 + G * 587/1000+ B * 114/1000
    3. 模式“P”为8位彩色图像、模式“RGBA”(alpha透明度)为32位彩色图像、模式“CMYK”为32位彩色图像
  • 自制打码工具

    1. captcha_getter.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    # 根据图片自己输入识别结果, 将图片保存下来, 图片名为正确的打码结果
    class CaptchaGUI:
    BASE_URL = "http://yktapp.njupt.edu.cn:8070"
    CAPTCHA_URL = BASE_URL + "/Login/GetValidateCode"
    LOGIN_URL = BASE_URL + "/Login/LoginBySnoQuery"
    def __init__(self):
    self.s = requests.session()
    self.r = self.s.get(self.CAPTCHA_URL)
    self.im = Image.open(BytesIO(self.r.content))
    self.root = tkinter.Tk()
    self.tkimg = ImageTk.PhotoImage(self.im)
    self.imgLabel = tkinter.Label(self.root, image=self.tkimg)
    self.imgLabel.pack()
    self.message = tkinter.Entry(self.root)
    self.message.pack()
    self.root.bind('<Return>', self.judge_and_save)
    self.root.mainloop()

    def judge_and_save(self, event):
    # 输出输入框的内容
    captcha_value = self.message.get()
    print(captcha_value)
    data = {
    "sno": self.account,
    "pwd": base64.b64encode(self.pwd.encode("utf8")),
    "ValiCode": captcha_value,
    "remember": 1,
    "uclass": 1,
    "zqcode": "",
    "json": True,
    }
    # 只有输入正确的才保存
    r = self.s.post(self.LOGIN_URL, data=data)
    if r.json()["IsSucceed"]:
    print("成功!")
    with open("captchas/{}.gif".format(captcha_value), 'wb+') as f:
    f.write(self.r.content)
    else:
    print(r.json())
    print("验证码输错了")
    # 刷新, 进行打码下一张
    self.r = self.s.get(self.CAPTCHA_URL)
    self.im = Image.open(BytesIO(self.r.content))
    # 刷新图片框
    self.tkimg = ImageTk.PhotoImage(self.im)
    self.imgLabel.config(image=self.tkimg)
    # ▲. 将输入框清空
    self.message.delete(0, 'end')
    1. captcha_gen.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49

    def spilt2chars():
    """
    分割已有的数据为字符并保存
    """
    try:
    shutil.rmtree('captcha_chars')
    except FileNotFoundError:
    pass
    os.mkdir("captcha_chars")
    values = "1234567890"
    # 每个字符创建对应的文件夹
    for value in values:
    os.mkdir('captcha_chars/{}'.format(value))

    file_names = os.listdir('captchas')
    # 遍历所有字符文件夹
    for file_name in file_names:
    if not os.path.isdir('captchas/{}'.format(file_name)) and file_name != '.DS_Store':
    # 获得打码img的图片名称(正确结果)
    values = file_name[:4]
    im = Image.open('captchas/{}'.format(file_name))
    captcha = CardCaptcha(im)
    # handle_split_image为切割后的四个字符图片, values为3fgc的值
    for im_part, value in zip(captcha.handle_split_image(), values):
    # 进行md5随机取名保存
    m = hashlib.md5()
    m.update("{}{}".format(time.time(), value).encode('utf8'))
    # captcha_chars/3/asdz.png
    im_part.save("captcha_chars/{}/{}.png".format(value, m.hexdigest()))

    def generate_models():
    iconset = list('0123456789')
    # 将图像数据转为向量数据并保存
    image_data = []
    for letter in iconset:
    try:
    # 遍历字符3下面所有的图片
    for img in os.listdir('captcha_chars/{}/'.format(letter)):
    if img != "Thumbs.db" and img != ".DS_Store":
    # 得到其图片向量
    temp = build_vector(Image.open("captcha_chars/{}/{}".format(letter, img)))
    # models_list中添加[{1: [0,1,1...]}]
    image_data.append({letter: temp})
    except FileNotFoundError:
    print(letter)
    image_data = [ {1: [0,1,1,...]}, {1: [1,0,...]}, ..., {2: [1,1,1,...]}]
    with open('image_data.json', 'w') as f:
    json.dump(image_data, f)

    附:找到最合适的旋转角度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24

    def rotate_img(image):
    """
    根据图像在x轴方向投影大小确定字符的摆放方向
    -45~+45循环, 当x轴上投影得到的白色像素数量最小,此时就是正确的角度
    :param image: PIL.Image object
    :return: rotated Image object
    """
    min_count = 1000
    final_angle = 0
    for angle in range(-45, 45):
    x_count = 0
    ti = image.rotate(angle, expand=True)
    for x in range(ti.width):
    # 当前y列只要有一个像素为white, 那么就+1结束, 进行下一列判断
    for y in range(ti.height):
    if ti.getpixel((x, y)) == WHITE:
    x_count += 1
    break
    if x_count < min_count:
    min_count = x_count
    final_angle = angle
    image = image.rotate(final_angle, expand=False)
    return image

Rss-Monitor

一个通用的RSS订阅通知器,和**RSSHub**一起食用效果更好哦。
RSS(Really Simple Syndication,简易信息聚合)是一种描述和同步网站内容的格式你可以认为是一种定制个性化推送信息的服务。它能够解决你漫无目的的浏览网页的问题。

  • 动态导入模块

  • 获得Rss链接信息——feedparser

    feedparser是一个Python的Feed解析库,可以处理RSS ,CDF,Atom 。使用它我们可从任何 RSS 或 Atom 订阅源得到标题、链接和文章的条目了。

    标准的item格式:

    1
    2
    3
    4
    5
    6
    7
    8
    <item>
    <title><![CDATA[厦门公交车放火案死者名单公布<br/>警方公布嫌犯犯罪证据]]></title>
    <link>http://www.infzm.com/content/91404</link>
    <description><![CDATA[6月11日下午,厦门BRT公交车放火案47名死亡者名单公布。厦门政府新闻办6月10日发布消息称,有证据表明,陈水总携带汽油上了闽DY7396公交车。且有多名幸存者指认其在车上纵火,致使整部车引起猛烈燃烧。经笔迹鉴定,陈水总6月7日致妻、女的两封绝笔书系陈水总本人所写。]]></description>
    <category>南方周末-热点新闻</category>
    <author>infzm</author>
    <pubDate>2013-06-11 11:24:32</pubDate>
    </item>

    feedparser模块api

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import feedparser
    d=feedparser.parse('http://feed.cnblogs.com/blog/sitehome/rss')
    # d.feed为数据字典, 与entries相比一个是dict一个是list, 其次entries中是更加凝练点的数据
    # 输出字典中的键值有哪些,一共有10中如下:
    ['feed', 'status', 'version', 'encoding', 'bozo', 'headers', 'href', 'namespaces', 'entries', 'bozo_exception']
    {'feed': {}, 'encoding': u'utf-8', 'bozo': 1, 'version': u'', 'namespaces': {}, 'entries': [], 'bozo_exception': SAXParseException('no element found',)}
    # d.channel下的属性为channel中中的字段,比如title、link、description、generator、webMaster、lastBuildDate
    >>>d['feed']['title']
    '博客园_首页'
    >>>d.feed.title #通过属性的方式访问
    '博客园_首页'
    >>>d.feed.subtitle
    '代码改变世界'
    >>>d.feed.link
    'uuid:1b90fd0c-6080-4ea5-86b1-b87c64b95d69;id=4466'
    d.entries #该属性类型为列表,表示一组文章的条目
    >>>type(d.entries) #类型为列表, 为item标签中的内容
    <class 'list'>
    >>>len(d.entries) #一共20篇文章
    20

    每个RSS和Atom订阅源都包含一个标题(d.feed.title)和一组文章条目(d.entries)

    通常每个文章条目都有一段摘要(d.entries[i].summary),或者是包含了条目中实际文本的描述性标签(d.entries[i].description)

  • 自定义添加Rss链接->https://docs.rsshub.app/joinus/quick-start.html、
    添加浙江工商大学教务处和学校首页通知

    除了按照Rsshub来自定义Rss订阅源以外,还可以使用 Huginn、Feed43等工具,如用Feed43为教务处网站制作RSS

    1. 暴露路由链接:在 /lib/router.js (opens new window)里添加路由

    2. 编写数据源脚本:在 /lib/routes/ (opens new window)中的路由对应路径下创建新的 js 脚本:

      1. 使用 got 请求 HTML 数据:
      2. 使用 cheerio 解析返回的 HTML:
      3. 使用 map 遍历数组,解析出每一个 item 的结果 ——对数据进行进一步处理,生成符合 RSS 规范的对象,把获取的标题、链接、描述、发布时间等数据赋值给 ctx.state.data, 生成 RSS 源

      生成 RSS 源原理:获取到的数据赋给 ctx.state.data, 然后数据会经过 template.js (opens new window)中间件处理,最后传到 /lib/views/rss.art (opens new window)来生成最后的 RSS 结果,每个字段的含义如下:https://docs.rsshub.app/joinus/quick-start.html#ti-jiao-xin-de-rsshub-gui-ze-bian-xie-jiao-ben

    3.添加脚本文档: <Route author="HenryQW" example="/github/issue/DIYgod/RSSHub" path="/github/issue/:user/:repo" :paramsDesc="['用户名', '仓库名']" />

ddt-sharp-shooter

使用到的库: screeninfo、pillow、ddddocr、pynput

  • pynput: 控制和监视输入设备;类似的有PyHook3(监视键鼠)、pywin32 (模拟键鼠)
  • ddddocr:识别验证码,这边用来识别数字
  1. 进程间通信。 Tkinter界面开启mainloop进程,其中又开辟出一个子线程来侦听其他进程发送的数据消息,然后通过tk.Text控件来展示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import multiprocessing
    import threading
    import time
    import tkinter
    # 声明全局对象及类型
    _tk: tkinter.Tk
    _text: tkinter.Text
    _terminate = False
    _queue: multiprocessing.Queue

    def update_text():
    """子线程侦听进程消息"""
    while not _terminate:
    if not _queue.empty():
    text = _queue.get(False)
    append_text(text)
    else:
    time.sleep(1)

    def append_text(text):
    _text.config(state='normal')
    _text.insert('end', f'\n{text}')
    _text.see('end')
    _text.config(state='disabled')

    def run(gui_queue):
    global _tk, _text, _queue, _screen_size
    _queue = gui_queue
    # ...
    threading.Thread(target=update_text).start()

    _tk.mainloop()
  2. py2app setup,在macos下创建python应用, python setup.py py2app

  3. 对纯数字识别结果进行清洗

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def recognize_digits(image: bytes):
    ocr = ddddocr.DdddOcr(show_ad=False)
    result = ocr.classification(image)
    return wash_digits(result)

    def wash_digits(digits: str):
    """由于不会出现非数字, 所以对易识别错误的字符进行替换"""
    washed = digits \
    .replace('g', '9').replace('q', '9') \
    .replace('l', '1').replace('i', '1') \
    .replace('z', '2') \
    .replace('o', '0')
    return re.sub(r'\D', '0', washed)

文章记录

Github优秀的文章项目

Go好用的三方库

Author: Mrli

Link: https://nymrli.top/2022/03/29/开源项目代码阅读记录/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

< PreviousPost
PlantUML作图
NextPost >
Python进阶
CATALOG
  1. 1. 爬虫系列
    1. 1.1. 抢购、秒杀
      1. 1.1.1. taobao_seckill
      2. 1.1.2. taobao_api
      3. 1.1.3. jd_seckill
      4. 1.1.4. jd-assistant★
      5. 1.1.5. jd-seckill-maotai
    2. 1.2. 主要功能
      1. 1.2.1. jd_mask
      2. 1.2.2. Automatic_ticket_purchase
      3. 1.2.3. NikeRobot
      4. 1.2.4. hpv4g★
      5. 1.2.5. 抢购型代码总结:
    3. 1.3. weiboSpider
    4. 1.4. proxy_pool★
      1. 1.4.1. scheduler的逻辑
      2. 1.4.2. 项目目录结构默写:
    5. 1.5. 极验滑块验证码破解与研究(三):滑块缺口识别
  2. 2. 机器学习
    1. 2.1. cardRecorder
    2. 2.2. wiki_zh_word2vec
  3. 3. 系统设计
    1. 3.1. Python 自制屏幕翻译工具
    2. 3.2. system-design-primer
    3. 3.3. go-package-plantuml
    4. 3.4. id-maker
    5. 3.5. health_checker
    6. 3.6. cookiecutter-golang
    7. 3.7. dddd_trainer
    8. 3.8. NJUPT-API
    9. 3.9. Rss-Monitor
    10. 3.10. ddt-sharp-shooter
  4. 4. 文章记录
    1. 4.1. Github优秀的文章项目
    2. 4.2. Go好用的三方库