Skip to content

Fastapi的插件WeRobot无法成功发送合并的新闻消息 #818

@gaoheran

Description

@gaoheran
  • 对 Bug 的描述
    • 当前行为:Fastapi的插件WeRobot无法成功发送合并的新闻消息
      我已经测试多次了

这个是我的FastApi的WeRobot项目通过点击微信公众号菜单返回的结果,从代码中看,它是先发合并的新闻消息,当失败后,错误代码是45008的时候,转逐条发送,下面的图片就是结果
Image
而下面这张图片是用的类似的方法,项目是thinkPHP5+EasyWechat完成的,用这个就可以正常发送合并的新闻消息
Image

  • 正确的行为:应当在微信公众号中收到合并到一条消息的,且包含多条新闻,只要不超过8条都行。

  • 环境

    • 平台:Windows 10
    • WeRoBot 版本号: 1.13.1
      description : WeRoBot: writing WeChat Offical Account Robots with fun
    • Python 版本:Python 3.11.3
  • 复现代码或 repo 链接

from app.services.wechat_robot import robot  # 引入WeRoBot实例(带DB存储token)

# 移除所有可能的默认处理器,防止产生额外的被动回复
robot.handlers = {}
robot.click_handlers = {}


#核心代码:
articles = msg_data.get("articles", [])[:8]  # 限制最多8篇
print(f"尝试发送 {len(articles)} 篇图文消息")
print(f"图文消息内容: {articles}")

if articles:
    # 将我们的文章格式转换为WeRobot需要的格式
    formatted_articles = []
    for article in articles:
        # 调试输出原始图片URL
        pic_url = article.get("picurl", "")
        print(f"原始图片URL: {pic_url}")
        
        # 同时保留picurl和image字段,确保至少一个能正常工作
        formatted_article = {
            "title": article.get("title", ""),
            "description": article.get("description", ""),
            "url": article.get("url", ""),
            "picurl": pic_url       # 微信API文档使用picurl
        }
        formatted_articles.append(formatted_article)
    
    # 打印完整的格式化后数据,用于调试
    print(f"格式化后的文章数据: {formatted_articles[:1]}")
    
    try:
        # 添加超时保护
        async def send_articles_with_timeout():
            try:
                return await asyncio.wait_for(
                    asyncio.to_thread(robot.client.send_article_message, openid, formatted_articles),
                    timeout=5.0
                )
            except asyncio.TimeoutError:
                print(f"发送图文消息超时")
                return {"errcode": -1, "errmsg": "timeout"}
        
        # 尝试合并发送
        result = await send_articles_with_timeout()
        print(f"通过WeRobot发送图文消息,标题: {articles[0].get('title', '')[:30]}...")
    except Exception as article_err:
        # 检查是否是"article size out of limit"错误
        error_msg = str(article_err)
        print(f"合并发送失败: {error_msg}")
        #return False
        
        if "45008" in error_msg and "article size out of limit" in error_msg:
            print("检测到文章大小超限,改为逐条发送...")
            success_count = 0
            
            # 逐条发送
            for i, article in enumerate(formatted_articles):
                try:
                    # 添加超时保护
                    async def send_single_article_with_timeout():
                        try:
                            return await asyncio.wait_for(
                                asyncio.to_thread(robot.client.send_article_message, openid, [article]),
                                timeout=5.0
                            )
                        except asyncio.TimeoutError:
                            print(f"发送单篇文章超时")
                            return {"errcode": -1, "errmsg": "timeout"}
                    
                    single_result = await send_single_article_with_timeout()
                    if not isinstance(single_result, dict) or single_result.get("errcode", 0) == 0:
                        print(f"第 {i+1}/{len(formatted_articles)} 篇发送成功: {article.get('title', '')[:15]}...")
                        success_count += 1
                    else:
                        print(f"第 {i+1} 篇发送失败: {single_result}")
                    
                    # 防止发送过快
                    if i < len(formatted_articles) - 1:
                        await asyncio.sleep(0.5)
                    if i == 2: # 只发送3篇
                        break
                except Exception as single_err:
                    print(f"第 {i+1} 篇发送失败: {single_err}")
            
            if success_count > 0:
                print(f"逐条发送成功 {success_count}/{len(formatted_articles)} 篇")
                return True
            else:
                print("所有文章逐条发送均失败")
                return False
        else:
            # 其他错误,直接报告失败
            print(f"发送图文消息失败,非大小限制错误: {error_msg}")
            return False
else:
    print("图文消息内容为空")
    return False
# 请在这里给出 bug 的复现代码。如有必要,可以创建一个复现 repo 并将链接粘贴到这里。

这个最终总是走错误:45008,然后进行逐条发送,逐条发送能成功发送3条。

  • 复现步骤
    1建立
# app/services/wechat_robot.py

import datetime
import time
import json
import logging
from sqlalchemy.orm import Session
from werobot import WeRoBot
from werobot.client import Client
from app.db.session import SessionLocal_share
from app.models.share_wechat_token import WechatToken
from app.core.config import (
    WECHAT_TOKEN, WECHAT_APPID, WECHAT_SECRET, WECHAT_ENCODING_AES_KEY
)
from werobot.replies import ArticlesReply, Article
import requests
import traceback
from sqlalchemy import text

# ======================================================
# 初始化 WeRoBot 实例
# ======================================================
robot = WeRoBot(
    token=WECHAT_TOKEN,
    encoding_aes_key=WECHAT_ENCODING_AES_KEY,
    app_id=WECHAT_APPID,
    app_secret=WECHAT_SECRET
)

# 添加自定义函数从数据库获取token
def get_token_from_db():
    """从数据库获取token"""
    try:
        with SessionLocal_share() as db:
            # 仅为调试目的获取表结构信息,保留原生SQL
            try:
                describe_result = db.execute(text("DESCRIBE share.wechat_token")).fetchall()
                columns = [row[0] for row in describe_result]
                print(f"wechat_token表的列名: {columns}")
            except Exception as e:
                print(f"获取表结构失败: {e}")
                columns = []
            
            # 使用ORM方式查询最新的token记录
            token_record = db.query(WechatToken)\
                .filter(WechatToken.cachename == 'wechat_jsapi_ticket')\
                .order_by(WechatToken.id.desc())\
                .first()
            
            if token_record:
                # 获取token值
                token_value = token_record.token
                datetime_value = token_record.datetime if hasattr(token_record, 'datetime') else None
                
                # 检查token值
                if not token_value:
                    print("数据库中的token为空")
                    return None
                
                # 检查是否是PHP序列化格式
                if isinstance(token_value, str) and 'a:' in token_value and '"value";' in token_value:
                    try:
                        # 尝试从PHP序列化字符串中提取token值
                        start_marker = '"value";s:'
                        start_pos = token_value.find(start_marker)
                        if start_pos > 0:
                            # 找到值长度标记
                            start_pos = token_value.find(':', start_pos + len(start_marker)) + 1
                            # 找到引号开始的位置
                            start_pos = token_value.find('"', start_pos) + 1
                            # 找到结束引号的位置
                            end_pos = token_value.find('"', start_pos)
                            # 提取token
                            if start_pos > 0 and end_pos > start_pos:
                                extracted_token = token_value[start_pos:end_pos]
                                print(f"从PHP序列化数据中提取到token: {extracted_token[:10]}...")
                                
                                # 检查过期时间
                                try:
                                    expired_marker = '"expired";i:'
                                    exp_start_pos = token_value.find(expired_marker)
                                    if exp_start_pos > 0:
                                        exp_start_pos += len(expired_marker)
                                        exp_end_pos = token_value.find(';', exp_start_pos)
                                        if exp_end_pos > exp_start_pos:
                                            expires_at = int(token_value[exp_start_pos:exp_end_pos])
                                            current_time = int(time.time())
                                            if expires_at > current_time:
                                                return extracted_token
                                            else:
                                                print(f"Token已过期: {expires_at} < {current_time}")
                                except Exception as exp_err:
                                    print(f"提取过期时间失败: {exp_err}")
                    except Exception as parse_err:
                        print(f"解析PHP序列化token失败: {parse_err}")
                else:
                    # 不是PHP序列化格式的token,直接检查微信API token
                    # 我们现在不相信数据库的token,强制获取新的
                    print(f"数据库中的token可能已经过期,将获取新token")
                    return None
            else:
                print("未找到cachename为'wechat_jsapi_ticket'的记录")
            
            # 找不到有效token
            return None
    except Exception as e:
        print(f"从数据库获取token时出错: {e}")
        traceback.print_exc()
        return None

# 添加自定义函数保存token到数据库
def save_token_to_db(token, expires_in):
    """将token保存到数据库"""
    try:
        # 首先检查表结构(调试目的)
        with SessionLocal_share() as db:
            try:
                # 保留原生SQL仅用于调试表结构
                describe_result = db.execute(text("DESCRIBE share.wechat_token")).fetchall()
                columns = [row[0] for row in describe_result]
                print(f"保存token时的表结构: {columns}")

                # 获取当前时间
                current_time = datetime.datetime.now()
                
                # 使用ORM检查是否存在记录
                existing_record = db.query(WechatToken)\
                    .filter(WechatToken.cachename == 'wechat_access_token')\
                    .first()
                
                if existing_record:
                    # 更新现有记录
                    existing_record.token = token
                    existing_record.datetime = current_time
                    db.commit()
                    print(f"更新了现有的token记录: {token[:10]}...")
                else:
                    # 创建新记录
                    new_token = WechatToken(
                        cachename="wechat_access_token",
                        token=token,
                        datetime=current_time
                    )
                    db.add(new_token)
                    db.commit()
                    print(f"创建了新的token记录: {token[:10]}...")
                
                return True
                
            except Exception as table_err:
                print(f"检查表结构或保存token出错: {table_err}")
                traceback.print_exc()
                return False
    except Exception as e:
        print(f"保存token到数据库时出错: {e}")
        traceback.print_exc()
        return False

# 包装获取access_token的方法
original_get_access_token = robot.client.get_access_token

def get_custom_access_token():
    """获取access_token的包装方法,优先从微信API获取新token"""
    try:
        # 强制使用原始方法获取新的token
        print("正在从微信API获取新的access_token...")
        token = original_get_access_token()
        
        # 如果获取成功,保存到数据库
        if token:
            try:
                print(f"获取到新token: {token[:10]}...")
                save_token_to_db(token, 7200)
            except Exception as e:
                print(f"保存token到数据库失败: {e}")
        else:
            print("从微信API获取token失败")
            # 尝试从数据库获取备用token
            db_token = get_token_from_db()
            if db_token:
                print("使用数据库中的备用token")
                return db_token
        
        return token
    except Exception as e:
        print(f"自定义获取token失败: {e}")
        traceback.print_exc()
        # 失败时尝试从数据库获取
        try:
            db_token = get_token_from_db()
            if db_token:
                return db_token
        except:
            print("所有token获取方法均失败")
        return None

# 替换方法
robot.client.get_access_token = get_custom_access_token

# ---------------------------
# 下面是你的消息处理器
# ---------------------------

def log_msg(prefix, message):
    logging.info(f"\n=== {prefix} ===\n{message}\n")

@robot.text
def handle_text_msg(message):
    content = message.content.strip()
    from_user = message.source
    to_user = message.target
    log_msg("文本消息", f"from={from_user}, to={to_user}, content={content}")

    if any(kw in content for kw in ["你好", "您好", "hello"]):
        return "您好!有什么能帮到您的吗?"

    elif any(kw in content for kw in ["帮助", "怎么", "使用"]):
        return "请点击菜单查看相关功能,如有问题请随时咨询。"

    elif any(kw in content for kw in ["最新", "消息", "通知"]):
        reply = ArticlesReply(message=message)
        article = Article(
            title="最新消息",
            description="这是最新发布的消息内容",
            img="https://example.com/some_image.jpg",
            url="https://example.com/news"
        )
        reply.add_article(article)
        return reply
    else:
        return f"您发送的消息已收到:{content}"

@robot.image
def handle_image_msg(message):
    """处理图片消息"""
    try:
        # 正确获取MediaId,检查多种可能的属性名
        media_id = None
        if hasattr(message, 'MediaId'):
            media_id = message.MediaId
        elif hasattr(message, 'media_id'):
            media_id = message.media_id
        elif hasattr(message, 'raw') and isinstance(message.raw, dict) and 'MediaId' in message.raw:
            media_id = message.raw['MediaId']
        
        # 检查图片URL
        pic_url = None
        if hasattr(message, 'PicUrl'):
            pic_url = message.PicUrl
        elif hasattr(message, 'pic_url'):
            pic_url = message.pic_url
        elif hasattr(message, 'img'):
            pic_url = message.img
            
        print(f"处理图片消息: MediaID={media_id}, URL={pic_url}")
        
        # 返回友好回复
        return f"图片已收到,感谢您的分享!"
    except Exception as e:
        print(f"处理图片消息出错: {e}")
        print(traceback.format_exc())
        return "系统正在处理您的图片,请稍后..."

@robot.voice
def handle_voice_msg(message):
    """处理语音消息"""
    try:
        # 正确获取MediaId和Recognition
        media_id = None
        recognition = None
        format_type = None
        
        # 直接访问属性而不是通过raw字典
        if hasattr(message, 'MediaId'):
            media_id = message.MediaId
        elif hasattr(message, 'media_id'):
            media_id = message.media_id
        
        if hasattr(message, 'Recognition'):
            recognition = message.Recognition
        elif hasattr(message, 'recognition'):
            recognition = message.recognition
            
        if hasattr(message, 'Format'):
            format_type = message.Format
        elif hasattr(message, 'format'):
            format_type = message.format
            
        log_msg("语音消息", f"media_id={media_id}, recognition={recognition}, format={format_type}")
        
        # 构建回复消息
        reply = "已收到您的语音"
        if recognition:
            reply += f",识别内容:{recognition}"
            
        return reply
    except Exception as e:
        print(f"处理语音消息出错: {e}")
        print(traceback.format_exc())
        return "已收到您发送的语音,我们会尽快处理!"

@robot.video
def handle_video_msg(message):
    log_msg("视频消息", f"media_id={message.media_id}, thumb_media_id={message.thumb_media_id}")
    return "已收到您发送的视频"

@robot.shortvideo
def handle_short_video_msg(message):
    log_msg("小视频消息", f"media_id={message.media_id}, thumb_media_id={message.thumb_media_id}")
    return "已收到您发送的小视频"

@robot.location
def handle_location_msg(message):
    log_msg("位置消息", f"label={message.label}, coords=({message.location_x}, {message.location_y})")
    return f"已收到您的位置: {message.label}"

@robot.link
def handle_link_msg(message):
    log_msg("链接消息", f"title={message.title}, url={message.url}")
    return f"已收到您分享的链接: {message.title}"

@robot.subscribe
def handle_subscribe(message):
    log_msg("用户关注事件", f"用户 {message.source} 关注了公众号")
    return "感谢您关注督学云平台!"

@robot.unsubscribe
def handle_unsubscribe(message):
    log_msg("用户取消关注事件", f"用户 {message.source} 取关")
    return ""

@robot.key_click("MENU_KEY_NOW")
def handle_click_menu_now(message):
    log_msg("菜单点击事件", f"event_key=MENU_KEY_NOW, from={message.source}")
    return "这里是最新消息"

@robot.view
def handle_view_event_msg(message):
    """处理链接点击事件"""
    from_user = message.source
    url = message.key
    log_msg("链接点击事件", f"用户={from_user}, URL={url}")
    # 对于view事件,不需要回复内容,返回空字符串
    return ""

@robot.location_event
def handle_location_event_msg(message):
    """处理位置上报事件"""
    from_user = message.source
    latitude = getattr(message, 'latitude', 0) or 0
    longitude = getattr(message, 'longitude', 0) or 0
    precision = getattr(message, 'precision', 0) or 0
    log_msg("位置上报事件", f"用户={from_user}, 经度={longitude}, 纬度={latitude}, 精度={precision}")
    # 对于location事件,不需要回复内容,返回空字符串
    return ""

@robot.handler
def handle_event(message):
    log_msg("收到消息", f"类型={message.type}, from={message.source}")
    
    if message.type == 'event':
        evt = (message.event or "").lower()
        log_msg("收到事件", f"event={evt}, from={message.source}")

        if evt == "scan":
            return "您已成功扫码"
        elif evt == "view":
            return ""  # 对于view事件不需要回复
        elif evt == "location":
            return ""  # 对于location事件不需要回复
        else:
            return f"收到事件: {evt}"
    elif message.type.endswith('_event'):
        # 处理其他_event类型消息
        evt = message.type.replace('_event', '')
        log_msg("收到事件消息", f"event={evt}, from={message.source}")
        
        if evt == "subscribe":
            return "感谢您关注督学云平台!"
        elif evt == "unsubscribe":
            return ""  # 对于unsubscribe事件不需要回复
        else:
            return ""  # 默认不需要回复
    else:
        log_msg("未知消息类型", f"type={message.type}")
        return "收到未知类型的消息"
  • 其他信息

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions