Compare commits

...

7 commits

7 changed files with 262 additions and 38 deletions

View file

@ -18,7 +18,8 @@ from core.post_to_fedi import handle_auth, handle_post_to_fedi
from core.promote import handle_promote_command
from core.repeater import MessageRepeater
from core.report_links import report_broken_links
from core.simple import handle_start_command, handle_baka, dummy_handler, handle_info_command
from core.simple import handle_start_command, handle_baka, dummy_handler, handle_info_command, handle_ping_command, \
handle_tips_command, handle_about_command
from core.actions import handle_actions, handle_reverse_actions
from core.stats import handle_stats_command
from core.middleware.stats import MessageStatsMiddleware
@ -38,10 +39,16 @@ class TelegramAdapter:
"""Register handlers with core module functions"""
# Create router
router = Router()
actions_router = Router()
repeater_router = Router()
dummy_router = Router()
# Register handlers on router
router.message(CommandStart())(handle_start_command)
router.message(Command('info'))(handle_info_command)
router.message(Command('about'))(handle_about_command)
router.message(Command('ping'))(handle_ping_command)
router.message(Command('tips'))(handle_tips_command)
# bitflip 模块
router.message(Command('bitflip'))(handle_bitflip_command)
# promote 模块
@ -51,27 +58,32 @@ class TelegramAdapter:
# fedi 模块
router.message(Command('fauth'))(handle_auth)
router.message(Command('post'))(handle_post_to_fedi)
# link 模块
router.message(Command('report_broken_links'))(report_broken_links)
router.message(F.text.contains('http') & ~F.text.contains('/report_broken_links'))(handle_links)
# unpin 模块
# 不知道为什么检测不到频道的消息被置顶这个事件,暂时认为所有的频道消息都是被置顶的
router.message(F.chat.type.in_({'group', 'supergroup'}) & F.sender_chat & (
F.sender_chat.type == 'channel') & F.is_automatic_forward)(
handle_unpin_channel_message)
# link 模块
router.message(Command('report_broken_links'))(report_broken_links)
router.message(F.text.contains('http') & ~F.text.contains('/report_broken_links'))(handle_links)
# repeater 模块
router.message(F.chat.type.in_({'group', 'supergroup'}))(MessageRepeater().handle_message)
# actions 模块
router.message(F.text.startswith('/'))(handle_actions)
router.message(F.text.startswith('\\'))(handle_reverse_actions)
repeater_router.message(F.chat.type.in_({'group', 'supergroup'}))(MessageRepeater().handle_message)
router.message(F.text == '我是笨蛋')(handle_baka)
# 捕获所有其他消息
router.message(F.chat.type.in_({'group', 'supergroup'}))(dummy_handler)
dummy_router.message(F.chat.type.in_({'group', 'supergroup'}))(dummy_handler)
# actions 模块
actions_router.message(F.chat.type.in_({'group', 'supergroup'}) & F.text.startswith('/'))(handle_actions)
actions_router.message(F.chat.type.in_({'group', 'supergroup'}) & F.text.startswith('\\'))(handle_reverse_actions)
# Include router in dispatcher
# 通用的路由
self.dp.include_router(router)
self.dp.include_router(actions_router)
# 处理联邦宇宙认证相关
self.dp.include_router(fedi_router)
self.dp.include_router(repeater_router)
self.dp.include_router(dummy_router)
def _setup_middleware(self):
"""注册中间件"""

View file

@ -6,16 +6,15 @@ from config import config
async def handle_actions(message: Message) -> None:
if not config.is_feature_enabled('actions', message.chat.id):
return
if not message.chat.type in ['group', 'supergroup']:
return
rawtext = message.text
from_user = message.from_user.mention_html(message.sender_chat.title) if message.sender_chat else message.from_user.mention_html()
replied_user = message.reply_to_message.from_user.mention_html(message.reply_to_message.sender_chat.title) if message.reply_to_message and message.reply_to_message.sender_chat else message.reply_to_message.from_user.mention_html()
# 防止识别成命令而被误触发
if rawtext.replace('/','',1).isascii() or '@' in rawtext:
print(rawtext.replace('/','',1).isascii())
return
elif " " in message.text:
from_user = message.from_user.mention_html(message.sender_chat.title) if message.sender_chat else message.from_user.mention_html()
replied_user = message.reply_to_message.from_user.mention_html(message.reply_to_message.sender_chat.title) if message.reply_to_message and message.reply_to_message.sender_chat else (message.reply_to_message.from_user.mention_html() if message.reply_to_message else None)
if " " in rawtext:
if rawtext.split(" ")[0].replace('/','',1).isascii():
return
await message.reply(f"{from_user} {rawtext.split(" ")[0].replace('/','')}{replied_user if message.reply_to_message else '自己' } {''.join(rawtext.split(" ")[1:])}",disable_web_page_preview=True)

View file

@ -9,6 +9,9 @@ from aiogram.types import Message
from config import config
whitelist_param_links = ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com','item.m.jd.com','item.jd.com','www.xiaohongshu.com']
def matches_adb_selector(url, selector):
"""Check if URL matches the given selector"""
@ -56,6 +59,16 @@ async def extend_short_urls(url):
# 如果 Location 头部没有 http 前缀,可能是相对路径
# 需要将其转换正确的链接
return urlparse(url)._replace(path=r.headers['Location']).geturl()
elif not r.status in [200,403,404,502,503]:
# 对于一些需要“正常”浏览器才能访问的链接,尝试修复
async with session.get(url, allow_redirects=False, headers={'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.7103.48 Safari/537.36'}) as r_fix:
if r_fix.status in [301, 302, 304, 307, 308] and 'Location' in r_fix.headers:
if r_fix.headers['Location'].startswith(('http://', 'https://')):
return r_fix.headers['Location']
else:
# 如果 Location 头部没有 http 前缀,可能是相对路径
# 需要将其转换正确的链接
return urlparse(url)._replace(path=r_fix.headers['Location']).geturl()
return url
def extract_tb_url_from_html(html_content):
@ -129,12 +142,27 @@ def reserve_whitelisted_params(url):
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['www.iesdouyin.com','www.bilibili.com','m.bilibili.com','bilibili.com','mall.bilibili.com','space.bilibili.com','live.bilibili.com']:
if 'music.163.com' in parsed_url.hostname and 'id' not in query_params:
# 如果网易云链接没有id参数不保留任何参数
# 例如 https://music.163.com/song/12345678
new_query_params = {}
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['www.iesdouyin.com','www.bilibili.com','m.bilibili.com','bilibili.com','mall.bilibili.com','space.bilibili.com','live.bilibili.com','item.m.jd.com','item.jd.com','www.xiaohongshu.com']:
# 不保留任何参数
new_query_params = {}
if 'xiaohongshu.com' in parsed_url.hostname and 'xsec_token' in query_params:
# 为了保证能正常访问,小红书链接保留 xsec_token 参数
# 我是不是也应该 f**k 小红书一下
new_query_params = {'xsec_token': query_params['xsec_token']}
# 重新构建URL
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
elif parsed_url.hostname in ['chatglm.cn'] and query_params:
# 就你叫智谱啊
new_query_params = {'share_conversation_id': query_params['share_conversation_id']}
cleaned_query = urlencode(new_query_params, doseq=True)
return urlunparse(parsed_url._replace(query=cleaned_query))
return url
def transform_into_fixed_url(url):
@ -154,9 +182,7 @@ def transform_into_fixed_url(url):
async def process_url(url):
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com']:
if urlparse(url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
@ -165,10 +191,11 @@ async def process_url(url):
cleaned_url = remove_tracking_params(url)
# 扩展短链接
extended_url = await extend_short_urls(cleaned_url)
if urlparse(extended_url).hostname in ['chatglm.cn']:
final_url = reserve_whitelisted_params(extended_url)
return final_url
# 对于扩展短链接之后的适配的网站,直接保留白名单参数并返回
if urlparse(extended_url).hostname in ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.com', 'h5.m.goofish.com', 'music.163.com',
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com']:
if urlparse(extended_url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(extended_url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
@ -204,4 +231,5 @@ async def handle_links(message: Message):
if final_urls:
await message.reply(f"{"\n".join(final_urls)}\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n注意:"
f"这个功能是试验性的,可能会出现链接无法访问等问题,如果出现链接没有清理干净的情况,"
f"可以将返回的结果再次发送给bot或者尝试手动清理。\n如果我没有回复链接,说明链接不需要被清理\n如果你找到了这个工具的问题,欢迎把它通过 `/report_broken_links 链接 需要去除的参数等等` 报告给开发者!")
f"可以将返回的结果再次发送给bot或者尝试手动清理。\n如果你找到了这个工具的问题,欢迎"
f"把它通过 `/report_broken_links 链接 需要去除的参数等等` 报告给开发者!")

View file

@ -1,4 +1,4 @@
from aiogram.types import Message
from aiogram.types import Message, CallbackQuery
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram import Router
@ -34,7 +34,8 @@ def check_user_cred_exists(instance: str, userid: int) -> bool:
async def handle_token(instance,mastodon, message: Message):
mastodon.log_in(
code=message.text,
to_file=f"secrets/realbot_{instance}_{message.from_user.id}_usercred.secret"
to_file=f"secrets/realbot_{instance}_{message.from_user.id}_usercred.secret",
scopes=['read:accounts', 'read:statuses', 'write:media', 'write:statuses']
)
async def handle_auth(message: Message, state: FSMContext):
@ -98,13 +99,63 @@ async def handle_post_to_fedi(message: Message):
if not user_cred_files:
await message.reply('请先使用 /fauth 命令进行身份验证')
return
arguments = message.text.replace('/post', '', 1).strip().split(' ')
specified_instance = None
visibility = None
if len(arguments) >= 1 and arguments[0]:
# 检查第一个参数是否是实例名
first_arg = arguments[0]
# 检查是否存在对应的凭据文件
matching_files = [f for f in user_cred_files if first_arg in f]
if matching_files:
specified_instance = first_arg
if len(arguments) >= 2:
visibility = arguments[1]
else:
visibility = arguments[0]
# 如果指定了实例,使用指定的实例
if specified_instance:
cred_file = next(f for f in user_cred_files if specified_instance in f)
filename = os.path.basename(cred_file)
parts = filename.split('_')
instance = '_'.join(parts[1:-2])
else:
# 如果有多个实例,让用户选择
if len(user_cred_files) > 1:
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
# 从文件名中提取实例名
cred_file = user_cred_files[0] # 假设用户只绑定一个实例
filename = os.path.basename(cred_file)
# 格式: realbot_{instance}_{userid}_usercred.secret
parts = filename.split('_')
instance = '_'.join(parts[1:-2]) # 提取实例名部分
# 提取所有实例名
instances = []
for cred_file in user_cred_files:
filename = os.path.basename(cred_file)
parts = filename.split('_')
instance_name = '_'.join(parts[1:-2])
instances.append(instance_name)
# 创建选择按钮
keyboard = []
for instance_name in instances:
keyboard.append([InlineKeyboardButton(
text=f"{instance_name}",
callback_data=f"post:{instance_name}:"
)])
# 添加全部发送选项
keyboard.append([InlineKeyboardButton(
text="发送到所有实例",
callback_data="post_instance:all"
)])
reply_markup = InlineKeyboardMarkup(inline_keyboard=keyboard)
await message.reply("请选择要发送到的实例:\n(实验性,目前还不能使用)", reply_markup=reply_markup)
return
else:
# 只有一个实例,直接使用
cred_file = user_cred_files[0]
filename = os.path.basename(cred_file)
parts = filename.split('_')
instance = '_'.join(parts[1:-2])
mastodon = Mastodon(
access_token=f'{cred_file}',
@ -113,12 +164,6 @@ async def handle_post_to_fedi(message: Message):
# 发布消息到联邦网络
try:
arguments = message.text.replace('/post', '', 1).strip().split(' ')
if len(arguments) >= 1 and arguments[0]:
visibility = arguments[0] # 默认可见性为账号设置的可见性
else:
visibility = None
status_message = await message.reply('尝试发布消息到联邦网络...')
# 处理图片附件
media_ids = []
@ -145,3 +190,63 @@ async def handle_post_to_fedi(message: Message):
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
except Exception as e:
await message.reply(f'发布失败: {str(e)}')
@router.callback_query(lambda c: c.data.startswith('post:'))
async def handle_instance_selection(callback: CallbackQuery):
"""处理实例选择回调"""
await callback.answer()
data_parts = callback.data.split(':')
selected_instance = data_parts[1]
# 获取原始回复消息
# 获取原始 /post 命令消息,然后获取它回复的消息
post_command_message = callback.message.reply_to_message
original_message = post_command_message.reply_to_message if post_command_message else None
if not original_message:
await callback.message.edit_text("错误:找不到原始消息")
return
user_id = callback.from_user.id
mastodon = Mastodon(
access_token=f'secrets/realbot_{selected_instance}_{user_id}_usercred.secret',
api_base_url=f'https://{selected_instance}'
)
# 发布消息到联邦网络
try:
status_message = await original_message.reply('尝试发布消息到联邦网络...')
# 处理图片附件
media_ids = []
cb_arguments = original_message.text.replace('/post', '', 1).strip().split(' ')
if not original_message.reply_to_message:
await status_message.edit_text("错误:找不到要发布的消息")
return
if original_message.reply_to_message.photo:
await status_message.edit_text('正在处理图片附件...')
# 获取最大尺寸的图片
photo = original_message.reply_to_message.photo[-1]
file_info = await callback.message.bot.get_file(photo.file_id)
file_data = await callback.message.bot.download_file(file_info.file_path)
# 上传图片到Mastodon
media = mastodon.media_post(file_data, mime_type='image/png')
media_ids.append(media['id'])
text = original_message.reply_to_message.text
if media_ids:
text = original_message.reply_to_message.caption
# 发布消息
status = mastodon.status_post(
text,
media_ids=media_ids if media_ids else None,
visibility = cb_arguments[0] if len(cb_arguments) == 1 else None # 默认为账户默认可见性
)
status_url = status['url']
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
await callback.message.delete()
except Exception as e:
await status_message.edit_text(f'发布失败: {str(e)}')

View file

@ -1,6 +1,10 @@
import asyncio
from aiogram import html
from aiogram.types import Message
import config
async def handle_start_command(message: Message) -> None:
"""Handle /start command"""
@ -24,6 +28,62 @@ async def handle_info_command(message: Message) -> None:
)
await message.reply(response)
async def handle_ping_command(message: Message) -> None:
"""Handle /ping command"""
import time
user_sent_time = message.date.timestamp()
bot_time_now = time.time()
time_diff = bot_time_now - user_sent_time
response = f"Pong! Time taken: {round(time_diff * 1000, 2)} milliseconds"
await message.reply(response)
async def handle_tips_command(message: Message) -> None:
"""Handle /tips command"""
tips = [
"你知道吗:其实 tips 都是废话(确信",
"如果 bot 没有回复链接,说明链接不需要被清理",
"不管如何,你今天都很棒!",
"这个 bot 暂时还跑在一台运行着 Arch Linux 的笔电上",
"/ping 命令其实显示的是 bot 到 Telegram 服务器的延迟,而不是用户到 bot 的延迟",
"bot 的链接清理功能其实大多归功于 ➗ Actually Legitimate URL Shortener Tool 规则集",
"bot 的功能可以被选择性的开启或者关闭,但是示例 bot 为了方便开发和测试,默认开启了所有功能",
"说真的,你应该去看看 @kmuav2bot",
"任何一条 tips 消息都会在一分钟后自动消失,再也不用担心消息堆积了",
]
import random
response = random.choice(tips)
tips_message = await message.reply(response)
# Delete the message after 1 minute
await asyncio.sleep(60)
await tips_message.delete()
async def handle_about_command(message: Message) -> None:
"""Handle /about command"""
import time
bot_time_start = time.time()
about_message = await message.reply('Loading...')
from dulwich.repo import Repo
git_commit_hash = Repo('.').head().decode('utf-8')[:7] # Get the first 7 characters of the commit hash
response = f"realbot@g{git_commit_hash}\n\n"
response += "孩子不懂随便写的 bot\n"
if message.chat.id == config.config.get_admin_id():
response += '\nDebug Info:\n'
import os
response += 'Python Version: ' + str(os.sys.version) + '\n'
response += 'System Info: ' + '\n' + ' '.join(str(x) for x in os.uname()) + '\n'
response += '\n这个命令比较慢dulwich 负全责(小声),'
bot_time_end = time.time()
time_diff = bot_time_end - bot_time_start
if time_diff < 1:
response += f"也就大概花了 {round(time_diff * 1000, 2)} ms..."
elif time_diff < 60:
response += f"也就大概花了 {round(time_diff, 2)} 秒..."
else:
minutes = int(time_diff // 60)
seconds = round(time_diff % 60, 2)
response += f"也就大概花了 {minutes}{seconds} 秒..."
await about_message.edit_text(response)
async def dummy_handler(message: Message) -> None:
"""A handler to catch all other messages"""
pass

View file

@ -6,6 +6,7 @@ requires-python = ">=3.13"
dependencies = [
"aiodns==3.5.0",
"aiogram==3.21.0",
"dulwich==0.24.1",
"mastodon-py==2.0.1",
"matrix-nio==0.25.2",
"python-abp==0.2.0",

19
uv.lock generated
View file

@ -197,6 +197,23 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/4e/8c/f3147f5c4b73e7550fe5f9352eaa956ae838d5c51eb58e7a25b9f3e2643b/decorator-5.2.1-py3-none-any.whl", hash = "sha256:d316bb415a2d9e2d2b3abcc4084c6502fc09240e292cd76a76afc106a1c8e04a", size = 9190, upload-time = "2025-02-24T04:41:32.565Z" },
]
[[package]]
name = "dulwich"
version = "0.24.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2b/f3/13a3425ddf04bd31f1caf3f4fa8de2352700c454cb0536ce3f4dbdc57a81/dulwich-0.24.1.tar.gz", hash = "sha256:e19fd864f10f02bb834bb86167d92dcca1c228451b04458761fc13dabd447758", size = 806136, upload-time = "2025-08-01T10:26:46.887Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5e/a5/3f4760169fea1b90df7aea88172699807af6f4f667c878de6a9ee554170f/dulwich-0.24.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1a11ec69fc6604228804ddfc32c85b22bc627eca4cf4ff3f27dbe822e6f29477", size = 1080923, upload-time = "2025-08-01T10:26:28.011Z" },
{ url = "https://files.pythonhosted.org/packages/71/d9/7aadd6318aed6f0e1242fa63bd61d80142716d13ea4e307c8b19fc61c9ae/dulwich-0.24.1-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:a9800df7238b586b4c38c00432776781bc889cf02d756dcfb8dc0ecb8fc47a33", size = 1159246, upload-time = "2025-08-01T10:26:29.487Z" },
{ url = "https://files.pythonhosted.org/packages/90/5d/df4256fe009c714e0392817df4fdc1748a901523504f58796d675fce755f/dulwich-0.24.1-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:3baab4a01aff890e2e6551ccbd33eb2a44173c897f0f027ad3aeab0fb057ec44", size = 1163646, upload-time = "2025-08-01T10:26:31.279Z" },
{ url = "https://files.pythonhosted.org/packages/8d/fe/850115d6fa7ad03756e20466ad5b72be54d1b59c1ff7d2b3c13bc4de965f/dulwich-0.24.1-cp313-cp313-win32.whl", hash = "sha256:b39689aa4d143ba1fb0a687a4eb93d2e630d2c8f940aaa6c6911e9c8dca16e6a", size = 762612, upload-time = "2025-08-01T10:26:33.223Z" },
{ url = "https://files.pythonhosted.org/packages/47/7f/f79940e0773efda2ed0e666a0ca0ae7c734fdce4f04b5b60bc5ed268b7cb/dulwich-0.24.1-cp313-cp313-win_amd64.whl", hash = "sha256:8fca9b863b939b52c5f759d292499f0d21a7bf7f8cbb9fdeb8cdd9511c5bc973", size = 779168, upload-time = "2025-08-01T10:26:35.303Z" },
{ url = "https://files.pythonhosted.org/packages/c9/bc/a2557d1b0afa5bf1e140f42f8cbca1783e43d7fa17665859c63060957952/dulwich-0.24.1-py3-none-any.whl", hash = "sha256:57cc0dc5a21059698ffa4ed9a7272f1040ec48535193df84b0ee6b16bf615676", size = 440765, upload-time = "2025-08-01T10:26:45.415Z" },
]
[[package]]
name = "frozenlist"
version = "1.7.0"
@ -624,6 +641,7 @@ source = { virtual = "." }
dependencies = [
{ name = "aiodns" },
{ name = "aiogram" },
{ name = "dulwich" },
{ name = "mastodon-py" },
{ name = "matrix-nio" },
{ name = "python-abp" },
@ -635,6 +653,7 @@ dependencies = [
requires-dist = [
{ name = "aiodns", specifier = "==3.5.0" },
{ name = "aiogram", specifier = "==3.21.0" },
{ name = "dulwich", specifier = "==0.24.1" },
{ name = "mastodon-py", specifier = "==2.0.1" },
{ name = "matrix-nio", specifier = "==0.25.2" },
{ name = "python-abp", specifier = "==0.2.0" },