Compare commits

...

5 commits

4 changed files with 75 additions and 24 deletions

View file

@ -22,7 +22,7 @@ from core.promote import handle_promote_command
from core.repeater import MessageRepeater
from core.report_links import report_broken_links
from core.simple import handle_start_command, handle_baka, dummy_handler, handle_info_command, handle_ping_command, \
handle_tips_command, handle_about_command
handle_tips_command, handle_about_command, handle_nexusmods_id
from core.actions import handle_actions, handle_reverse_actions
from core.stats import handle_stats_command
from core.middleware.stats import MessageStatsMiddleware
@ -77,6 +77,7 @@ class TelegramAdapter:
handle_unpin_channel_message)
# repeater 模块
repeater_router.message(F.chat.type.in_({'group', 'supergroup'}))(MessageRepeater().handle_message)
router.message(F.text.regexp(r'(n|N) ?网尾号 ?[0-9]*'))(handle_nexusmods_id)
router.message(F.text == '我是笨蛋')(handle_baka)
router.inline_query()(handle_inline_query)
# 捕获所有其他消息

View file

@ -2,6 +2,8 @@ from aiogram.enums import ParseMode
from aiogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent
from aiogram.utils.formatting import Text, ExpandableBlockQuote
from core.link import clean_link_in_text
async def handle_inline_query(query: InlineQuery):
"""
@ -11,7 +13,7 @@ async def handle_inline_query(query: InlineQuery):
"""
print(f"Received inline query")
query_text = query.query
if query_text == "":
if not query_text:
await query.answer(results=[
InlineQueryResultArticle(
id="1",
@ -26,7 +28,7 @@ async def handle_inline_query(query: InlineQuery):
return
if query_text.startswith("search"):
search_query = query_text.replace("search", "").strip()
search_query = query_text.replace("search", "",1).strip()
if search_query:
await query.answer(results=[
InlineQueryResultArticle(
@ -63,7 +65,7 @@ async def handle_inline_query(query: InlineQuery):
return
if query_text.startswith("pg"):
text = query_text.replace("pg", "").strip()
text = query_text.replace("pg", "",1).strip()
import pangu
text = pangu.spacing_text(text)
await query.answer(results=[
@ -183,6 +185,34 @@ async def handle_inline_query(query: InlineQuery):
)
], cache_time=0)
return
if "http" in query_text:
# 实现清理 URL 的功能
cleaned_links = await clean_link_in_text(query_text)
if cleaned_links:
result = '\n\n'.join(cleaned_links)
await query.answer(results=[
InlineQueryResultArticle(
id="1",
title="清理后的链接",
input_message_content=InputTextMessageContent(
message_text=Text(ExpandableBlockQuote(result)).as_markdown(),
parse_mode=ParseMode.MARKDOWN
),
description=f"发送清理后的链接:{result}"
)
], cache_time=0)
else:
await query.answer(results=[
InlineQueryResultArticle(
id="1",
title="似乎没有链接需要被清理",
input_message_content=InputTextMessageContent(
message_text=query_text,
parse_mode=None
),
description="发送原始文本")
], cache_time=0)
return
if query_text.startswith("将军"):
# fallback support for users who forget the colon
if not query_text.startswith('将军:'):
@ -223,7 +253,9 @@ async def handle_inline_query(query: InlineQuery):
else:
from helpers.songs import fetch_from_b23_api
# 如果没有在本地找到歌曲,则尝试从 Bilibili API 获取
result = await fetch_from_b23_api(keywords)
#result = await fetch_from_b23_api(keywords)
result = None
# 因为 B 站的搜索 API 经常失效,所以这里暂时注释掉
if result:
song_name, song_link = result
await query.answer(results=[

View file

@ -14,11 +14,13 @@ from config import config
whitelist_param_links = ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com','item.m.jd.com','item.jd.com',
'www.xiaohongshu.com','zhuanlan.zhihu.com','www.baidu.com','www.youtube.com',
'www.xiaohongshu.com','zhuanlan.zhihu.com','www.baidu.com','m.youtube.com','www.youtube.com',
'music.youtube.com','youtu.be']
has_self_redirection_links = ['www.cnbeta.com.tw','m.cnbeta.com.tw','www.landiannews.com', 'www.bilibili.com']
has_better_alternative_links = ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com', 'youtu.be','m.youtube.com','x.com', 'twitter.com']
def matches_adb_selector(url, selector):
"""Check if URL matches the given selector"""
if selector['type'] == 'url-pattern':
@ -182,7 +184,7 @@ def reserve_whitelisted_params(url):
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['www.baidu.com','www.youtube.com','music.youtube.com','youtu.be']:
elif parsed_url.hostname in ['www.baidu.com','m.youtube.com','www.youtube.com','music.youtube.com','youtu.be']:
new_query_params = {}
if parsed_url.hostname == 'www.baidu.com' and 'wd' in query_params:
# 百度搜索链接保留 wd 参数
@ -206,7 +208,7 @@ def reserve_whitelisted_params(url):
return url
def transform_into_fixed_url(url):
""" 转换为修复了链接预览的链接 """
""" 转换为修复的链接 """
parsed_url = urlparse(url)
if parsed_url.hostname in ['x.com', 'twitter.com']:
@ -218,6 +220,9 @@ def transform_into_fixed_url(url):
if parsed_url.hostname in ['www.iesdouyin.com']:
# 把抖音分享链接转换为正常的 www.douyin.com
return urlunparse(parsed_url._replace(netloc='www.douyin.com'))
if parsed_url.hostname in ['youtu.be','m.youtube.com']:
# 把 youtu.be 和 m.youtube.com 的链接转换为 www.youtube.com
return urlunparse(parsed_url._replace(netloc='www.youtube.com'))
return url
async def process_url(url):
@ -228,7 +233,7 @@ async def process_url(url):
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
if urlparse(final_url).hostname in has_better_alternative_links:
final_url = transform_into_fixed_url(final_url)
if url != final_url:
return final_url
@ -246,12 +251,14 @@ async def process_url(url):
# 对于扩展短链接之后的适配的网站,直接保留白名单参数并返回
if urlparse(extended_url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(extended_url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
if urlparse(final_url).hostname in has_better_alternative_links:
final_url = transform_into_fixed_url(final_url)
if url != final_url:
return final_url
if urlparse(extended_url).hostname in ['x.com', 'twitter.com']:
# 对于 Twitter 链接,转换为 fixupx.com
else:
# 链接没有变化,直接返回 None避免重复处理
return None
if urlparse(extended_url).hostname in has_better_alternative_links:
removed_tracking_url = remove_tracking_params(extended_url)
final_url = transform_into_fixed_url(removed_tracking_url)
else:
@ -261,23 +268,29 @@ async def process_url(url):
return final_url
return None
async def clean_link_in_text(text):
# URL regex pattern
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
urls = re.findall(url_pattern, text)
if not urls:
return None
final_urls = await asyncio.gather(*[process_url(url) for url in urls])
# Filter out None values
final_urls = [url for url in final_urls if url is not None]
return final_urls
async def handle_links(message: Message):
if not config.is_feature_enabled('link', message.chat.id):
return
# URL regex pattern
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
text = message.text or message.caption
# Extract URLs from message text
if text:
urls = re.findall(url_pattern, text)
if not urls:
return
final_urls = await asyncio.gather(*[process_url(url) for url in urls])
# Filter out None values
final_urls = [url for url in final_urls if url is not None]
final_urls = await clean_link_in_text(text)
# 回复处理后的链接
if final_urls:
await message.reply(f"<blockquote expandable>{"\n\n".join(final_urls)}\n</blockquote>\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n"
f"注意:这个功能是试验性的,可能会出现问题。"
f"\n如果你找到了问题,欢迎"
f"把它通过 <code>/report_broken_links 链接 需要去除的参数等等</code> 报告给开发者!")
await message.reply(
f"<blockquote expandable>{"\n\n".join(final_urls)}\n</blockquote>\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n"
f"注意:这个功能是试验性的,可能会出现问题。"
f"\n如果你找到了问题,欢迎"
f"把它通过 <code>/report_broken_links 链接 需要去除的参数等等</code> 报告给开发者!")

View file

@ -88,6 +88,11 @@ async def handle_about_command(message: Message) -> None:
response += f"也就大概花了 {minutes}{seconds} 秒..."
await about_message.edit_text(response)
async def handle_nexusmods_id(message: Message) -> None:
"""输入n网尾号xxxx之后主动返回星露谷物语的对应mod"""
nexusmods_id = message.text.replace(" ","").replace("N网尾号","").replace("n网尾号","")
await message.reply(f"https://www.nexusmods.com/stardewvalley/mods/{nexusmods_id}")
async def dummy_handler(message: Message) -> None:
"""A handler to catch all other messages"""
pass