python批量下载公众号历史文章(二)

阅读: 评论:0

python批量下载公众号历史文章(二)

python批量下载公众号历史文章(二)

【背景】

        在上一个版本里,实现了半自动的下载,需要通过手工进行抓包操作。通过fiddler截获历史文章raw文件保存到本地;通过python进行解析,获得文章信息(含访问url、标题、发布时间等)的列表;然后通过python调用这些url把需要的文章下载到本地;当然还有一些图片和转pdf的一些处理。

       上一版本博客链接:

       通过这几天帮助朋友处理其他问题的练习,已经学会了使用mitmproxy自动抓包,并将信息保存在数据库里。

【目标】

        这个版本的目标很简单,就是去除手工,不做太多的更新。目标有三个:

        1、使用mitmproxy,完成公众号基础信息的自动抓包。

             基础信息包括:访问“历史文章的首页”信息(包括url、headers(含cookie)等)

                                      访问“加载更多”链接的信息(包括url等)

                                      访问某篇文章的信息(包括url、公众号名称等,主要是为了获得公众号名称信息)

        2、使用mongodb把上述抓取到的基础信息包保存在数据库里。

        3、读取基础信息,拼装获取任意信息的url,将想要的文章信息保存到数据库里。

              文章信息包括:文章的访问url、文章的标题、文章的发布时间、文章发布序号(同一时间可发布多篇文章)

         PS:这个版本只是完成了文章信息保存的数据库里,并没有真正下载(不过有了下载url、headers、cookie,想下载随时可下载),具体的下载方法可以参考上一个版本。

         后续,准备做一个前端页面,增加检索功能,支持直接在线阅读,或者把文章下载到数据库里,导出pdf等功能。

        PS:随着版本的迭代,后面准备使用git进行版本管理

【思路】

        1、使用mitmporxy代替fiddler,编写插件,截获微信访问信息,保存到数据库。

              mitmproxy的使用见另一个博客实例:

              PS:本版本实现了自动在程序中启动mitmproxy,无需再手动操作

        2、通过python程序读取信息,分析“加载更多”链接的规则,拼装更多的加载更多链接url。

        3、通过python访问这些加载更多url,解析返回的json串,把解析出来的文章信息保存到数据库里。

【工具】

        1、mitmpoxy

        2、mongodb

        3、python3.7及对应的包

【编码】

        1、mitmproxy的插件编写 mitmaddons.py

# _*_ coding:utf-8 _*_"""
mitmproxy 插件,拦截微信访问数据,并插入到数据库里
"""import mitmproxy.http
from mitmproxy import ctx, http
from datetime import datetime
import jsonfrom db import CollectionOperation
from loguru import loggerclass WechatAddon:# 定义需要拦截的请求urldef __init__(self):self.url_filter = {'load_more':'=getmsg', 'getappmsgext':'?', 'appmsg_comment':'=getcomment', 'content':'?', 'home':'=home'}"""拦截url_filter中的请求参数 用q的格式作为key name 存入数据库"""@staticmethoddef _extract_wxuin(req_data):""":param req_data::return: 从cookie中解析出wxuin 作为公众号的唯一标识"""wxuin = 'UNK'if 'Cookie' in req_data['requestOptions']['headers']:cookie_dict = str_to_dict(req_data['requestOptions']['headers']['Cookie'], ';', '=')else:cookie_dict = str_to_dict(req_data['requestOptions']['headers']['cookie'], ';', '=')if 'wxuin' in cookie_dict:wxuin = cookie_dict['wxuin']return wxuindef request(self, flow: mitmproxy.http.HTTPFlow):passdef response(self, flow: mitmproxy.http.HTTPFlow):# print("url:"&#quest.url)# 截获返回数据的cookie保存在数据库里for ukey in self.url_filter:if self.url_filter[ukey] quest.url:req_data, timestamp = AnalysisFlow.format_request_quest)wxuin = self._extract_wxuin(req_data)if wxuin == 'UNK':returnkey_name = '%s.%s.req' % (wxuin, ukey)insert_helper(key_name, req_data,'req_data')if ukey == 'getappmsgext':text_dict = json.)nick_name = 'UNK'if 'nick_name' in text_dict:nick_name = text_dict['nick_name']if not nick_name == 'UNK':insert_helper(nick_name + '.nick_name', wxuin, 'req_data')elif ukey == 'home':html_text = urrent_nickname = html_text.split('var nickname = "')[1].split('" || ""')[0]insert_helper(wxuin+'.oaname', current_nickname, 'req_data')class AnalysisFlow:"""解析flow request数据"""@staticmethoddef format_request_data(request):req_data = {}req_data['protocol'] = request.schemereq_data['url'] = request.urlreq_data['requestOptions'] = {}req_data['requestOptions']['headers'] = AnalysisFlow.decode_headers(request.headers)req_data['requestOptions']['hostname'] = request.pretty_hostreq_data['requestOptions']['port'] = request.portreq_data['requestOptions']['path'] = request.pathreq_data['requestOptions']['method'] = hodreq_data['requestData'] = imestamp = int(request.timestamp_end * 1000)return (req_data, timestamp)@staticmethoddef decode_headers(headers):headers_data = {}for i in headers.fields:headers_data[str(i[0], 'utf-8')] = str(i[1], 'utf-8')if ':authority' in headers_data:headers_data.pop(':authority')return headers_datadef str_to_dict(s, join_symbol='n', split_symbol=':'):"""字符串到字典 支持自定义键值间隔符和成员间隔符:param s: 原字符串:param join_symbol: 连接符:param split_symbol: 分隔符:return: 字典"""s_list = s.split(join_symbol)data = dict()for item in s_list:item = item.strip()if item:k, v = item.split(split_symbol, 1)data[k] = v.strip()return datadef insert_helper(key, value, table):json_value = str(value).replace("'", '"')data = {'id':key,  'key':key,  'time':w(),  'value':json_value}CollectionOperation(table).insert('id', data)"""
addons = [WechatAddon()
]
"""

         2、操作自动启动mitmproxy的程序 startmitm.py

"""
启动mitmproxy
"""
from mitmproxy import proxy, options
ls.dump import DumpMasterfrom mitmaddons import WechatAddondef start_proxy():wechat_addon = WechatAddon()opts = options.Options(listen_host='0.0.0.0', listen_port=8080)pconf = fig.ProxyConfig(opts)m = DumpMaster(opts)m.server = proxy.server.ProxyServer(pconf)m.addons.add(wechat_addon)try:m.run()except KeyboardInterrupt:print('')m.shutdown()if __name__ == '__main__':start_proxy()

         3、操作mongodb的公用方法编写 db.py

# -*- coding:utf-8 -*-from pymongo import MongoClient
from config import cfg# mongodb连接
MONGODB_HOST = ('mongodb','MONGODB_HOST')
MONGODB_PORT = ('mongodb','MONGODB_PORT'))
MONGODB_NAME = ('mongodb','MONGODB_NAME')
db_client = MongoClient(MONGODB_HOST, MONGODB_PORT)
db_instance = db_client[MONGODB_NAME]class CollectionOperation:"""操作mongodb的类"""def __init__(self, table):""":param table: 实际上table在mongodb中被称为 collection 为了名称统一此处仍成为tabletable 的数据结果见data_scheme"""self.table = db_instance[table]def count(self, **kwargs):""":return:返回符合条件数据的总数"""return self.table.find(kwargs).count()def delete(self, **kwargs):""":param kwargs: 用字典表示的过滤器:return: 根据match中提供的符合信息删除文章 支持全部删除"""self.table.delete_many(kwargs)def get(self, **kwargs):""":param kwargs::return: 返回符要求的数据生成器"""data = self.table.find(kwargs)return datadef insert(self, key, data, check_exist=True):""":param data: []多个数据,或单个数据{}:param key: 更新模式下判重的依据:param check_exist:是否需要检查存在(更新模式):return: 插入一条数据或多个数据 在进行数据写入 基本上只需使用这一个API"""res = 'INSERT'if check_exist:if type(data) == dict:res = self._update_one(key, data)else:if type(data) == list:res = self._update_many(key, data)else:if type(data) == dict:self._insert_one(data)else:if type(data) == list:self._insert_many(data)return resdef _insert_one(self, data):""":param data: {}:return: 插入一条数据"""return self.table.insert_one(data).inserted_iddef _insert_many(self, data):""":param data: []:return: 插入多条数据"""self.table.insert_many(data)return len(data)def _update_one(self, key, data):""":param key: 判存字段:param data: {}:return: 更新或插入一条数据 用data中的字段更新key作为判断是否存在 存在更新 不存在就插入"""result = self.table.find_one({key: data[key]})if type(result) is dict:self.table.update_one({key: data[key]}, {'$set': data})op_result = 'UPDATE'else:self._insert_one(data)op_result = 'INSERT'return op_resultdef _update_many(self, key, data):""":param key: 判存字段:param data: []:return: 更新或插入多个数据 只要有一个数据是更新模式 返回UPDATE否则返回INSERT"""res = 'INSERT'for d in data:if self._update_one(key, d) == 'UPDATE':res = 'UPDATE'return resdef custom(self):""":return: 返回table 方便用户自定义操作"""return self.tabledef test():data = {'id':wechat, 'key':wechat,  'value':{'id':'wechat','url':'','title':'this is a test'}}col = CollectionOperation('test')col.insert('id', data)

        3、读取配置文件的方法编写 config.py

# -*- coding:utf-8 -*-import os
from configparser import ConfigParsercfg = ConfigParser()ad(os.path.join("conf","config.ini"))

        4、配置文件(config.ini)信息

[mongodb]
MONGODB_NAME = XDT
MONGODB_HOST = localhost
MONGODB_PORT = 27017

        5、读取数据库里保存的公众号“加载更多”基础信息,获取文章信息并保存到数据库里  insertarticle.py

# -*- coding:utf-8 -*-import requests
from datetime import datetimefrom db import CollectionOperation
from anajson import get_article_listdef get_load_more():db_tb = CollectionOperation('req_data').custom()db_data = db_tb.find({'id':{"$regex":"q"}})oa_info_list = []article_info_list = []for x in db_data:oa_info = {}oa_info['id'] = x['id']json_value = json.loads(x['value'])oa_info['protocol'] = json_value['protocol']oa_info['url'] = json_value['url']oa_info['headers'] = json_value['requestOptions']['headers']#oa_info['cookie'] = json_value['requestOptions']['headers']['Cookie']oa_info['wxuin'] = x['id'].split('.')[0]oa_info['oaname'] = db_tb.find_one({'id':oa_info['wxuin']+'.oaname'})['value']oa_info_list.append(oa_info)print("url:"+oa_info['url'])print(oa_info['headers'])rsps = get_response(oa_info['url'],oa_info['headers'])oa_atc_list = get_article_)for atc in oa_atc_list:atc_info = {}atc_info['url'] = atc.urlatc_info['title'] = atc.titleatc_info['pubtime'] = atc.atc_datetimeatc_info['idx_num'] = atc.idx_numatc_info['oaname'] = oa_info['oaname']atc_info['wxuin'] = oa_info['wxuin']article_info_list.append(atc_info)for oa_i in oa_info_list:oa_key = oa_i['wxuin']+".oainfo"insert_helper(oa_key,oa_i,"oa_info")for atc_i in article_info_list:atc_key = atc_i['wxuin']+"."+str(atc_i['pubtime'])+"."+str(atc_i['idx_num'])+".atcinfo"insert_helper(atc_key,atc_i,"atc_info")def insert_helper(key, value, table):json_value = str(value).replace("'", '"')data = {'id':key,  'key':key,  'time':w(),  'value':json_value}CollectionOperation(table).insert('id', data)def get_response(url,headers):(url,headers = headers)

        6. 定义解析返回的json页面的方法 anajson.py

# -*- coding:utf-8 -*-"""
解析json字符串
"""
import jsonfrom xtime import seconds_to_timedef get_article_list(resptext):"""传入保存文章列表信息的json页面,返回文章信息列表"""#print(resptext)article_list = [] # 用来保存所有文章的列表json_cont = json.loads(resptext)general_msg_list = json_cont['general_msg_list']json_list = json.loads(general_msg_list)#print(json_list['list'][0]['comm_msg_info']['datetime'])for lst in json_list['list']:atc_idx = 0 # 每个时间可以发多篇文章 为了方便后续图片命名seconds_datetime = lst['comm_msg_info']['datetime']atc_datetime = seconds_to_time(seconds_datetime)if lst['comm_msg_info']['type'] == 49: # 49为普通的图文atc_idx+=1url = lst['app_msg_ext_info']['content_url']title = lst['app_msg_ext_info']['title']atc_info = ArticleInfo(url,title,atc_idx,atc_datetime)article_list.append(atc_info)if 1 == lst['app_msg_ext_info']['is_multi']: # 一次发多篇multi_app_msg_item_list = lst['app_msg_ext_info']['multi_app_msg_item_list']for multi in multi_app_msg_item_list:atc_idx+=1url = multi['content_url']title = multi['title']mul_act_info = ArticleInfo(url,title,atc_idx,atc_datetime)article_list.append(mul_act_info)return article_listclass ArticleInfo():"""文章信息数据结构@title 文章标题@url 文章链接@atc_datetime 文章发布时间@idx_num 同一时间可能会发布多篇文章,计数用"""def __init__(self,url,title,idx_num,atc_datetime): #idx_num是为了方便保存图片命名self.url = urlself.title = titleself.idx_num = idx_numself.atc_datetime = atc_datetime

【运行方法】

        >准备工作

        1、上述文件放在同一个文件夹下,笔者是有标准的工程目录的,但不知道怎么上传到csdn,不过为了是各位看官能随时保证有一个可运行的版本(传说中的敏捷迭代),程序的包都是做过调整的,直接扔在一个目录就能执行。

        2、启动mongodb数据库,默认的用户名和密码为空,如果有配置,需要修改下db.py的程序。

            db.py 

            db_client = MongoClient(MONGODB_HOST, MONGODB_PORT)

            修改为:

            dburl = 'mongodb://' + user + ':' + pwd + '@' + MONGODB_HOST+ ':' + MONGODB_PORT

            db_client  = MongoClient(dburl)

        3、设置mitmproxy的代理并安装证书,具体方法看上一篇文章(必须安装,因为是拦截的微信app数据,无法通过快捷方式打开chrome那样)

        >运行

        1、(cmd)>python startmitm.py

        2、访问微信(手机、电脑版、模拟器均可,但是一定要设置mitm代理并安装证书),点击任一公众号,打开历史信息页面,下划至少一次加载更多(多了也没用,数据库只会保存一次,有重复 判断)。

        3、(cmd)>python insertarticle.py

        即可在数据库理查看到相应的数据了,有三个表:

        req_data存放的是公众号的基础数据,一个公众号三条记录;

        oa_info存放的是公众号的基础信息,一个公众号一条记录;(这个表,是从上个表里抽取的数据)

        atc_info存放的是所有的文章信息,很多很多

【运行效果截图】

       

 

本文发布于:2024-01-31 04:54:03,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170664804625670.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:历史文章   批量   公众   python
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23