feat: handle some links directly and add report links features

This commit is contained in:
草师傅 2025-08-01 14:27:55 +08:00
parent fe9da1a819
commit c9762da447
5 changed files with 71 additions and 15 deletions

View file

@ -17,6 +17,7 @@ from core.link import handle_links
from core.post_to_fedi import handle_auth, handle_post_to_fedi
from core.promote import handle_promote_command
from core.repeater import MessageRepeater
from core.report_links import report_broken_links
from core.simple import handle_start_command, handle_baka, dummy_handler, handle_info_command
from core.actions import handle_actions, handle_reverse_actions
from core.stats import handle_stats_command
@ -55,15 +56,15 @@ class TelegramAdapter:
router.message(F.chat.type.in_({'group', 'supergroup'}) & F.sender_chat & (
F.sender_chat.type == 'channel') & F.is_automatic_forward)(
handle_unpin_channel_message)
# link 模块
router.message(Command('report_broken_links'))(report_broken_links)
router.message(F.text.contains('http') & ~F.text.contains('/report_broken_links'))(handle_links)
# repeater 模块
router.message(F.chat.type.in_({'group', 'supergroup'}))(MessageRepeater().handle_message)
# actions 模块
router.message(F.text.startswith('/'))(handle_actions)
router.message(F.text.startswith('\\'))(handle_reverse_actions)
router.message(F.text == '我是笨蛋')(handle_baka)
# link 模块
router.message(F.text.contains('http'))(handle_links)
# repeater 模块
router.message(F.chat.type.in_({'group', 'supergroup'}))(MessageRepeater().handle_message)
# 捕获所有其他消息
router.message(F.chat.type.in_({'group', 'supergroup'}))(dummy_handler)

View file

@ -1,6 +1,12 @@
# 管理员对应的 Telegram 用户 ID
# 你可以通过 /info 命令获取你的用户 ID
admin: 123456789
# 开发者对应的 Telegram 用户 ID
# 如果你希望 fork 这个项目并进行开发,你可以在这里更改成你的用户 ID用户不推荐更改这部分
# 这部分仅限于汇报链接跟踪参数去除的问题
dev: 616760897
# global features settings
features:

View file

@ -23,6 +23,10 @@ class Config:
"""Get admin user ID"""
return self.config_data.get('admin')
def get_developer_id(self) -> Optional[int]:
"""Get developer user ID"""
return self.config_data.get('dev')
def is_feature_enabled(self, feature_name: str, chat_id: Optional[int] = None) -> bool:
"""
Check if a feature is enabled for a specific chat or globally

View file

@ -1,5 +1,4 @@
import aiohttp
import requests
import re
import html
import asyncio
@ -44,7 +43,6 @@ async def extend_short_urls(url):
""" 扩展短链接 """
async with aiohttp.ClientSession() as session:
async with session.get(url,allow_redirects=False) as r:
if 'tb.cn' in urlparse(url).hostname:
# 淘宝短链接特殊处理
html_content = await r.text()
@ -52,7 +50,7 @@ async def extend_short_urls(url):
if not url:
return url
if r.status in [301, 302, 304, 307, 308] and 'Location' in r.headers:
if 'http' in r.headers['Location']:
if r.headers['Location'].startswith(('http://', 'https://')):
return r.headers['Location']
else:
# 如果 Location 头部没有 http 前缀,可能是相对路径
@ -131,7 +129,7 @@ def reserve_whitelisted_params(url):
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['www.bilibili.com','m.bilibili.com','bilibili.com','mall.bilibili.com','space.bilibili.com','live.bilibili.com']:
elif parsed_url.hostname in ['www.iesdouyin.com','www.bilibili.com','m.bilibili.com','bilibili.com','mall.bilibili.com','space.bilibili.com','live.bilibili.com']:
# 不保留任何参数
new_query_params = {}
# 重新构建URL
@ -149,21 +147,33 @@ def transform_into_fixed_url(url):
if parsed_url.hostname in ['bilibili.com', 'm.bilibili.com']:
# 把 bilibili 的链接转换为桌面端的 www.bilibili.com
return urlunparse(parsed_url._replace(netloc='www.bilibili.com'))
if parsed_url.hostname in ['www.iesdouyin.com']:
# 把抖音分享链接转换为正常的 www.douyin.com
return urlunparse(parsed_url._replace(netloc='www.douyin.com'))
return url
async def process_url(url):
# 首先清理跟踪参数
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com']:
final_url = reserve_whitelisted_params(url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
return final_url
# 对于其它的网站,首先清理跟踪参数
cleaned_url = remove_tracking_params(url)
# 扩展短链接
extended_url = await extend_short_urls(cleaned_url)
# 对于一些网站,只保留白名单中的参数
if urlparse(extended_url).hostname in ['item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
# 对于扩展短链接之后的适配的网站,直接保留白名单参数并返回
if urlparse(extended_url).hostname in ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com']:
final_url = reserve_whitelisted_params(extended_url)
if urlparse(extended_url).hostname in ['bilibili.com', 'm.bilibili.com']:
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
elif urlparse(extended_url).hostname in ['x.com', 'twitter.com']:
return final_url
if urlparse(extended_url).hostname in ['x.com', 'twitter.com']:
# 对于 Twitter 链接,转换为 fixupx.com
removed_tracking_url = remove_tracking_params(extended_url)
final_url = transform_into_fixed_url(removed_tracking_url)
@ -192,4 +202,6 @@ async def handle_links(message: Message):
# 回复处理后的链接
if final_urls:
await message.reply(f"{"\n".join(final_urls)}\n消息里有包含跟踪参数的链接,已经帮你转换了哦")
await message.reply(f"{"\n".join(final_urls)}\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n注意:"
f"这个功能是试验性的,可能会出现链接无法访问等问题,如果出现链接没有清理干净的情况,"
f"可以将返回的结果再次发送给bot或者尝试手动清理。\n如果你找到了这个工具的问题,欢迎把它通过 `/report_broken_links 链接` 报告给开发者!")

33
core/report_links.py Normal file
View file

@ -0,0 +1,33 @@
import re
from aiogram.types import Message
from config import config
async def report_broken_links(message: Message):
if not config.is_feature_enabled('link', message.chat.id):
return
# 获取被回复的消息中的链接
links = []
# 链接正则表达式
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
text = message.text or message.caption
# Extract URLs from message text
if text:
links = re.findall(url_pattern, text)
if not links:
await message.reply("没有找到链接。请提供链接以及希望得到的清理结果。格式最好是 `/report_broken_links 链接 描述文本`。")
return
# 处理报告逻辑(例如,保存到数据库或发送给开发者)
report_content = f"用户 {message.from_user.full_name} ({message.from_user.id}) 报告了以下链接的问题:\n"
report_content += "\n".join(links) + "\n"
report_content += f"描述:{text.split(' ')[2] if ' ' in text else text}\n"
# 将 report_content 发送到开发者
developer_id = config.get_developer_id() # 从配置获取开发者ID
await message.bot.send_message(chat_id=developer_id, text=report_content)
await message.reply("感谢您的报告,我们会尽快处理!")