Compare commits

..

6 commits

4 changed files with 100 additions and 22 deletions

View file

@ -9,6 +9,7 @@ async def handle_mc_status_command(message: Message):
args = message.text.replace('/mc', '').strip().split(' ')
server_type = args[0] if args else 'java'
server_address = args[1] if len(args) >= 2 else None
query_enabled = True if len(args) >= 3 and args[2] == 'query' else False
if not args:
await message.reply("Usage: /mc <java/bedrock> <server_address>\n"
"Example: /mc java play.example.com")
@ -29,22 +30,24 @@ async def handle_mc_status_command(message: Message):
if server_type == 'java':
try:
from mcstatus import JavaServer
server = JavaServer.lookup(server_address)
status = server.status()
server = await JavaServer.async_lookup(server_address)
status = await server.async_status()
query = None
s_message = f"这个 Java 服务器"
# 尝试查询服务器信息
if query_enabled:
try:
query = server.query()
query = await server.async_query()
except Exception as e:
s_message = f"_我未能成功发送 query 请求可能是因为服务器未开放对应端口。_\n这个 Java 服务器"
logging.warning('查询 Minecraft 服务器遇到了错误',e)
s_message = f"*我未能成功发送 query 请求,显示的结果可能有出入。*\n这个 Java 服务器"
if query:
s_message = f"这个 Java 服务器使用了 {query.software.brand}({query.software.version})"
s_message += f"{status.players.online}(/{status.players.max}) 人在线\n"
s_message += "延迟大约有 {:.2f} ms\n".format(status.latency)
s_message += f"服务器的 MOTD 是: ```\n{status.motd.to_minecraft()}\n```"
s_message += f"版本信息: {status.version.name} ({status.version.protocol})\n"
s_message += f"你应该使用和上面的版本相同的 Minecraft 客户端连接这个服务器。\n"
s_message += f"你应该使用和上面的版本相同的 Minecraft 客户端连接这个服务器。\n\n"
if query and query.software.plugins:
s_message += f"服务器插件: {', '.join(query.software.plugins)}\n"
if query and query.players.names:
@ -52,7 +55,7 @@ async def handle_mc_status_command(message: Message):
if status.forge_data:
s_message += f"看起来这是一个有 mod 的服务器。\n"
if status.enforces_secure_chat:
s_message += "服务器启用了消息签名,这意味着你需要调整 No Chat Reports 等类似 mod 的设置。\n"
s_message += "服务器启用了消息签名,这意味着你需要调整 No Chat Reports 等类似 mod 的设置。\n\n"
s_message += "声明这些结果均为服务器所公开的信息。查询结果仅代表bot所在的服务器对该服务器的查询结果可能与实际情况有出入。"
await status_message.edit_text(s_message, parse_mode=ParseMode.MARKDOWN)
except Exception as e:
@ -62,7 +65,7 @@ async def handle_mc_status_command(message: Message):
from mcstatus import BedrockServer
server = BedrockServer.lookup(server_address)
status = server.status()
status = await server.async_status()
# 稍微汉化一下这个状态信息
if status.gamemode == 'Survival':

View file

@ -2,6 +2,7 @@ from aiogram import BaseMiddleware
from aiogram.types import Message
from typing import Callable, Dict, Any, Awaitable
import json
from datetime import datetime, timedelta
class MessageStatsMiddleware(BaseMiddleware):
def __init__(self, stats_file: str = 'message_stats.json'):
@ -18,13 +19,19 @@ class MessageStatsMiddleware(BaseMiddleware):
if event.chat.type in ['group', 'supergroup']:
chat_id = str(event.chat.id)
user_id = str(event.from_user.id if event.from_user else 0)
current_time = datetime.now().isoformat()
# 初始化统计数据
if chat_id not in self.stats:
self.stats[chat_id] = {
'total_messages': 0,
'users': {},
'chat_title': event.chat.title
'chat_title': event.chat.title,
'messages_24h': {
'message_count': 0,
'active_users': {},
'messages': []
}
}
if user_id not in self.stats[chat_id]['users']:
@ -48,29 +55,75 @@ class MessageStatsMiddleware(BaseMiddleware):
# 更新统计
self.stats[chat_id]['total_messages'] += 1
self.stats[chat_id]['users'][user_id]['message_count'] += 1
self.stats[chat_id]['messages_24h']['message_count'] += 1
# 更新活跃用户统计
if user_id not in self.stats[chat_id]['messages_24h']['active_users']:
self.stats[chat_id]['messages_24h']['active_users'][user_id] = 0
self.stats[chat_id]['messages_24h']['active_users'][user_id] += 1
# 添加24小时消息记录
message_record = {
'user_id': user_id,
'timestamp': current_time,
'type': 'message'
}
# 羡慕、我菜统计
if event.text and any(keyword in event.text.lower() for keyword in ['xm','xmsl','羡慕','羡慕死了']):
if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.sticker and event.sticker.file_unique_id in ['AQADhhcAAs1rgFVy']:
if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.text and '我菜' in event.text:
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
if event.sticker and event.sticker.file_unique_id in ['AQAD6AUAAgGeUVZy']:
if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['wocai_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
# 添加消息记录到24小时列表
self.stats[chat_id]['messages_24h']['messages'].append(message_record)
# 清理超过24小时的记录
self.cleanup_old_messages(chat_id)
# 保存统计数据
self.save_stats()
return await handler(event, data)
def cleanup_old_messages(self, chat_id: str):
"""清理超过24小时的消息记录"""
if 'messages_24h' not in self.stats[chat_id]:
return
cutoff_time = datetime.now() - timedelta(hours=24)
self.stats[chat_id]['messages_24h']['messages'] = [
msg for msg in self.stats[chat_id]['messages_24h']['messages']
if datetime.fromisoformat(msg['timestamp']) > cutoff_time
]
# 更新消息计数和活跃用户列表
messages_24h = self.stats[chat_id]['messages_24h']['messages']
self.stats[chat_id]['messages_24h']['message_count'] = len(messages_24h)
# 重新计算活跃用户字典,统计每个用户的消息数量
active_users_dict = {}
for msg in messages_24h:
user_id = msg['user_id']
active_users_dict[user_id] = active_users_dict.get(user_id, 0) + 1
self.stats[chat_id]['messages_24h']['active_users'] = active_users_dict
def load_stats(self) -> dict:
try:
with open(self.stats_file, 'r', encoding='utf-8') as f:

View file

@ -49,6 +49,7 @@ async def handle_tips_command(message: Message) -> None:
"bot 的功能可以被选择性的开启或者关闭,但是示例 bot 为了方便开发和测试,默认开启了所有功能",
"说真的,你应该去看看 @kmuav2bot",
"任何一条 tips 消息都会在一分钟后自动消失,再也不用担心消息堆积了",
"/mc 命令使用了 mcstatus 库来查询 Minecraft 服务器状态,而这个库曾经由 Dinnerbone 维护",
]
import random
response = random.choice(tips)

View file

@ -22,12 +22,19 @@ async def handle_stats_command(message: Message):
await message.reply("暂无统计数据")
return
stats_message = await message.reply("正在生成统计信息...")
# 按消息数量排序用户
sorted_users = sorted(
stats['users'].items(),
key=lambda x: x[1]['message_count'],
reverse=True
)
sorted_24h_users = sorted(
[(user_id, stats['users'][user_id]) for user_id in stats.get('messages_24h', {}).get('active_users', [])],
key=lambda x: x[1]['message_count'],
reverse=True
)
sorted_most_xm_users = sorted(
stats['users'].items(),
key=lambda x: x[1].get('xm_count',0),
@ -39,26 +46,40 @@ async def handle_stats_command(message: Message):
reverse=True
)
# 构建统计消息
text = f"📊 群组统计\n\n"
text += f"总消息数: {stats['total_messages']}\n"
text += f"活跃用户数: {len(stats['users'])}\n\n"
text += f"24小时内消息数: {stats['messages_24h']['message_count']}\n"
text += f"活跃用户数: {len(stats['users'])}\n"
text += f"24小时内活跃用户数:{len(stats['messages_24h']['active_users'])}\n\n"
text += "🏆 发言排行榜:\n"
text += "<blockquote expandable>"
for i, (user_id, user_data) in enumerate(sorted_users[:10], 1):
name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{i}. {name}: {user_data['message_count']}\n"
text += "</blockquote>\n"
text += "📈 24小时内发言排行榜:\n"
text += "<blockquote expandable>"
for i, (user_id, user_data) in enumerate(sorted_24h_users[:10], 1):
name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{i}. {name}: {stats['messages_24h']['active_users'][user_id]}\n"
text += "</blockquote>\n"
if sorted_most_xm_users and any(user_data['xm_count'] > 0 for _, user_data in sorted_most_xm_users):
text += "\n🍋 羡慕统计:\n"
text += "<blockquote expandable>"
for user_id, user_data in sorted_most_xm_users:
if user_data['xm_count'] > 0:
name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{name}: {user_data['xm_count']} 次羡慕\n"
text += "</blockquote>\n"
if sorted_most_wocai_users and any(user_data['wocai_count'] > 0 for _, user_data in sorted_most_wocai_users):
text += "\n🥬 卖菜统计:\n"
text += "<blockquote expandable>"
for user_id, user_data in sorted_most_wocai_users:
if user_data['wocai_count'] > 0:
name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{name}: {user_data['wocai_count']} 次卖菜\n"
text += "</blockquote>\n"
await message.reply(text)
await stats_message.edit_text(text)