from aiogram.types import Message, CallbackQuery from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from aiogram import Router from config import config from mastodon import Mastodon router = Router() class AuthStates(StatesGroup): waiting_for_token = State() def check_client_cred_exists(instance: str) -> bool: """ 检查实例的凭据文件是否存在 """ try: with open(f'secrets/realbot_{instance}_clientcred.secret', 'r'): return True except FileNotFoundError: return False def check_user_cred_exists(instance: str, userid: int) -> bool: """ 检查用户凭据文件是否存在 """ try: with open(f'secrets/realbot_{instance}_{userid}_usercred.secret', 'r'): return True except FileNotFoundError: return False async def handle_token(instance,mastodon, message: Message): mastodon.log_in( code=message.text, to_file=f"secrets/realbot_{instance}_{message.from_user.id}_usercred.secret", scopes=['read:accounts', 'read:statuses', 'write:media', 'write:statuses'] ) async def handle_auth(message: Message, state: FSMContext): """ 处理身份验证 """ if not config.is_feature_enabled('fedi', message.chat.id): return if not message.chat.type == 'private': await message.reply('请在私聊中使用此命令') return instance = message.text.replace('/fauth', '').strip() if instance == '': await message.reply('请输入实例域名,例如:`example.com`') return if not check_client_cred_exists(instance): Mastodon.create_app( 'realbot', api_base_url='https://{}'.format(instance), to_file='secrets/realbot_{}_clientcred.secret'.format(instance), scopes=['read:accounts', 'read:statuses','write:media','write:statuses'] ) mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret', ) auth_url = mastodon.auth_request_url() await message.reply('请在浏览器中打开链接进行身份验证:\n{}\n验证完成后,请用得到的 token 回复这条消息'.format(auth_url)) # 在发送消息后设置状态 await state.update_data(instance=instance) await state.set_state(AuthStates.waiting_for_token) # 创建处理回复的 handler @router.message(AuthStates.waiting_for_token) async def handle_token_reply(message: Message, state: FSMContext): data = await state.get_data() instance = data.get('instance') mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret') status = await message.reply('正在处理身份验证,请稍候...') await handle_token(instance,mastodon,message) await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。') # 清除状态 await state.clear() async def handle_post_to_fedi(message: Message): """ 处理发布到联邦网络的消息 """ if not config.is_feature_enabled('fedi', message.chat.id): return if not message.reply_to_message: await message.reply('请回复要发布的消息') return user_id = message.from_user.id # 查找用户绑定的实例 import os import glob user_cred_pattern = f'secrets/realbot_*_{user_id}_usercred.secret' user_cred_files = glob.glob(user_cred_pattern) if not user_cred_files: await message.reply('请先使用 /fauth 命令进行身份验证') return arguments = message.text.replace('/post', '', 1).strip().split(' ') specified_instance = None visibility = None if len(arguments) >= 1 and arguments[0]: # 检查第一个参数是否是实例名 first_arg = arguments[0] # 检查是否存在对应的凭据文件 matching_files = [f for f in user_cred_files if first_arg in f] if matching_files: specified_instance = first_arg if len(arguments) >= 2: visibility = arguments[1] else: visibility = arguments[0] # 如果指定了实例,使用指定的实例 if specified_instance: cred_file = next(f for f in user_cred_files if specified_instance in f) filename = os.path.basename(cred_file) parts = filename.split('_') instance = '_'.join(parts[1:-2]) else: # 如果有多个实例,让用户选择 if len(user_cred_files) > 1: from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton # 提取所有实例名 instances = [] for cred_file in user_cred_files: filename = os.path.basename(cred_file) parts = filename.split('_') instance_name = '_'.join(parts[1:-2]) instances.append(instance_name) # 创建选择按钮 keyboard = [] for instance_name in instances: keyboard.append([InlineKeyboardButton( text=f"{instance_name}", callback_data=f"post:{instance_name}:" )]) # 添加全部发送选项 keyboard.append([InlineKeyboardButton( text="发送到所有实例", callback_data="post_instance:all" )]) reply_markup = InlineKeyboardMarkup(inline_keyboard=keyboard) await message.reply("请选择要发送到的实例:\n(实验性,目前还不能使用)", reply_markup=reply_markup) return else: # 只有一个实例,直接使用 cred_file = user_cred_files[0] filename = os.path.basename(cred_file) parts = filename.split('_') instance = '_'.join(parts[1:-2]) mastodon = Mastodon( access_token=f'{cred_file}', api_base_url=f'https://{instance}' ) # 发布消息到联邦网络 try: status_message = await message.reply('尝试发布消息到联邦网络...') # 处理图片附件 media_ids = [] if message.reply_to_message.photo: await status_message.edit_text('正在处理图片附件...') # 获取最大尺寸的图片 photo = message.reply_to_message.photo[-1] file_info = await message.bot.get_file(photo.file_id) file_data = await message.bot.download_file(file_info.file_path) # 上传图片到Mastodon media = mastodon.media_post(file_data, mime_type='image/png') media_ids.append(media['id']) text = message.reply_to_message.text if media_ids: text = message.reply_to_message.caption # 发布消息 status = mastodon.status_post( text, media_ids=media_ids if media_ids else None, visibility = visibility ) status_url = status['url'] await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}') except Exception as e: await message.reply(f'发布失败: {str(e)}') @router.callback_query(lambda c: c.data.startswith('post:')) async def handle_instance_selection(callback: CallbackQuery): """处理实例选择回调""" await callback.answer() data_parts = callback.data.split(':') selected_instance = data_parts[1] # 获取原始回复消息 # 获取原始 /post 命令消息,然后获取它回复的消息 post_command_message = callback.message.reply_to_message original_message = post_command_message.reply_to_message if post_command_message else None if not original_message: await callback.message.edit_text("错误:找不到原始消息") return user_id = callback.from_user.id mastodon = Mastodon( access_token=f'secrets/realbot_{selected_instance}_{user_id}_usercred.secret', api_base_url=f'https://{selected_instance}' ) # 发布消息到联邦网络 try: status_message = await original_message.reply('尝试发布消息到联邦网络...') # 处理图片附件 media_ids = [] cb_arguments = original_message.text.replace('/post', '', 1).strip().split(' ') if not original_message.reply_to_message: await status_message.edit_text("错误:找不到要发布的消息") return if original_message.reply_to_message.photo: await status_message.edit_text('正在处理图片附件...') # 获取最大尺寸的图片 photo = original_message.reply_to_message.photo[-1] file_info = await callback.message.bot.get_file(photo.file_id) file_data = await callback.message.bot.download_file(file_info.file_path) # 上传图片到Mastodon media = mastodon.media_post(file_data, mime_type='image/png') media_ids.append(media['id']) text = original_message.reply_to_message.text if media_ids: text = original_message.reply_to_message.caption # 发布消息 status = mastodon.status_post( text, media_ids=media_ids if media_ids else None, visibility = cb_arguments[0] if len(cb_arguments) == 1 else None # 默认为账户默认可见性 ) status_url = status['url'] await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}') await callback.message.delete() except Exception as e: await status_message.edit_text(f'发布失败: {str(e)}')