声明:本项目仅供学习参考使用,请勿做任何商业化乃至违法行为,请勿大量请求,以免给服务器带来不必要的负担。
前言:
小红书作为一个以种草、分享生活方式、商品评测为主的社区平台,日常使用用户多且内容丰富,因此也常常成为数据采集工作的重要目标。然而,小红书在反爬虫方面做了不少功夫,如果只是通过普通的requests
库发送请求,往往会遇到各种报错或“疑似异常流量”而被禁止访问。
本文将为你详细介绍一套Python 实现 的小红书爬虫思路,包括在抓包时发现的接口信息、必需的加密参数生成、Cookies 的获取和切换、数据的抓取与持久化、图片下载等环节。
最终,你可以通过该示例一次性爬取多个用户的笔记列表和详情信息,并将数据存储到 CSV 文件,甚至还能根据需求把笔记中的图片也一起下载到本地。
许多网站(尤其是移动端 / H5 服务)为了提高安全性,会对其接口做各种加密或混淆处理,以防止未经授权的爬虫大规模采集数据。具体做法包括但不限于:
{"success": false}
或者被识别为异常流量。在本文中,我们需要的关键参数有:
x-s
、x-t
:小红书的核心加密签名,用于鉴权。xsec_token
:当获取用户笔记列表或单条笔记详情时,需要携带该参数来验证。a1
、web_session
等信息。概括而言,整个爬虫流程可以拆解为以下几个阶段:
准备阶段
.js
),内部包含了小红书签名生成所需的核心函数 getXs
,我们将借助 execjs
在 Python 环境中调用它。读取博主 ID
pandas
从 Excel 表格或 CSV 文件中读取每位博主的 user_id。循环爬取:获取博主笔记列表
GET /api/sns/web/v1/user_posted
接口,获取其笔记列表。cursor
实现翻页),直到 has_more
为 False 或者没有返回 cursor
,说明没有更多笔记。获取单条笔记详情
POST /api/sns/web/v1/feed
接口,获得更详细的数据,如:笔记文本、标签、点赞收藏数、评论数、博主 IP 位置、图片列表等。数据解析与持久化
频率限制与 Cookies 切换
success == false
或者 461等错误。time.sleep(0.5)
)让访问更自然。在使用前,请先安装以下核心库:
pip install requests execjs loguru pandas openpyxl
并将逆向得到的小红书签名 JS 文件放在脚本同级目录下。
下面的示例来自一个可正常执行并获取小红书博主笔记信息的爬虫脚本。我们将分几个功能函数为你一一解释实现原理与示例代码。最后会把所有函数组装到一起,形成一个完整可运行的脚本。
update_headers
函数在小红书的接口中,x-s
、x-t
这两个字段是最关键的签名参数。如果不带或不正确,就会出现 {"success": false}
等错误。
通过前期的 JS 逆向 或 抓包 分析,我们可以在网页中找到类似 getXs
的函数,专门用于生成 x-s
、x-t
。通过将其提取成单独的 JavaScript 文件,我们在 Python 中调用它即可。
def update_headers(api, data, current_cookies):
"""
使用 execjs 执行 JS 文件生成 x-s, x-t 签名
:param api: 接口路径,示例:'/api/sns/web/v1/feed' 或带有参数的全路径
:param data: 请求 body 或者需要参与签名的字段,可为 None
:param current_cookies: 当前使用的 Cookies(提取 a1)
:return: 包含 'X-s', 'X-t' 的字典
"""
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
# 第三个参数是 a1 值,在 cookies 中
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
这个函数的工作流程很简单:
getXs
方法,传入接口路径、请求数据、以及 cookies 里的 a1
。{'X-s': 'xxx', 'X-t': 123456789}
。拿到签名后,我们就可以把它放到请求头里去发送请求了。
main
函数(核心循环)为了实现批量爬取,我们通常会从一个文件(如 博主id.xlsx
)中读取许多用户 ID (user_id
),然后挨个请求 GET /api/sns/web/v1/user_posted
接口,拿到该用户的所有笔记。
以下示例展示了主函数 main
中的关键逻辑:
def main(file_path, file_name, cookies_list, is_download=False):
# 1. 读取用户ID
data = pd.read_excel(file_path) # 或 pd.read_csv
user_ids = data['用户id'] # 假定excel中有一列叫"用户id"
# 2. 小红书的接口地址
url = "edith.xiaohongshu/api/sns/web/v1/user_posted"
person_index = 1
# 3. 遍历每个 user_id,依次爬取笔记列表
for user_id in user_ids:
has_more = True
print(f'正在爬取第{person_index}个人 {user_id} 的帖子信息')
# 3.1 初始化请求参数
params = {
"num": "30", # 一页返回多少条
"cursor": "", # 分页游标
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "", # 预留字段
"xsec_source": "pc_note"
}
k = 0
# 3.2 当 has_more 为 True 就不断分页请求
while has_more:
current_cookie_index = 0
# 3.3 如果遇到访问异常,则切换cookies
while current_cookie_index < len(cookies_list):
current_cookies = cookies_list[current_cookie_index]
# 拼接参数并复制初始headers
params_encoded = urllib.parse.urlencode(params)
headers = py()
# 生成 x-s、x-t 并写入到 headers 中
sign_headers = update_headers(
f'/api/sns/web/v1/user_posted?{params_encoded}', # 接口带查询参数
None,
current_cookies
)
headers['x-s'] = sign_headers['X-s']
headers['x-t'] = str(sign_headers['X-t'])
# 3.4 发起请求
response1 = (url, headers=headers, cookies=current_cookies, params=params)
print(response1.status_code)
# 3.5 如果状态码200且 success=true,则表示正常返回
if response1.status_code == 200 and response1.json().get('success') == True:
data_json = response1.json()
# 解析下一页所需的 cursor、has_more
notes = ('data', {}).get('notes', [])
has_more = ('data', {}).get('has_more', False)
cursor = ('data', {}).get('cursor', None)
if cursor:
params['cursor'] = cursor
logger.info(f"成功更新 cursor 为: {cursor}")
else:
has_more = False
logger.info("未返回 cursor,结束分页")
# 3.6 遍历当前页所有笔记
for note in notes:
logger.info(f'正在爬取第{person_index}个人的第{k}个帖子')
k += 1
xsec_token = ('xsec_token')
note_id = ('note_id')
# 获取笔记详情
note_data, status_code_result, headers_result = fetch_xiaohongshu_data(
note_id, xsec_token, current_cookies
)
# 如果 success=False,说明被识别为频率异常,需要切换Cookies
if status_code_result == 200 and ('success') == False:
current_cookie_index += 1
print('出现访问频率异常,切换下一个cookies,跳出当前页笔记循环')
k -= 1 # 回退一下计数,因为这一条笔记没有成功获取
break
else:
# 如果获取成功,则解析并保存
if status_code_result == 200 and ('success') == True:
note_id, user_id_1 = parse_data(note_data, file_name, xsec_token)
# 如果需要下载笔记内图片
if is_download == True:
download_img(note_data, user_id_1, note_id)
else:
('请求失败,跳过')
current_cookie_index += 1
# 如果这一页爬完并且没有更多,就退出
if has_more == False:
logger.info(f'用户{user_id}的笔记已经爬完')
break
else:
# 若请求不成功,切换Cookies重试
logger.info('------------------------------------')
logger.info('请求失败,切换下一个 cookies')
logger.info('------------------------------------')
current_cookie_index += 1
person_index += 1
logger.info("所有用户数据处理完毕")
需要注意:
- 若请求过于频繁,或某一 Cookie 被判定为异常,则需要切换到
cookies_list
中的下一个 Cookie。- 由于小红书接口的复杂性,建议你在实际部署时,也要考虑代理池的使用,避免 IP 被拉黑。
- 请不要大规模爬取,以免给服务器带来不必要的负担。
fetch_xiaohongshu_data
函数用户笔记列表接口虽然能拿到很多基础信息,但并不够全面。通常我们还需要调用单条笔记的详情接口进行爬取,例如 POST /api/sns/web/v1/feed
。该接口需要在请求体中传递 source_note_id
、xsec_token
、xsec_source
等,并再次生成签名:
def fetch_xiaohongshu_data(source_note_id, xsec_token, cookies):
url = "edith.xiaohongshu/api/sns/web/v1/feed"
api_endpoint = '/api/sns/web/v1/feed'
# 请求体
data = {
"source_note_id": source_note_id,
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": "1"},
"xsec_source": "pc_feed",
"xsec_token": xsec_token
}
a1_value = cookies['a1']
headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "www.xiaohongshu/",
"sec-ch-ua": ""Not/A)Brand";v="8", "Chromium";v="126"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": ""macOS"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)",
"x-b3-traceid": "9cbac8e2b8562aa3"
}
# 与上面相同,再次通过 JS 文件生成 x-s, x-t
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api_endpoint, data, a1_value)
headers['x-s'] = sign['X-s']
headers['x-t'] = str(sign['X-t'])
# 发起请求
data_json = json.dumps(data, separators=(',', ':'))
time.sleep(0.5) # 做一个小延时,减小被识别为爬虫的概率
response = requests.post(url, headers=headers, cookies=cookies, data=data_json)
return response.json(), response.status_code, response.headers
调用此函数后,我们会得到一个 JSON 响应,其中包含了笔记正文、图片信息、点赞收藏数、评论数、IP 所属地等。
parse_data
函数拿到笔记详情后,就可以对响应进行解析,并将关键字段保存到本地 CSV 文件中了。例如:
def parse_data(data, filename='博主信息', xsec_token=0):
fieldnames = [
"笔记id", "xsec_token", "笔记链接", "笔记类型", "笔记标题",
"笔记正文", "笔记标签", "发布时间", "笔记最后更新时间", "图片链接",
"点赞数", "收藏数", "评论数", "分享数", "用户名",
"用户id", "用户ip", "用户头像"
]
file_name = f"{filename}.csv"
file_exists = os.path.isfile(file_name)
with open(file_name, mode='a', newline='', encoding='utf-8-sig') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
item = data['data']['items'][0] # 取到 note 数据
note_card = item['note_card']
note_id = item["id"]
note_url = f'www.xiaohongshu/explore/{note_id}?xsec_token={xsec_token}&xsec_source=pc_feed'
note_type = item["model_type"]
desc = ("desc", "")
# 提取 #标签
tags = re.findall(r"#([^#]+?)(?=[话题])", desc)
tags = ", ".join(tags)
# 互动信息
interact_info = ("interact_info", {})
publish_time = ("time")
title = ("title", "")
user_info = ("user", {})
user_avatar = ("avatar", "")
user_name = ('nickname', '')
user_id = ('user_id', '')
last_updated_time = ("last_update_time", "")
like_count = ("liked_count", 0)
collect_count = ("collected_count", 0)
comment_count = ("comment_count", 0)
share_count = ("share_count", 0)
ip = ("ip_location", "")
# 笔记首图
image_url = ""
if ("image_list"):
image_url = note_card["image_list"][0]["info_list"][0]["url"]
# 写入CSV
writer.writerow({
"笔记id": note_id,
"xsec_token": xsec_token,
"笔记链接": note_url,
"笔记类型": note_type,
"笔记标题": title,
"笔记正文": desc,
"笔记标签": tags,
"发布时间": convert_scientific_time(publish_time),
"笔记最后更新时间": convert_scientific_time(last_updated_time),
"图片链接": image_url,
"点赞数": convert_to_int(like_count),
"收藏数": convert_to_int(collect_count),
"评论数": convert_to_int(comment_count),
"分享数": convert_to_int(share_count),
"用户名": user_name,
"用户id": user_id,
"用户ip": ip,
"用户头像": user_avatar
})
return note_id, user_id
这里做了几个小处理:
re.findall(r"#([^#]+?)(?=[话题])", desc)
来找出笔记中以 #
开头且紧跟 [话题]
的标签。datetime.fromtimestamp
转换成人类可读的格式。def convert_scientific_time(time_str):
timestamp = float(time_str)
timestamp_in_seconds = timestamp / 1000
readable_time = datetime.fromtimestamp(timestamp_in_seconds).strftime('%Y年%m月%d日 %H:%M:%S')
return readable_time
def convert_to_int(value):
if '万' in value:
value = place('万', '')
return float(value) * 10000
else:
return value
download_img
函数(可选)如果业务需要存储图片,可以在爬取到笔记详情时,将 note_card["image_list"]
中的图片地址遍历下载下来:
def download_img(data, user_id, note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = os.path.join("images", user_id, note_id)
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
try:
response = (url)
if response.status_code == 200:
file_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
with open(file_path, "wb") as f:
f.t)
logger.info(f"图片下载成功: {file_path}")
except Exception as e:
(f"图片下载出错: {e}")
这样就能在 images/用户id/笔记id
文件夹下存储下载到的所有图片。
将以上函数整合,存放到同一个 Python 文件(如 xiaohongshu_spider.py
)即可。请在运行前,先确认你拥有 GenXsAndCommon_56.js
、博主id.xlsx
等文件,并且手动更新了 cookies_list
中的 Cookies。示例:
import time
import execjs
import urllib.parse
from loguru import logger
import pandas as pd
import os
import requests
import re
import csv
from datetime import datetime
import json
import random
def base36encode(number, digits='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
base36 = ""
while number:
number, i = divmod(number, 36)
base36 = digits[i] + base36
return base36.lower()
def generate_search_id():
timestamp = int(time.time() * 1000) << 64
random_value = int(random.uniform(0, 2147483646))
return base36encode(timestamp + random_value)
def convert_scientific_time(time_str):
timestamp = float(time_str)
timestamp_in_seconds = timestamp / 1000
readable_time = datetime.fromtimestamp(timestamp_in_seconds).strftime('%Y年%m月%d日 %H:%M:%S')
return readable_time
def convert_to_int(value):
if '万' in value:
value = place('万', '')
return float(value) * 10000
else:
return value
def update_headers(api, data, current_cookies):
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
headers_init = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"origin": "www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "www.xiaohongshu/",
"sec-ch-ua": ""Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": ""macOS"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"x-b3-traceid": "9d6c7dd3a155b6a5"
}
def fetch_xiaohongshu_data(source_note_id, xsec_token, cookies):
url = "edith.xiaohongshu/api/sns/web/v1/feed"
api_endpoint = '/api/sns/web/v1/feed'
data = {
"source_note_id": source_note_id,
"image_formats": ["jpg", "webp", "avif"],
"extra": {"need_body_topic": "1"},
"xsec_source": "pc_feed",
"xsec_token": xsec_token
}
a1_value = cookies['a1']
headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json;charset=UTF-8",
"origin": "www.xiaohongshu",
"pragma": "no-cache",
"priority": "u=1, i",
"referer": "www.xiaohongshu/",
"sec-ch-ua": ""Not/A)Brand";v="8", "Chromium";v="126"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": ""macOS"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"x-b3-traceid": "9cbac8e2b8562aa3"
}
with open('GenXsAndCommon_56.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjspile(js_script)
sign = context.call('getXs', api_endpoint, data, a1_value)
headers['x-s'] = sign['X-s']
headers['x-t'] = str(sign['X-t'])
data_json = json.dumps(data, separators=(',', ':'))
time.sleep(0.5)
response = requests.post(url, headers=headers, cookies=cookies, data=data_json)
return response.json(), response.status_code, response.headers
def parse_data(data, filename='博主信息', xsec_token=0):
fieldnames = [
"笔记id", "xsec_token", "笔记链接", "笔记类型", "笔记标题",
"笔记正文", "笔记标签", "发布时间", "笔记最后更新时间", "图片链接",
"点赞数", "收藏数", "评论数", "分享数", "用户名",
"用户id", "用户ip", "用户头像"
]
file_name = f"{filename}.csv"
file_exists = os.path.isfile(file_name)
with open(file_name, mode='a', newline='', encoding='utf-8-sig') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
item = data['data']['items'][0]
note_card = item['note_card']
note_id = item["id"]
note_url = f'www.xiaohongshu/explore/{note_id}?xsec_token={xsec_token}&xsec_source=pc_feed'
note_type = item["model_type"]
desc = ("desc", "")
tags = re.findall(r"#([^#]+?)(?=[话题])", desc)
tags = ", ".join(tags)
interact_info = ("interact_info", {})
publish_time = ("time")
title = ("title", "")
user_info = ("user", {})
user_avatar = ("avatar", "")
user_name = ('nickname', '')
user_id = ('user_id', '')
last_updated_time = ("last_update_time", "")
like_count = ("liked_count", 0)
collect_count = ("collected_count", 0)
comment_count = ("comment_count", 0)
share_count = ("share_count", 0)
ip = ("ip_location", "")
image_url = ""
if ("image_list"):
image_url = note_card["image_list"][0]["info_list"][0]["url"]
writer.writerow({
"笔记id": note_id,
"xsec_token": xsec_token,
"笔记链接": note_url,
"笔记类型": note_type,
"笔记标题": title,
"笔记正文": desc,
"笔记标签": tags,
"发布时间": convert_scientific_time(publish_time),
"笔记最后更新时间": convert_scientific_time(last_updated_time),
"图片链接": image_url,
"点赞数": convert_to_int(like_count),
"收藏数": convert_to_int(collect_count),
"评论数": convert_to_int(comment_count),
"分享数": convert_to_int(share_count),
"用户名": user_name,
"用户id": user_id,
"用户ip": ip,
"用户头像": user_avatar
})
return note_id, user_id
def download_img(data, user_id, note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = os.path.join("images", user_id, note_id)
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
try:
response = (url)
if response.status_code == 200:
file_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
with open(file_path, "wb") as f:
f.t)
logger.info(f"图片下载成功: {file_path}")
except Exception as e:
(f"图片下载出错: {e}")
def main(file_path, file_name, cookies_list, is_download=False):
data = pd.read_excel(file_path)
user_ids = data['用户id']
url = "edith.xiaohongshu/api/sns/web/v1/user_posted"
person_index = 1
for user_id in user_ids:
has_more = True
print(f'正在爬取第{person_index}个人 {user_id} 的帖子信息')
params = {
"num": "30",
"cursor": "",
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "",
"xsec_source": "pc_note"
}
k = 0
while has_more:
current_cookie_index = 0
while current_cookie_index < len(cookies_list):
current_cookies = cookies_list[current_cookie_index]
params_encoded = urllib.parse.urlencode(params)
headers = py()
sign_headers = update_headers(
f'/api/sns/web/v1/user_posted?{params_encoded}', None, current_cookies
)
headers['x-s'] = sign_headers['X-s']
headers['x-t'] = str(sign_headers['X-t'])
response1 = (url, headers=headers, cookies=current_cookies, params=params)
print(response1.status_code)
if response1.status_code == 200 and response1.json().get('success') == True:
data_json = response1.json()
notes = ('data', {}).get('notes', [])
has_more = ('data', {}).get('has_more', False)
cursor = ('data', {}).get('cursor', None)
if cursor:
params['cursor'] = cursor
logger.info(f"成功更新 cursor 为: {cursor}")
else:
has_more = False
logger.info("未返回 cursor,结束分页")
for note in notes:
logger.info(f'正在爬取第{person_index}个人的第{k}个帖子')
k += 1
xsec_token = ('xsec_token')
note_id = ('note_id')
note_data, status_code_result, headers_result = fetch_xiaohongshu_data(
note_id, xsec_token, current_cookies
)
if status_code_result == 200 and ('success') == False:
current_cookie_index += 1
print('出现频率访问异常,切换下一个cookies,跳出当前页笔记')
k -= 1
break
else:
pass
if status_code_result == 200 and ('success') == True:
note_id, user_id_1 = parse_data(note_data, file_name, xsec_token)
if is_download == True:
download_img(note_data, user_id_1, note_id)
else:
('请求失败,跳过')
current_cookie_index += 1
if has_more == False:
logger.info(f'用户{user_id}的笔记已经爬完')
break
else:
logger.info('------------------------------------')
logger.info('请求失败,切换下一个 cookies')
logger.info('------------------------------------')
current_cookie_index += 1
person_index += 1
logger.info("所有用户数据处理完毕")
if __name__ == '__main__':
file_path = '博主id.xlsx' # 存放博主用户ID的Excel
cookies_list = [
]
is_download = True
file_name = '博主帖子信息'
main(file_path, file_name, cookies_list, is_download)
频繁出现“频率请求限制”
CSV 文件乱码
encoding='utf-8-sig'
或者 gb18030
,看你本机环境需求。爬取速度过慢 / 数据量大
通过上文的示例,我们了解了如何借助 Python + ExecJS 来模拟小红书官方 JS 的请求签名过程,从而实现对用户笔记列表和笔记详情的批量爬取。重点在于:
x-s
、x-t
,进而无法正常拿到小红书的数据。声明:本示例仅用于学习与技术交流,请勿将其用于任何商业或非法用途。若有侵权,请及时与笔者联系。
如果这篇文章对你有帮助,欢迎点赞、收藏或关注!
如有其他疑问或想进一步探讨的,欢迎留言或私信交流。
END
本文发布于:2025-04-04 11:40:00,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/1743738055583771.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |