bot/core/link.py
2025-07-30 21:40:15 +08:00

109 lines
No EOL
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import re
import html
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from abp.filters import parse_filterlist
from aiogram.types import Message
from config import config
def extend_short_urls(url):
""" 扩展短链接 """
r = requests.get(url)
if 'tb.cn' in urlparse(url).hostname:
# 淘宝短链接特殊处理
html_content = r.text
url = extract_tb_url_from_html(html_content)
if not url:
return url
if r.status_code != 200:
return url
elif r.status_code in [301,302,304,307,308]:
return r.headers['Location']
return url
def extract_tb_url_from_html(html_content):
# 使用正则表达式匹配 var url = '...' 的模式
pattern = r"var url = ['\"]([^'\"]+)['\"]"
match = re.search(pattern, html_content)
if match:
url = match.group(1)
# 解码HTML实体
decoded_url = html.unescape(url)
return decoded_url
return None
def remove_tracking_params(url):
""" 移除跟踪参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
tracking_params = []
with open('assets/LegitimateURLShortener.txt','r', encoding='utf-8') as f:
for line in parse_filterlist(f):
if hasattr(line, 'options') and line.options:
for option in line.options:
if option[0] == 'removeparam':
tracking_params.append(option[1])
for param in tracking_params:
query_params.pop(param, None)
# Rebuild the URL without tracking parameters
cleaned_query = urlencode(query_params, doseq=True)
cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query))
return cleaned_url
def reserve_whitelisted_params(url):
""" 保留白名单中的参数 """
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if parsed_url.hostname in ['item.taobao.com','detail.tmall.com','h5.m.goofish.com','music.163.com']:
if 'id' in query_params:
# 只保留id参数创建新的query_params
new_query_params = {'id': query_params['id']}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['mall.bilibili.com','space.bilibili.com','live.bilibili.com']:
# 只保留spm_id_from参数创建新的query_params
new_query_params = {}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
return url
def transform_into_fixed_url(url):
""" 转换为修复了链接预览的链接 """
parsed_url = urlparse(url)
if parsed_url.hostname in ['x.com', 'twitter.com']:
# 把 twitter 的链接转换为 fixupx.com
return urlunparse(parsed_url._replace(hostname='i.fixupx.com'))
return url
async def handle_links(message: Message):
if not config.is_feature_enabled('link', message.chat.id):
return
# URL regex pattern
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
# Extract URLs from message text
if message.text:
urls = re.findall(url_pattern, message.text)
for url in urls:
# Process each URL with your functions
cleaned_url = remove_tracking_params(url)
extended_url = extend_short_urls(cleaned_url)
only_wl_params_url = reserve_whitelisted_params(extended_url)
#untracked_url = remove_tracking_params(only_wl_params_url)
fixed_url = transform_into_fixed_url(only_wl_params_url)
# Do something with the processed URL
await message.reply(f"清理完成:\n{fixed_url}")