feat: initial multiple instance support for fedi-posting
This commit is contained in:
parent
65264b1a34
commit
3cfc94b321
1 changed files with 118 additions and 14 deletions
|
@ -1,4 +1,4 @@
|
|||
from aiogram.types import Message
|
||||
from aiogram.types import Message, CallbackQuery
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.fsm.state import State, StatesGroup
|
||||
from aiogram import Router
|
||||
|
@ -99,13 +99,63 @@ async def handle_post_to_fedi(message: Message):
|
|||
if not user_cred_files:
|
||||
await message.reply('请先使用 /fauth 命令进行身份验证')
|
||||
return
|
||||
|
||||
# 从文件名中提取实例名
|
||||
cred_file = user_cred_files[0] # 假设用户只绑定一个实例
|
||||
arguments = message.text.replace('/post', '', 1).strip().split(' ')
|
||||
specified_instance = None
|
||||
visibility = None
|
||||
if len(arguments) >= 1 and arguments[0]:
|
||||
# 检查第一个参数是否是实例名
|
||||
first_arg = arguments[0]
|
||||
# 检查是否存在对应的凭据文件
|
||||
matching_files = [f for f in user_cred_files if first_arg in f]
|
||||
if matching_files:
|
||||
specified_instance = first_arg
|
||||
if len(arguments) >= 2:
|
||||
visibility = arguments[1]
|
||||
else:
|
||||
visibility = arguments[0]
|
||||
# 如果指定了实例,使用指定的实例
|
||||
if specified_instance:
|
||||
cred_file = next(f for f in user_cred_files if specified_instance in f)
|
||||
filename = os.path.basename(cred_file)
|
||||
# 格式: realbot_{instance}_{userid}_usercred.secret
|
||||
parts = filename.split('_')
|
||||
instance = '_'.join(parts[1:-2]) # 提取实例名部分
|
||||
instance = '_'.join(parts[1:-2])
|
||||
else:
|
||||
# 如果有多个实例,让用户选择
|
||||
if len(user_cred_files) > 1:
|
||||
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||||
|
||||
# 提取所有实例名
|
||||
instances = []
|
||||
for cred_file in user_cred_files:
|
||||
filename = os.path.basename(cred_file)
|
||||
parts = filename.split('_')
|
||||
instance_name = '_'.join(parts[1:-2])
|
||||
instances.append(instance_name)
|
||||
|
||||
|
||||
# 创建选择按钮
|
||||
keyboard = []
|
||||
for instance_name in instances:
|
||||
keyboard.append([InlineKeyboardButton(
|
||||
text=f"{instance_name}",
|
||||
callback_data=f"post:{instance_name}:"
|
||||
)])
|
||||
|
||||
# 添加全部发送选项
|
||||
keyboard.append([InlineKeyboardButton(
|
||||
text="发送到所有实例",
|
||||
callback_data="post_instance:all"
|
||||
)])
|
||||
|
||||
reply_markup = InlineKeyboardMarkup(inline_keyboard=keyboard)
|
||||
await message.reply("请选择要发送到的实例:\n(实验性,目前还不能使用)", reply_markup=reply_markup)
|
||||
return
|
||||
else:
|
||||
# 只有一个实例,直接使用
|
||||
cred_file = user_cred_files[0]
|
||||
filename = os.path.basename(cred_file)
|
||||
parts = filename.split('_')
|
||||
instance = '_'.join(parts[1:-2])
|
||||
|
||||
mastodon = Mastodon(
|
||||
access_token=f'{cred_file}',
|
||||
|
@ -114,12 +164,6 @@ async def handle_post_to_fedi(message: Message):
|
|||
|
||||
# 发布消息到联邦网络
|
||||
try:
|
||||
arguments = message.text.replace('/post', '', 1).strip().split(' ')
|
||||
if len(arguments) >= 1 and arguments[0]:
|
||||
visibility = arguments[0] # 默认可见性为账号设置的可见性
|
||||
else:
|
||||
visibility = None
|
||||
|
||||
status_message = await message.reply('尝试发布消息到联邦网络...')
|
||||
# 处理图片附件
|
||||
media_ids = []
|
||||
|
@ -146,3 +190,63 @@ async def handle_post_to_fedi(message: Message):
|
|||
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
|
||||
except Exception as e:
|
||||
await message.reply(f'发布失败: {str(e)}')
|
||||
|
||||
|
||||
@router.callback_query(lambda c: c.data.startswith('post:'))
|
||||
async def handle_instance_selection(callback: CallbackQuery):
|
||||
"""处理实例选择回调"""
|
||||
await callback.answer()
|
||||
|
||||
data_parts = callback.data.split(':')
|
||||
|
||||
selected_instance = data_parts[1]
|
||||
|
||||
# 获取原始回复消息
|
||||
# 获取原始 /post 命令消息,然后获取它回复的消息
|
||||
post_command_message = callback.message.reply_to_message
|
||||
original_message = post_command_message.reply_to_message if post_command_message else None
|
||||
if not original_message:
|
||||
await callback.message.edit_text("错误:找不到原始消息")
|
||||
return
|
||||
|
||||
user_id = callback.from_user.id
|
||||
|
||||
mastodon = Mastodon(
|
||||
access_token=f'secrets/realbot_{selected_instance}_{user_id}_usercred.secret',
|
||||
api_base_url=f'https://{selected_instance}'
|
||||
)
|
||||
|
||||
# 发布消息到联邦网络
|
||||
try:
|
||||
status_message = await original_message.reply('尝试发布消息到联邦网络...')
|
||||
# 处理图片附件
|
||||
media_ids = []
|
||||
cb_arguments = original_message.text.replace('/post', '', 1).strip().split(' ')
|
||||
if not original_message.reply_to_message:
|
||||
await status_message.edit_text("错误:找不到要发布的消息")
|
||||
return
|
||||
|
||||
if original_message.reply_to_message.photo:
|
||||
await status_message.edit_text('正在处理图片附件...')
|
||||
# 获取最大尺寸的图片
|
||||
photo = original_message.reply_to_message.photo[-1]
|
||||
file_info = await callback.message.bot.get_file(photo.file_id)
|
||||
file_data = await callback.message.bot.download_file(file_info.file_path)
|
||||
|
||||
# 上传图片到Mastodon
|
||||
media = mastodon.media_post(file_data, mime_type='image/png')
|
||||
media_ids.append(media['id'])
|
||||
text = original_message.reply_to_message.text
|
||||
if media_ids:
|
||||
text = original_message.reply_to_message.caption
|
||||
# 发布消息
|
||||
status = mastodon.status_post(
|
||||
text,
|
||||
media_ids=media_ids if media_ids else None,
|
||||
visibility = cb_arguments[0] if len(cb_arguments) == 1 else None # 默认为账户默认可见性
|
||||
)
|
||||
status_url = status['url']
|
||||
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
|
||||
await callback.message.delete()
|
||||
except Exception as e:
|
||||
await status_message.edit_text(f'发布失败: {str(e)}')
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue