bot/core/middleware/stats.py

136 lines
No EOL
5.9 KiB
Python

from aiogram import BaseMiddleware
from aiogram.types import Message
from typing import Callable, Dict, Any, Awaitable
import json
from datetime import datetime, timedelta
class MessageStatsMiddleware(BaseMiddleware):
def __init__(self, stats_file: str = 'message_stats.json'):
self.stats_file = stats_file
self.stats = self.load_stats()
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
# 只统计群组消息
if event.chat.type in ['group', 'supergroup']:
chat_id = str(event.chat.id)
user_id = str(event.from_user.id if event.from_user else 0)
current_time = datetime.now().isoformat()
# 初始化统计数据
if chat_id not in self.stats:
self.stats[chat_id] = {
'total_messages': 0,
'users': {},
'chat_title': event.chat.title,
'messages_24h': {
'message_count': 0,
'active_users': {},
'messages': []
}
}
if user_id not in self.stats[chat_id]['users']:
name = 'Unknown'
if event.sender_chat:
if event.sender_chat.type in ['group','supergroup']:
# 如果是频道/群组匿名管理员消息,使用频道名称
name = f"{event.sender_chat.title} [admin]"
# 如果是频道/群组匿名管理员消息,使用频道名称
name = f"{event.sender_chat.title} [channel]"
elif event.from_user:
name = event.from_user.full_name
self.stats[chat_id]['users'][user_id] = {
'message_count': 0,
'xm_count': 0,
'wocai_count': 0,
'username': event.from_user.username if event.from_user else 'Unknown',
'name': name
}
# 更新统计
self.stats[chat_id]['total_messages'] += 1
self.stats[chat_id]['users'][user_id]['message_count'] += 1
self.stats[chat_id]['messages_24h']['message_count'] += 1
# 更新活跃用户统计
if user_id not in self.stats[chat_id]['messages_24h']['active_users']:
self.stats[chat_id]['messages_24h']['active_users'][user_id] = 0
self.stats[chat_id]['messages_24h']['active_users'][user_id] += 1
# 添加24小时消息记录
message_record = {
'user_id': user_id,
'timestamp': current_time,
'type': 'message'
}
# 羡慕、我菜统计
if event.text and any(keyword in event.text.lower() for keyword in ['xm','xmsl','羡慕','羡慕死了']):
if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.sticker and event.sticker.file_unique_id in ['AQADhhcAAs1rgFVy']:
if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.text and '我菜' in event.text:
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
if event.sticker and event.sticker.file_unique_id in ['AQAD6AUAAgGeUVZy']:
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['wocai_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
# 添加消息记录到24小时列表
self.stats[chat_id]['messages_24h']['messages'].append(message_record)
# 清理超过24小时的记录
self.cleanup_old_messages(chat_id)
# 保存统计数据
self.save_stats()
return await handler(event, data)
def cleanup_old_messages(self, chat_id: str):
"""清理超过24小时的消息记录"""
if 'messages_24h' not in self.stats[chat_id]:
return
cutoff_time = datetime.now() - timedelta(hours=24)
self.stats[chat_id]['messages_24h']['messages'] = [
msg for msg in self.stats[chat_id]['messages_24h']['messages']
if datetime.fromisoformat(msg['timestamp']) > cutoff_time
]
# 更新消息计数和活跃用户列表
messages_24h = self.stats[chat_id]['messages_24h']['messages']
self.stats[chat_id]['messages_24h']['message_count'] = len(messages_24h)
# 重新计算活跃用户字典,统计每个用户的消息数量
active_users_dict = {}
for msg in messages_24h:
user_id = msg['user_id']
active_users_dict[user_id] = active_users_dict.get(user_id, 0) + 1
self.stats[chat_id]['messages_24h']['active_users'] = active_users_dict
def load_stats(self) -> dict:
try:
with open(self.stats_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_stats(self):
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(self.stats, f, ensure_ascii=False, indent=2)