bot/core/link.py

235 lines
No EOL
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
import re
import html
import asyncio
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from abp.filters import parse_filterlist
from aiogram.types import Message
from config import config
whitelist_param_links = ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com','item.m.jd.com','item.jd.com','www.xiaohongshu.com']
def matches_adb_selector(url, selector):
"""Check if URL matches the given selector"""
if selector['type'] == 'url-pattern':
pattern = selector['value']
# Convert AdBlock pattern to regex
# ||domain/* becomes ^https?://[^/]*domain.*
# domain/* becomes .*domain.*
if pattern.startswith('||'):
domain_pattern = pattern[2:]
# Escape special regex chars except * which we'll convert to .*
domain_pattern = re.escape(domain_pattern).replace(r'\*', '.*')
regex_pattern = f"^https?://[^/]*{domain_pattern}"
else:
# Escape special regex chars except * which we'll convert to .*
regex_pattern = re.escape(pattern).replace(r'\*', '.*')
return bool(re.search(regex_pattern, url))
return False
def should_remove_param(url, filter_rule):
"""Check if parameter should be removed based on filter rule"""
if filter_rule.action == 'allow':
return False # Allowlist rules prevent removal
if filter_rule.selector:
return matches_adb_selector(url, filter_rule.selector)
return True # No selector means apply to all URLs
async def extend_short_urls(url):
""" 扩展短链接 """
async with aiohttp.ClientSession() as session:
async with session.get(url,allow_redirects=False) as r:
if 'tb.cn' in urlparse(url).hostname:
# 淘宝短链接特殊处理
html_content = await r.text()
url = extract_tb_url_from_html(html_content)
if not url:
return url
if r.status in [301, 302, 304, 307, 308] and 'Location' in r.headers:
if r.headers['Location'].startswith(('http://', 'https://')):
return r.headers['Location']
else:
# 如果 Location 头部没有 http 前缀,可能是相对路径
# 需要将其转换正确的链接
return urlparse(url)._replace(path=r.headers['Location']).geturl()
elif not r.status in [200,403,404,502,503]:
# 对于一些需要“正常”浏览器才能访问的链接,尝试修复
async with session.get(url, allow_redirects=False, headers={'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.7103.48 Safari/537.36'}) as r_fix:
if r_fix.status in [301, 302, 304, 307, 308] and 'Location' in r_fix.headers:
if r_fix.headers['Location'].startswith(('http://', 'https://')):
return r_fix.headers['Location']
else:
# 如果 Location 头部没有 http 前缀,可能是相对路径
# 需要将其转换正确的链接
return urlparse(url)._replace(path=r_fix.headers['Location']).geturl()
return url
def extract_tb_url_from_html(html_content):
# 使用正则表达式匹配 var url = '...' 的模式
pattern = r"var url = ['\"]([^'\"]+)['\"]"
match = re.search(pattern, html_content)
if match:
url = match.group(1)
# 解码HTML实体
decoded_url = html.unescape(url)
return decoded_url
return None
def remove_tracking_params(url):
""" 移除跟踪参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Modified tracking_params collection
tracking_rules = []
with open('assets/LegitimateURLShortener.txt', 'r', encoding='utf-8') as f:
for line in parse_filterlist(f):
if hasattr(line, 'options') and line.options:
for option in line.options:
if option[0] == 'removeparam':
tracking_rules.append(line)
break # Only add rule once even if multiple removeparam options
for rule in tracking_rules:
if not should_remove_param(url, rule):
continue
for option in rule.options or []:
if option[0] == 'removeparam':
param_pattern = option[1]
if param_pattern is True: # Remove all params
query_params.clear()
break
elif isinstance(param_pattern, str):
# Handle regex patterns
if param_pattern.startswith('/') and param_pattern.endswith('/'):
regex_pattern = param_pattern[1:-1]
params_to_remove = [
param for param in query_params.keys()
if re.search(regex_pattern, param)
]
else:
# Exact match
params_to_remove = [param_pattern] if param_pattern in query_params else []
for param in params_to_remove:
query_params.pop(param, None)
# Reconstruct URL
new_query = urlencode(query_params, doseq=True)
return urlunparse(parsed_url._replace(query=new_query))
def reserve_whitelisted_params(url):
""" 保留白名单中的参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if parsed_url.hostname in ['item.taobao.com','detail.tmall.com','h5.m.goofish.com','music.163.com']:
if 'id' in query_params:
# 只保留id参数创建新的query_params
new_query_params = {'id': query_params['id']}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
if 'music.163.com' in parsed_url.hostname and 'id' not in query_params:
# 如果网易云链接没有id参数不保留任何参数
# 例如 https://music.163.com/song/12345678
new_query_params = {}
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['www.iesdouyin.com','www.bilibili.com','m.bilibili.com','bilibili.com','mall.bilibili.com','space.bilibili.com','live.bilibili.com','item.m.jd.com','item.jd.com','www.xiaohongshu.com']:
# 不保留任何参数
new_query_params = {}
if 'xiaohongshu.com' in parsed_url.hostname and 'xsec_token' in query_params:
# 为了保证能正常访问,小红书链接保留 xsec_token 参数
# 我是不是也应该 f**k 小红书一下
new_query_params = {'xsec_token': query_params['xsec_token']}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['chatglm.cn'] and query_params:
# 就你叫智谱啊
new_query_params = {'share_conversation_id': query_params['share_conversation_id']}
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
return url
def transform_into_fixed_url(url):
""" 转换为修复了链接预览的链接 """
parsed_url = urlparse(url)
if parsed_url.hostname in ['x.com', 'twitter.com']:
# 把 twitter 的链接转换为 fixupx.com
return urlunparse(parsed_url._replace(netloc='i.fixupx.com'))
if parsed_url.hostname in ['bilibili.com', 'm.bilibili.com']:
# 把 bilibili 的链接转换为桌面端的 www.bilibili.com
return urlunparse(parsed_url._replace(netloc='www.bilibili.com'))
if parsed_url.hostname in ['www.iesdouyin.com']:
# 把抖音分享链接转换为正常的 www.douyin.com
return urlunparse(parsed_url._replace(netloc='www.douyin.com'))
return url
async def process_url(url):
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
return final_url
# 对于其它的网站,首先清理跟踪参数
cleaned_url = remove_tracking_params(url)
# 扩展短链接
extended_url = await extend_short_urls(cleaned_url)
if urlparse(extended_url).hostname in ['chatglm.cn']:
final_url = reserve_whitelisted_params(extended_url)
return final_url
# 对于扩展短链接之后的适配的网站,直接保留白名单参数并返回
if urlparse(extended_url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(extended_url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
return final_url
if urlparse(extended_url).hostname in ['x.com', 'twitter.com']:
# 对于 Twitter 链接,转换为 fixupx.com
removed_tracking_url = remove_tracking_params(extended_url)
final_url = transform_into_fixed_url(removed_tracking_url)
else:
# 对于其他链接,直接对其进行跟踪参数清理
final_url = remove_tracking_params(extended_url)
if url != final_url:
return final_url
return None
async def handle_links(message: Message):
if not config.is_feature_enabled('link', message.chat.id):
return
# URL regex pattern
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
text = message.text or message.caption
# Extract URLs from message text
if text:
urls = re.findall(url_pattern, text)
if not urls:
return
final_urls = await asyncio.gather(*[process_url(url) for url in urls])
# Filter out None values
final_urls = [url for url in final_urls if url is not None]
# 回复处理后的链接
if final_urls:
await message.reply(f"{"\n".join(final_urls)}\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n注意:"
f"这个功能是试验性的,可能会出现链接无法访问等问题,如果出现链接没有清理干净的情况,"
f"可以将返回的结果再次发送给bot或者尝试手动清理。\n如果你找到了这个工具的问题,欢迎"
f"把它通过 `/report_broken_links 链接 需要去除的参数等等` 报告给开发者!")