Compare commits

..

6 commits

4 changed files with 100 additions and 22 deletions

View file

@ -9,6 +9,7 @@ async def handle_mc_status_command(message: Message):
args = message.text.replace('/mc', '').strip().split(' ') args = message.text.replace('/mc', '').strip().split(' ')
server_type = args[0] if args else 'java' server_type = args[0] if args else 'java'
server_address = args[1] if len(args) >= 2 else None server_address = args[1] if len(args) >= 2 else None
query_enabled = True if len(args) >= 3 and args[2] == 'query' else False
if not args: if not args:
await message.reply("Usage: /mc <java/bedrock> <server_address>\n" await message.reply("Usage: /mc <java/bedrock> <server_address>\n"
"Example: /mc java play.example.com") "Example: /mc java play.example.com")
@ -29,22 +30,24 @@ async def handle_mc_status_command(message: Message):
if server_type == 'java': if server_type == 'java':
try: try:
from mcstatus import JavaServer from mcstatus import JavaServer
server = JavaServer.lookup(server_address) server = await JavaServer.async_lookup(server_address)
status = server.status() status = await server.async_status()
query = None query = None
s_message = f"这个 Java 服务器"
# 尝试查询服务器信息 # 尝试查询服务器信息
try: if query_enabled:
query = server.query() try:
except Exception as e: query = await server.async_query()
logging.warning('查询 Minecraft 服务器遇到了错误',e) except Exception as e:
s_message = f"*我未能成功发送 query 请求,显示的结果可能有出入。*\n这个 Java 服务器" s_message = f"_我未能成功发送 query 请求可能是因为服务器未开放对应端口。_\n这个 Java 服务器"
logging.warning('查询 Minecraft 服务器遇到了错误',e)
if query: if query:
s_message = f"这个 Java 服务器使用了 {query.software.brand}({query.software.version})" s_message = f"这个 Java 服务器使用了 {query.software.brand}({query.software.version})"
s_message += f"{status.players.online}(/{status.players.max}) 人在线\n" s_message += f"{status.players.online}(/{status.players.max}) 人在线\n"
s_message += "延迟大约有 {:.2f} ms\n".format(status.latency) s_message += "延迟大约有 {:.2f} ms\n".format(status.latency)
s_message += f"服务器的 MOTD 是: ```\n{status.motd.to_minecraft()}\n```" s_message += f"服务器的 MOTD 是: ```\n{status.motd.to_minecraft()}\n```"
s_message += f"版本信息: {status.version.name} ({status.version.protocol})\n" s_message += f"版本信息: {status.version.name} ({status.version.protocol})\n"
s_message += f"你应该使用和上面的版本相同的 Minecraft 客户端连接这个服务器。\n" s_message += f"你应该使用和上面的版本相同的 Minecraft 客户端连接这个服务器。\n\n"
if query and query.software.plugins: if query and query.software.plugins:
s_message += f"服务器插件: {', '.join(query.software.plugins)}\n" s_message += f"服务器插件: {', '.join(query.software.plugins)}\n"
if query and query.players.names: if query and query.players.names:
@ -52,7 +55,7 @@ async def handle_mc_status_command(message: Message):
if status.forge_data: if status.forge_data:
s_message += f"看起来这是一个有 mod 的服务器。\n" s_message += f"看起来这是一个有 mod 的服务器。\n"
if status.enforces_secure_chat: if status.enforces_secure_chat:
s_message += "服务器启用了消息签名,这意味着你需要调整 No Chat Reports 等类似 mod 的设置。\n" s_message += "服务器启用了消息签名,这意味着你需要调整 No Chat Reports 等类似 mod 的设置。\n\n"
s_message += "声明这些结果均为服务器所公开的信息。查询结果仅代表bot所在的服务器对该服务器的查询结果可能与实际情况有出入。" s_message += "声明这些结果均为服务器所公开的信息。查询结果仅代表bot所在的服务器对该服务器的查询结果可能与实际情况有出入。"
await status_message.edit_text(s_message, parse_mode=ParseMode.MARKDOWN) await status_message.edit_text(s_message, parse_mode=ParseMode.MARKDOWN)
except Exception as e: except Exception as e:
@ -62,7 +65,7 @@ async def handle_mc_status_command(message: Message):
from mcstatus import BedrockServer from mcstatus import BedrockServer
server = BedrockServer.lookup(server_address) server = BedrockServer.lookup(server_address)
status = server.status() status = await server.async_status()
# 稍微汉化一下这个状态信息 # 稍微汉化一下这个状态信息
if status.gamemode == 'Survival': if status.gamemode == 'Survival':

View file

@ -2,6 +2,7 @@ from aiogram import BaseMiddleware
from aiogram.types import Message from aiogram.types import Message
from typing import Callable, Dict, Any, Awaitable from typing import Callable, Dict, Any, Awaitable
import json import json
from datetime import datetime, timedelta
class MessageStatsMiddleware(BaseMiddleware): class MessageStatsMiddleware(BaseMiddleware):
def __init__(self, stats_file: str = 'message_stats.json'): def __init__(self, stats_file: str = 'message_stats.json'):
@ -18,13 +19,19 @@ class MessageStatsMiddleware(BaseMiddleware):
if event.chat.type in ['group', 'supergroup']: if event.chat.type in ['group', 'supergroup']:
chat_id = str(event.chat.id) chat_id = str(event.chat.id)
user_id = str(event.from_user.id if event.from_user else 0) user_id = str(event.from_user.id if event.from_user else 0)
current_time = datetime.now().isoformat()
# 初始化统计数据 # 初始化统计数据
if chat_id not in self.stats: if chat_id not in self.stats:
self.stats[chat_id] = { self.stats[chat_id] = {
'total_messages': 0, 'total_messages': 0,
'users': {}, 'users': {},
'chat_title': event.chat.title 'chat_title': event.chat.title,
'messages_24h': {
'message_count': 0,
'active_users': {},
'messages': []
}
} }
if user_id not in self.stats[chat_id]['users']: if user_id not in self.stats[chat_id]['users']:
@ -48,29 +55,75 @@ class MessageStatsMiddleware(BaseMiddleware):
# 更新统计 # 更新统计
self.stats[chat_id]['total_messages'] += 1 self.stats[chat_id]['total_messages'] += 1
self.stats[chat_id]['users'][user_id]['message_count'] += 1 self.stats[chat_id]['users'][user_id]['message_count'] += 1
self.stats[chat_id]['messages_24h']['message_count'] += 1
# 更新活跃用户统计
if user_id not in self.stats[chat_id]['messages_24h']['active_users']:
self.stats[chat_id]['messages_24h']['active_users'][user_id] = 0
self.stats[chat_id]['messages_24h']['active_users'][user_id] += 1
# 添加24小时消息记录
message_record = {
'user_id': user_id,
'timestamp': current_time,
'type': 'message'
}
# 羡慕、我菜统计 # 羡慕、我菜统计
if event.text and any(keyword in event.text.lower() for keyword in ['xm','xmsl','羡慕','羡慕死了']): if event.text and any(keyword in event.text.lower() for keyword in ['xm','xmsl','羡慕','羡慕死了']):
if not self.stats[chat_id]['users'][user_id]['xm_count']: if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0 self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1 self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.sticker and event.sticker.file_unique_id in ['AQADhhcAAs1rgFVy']: if event.sticker and event.sticker.file_unique_id in ['AQADhhcAAs1rgFVy']:
if not self.stats[chat_id]['users'][user_id]['xm_count']: if not self.stats[chat_id]['users'][user_id]['xm_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0 self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['xm_count'] += 1 self.stats[chat_id]['users'][user_id]['xm_count'] += 1
message_record['special_type'] = 'xm'
if event.text and '我菜' in event.text: if event.text and '我菜' in event.text:
if not self.stats[chat_id]['users'][user_id]['wocai_count']: if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['xm_count'] = 0 self.stats[chat_id]['users'][user_id]['xm_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1 self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
if event.sticker and event.sticker.file_unique_id in ['AQAD6AUAAgGeUVZy']: if event.sticker and event.sticker.file_unique_id in ['AQAD6AUAAgGeUVZy']:
if not self.stats[chat_id]['users'][user_id]['wocai_count']: if not self.stats[chat_id]['users'][user_id]['wocai_count']:
self.stats[chat_id]['users'][user_id]['wocai_count'] = 0 self.stats[chat_id]['users'][user_id]['wocai_count'] = 0
self.stats[chat_id]['users'][user_id]['wocai_count'] += 1 self.stats[chat_id]['users'][user_id]['wocai_count'] += 1
message_record['special_type'] = 'wocai'
# 添加消息记录到24小时列表
self.stats[chat_id]['messages_24h']['messages'].append(message_record)
# 清理超过24小时的记录
self.cleanup_old_messages(chat_id)
# 保存统计数据 # 保存统计数据
self.save_stats() self.save_stats()
return await handler(event, data) return await handler(event, data)
def cleanup_old_messages(self, chat_id: str):
"""清理超过24小时的消息记录"""
if 'messages_24h' not in self.stats[chat_id]:
return
cutoff_time = datetime.now() - timedelta(hours=24)
self.stats[chat_id]['messages_24h']['messages'] = [
msg for msg in self.stats[chat_id]['messages_24h']['messages']
if datetime.fromisoformat(msg['timestamp']) > cutoff_time
]
# 更新消息计数和活跃用户列表
messages_24h = self.stats[chat_id]['messages_24h']['messages']
self.stats[chat_id]['messages_24h']['message_count'] = len(messages_24h)
# 重新计算活跃用户字典,统计每个用户的消息数量
active_users_dict = {}
for msg in messages_24h:
user_id = msg['user_id']
active_users_dict[user_id] = active_users_dict.get(user_id, 0) + 1
self.stats[chat_id]['messages_24h']['active_users'] = active_users_dict
def load_stats(self) -> dict: def load_stats(self) -> dict:
try: try:
with open(self.stats_file, 'r', encoding='utf-8') as f: with open(self.stats_file, 'r', encoding='utf-8') as f:

View file

@ -49,6 +49,7 @@ async def handle_tips_command(message: Message) -> None:
"bot 的功能可以被选择性的开启或者关闭,但是示例 bot 为了方便开发和测试,默认开启了所有功能", "bot 的功能可以被选择性的开启或者关闭,但是示例 bot 为了方便开发和测试,默认开启了所有功能",
"说真的,你应该去看看 @kmuav2bot", "说真的,你应该去看看 @kmuav2bot",
"任何一条 tips 消息都会在一分钟后自动消失,再也不用担心消息堆积了", "任何一条 tips 消息都会在一分钟后自动消失,再也不用担心消息堆积了",
"/mc 命令使用了 mcstatus 库来查询 Minecraft 服务器状态,而这个库曾经由 Dinnerbone 维护",
] ]
import random import random
response = random.choice(tips) response = random.choice(tips)

View file

@ -22,12 +22,19 @@ async def handle_stats_command(message: Message):
await message.reply("暂无统计数据") await message.reply("暂无统计数据")
return return
stats_message = await message.reply("正在生成统计信息...")
# 按消息数量排序用户 # 按消息数量排序用户
sorted_users = sorted( sorted_users = sorted(
stats['users'].items(), stats['users'].items(),
key=lambda x: x[1]['message_count'], key=lambda x: x[1]['message_count'],
reverse=True reverse=True
) )
sorted_24h_users = sorted(
[(user_id, stats['users'][user_id]) for user_id in stats.get('messages_24h', {}).get('active_users', [])],
key=lambda x: x[1]['message_count'],
reverse=True
)
sorted_most_xm_users = sorted( sorted_most_xm_users = sorted(
stats['users'].items(), stats['users'].items(),
key=lambda x: x[1].get('xm_count',0), key=lambda x: x[1].get('xm_count',0),
@ -39,26 +46,40 @@ async def handle_stats_command(message: Message):
reverse=True reverse=True
) )
# 构建统计消息 # 构建统计消息
text = f"📊 群组统计\n\n" text = f"📊 群组统计\n\n"
text += f"总消息数: {stats['total_messages']}\n" text += f"总消息数: {stats['total_messages']}\n"
text += f"活跃用户数: {len(stats['users'])}\n\n" text += f"24小时内消息数: {stats['messages_24h']['message_count']}\n"
text += f"活跃用户数: {len(stats['users'])}\n"
text += f"24小时内活跃用户数:{len(stats['messages_24h']['active_users'])}\n\n"
text += "🏆 发言排行榜:\n" text += "🏆 发言排行榜:\n"
text += "<blockquote expandable>"
for i, (user_id, user_data) in enumerate(sorted_users[:10], 1): for i, (user_id, user_data) in enumerate(sorted_users[:10], 1):
name = user_data['name'] or user_data['username'] or str(user_id) name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{i}. {name}: {user_data['message_count']}\n" text += f"{i}. {name}: {user_data['message_count']}\n"
text += "</blockquote>\n"
text += "📈 24小时内发言排行榜:\n"
text += "<blockquote expandable>"
for i, (user_id, user_data) in enumerate(sorted_24h_users[:10], 1):
name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{i}. {name}: {stats['messages_24h']['active_users'][user_id]}\n"
text += "</blockquote>\n"
if sorted_most_xm_users and any(user_data['xm_count'] > 0 for _, user_data in sorted_most_xm_users): if sorted_most_xm_users and any(user_data['xm_count'] > 0 for _, user_data in sorted_most_xm_users):
text += "\n🍋 羡慕统计:\n" text += "\n🍋 羡慕统计:\n"
for user_id, user_data in sorted_most_xm_users: text += "<blockquote expandable>"
if user_data['xm_count'] > 0: for user_id, user_data in sorted_most_xm_users:
name = user_data['name'] or user_data['username'] or str(user_id) if user_data['xm_count'] > 0:
text += f"{name}: {user_data['xm_count']} 次羡慕\n" name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{name}: {user_data['xm_count']} 次羡慕\n"
text += "</blockquote>\n"
if sorted_most_wocai_users and any(user_data['wocai_count'] > 0 for _, user_data in sorted_most_wocai_users): if sorted_most_wocai_users and any(user_data['wocai_count'] > 0 for _, user_data in sorted_most_wocai_users):
text += "\n🥬 卖菜统计:\n" text += "\n🥬 卖菜统计:\n"
for user_id, user_data in sorted_most_wocai_users: text += "<blockquote expandable>"
if user_data['wocai_count'] > 0: for user_id, user_data in sorted_most_wocai_users:
name = user_data['name'] or user_data['username'] or str(user_id) if user_data['wocai_count'] > 0:
text += f"{name}: {user_data['wocai_count']} 次卖菜\n" name = user_data['name'] or user_data['username'] or str(user_id)
text += f"{name}: {user_data['wocai_count']} 次卖菜\n"
text += "</blockquote>\n"
await message.reply(text) await stats_message.edit_text(text)