136 lines
No EOL
5.9 KiB
Python
136 lines
No EOL
5.9 KiB
Python
from aiogram import BaseMiddleware
|
|
from aiogram.types import Message
|
|
from typing import Callable, Dict, Any, Awaitable
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
|
|
class MessageStatsMiddleware(BaseMiddleware):
|
|
def __init__(self, stats_file: str = 'message_stats.json'):
|
|
self.stats_file = stats_file
|
|
self.stats = self.load_stats()
|
|
|
|
async def __call__(
|
|
self,
|
|
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
|
|
event: Message,
|
|
data: Dict[str, Any]
|
|
) -> Any:
|
|
# 只统计群组消息
|
|
if event.chat.type in ['group', 'supergroup']:
|
|
chat_id = str(event.chat.id)
|
|
user_id = str(event.from_user.id if event.from_user else 0)
|
|
current_time = datetime.now().isoformat()
|
|
|
|
# 初始化统计数据
|
|
if chat_id not in self.stats:
|
|
self.stats[chat_id] = {
|
|
'total_messages': 0,
|
|
'users': {},
|
|
'chat_title': event.chat.title,
|
|
'messages_24h': {
|
|
'message_count': 0,
|
|
'active_users': {},
|
|
'messages': []
|
|
}
|
|
}
|
|
|
|
if user_id not in self.stats[chat_id]['users']:
|
|
name = 'Unknown'
|
|
if event.sender_chat:
|
|
if event.sender_chat.type in ['group','supergroup']:
|
|
# 如果是频道/群组匿名管理员消息,使用频道名称
|
|
name = f"{event.sender_chat.title} [admin]"
|
|
# 如果是频道/群组匿名管理员消息,使用频道名称
|
|
name = f"{event.sender_chat.title} [channel]"
|
|
elif event.from_user:
|
|
name = event.from_user.full_name
|
|
self.stats[chat_id]['users'][user_id] = {
|
|
'message_count': 0,
|
|
'xm_count': 0,
|
|
'wocai_count': 0,
|
|
'username': event.from_user.username if event.from_user else 'Unknown',
|
|
'name': name
|
|
}
|
|
|
|
# 更新统计
|
|
self.stats[chat_id]['total_messages'] += 1
|
|
self.stats[chat_id]['users'][user_id]['message_count'] += 1
|
|
self.stats[chat_id]['messages_24h']['message_count'] += 1
|
|
# 更新活跃用户统计
|
|
if user_id not in self.stats[chat_id]['messages_24h']['active_users']:
|
|
self.stats[chat_id]['messages_24h']['active_users'][user_id] = 0
|
|
self.stats[chat_id]['messages_24h']['active_users'][user_id] += 1
|
|
|
|
# 添加24小时消息记录
|
|
message_record = {
|
|
'user_id': user_id,
|
|
'timestamp': current_time,
|
|
'type': 'message'
|
|
}
|
|
|
|
# 羡慕、我菜统计
|
|
if event.text and any(keyword in event.text.lower() for keyword in ['xm','xmsl','羡慕','羡慕死了']):
|
|
if not self.stats[chat_id]['users'][user_id]['xm_count']:
|
|
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
|
|
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
|
|
message_record['special_type'] = 'xm'
|
|
|
|
if event.sticker and event.sticker.file_unique_id in ['AQADhhcAAs1rgFVy']:
|
|
if not self.stats[chat_id]['users'][user_id]['xm_count']:
|
|
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
|
|
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
|
|
message_record['special_type'] = 'xm'
|
|
|
|
if event.text and '我菜' in event.text:
|
|
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
|
|
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
|
|
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
|
|
message_record['special_type'] = 'wocai'
|
|
if event.sticker and event.sticker.file_unique_id in ['AQAD6AUAAgGeUVZy']:
|
|
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
|
|
self.stats[chat_id]['users'][user_id]['wocai_count'] = 0
|
|
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
|
|
message_record['special_type'] = 'wocai'
|
|
|
|
# 添加消息记录到24小时列表
|
|
self.stats[chat_id]['messages_24h']['messages'].append(message_record)
|
|
|
|
# 清理超过24小时的记录
|
|
self.cleanup_old_messages(chat_id)
|
|
|
|
# 保存统计数据
|
|
self.save_stats()
|
|
|
|
return await handler(event, data)
|
|
|
|
def cleanup_old_messages(self, chat_id: str):
|
|
"""清理超过24小时的消息记录"""
|
|
if 'messages_24h' not in self.stats[chat_id]:
|
|
return
|
|
|
|
cutoff_time = datetime.now() - timedelta(hours=24)
|
|
self.stats[chat_id]['messages_24h']['messages'] = [
|
|
msg for msg in self.stats[chat_id]['messages_24h']['messages']
|
|
if datetime.fromisoformat(msg['timestamp']) > cutoff_time
|
|
]
|
|
|
|
# 更新消息计数和活跃用户列表
|
|
messages_24h = self.stats[chat_id]['messages_24h']['messages']
|
|
self.stats[chat_id]['messages_24h']['message_count'] = len(messages_24h)
|
|
# 重新计算活跃用户字典,统计每个用户的消息数量
|
|
active_users_dict = {}
|
|
for msg in messages_24h:
|
|
user_id = msg['user_id']
|
|
active_users_dict[user_id] = active_users_dict.get(user_id, 0) + 1
|
|
self.stats[chat_id]['messages_24h']['active_users'] = active_users_dict
|
|
|
|
def load_stats(self) -> dict:
|
|
try:
|
|
with open(self.stats_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
def save_stats(self):
|
|
with open(self.stats_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.stats, f, ensure_ascii=False, indent=2) |