init commit

This commit is contained in:
草师傅 2025-07-30 21:40:15 +08:00
commit 7ec4ee6c6f
19 changed files with 7155 additions and 0 deletions

23
core/actions.py Normal file
View file

@ -0,0 +1,23 @@
from aiogram.types import Message
from config import config
async def handle_actions(message: Message) -> None:
if not config.is_feature_enabled('actions', message.chat.id):
return
rawtext = message.text
# 防止识别成命令而被误触发
if rawtext.replace('/','',1).isalpha() or '@' in rawtext:
return
elif " " in message.text:
if rawtext.split(" ")[0].replace('/','',1).isalpha():
return
await message.reply(f"{message.from_user.mention_html()} {rawtext.split(" ")[0].replace('/','')}{message.reply_to_message.from_user.mention_html() if message.reply_to_message else '自己' } {''.join(rawtext.split(" ")[1:])}",disable_web_page_preview=True)
else:
await message.reply(f"{message.from_user.mention_html()} {message.text.replace('/','')}{message.reply_to_message.from_user.mention_html() if message.reply_to_message else '自己'}",disable_web_page_preview=True)
async def handle_reverse_actions(message: Message) -> None:
if not config.is_feature_enabled('actions', message.chat.id):
return
await message.reply(f"{message.from_user.mention_html()}{message.reply_to_message.from_user.mention_html()} {message.text.replace('\\','')}了!",disable_web_page_preview=True)

42
core/bitflip.py Normal file
View file

@ -0,0 +1,42 @@
from aiogram.types import Message
from config import config
def bitflip(text: str) -> str:
"""将文本中的0和1进行互换遇到 0.x 就 1-0.x"""
import re
def replace_func(match):
value = match.group()
try:
num = float(value)
if 0 < num < 1:
return str(1 - num)
if num == 1.0:
return str(0)
if num == 0.0:
return str(1)
else:
return value
except ValueError:
return value
flipped_text = re.sub(r'\d*\.?\d+', replace_func, text)
return flipped_text
async def handle_bitflip_command(message: Message) -> None:
if not config.is_feature_enabled('bitflip', message.chat.id):
return
"""获取回复的消息文本"""
if not message.reply_to_message or not message.reply_to_message.text:
await message.reply("请回复一条消息进行0/1翻转")
return
original_text = message.reply_to_message.text.replace("","").replace("","")
if "0.5" in original_text:
await message.reply_to_message.reply(f"确实,{original_text}")
return
flipped_text = bitflip(original_text)
await message.reply_to_message.reply(f"错误的,{flipped_text}")

109
core/link.py Normal file
View file

@ -0,0 +1,109 @@
import requests
import re
import html
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from abp.filters import parse_filterlist
from aiogram.types import Message
from config import config
def extend_short_urls(url):
""" 扩展短链接 """
r = requests.get(url)
if 'tb.cn' in urlparse(url).hostname:
# 淘宝短链接特殊处理
html_content = r.text
url = extract_tb_url_from_html(html_content)
if not url:
return url
if r.status_code != 200:
return url
elif r.status_code in [301,302,304,307,308]:
return r.headers['Location']
return url
def extract_tb_url_from_html(html_content):
# 使用正则表达式匹配 var url = '...' 的模式
pattern = r"var url = ['\"]([^'\"]+)['\"]"
match = re.search(pattern, html_content)
if match:
url = match.group(1)
# 解码HTML实体
decoded_url = html.unescape(url)
return decoded_url
return None
def remove_tracking_params(url):
""" 移除跟踪参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
tracking_params = []
with open('assets/LegitimateURLShortener.txt','r', encoding='utf-8') as f:
for line in parse_filterlist(f):
if hasattr(line, 'options') and line.options:
for option in line.options:
if option[0] == 'removeparam':
tracking_params.append(option[1])
for param in tracking_params:
query_params.pop(param, None)
# Rebuild the URL without tracking parameters
cleaned_query = urlencode(query_params, doseq=True)
cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query))
return cleaned_url
def reserve_whitelisted_params(url):
""" 保留白名单中的参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if parsed_url.hostname in ['item.taobao.com','detail.tmall.com','h5.m.goofish.com','music.163.com']:
if 'id' in query_params:
# 只保留id参数创建新的query_params
new_query_params = {'id': query_params['id']}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['mall.bilibili.com','space.bilibili.com','live.bilibili.com']:
# 只保留spm_id_from参数创建新的query_params
new_query_params = {}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
return url
def transform_into_fixed_url(url):
""" 转换为修复了链接预览的链接 """
parsed_url = urlparse(url)
if parsed_url.hostname in ['x.com', 'twitter.com']:
# 把 twitter 的链接转换为 fixupx.com
return urlunparse(parsed_url._replace(hostname='i.fixupx.com'))
return url
async def handle_links(message: Message):
if not config.is_feature_enabled('link', message.chat.id):
return
# URL regex pattern
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
# Extract URLs from message text
if message.text:
urls = re.findall(url_pattern, message.text)
for url in urls:
# Process each URL with your functions
cleaned_url = remove_tracking_params(url)
extended_url = extend_short_urls(cleaned_url)
only_wl_params_url = reserve_whitelisted_params(extended_url)
#untracked_url = remove_tracking_params(only_wl_params_url)
fixed_url = transform_into_fixed_url(only_wl_params_url)
# Do something with the processed URL
await message.reply(f"清理完成:\n{fixed_url}")

53
core/middleware/stats.py Normal file
View file

@ -0,0 +1,53 @@
from aiogram import BaseMiddleware
from aiogram.types import Message
from typing import Callable, Dict, Any, Awaitable
import json
class MessageStatsMiddleware(BaseMiddleware):
def __init__(self, stats_file: str = 'message_stats.json'):
self.stats_file = stats_file
self.stats = self.load_stats()
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
# 只统计群组消息
if event.chat.type in ['group', 'supergroup']:
chat_id = str(event.chat.id)
user_id = str(event.from_user.id if event.from_user else 0)
# 初始化统计数据
if chat_id not in self.stats:
self.stats[chat_id] = {
'total_messages': 0,
'users': {},
'chat_title': event.chat.title
}
if user_id not in self.stats[chat_id]['users']:
self.stats[chat_id]['users'][user_id] = {
'message_count': 0,
'username': event.from_user.username if event.from_user else 'Unknown',
'first_name': event.from_user.first_name if event.from_user else 'Unknown'
}
# 更新统计
self.stats[chat_id]['total_messages'] += 1
self.stats[chat_id]['users'][user_id]['message_count'] += 1
self.save_stats()
return await handler(event, data)
def load_stats(self) -> dict:
try:
with open(self.stats_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_stats(self):
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(self.stats, f, ensure_ascii=False, indent=2)

52
core/repeater.py Normal file
View file

@ -0,0 +1,52 @@
import aiogram.types
from collections import defaultdict
import time
from config import config
class MessageRepeater:
def __init__(self, message_expiry_seconds=3600): # 1 hour default
self.message_counts = defaultdict(lambda: defaultdict(int))
self.repeated_messages = defaultdict(set)
self.message_timestamps = defaultdict(lambda: defaultdict(float))
self.expiry_seconds = message_expiry_seconds
async def handle_message(self, message: aiogram.types.Message):
"""Handle incoming messages and repeat when a threshold is met"""
chat_id = message.chat.id
text = message.text
if not config.is_feature_enabled('repeater', message.chat.id):
return
if not text:
return
# Clean expired messages
self._clean_expired_messages(chat_id)
# Increment message count
self.message_counts[chat_id][text] += 1
self.message_timestamps[chat_id][text] = time.time()
# If a message appears twice and hasn't been repeated yet
if (self.message_counts[chat_id][text] >= 2 and
text not in self.repeated_messages[chat_id]):
# Mark as repeated and send the message
self.repeated_messages[chat_id].add(text)
await message.answer(text)
def _clean_expired_messages(self, chat_id: int):
"""Remove messages older than expiry_seconds"""
current_time = time.time()
expired_messages = []
for text, timestamp in self.message_timestamps[chat_id].items():
if current_time - timestamp > self.expiry_seconds:
expired_messages.append(text)
for text in expired_messages:
del self.message_counts[chat_id][text]
del self.message_timestamps[chat_id][text]
self.repeated_messages[chat_id].discard(text)

29
core/simple.py Normal file
View file

@ -0,0 +1,29 @@
from aiogram import html
from aiogram.types import Message
async def handle_start_command(message: Message) -> None:
"""Handle /start command"""
await message.answer(f"Hello, {html.bold(message.from_user.full_name)}!")
async def handle_baka(message: Message) -> None:
await message.reply(f"你是笨蛋")
async def handle_info_command(message: Message) -> None:
"""Handle /info command"""
user = message.from_user
chat = message.chat
response = (
f"User Info:\n"
f"Name: {html.bold(user.full_name)}\n"
f"Username: @{user.username if user.username else 'N/A'}\n"
f"User ID: {user.id}\n\n"
f"Chat Info:\n"
f"Chat Title: {html.bold(chat.title)}\n"
f"Chat ID: {chat.id}\n"
)
await message.reply(response)
async def dummy_handler(message: Message) -> None:
"""A handler to catch all other messages"""
pass

44
core/stats.py Normal file
View file

@ -0,0 +1,44 @@
from aiogram.types import Message
import json
from config import config
async def handle_stats_command(message: Message):
"""处理统计命令"""
if not config.is_feature_enabled('stats', message.chat.id):
return
if message.chat.type not in ['group', 'supergroup']:
await message.reply("此命令仅在群组中可用")
return
chat_id = str(message.chat.id)
try:
with open('message_stats.json', 'r', encoding='utf-8') as f:
stats = json.load(f).get(chat_id)
except (FileNotFoundError, json.JSONDecodeError):
stats = {}.get(chat_id)
print(stats)
if not stats:
await message.reply("暂无统计数据")
return
# 按消息数量排序用户
sorted_users = sorted(
stats['users'].items(),
key=lambda x: x[1]['message_count'],
reverse=True
)
# 构建统计消息
text = f"📊 群组统计\n\n"
text += f"总消息数: {stats['total_messages']}\n"
text += f"活跃用户数: {len(stats['users'])}\n\n"
text += "🏆 发言排行榜:\n"
for i, (user_id, user_data) in enumerate(sorted_users[:10], 1):
name = user_data['first_name'] or user_data['username'] or str(user_id)
text += f"{i}. {name}: {user_data['message_count']}\n"
await message.reply(text)

23
core/unpin.py Normal file
View file

@ -0,0 +1,23 @@
from aiogram.types import Message
from config import config
async def handle_unpin_channel_message(message: Message):
"""Handle unpinning messages from linked channels without a specific hashtag"""
if not config.is_feature_enabled('unpin', message.chat.id):
return
try:
regex_pattern = config.get_feature_config('unpin', message.chat.id)['regex']
print(regex_pattern)
# If a regex pattern exists, check if the message matches
if regex_pattern:
import re
if re.search(regex_pattern, message.text or message.caption or ""):
# Message matches regex, don't unpin
return
# Either no regex pattern or message doesn't match, proceed to unpin
print("trying to unpin the message")
await message.unpin()
except Exception as e:
print('Error unpinning message:', e)