-
Notifications
You must be signed in to change notification settings - Fork 1k
Open
Description
- 对 Bug 的描述
- 当前行为:Fastapi的插件WeRobot无法成功发送合并的新闻消息
我已经测试多次了
- 当前行为:Fastapi的插件WeRobot无法成功发送合并的新闻消息
这个是我的FastApi的WeRobot项目通过点击微信公众号菜单返回的结果,从代码中看,它是先发合并的新闻消息,当失败后,错误代码是45008的时候,转逐条发送,下面的图片就是结果
而下面这张图片是用的类似的方法,项目是thinkPHP5+EasyWechat完成的,用这个就可以正常发送合并的新闻消息
-
正确的行为:应当在微信公众号中收到合并到一条消息的,且包含多条新闻,只要不超过8条都行。
-
环境
- 平台:Windows 10
- WeRoBot 版本号: 1.13.1
description : WeRoBot: writing WeChat Offical Account Robots with fun - Python 版本:Python 3.11.3
-
复现代码或 repo 链接
from app.services.wechat_robot import robot # 引入WeRoBot实例(带DB存储token)
# 移除所有可能的默认处理器,防止产生额外的被动回复
robot.handlers = {}
robot.click_handlers = {}
#核心代码:
articles = msg_data.get("articles", [])[:8] # 限制最多8篇
print(f"尝试发送 {len(articles)} 篇图文消息")
print(f"图文消息内容: {articles}")
if articles:
# 将我们的文章格式转换为WeRobot需要的格式
formatted_articles = []
for article in articles:
# 调试输出原始图片URL
pic_url = article.get("picurl", "")
print(f"原始图片URL: {pic_url}")
# 同时保留picurl和image字段,确保至少一个能正常工作
formatted_article = {
"title": article.get("title", ""),
"description": article.get("description", ""),
"url": article.get("url", ""),
"picurl": pic_url # 微信API文档使用picurl
}
formatted_articles.append(formatted_article)
# 打印完整的格式化后数据,用于调试
print(f"格式化后的文章数据: {formatted_articles[:1]}")
try:
# 添加超时保护
async def send_articles_with_timeout():
try:
return await asyncio.wait_for(
asyncio.to_thread(robot.client.send_article_message, openid, formatted_articles),
timeout=5.0
)
except asyncio.TimeoutError:
print(f"发送图文消息超时")
return {"errcode": -1, "errmsg": "timeout"}
# 尝试合并发送
result = await send_articles_with_timeout()
print(f"通过WeRobot发送图文消息,标题: {articles[0].get('title', '')[:30]}...")
except Exception as article_err:
# 检查是否是"article size out of limit"错误
error_msg = str(article_err)
print(f"合并发送失败: {error_msg}")
#return False
if "45008" in error_msg and "article size out of limit" in error_msg:
print("检测到文章大小超限,改为逐条发送...")
success_count = 0
# 逐条发送
for i, article in enumerate(formatted_articles):
try:
# 添加超时保护
async def send_single_article_with_timeout():
try:
return await asyncio.wait_for(
asyncio.to_thread(robot.client.send_article_message, openid, [article]),
timeout=5.0
)
except asyncio.TimeoutError:
print(f"发送单篇文章超时")
return {"errcode": -1, "errmsg": "timeout"}
single_result = await send_single_article_with_timeout()
if not isinstance(single_result, dict) or single_result.get("errcode", 0) == 0:
print(f"第 {i+1}/{len(formatted_articles)} 篇发送成功: {article.get('title', '')[:15]}...")
success_count += 1
else:
print(f"第 {i+1} 篇发送失败: {single_result}")
# 防止发送过快
if i < len(formatted_articles) - 1:
await asyncio.sleep(0.5)
if i == 2: # 只发送3篇
break
except Exception as single_err:
print(f"第 {i+1} 篇发送失败: {single_err}")
if success_count > 0:
print(f"逐条发送成功 {success_count}/{len(formatted_articles)} 篇")
return True
else:
print("所有文章逐条发送均失败")
return False
else:
# 其他错误,直接报告失败
print(f"发送图文消息失败,非大小限制错误: {error_msg}")
return False
else:
print("图文消息内容为空")
return False
# 请在这里给出 bug 的复现代码。如有必要,可以创建一个复现 repo 并将链接粘贴到这里。
这个最终总是走错误:45008,然后进行逐条发送,逐条发送能成功发送3条。
- 复现步骤
1建立
# app/services/wechat_robot.py
import datetime
import time
import json
import logging
from sqlalchemy.orm import Session
from werobot import WeRoBot
from werobot.client import Client
from app.db.session import SessionLocal_share
from app.models.share_wechat_token import WechatToken
from app.core.config import (
WECHAT_TOKEN, WECHAT_APPID, WECHAT_SECRET, WECHAT_ENCODING_AES_KEY
)
from werobot.replies import ArticlesReply, Article
import requests
import traceback
from sqlalchemy import text
# ======================================================
# 初始化 WeRoBot 实例
# ======================================================
robot = WeRoBot(
token=WECHAT_TOKEN,
encoding_aes_key=WECHAT_ENCODING_AES_KEY,
app_id=WECHAT_APPID,
app_secret=WECHAT_SECRET
)
# 添加自定义函数从数据库获取token
def get_token_from_db():
"""从数据库获取token"""
try:
with SessionLocal_share() as db:
# 仅为调试目的获取表结构信息,保留原生SQL
try:
describe_result = db.execute(text("DESCRIBE share.wechat_token")).fetchall()
columns = [row[0] for row in describe_result]
print(f"wechat_token表的列名: {columns}")
except Exception as e:
print(f"获取表结构失败: {e}")
columns = []
# 使用ORM方式查询最新的token记录
token_record = db.query(WechatToken)\
.filter(WechatToken.cachename == 'wechat_jsapi_ticket')\
.order_by(WechatToken.id.desc())\
.first()
if token_record:
# 获取token值
token_value = token_record.token
datetime_value = token_record.datetime if hasattr(token_record, 'datetime') else None
# 检查token值
if not token_value:
print("数据库中的token为空")
return None
# 检查是否是PHP序列化格式
if isinstance(token_value, str) and 'a:' in token_value and '"value";' in token_value:
try:
# 尝试从PHP序列化字符串中提取token值
start_marker = '"value";s:'
start_pos = token_value.find(start_marker)
if start_pos > 0:
# 找到值长度标记
start_pos = token_value.find(':', start_pos + len(start_marker)) + 1
# 找到引号开始的位置
start_pos = token_value.find('"', start_pos) + 1
# 找到结束引号的位置
end_pos = token_value.find('"', start_pos)
# 提取token
if start_pos > 0 and end_pos > start_pos:
extracted_token = token_value[start_pos:end_pos]
print(f"从PHP序列化数据中提取到token: {extracted_token[:10]}...")
# 检查过期时间
try:
expired_marker = '"expired";i:'
exp_start_pos = token_value.find(expired_marker)
if exp_start_pos > 0:
exp_start_pos += len(expired_marker)
exp_end_pos = token_value.find(';', exp_start_pos)
if exp_end_pos > exp_start_pos:
expires_at = int(token_value[exp_start_pos:exp_end_pos])
current_time = int(time.time())
if expires_at > current_time:
return extracted_token
else:
print(f"Token已过期: {expires_at} < {current_time}")
except Exception as exp_err:
print(f"提取过期时间失败: {exp_err}")
except Exception as parse_err:
print(f"解析PHP序列化token失败: {parse_err}")
else:
# 不是PHP序列化格式的token,直接检查微信API token
# 我们现在不相信数据库的token,强制获取新的
print(f"数据库中的token可能已经过期,将获取新token")
return None
else:
print("未找到cachename为'wechat_jsapi_ticket'的记录")
# 找不到有效token
return None
except Exception as e:
print(f"从数据库获取token时出错: {e}")
traceback.print_exc()
return None
# 添加自定义函数保存token到数据库
def save_token_to_db(token, expires_in):
"""将token保存到数据库"""
try:
# 首先检查表结构(调试目的)
with SessionLocal_share() as db:
try:
# 保留原生SQL仅用于调试表结构
describe_result = db.execute(text("DESCRIBE share.wechat_token")).fetchall()
columns = [row[0] for row in describe_result]
print(f"保存token时的表结构: {columns}")
# 获取当前时间
current_time = datetime.datetime.now()
# 使用ORM检查是否存在记录
existing_record = db.query(WechatToken)\
.filter(WechatToken.cachename == 'wechat_access_token')\
.first()
if existing_record:
# 更新现有记录
existing_record.token = token
existing_record.datetime = current_time
db.commit()
print(f"更新了现有的token记录: {token[:10]}...")
else:
# 创建新记录
new_token = WechatToken(
cachename="wechat_access_token",
token=token,
datetime=current_time
)
db.add(new_token)
db.commit()
print(f"创建了新的token记录: {token[:10]}...")
return True
except Exception as table_err:
print(f"检查表结构或保存token出错: {table_err}")
traceback.print_exc()
return False
except Exception as e:
print(f"保存token到数据库时出错: {e}")
traceback.print_exc()
return False
# 包装获取access_token的方法
original_get_access_token = robot.client.get_access_token
def get_custom_access_token():
"""获取access_token的包装方法,优先从微信API获取新token"""
try:
# 强制使用原始方法获取新的token
print("正在从微信API获取新的access_token...")
token = original_get_access_token()
# 如果获取成功,保存到数据库
if token:
try:
print(f"获取到新token: {token[:10]}...")
save_token_to_db(token, 7200)
except Exception as e:
print(f"保存token到数据库失败: {e}")
else:
print("从微信API获取token失败")
# 尝试从数据库获取备用token
db_token = get_token_from_db()
if db_token:
print("使用数据库中的备用token")
return db_token
return token
except Exception as e:
print(f"自定义获取token失败: {e}")
traceback.print_exc()
# 失败时尝试从数据库获取
try:
db_token = get_token_from_db()
if db_token:
return db_token
except:
print("所有token获取方法均失败")
return None
# 替换方法
robot.client.get_access_token = get_custom_access_token
# ---------------------------
# 下面是你的消息处理器
# ---------------------------
def log_msg(prefix, message):
logging.info(f"\n=== {prefix} ===\n{message}\n")
@robot.text
def handle_text_msg(message):
content = message.content.strip()
from_user = message.source
to_user = message.target
log_msg("文本消息", f"from={from_user}, to={to_user}, content={content}")
if any(kw in content for kw in ["你好", "您好", "hello"]):
return "您好!有什么能帮到您的吗?"
elif any(kw in content for kw in ["帮助", "怎么", "使用"]):
return "请点击菜单查看相关功能,如有问题请随时咨询。"
elif any(kw in content for kw in ["最新", "消息", "通知"]):
reply = ArticlesReply(message=message)
article = Article(
title="最新消息",
description="这是最新发布的消息内容",
img="https://example.com/some_image.jpg",
url="https://example.com/news"
)
reply.add_article(article)
return reply
else:
return f"您发送的消息已收到:{content}"
@robot.image
def handle_image_msg(message):
"""处理图片消息"""
try:
# 正确获取MediaId,检查多种可能的属性名
media_id = None
if hasattr(message, 'MediaId'):
media_id = message.MediaId
elif hasattr(message, 'media_id'):
media_id = message.media_id
elif hasattr(message, 'raw') and isinstance(message.raw, dict) and 'MediaId' in message.raw:
media_id = message.raw['MediaId']
# 检查图片URL
pic_url = None
if hasattr(message, 'PicUrl'):
pic_url = message.PicUrl
elif hasattr(message, 'pic_url'):
pic_url = message.pic_url
elif hasattr(message, 'img'):
pic_url = message.img
print(f"处理图片消息: MediaID={media_id}, URL={pic_url}")
# 返回友好回复
return f"图片已收到,感谢您的分享!"
except Exception as e:
print(f"处理图片消息出错: {e}")
print(traceback.format_exc())
return "系统正在处理您的图片,请稍后..."
@robot.voice
def handle_voice_msg(message):
"""处理语音消息"""
try:
# 正确获取MediaId和Recognition
media_id = None
recognition = None
format_type = None
# 直接访问属性而不是通过raw字典
if hasattr(message, 'MediaId'):
media_id = message.MediaId
elif hasattr(message, 'media_id'):
media_id = message.media_id
if hasattr(message, 'Recognition'):
recognition = message.Recognition
elif hasattr(message, 'recognition'):
recognition = message.recognition
if hasattr(message, 'Format'):
format_type = message.Format
elif hasattr(message, 'format'):
format_type = message.format
log_msg("语音消息", f"media_id={media_id}, recognition={recognition}, format={format_type}")
# 构建回复消息
reply = "已收到您的语音"
if recognition:
reply += f",识别内容:{recognition}"
return reply
except Exception as e:
print(f"处理语音消息出错: {e}")
print(traceback.format_exc())
return "已收到您发送的语音,我们会尽快处理!"
@robot.video
def handle_video_msg(message):
log_msg("视频消息", f"media_id={message.media_id}, thumb_media_id={message.thumb_media_id}")
return "已收到您发送的视频"
@robot.shortvideo
def handle_short_video_msg(message):
log_msg("小视频消息", f"media_id={message.media_id}, thumb_media_id={message.thumb_media_id}")
return "已收到您发送的小视频"
@robot.location
def handle_location_msg(message):
log_msg("位置消息", f"label={message.label}, coords=({message.location_x}, {message.location_y})")
return f"已收到您的位置: {message.label}"
@robot.link
def handle_link_msg(message):
log_msg("链接消息", f"title={message.title}, url={message.url}")
return f"已收到您分享的链接: {message.title}"
@robot.subscribe
def handle_subscribe(message):
log_msg("用户关注事件", f"用户 {message.source} 关注了公众号")
return "感谢您关注督学云平台!"
@robot.unsubscribe
def handle_unsubscribe(message):
log_msg("用户取消关注事件", f"用户 {message.source} 取关")
return ""
@robot.key_click("MENU_KEY_NOW")
def handle_click_menu_now(message):
log_msg("菜单点击事件", f"event_key=MENU_KEY_NOW, from={message.source}")
return "这里是最新消息"
@robot.view
def handle_view_event_msg(message):
"""处理链接点击事件"""
from_user = message.source
url = message.key
log_msg("链接点击事件", f"用户={from_user}, URL={url}")
# 对于view事件,不需要回复内容,返回空字符串
return ""
@robot.location_event
def handle_location_event_msg(message):
"""处理位置上报事件"""
from_user = message.source
latitude = getattr(message, 'latitude', 0) or 0
longitude = getattr(message, 'longitude', 0) or 0
precision = getattr(message, 'precision', 0) or 0
log_msg("位置上报事件", f"用户={from_user}, 经度={longitude}, 纬度={latitude}, 精度={precision}")
# 对于location事件,不需要回复内容,返回空字符串
return ""
@robot.handler
def handle_event(message):
log_msg("收到消息", f"类型={message.type}, from={message.source}")
if message.type == 'event':
evt = (message.event or "").lower()
log_msg("收到事件", f"event={evt}, from={message.source}")
if evt == "scan":
return "您已成功扫码"
elif evt == "view":
return "" # 对于view事件不需要回复
elif evt == "location":
return "" # 对于location事件不需要回复
else:
return f"收到事件: {evt}"
elif message.type.endswith('_event'):
# 处理其他_event类型消息
evt = message.type.replace('_event', '')
log_msg("收到事件消息", f"event={evt}, from={message.source}")
if evt == "subscribe":
return "感谢您关注督学云平台!"
elif evt == "unsubscribe":
return "" # 对于unsubscribe事件不需要回复
else:
return "" # 默认不需要回复
else:
log_msg("未知消息类型", f"type={message.type}")
return "收到未知类型的消息"
- 其他信息
Metadata
Metadata
Assignees
Labels
No labels