import json import logging import os import uuid import aiohttp from aiogram.types import Message, CallbackQuery from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from aiogram import Router from requests import session from config import config from mastodon import Mastodon router = Router() class AuthStates(StatesGroup): waiting_for_token = State() def check_secrets_folder_exists() -> bool: """ 检查 secrets 文件夹是否存在 """ return os.path.exists('secrets') and os.path.isdir('secrets') def check_client_cred_exists(instance: str) -> bool: """ 检查实例的凭据文件是否存在 """ try: with open(f'secrets/realbot_{instance}_clientcred.secret', 'r'): return True except FileNotFoundError: return False def check_user_cred_exists(instance: str, userid: int) -> bool: """ 检查用户凭据文件是否存在 """ try: with open(f'secrets/realbot_{instance}_{userid}_usercred.secret', 'r'): return True except FileNotFoundError: return False async def instance_is_misskey(instance: str) -> bool: """ 检查实例是否是 Misskey 实例 """ try: async with aiohttp.ClientSession() as client: async with client.get(f"https://{instance}/api/v1/instance", headers={"Content-Type": "application/json"}, allow_redirects=False) as r: if r.status != 200: return True else: return False # 如果没有异常,则不是 Misskey 实例 except Exception as e: logging.debug(f"检查实例 {instance} 是否为 Misskey 时发生错误: {e}") return True # 如果发生异常,则认为是 Misskey 实例 async def handle_token(instance,mastodon, message: Message): mastodon.log_in( code=message.text, to_file=f"secrets/realbot_{instance}_{message.from_user.id}_usercred.secret", scopes=['read:accounts', 'read:statuses', 'write:media', 'write:statuses'] ) async def handle_auth(message: Message, state: FSMContext): """ 处理身份验证 """ if not config.is_feature_enabled('fedi', message.chat.id): return if not message.chat.type == 'private': await message.reply('请在私聊中使用此命令') return instance = message.text.replace('/fauth', '').strip() if instance == '': await message.reply('请输入实例域名,例如:`example.com`') return auth_url = '' session_id = uuid.uuid4() if not check_client_cred_exists(instance): if not await instance_is_misskey(instance): # 如果是 Misskey 实例,使用不同的创建应用方式 try: if not check_secrets_folder_exists(): os.mkdir('secrets') Mastodon.create_app( 'realbot', api_base_url='https://{}'.format(instance), to_file='secrets/realbot_{}_clientcred.secret'.format(instance), scopes=['read:accounts', 'read:statuses','write:media','write:statuses'] ) except Exception as e: logging.warning(e) await message.reply(f'创建应用失败:{str(e)}\n请确保实例域名正确并且实例支持 Mastodon API。') return mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret') auth_url = mastodon.auth_request_url() else: # 如果是 Misskey 实例,使用不同的创建应用方式 try: if not check_secrets_folder_exists(): os.mkdir('secrets') auth_url = f'https://{instance}/miauth/{session_id}?name=realbot&permission=read:account,write:notes,write:drive' except Exception as e: logging.warning(e) await message.reply(f'创建应用失败:{str(e)}\n请确保实例域名正确并且实例支持 Misskey API。') return await message.reply('请在浏览器中打开链接进行身份验证:\n{}\n验证完成后,请用得到的 token 回复这条消息。\n注意:对于使用 Akkoma 的用户,可能需要验证两次。对于使用 misskey 的用户,请任意输入文本。'.format(auth_url)) # 在发送消息后设置状态 await state.update_data(instance=instance,session=session_id) await state.set_state(AuthStates.waiting_for_token) # 创建处理回复的 handler @router.message(AuthStates.waiting_for_token) async def handle_token_reply(message: Message, state: FSMContext): data = await state.get_data() instance = data.get('instance') session_id = data.get('session') if not await instance_is_misskey(instance): mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret') status = await message.reply('正在处理身份验证,请稍候...') await handle_token(instance,mastodon,message) await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。') else: status = await message.reply('正在处理身份验证,请稍候...') misskey_check_url = f'https://{instance}/api/miauth/{session_id}/check' logging.debug(misskey_check_url) async with aiohttp.ClientSession() as client: async with client.post(misskey_check_url, data="", headers={"Accept":"*/*"},allow_redirects=False) as r: if r.status == 200: data = await r.json() if data['token']: # 保存用户凭据 with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'w') as f: f.write(data['token']) await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。') else: await status.edit_text('身份验证失败,请确保实例域名正确并且实例支持 Misskey API。\n以下的信息可能有助于诊断问题:\n{}'.format(await r.text())) # 清除状态 await state.clear() async def handle_post_to_fedi(message: Message): """ 处理发布到联邦网络的消息 """ if not config.is_feature_enabled('fedi', message.chat.id): return if not message.reply_to_message: await message.reply('请回复要发布的消息') return user_id = message.from_user.id # 查找用户绑定的实例 import glob user_cred_pattern = f'secrets/realbot_*_{user_id}_usercred.secret' user_cred_files = glob.glob(user_cred_pattern) if not user_cred_files: await message.reply('请先使用 /fauth 命令进行身份验证') return arguments = message.text.replace('/post', '', 1).strip().split(' ') specified_instance = None visibility = None if len(arguments) >= 1 and arguments[0]: # 检查第一个参数是否是实例名 first_arg = arguments[0] # 检查是否存在对应的凭据文件 matching_files = [f for f in user_cred_files if first_arg in f] if matching_files: specified_instance = first_arg if len(arguments) >= 2: visibility = arguments[1] else: visibility = arguments[0] # 如果指定了实例,使用指定的实例 if specified_instance: cred_file = next(f for f in user_cred_files if specified_instance in f) filename = os.path.basename(cred_file) parts = filename.split('_') instance = '_'.join(parts[1:-2]) else: # 如果有多个实例,让用户选择 if len(user_cred_files) > 1: from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton # 提取所有实例名 instances = [] for cred_file in user_cred_files: filename = os.path.basename(cred_file) parts = filename.split('_') instance_name = '_'.join(parts[1:-2]) instances.append(instance_name) # 创建选择按钮 keyboard = [] for instance_name in instances: keyboard.append([InlineKeyboardButton( text=f"{instance_name}", callback_data=f"post:{instance_name}:" )]) # 添加全部发送选项 keyboard.append([InlineKeyboardButton( text="发送到所有实例", callback_data="post_instance:all" )]) reply_markup = InlineKeyboardMarkup(inline_keyboard=keyboard) await message.reply("请选择要发送到的实例:\n(实验性,目前还不能使用)", reply_markup=reply_markup) return else: # 只有一个实例,直接使用 cred_file = user_cred_files[0] filename = os.path.basename(cred_file) parts = filename.split('_') instance = '_'.join(parts[1:-2]) mastodon = Mastodon( access_token=f'{cred_file}', api_base_url=f'https://{instance}' ) # 发布消息到联邦网络 try: status_message = await message.reply('尝试发布消息到联邦网络...') is_misskey = await instance_is_misskey(instance) # 处理图片附件 media_ids = [] if message.reply_to_message.photo: await status_message.edit_text('正在处理图片附件...') # 获取最大尺寸的图片 photo = message.reply_to_message.photo[-1] file_info = await message.bot.get_file(photo.file_id) file_data = await message.bot.download_file(file_info.file_path) if not is_misskey: # 上传图片到Mastodon media = mastodon.media_post(file_data, mime_type='image/png') media_ids.append(media['id']) else: misskey_upload_drive_url = f"https://{instance}/api/drive/files/create" token = '' with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'r') as f: token = f.read() file_info = await message.bot.get_file(photo.file_id) file_data = await message.bot.download_file(file_info.file_path) data = aiohttp.FormData() data.add_field('i',token) data.add_field('file', file_data, filename=f"{photo.file_id}.png", content_type='image/png') #if photo.has_media_spoiler: # data.add_field('isSensitive', True) data.add_field('name', f"tg_{photo.file_id}.png") async with aiohttp.ClientSession() as client: async with client.post(misskey_upload_drive_url, allow_redirects=False, data=data) as r: if r.status == 200: media_info = await r.json() media_ids.append(media_info['id']) else: await status_message.reply(f'上传图片失败,但是仍然可以发布这条消息的文本部分。\n错误信息: {r.status} {await r.text()}') # 如果有图片附件而且图片有 caption,优先使用图片的 caption 作为文本 text = message.reply_to_message.caption or message.reply_to_message.text if not is_misskey: status = mastodon.status_post( text, media_ids=media_ids if media_ids else None, visibility = visibility ) status_url = status['url'] await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}') else: # 对于 Misskey 实例,使用不同的发布方式 misskey_create_status_url = f'https://{instance}/api/notes/create' token = '' with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'r') as f: token = f.read() data = {} if token and media_ids: data = json.dumps({ "visibility": visibility if visibility else "public", "text": text, "fileIds": media_ids }) elif token: data = json.dumps({ "visibility": visibility if visibility else "public", "text": text }) created_note = {} async with aiohttp.ClientSession() as client: async with client.post(misskey_create_status_url, allow_redirects=False, data=data,headers={"Authorization": f"Bearer {token}","Content-Type": "application/json"}) as r: if r.status == 200: created_note = await r.json() else: await status_message.edit_text(f'发布失败: {r.status} - {await r.text()}') return status_url = f'https://{instance}/notes/{created_note["createdNote"]["id"]}' await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}') except Exception as e: logging.warning('Error posting to fedi:', exc_info=e) await status_message.edit_text(f'发布失败: {str(e)}') @router.callback_query(lambda c: c.data.startswith('post:')) async def handle_instance_selection(callback: CallbackQuery): """处理实例选择回调""" await callback.answer() data_parts = callback.data.split(':') selected_instance = data_parts[1] # 获取原始回复消息 # 获取原始 /post 命令消息,然后获取它回复的消息 post_command_message = callback.message.reply_to_message original_message = post_command_message.reply_to_message if post_command_message else None if not original_message: await callback.message.edit_text("错误:找不到原始消息") return user_id = callback.from_user.id mastodon = Mastodon( access_token=f'secrets/realbot_{selected_instance}_{user_id}_usercred.secret', api_base_url=f'https://{selected_instance}' ) # 发布消息到联邦网络 try: status_message = await original_message.reply('尝试发布消息到联邦网络...') # 处理图片附件 media_ids = [] cb_arguments = original_message.text.replace('/post', '', 1).strip().split(' ') if not original_message.reply_to_message: await status_message.edit_text("错误:找不到要发布的消息") return if original_message.reply_to_message.photo: await status_message.edit_text('正在处理图片附件...') # 获取最大尺寸的图片 photo = original_message.reply_to_message.photo[-1] file_info = await callback.message.bot.get_file(photo.file_id) file_data = await callback.message.bot.download_file(file_info.file_path) # 上传图片到Mastodon media = mastodon.media_post(file_data, mime_type='image/png') media_ids.append(media['id']) text = original_message.reply_to_message.text if media_ids: text = original_message.reply_to_message.caption # 发布消息 status = mastodon.status_post( text, media_ids=media_ids if media_ids else None, visibility = cb_arguments[0] if len(cb_arguments) == 1 else None # 默认为账户默认可见性 ) status_url = status['url'] await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}') await callback.message.delete() except Exception as e: await status_message.edit_text(f'发布失败: {str(e)}')