Compare commits

..

No commits in common. "ac67cdef7ddbd214fd1459a2fceeb966815dd7d3" and "6e50dc4364026325841cdb9dad9daa2151a5ac3e" have entirely different histories.

5 changed files with 37 additions and 175 deletions

View file

@ -2,18 +2,14 @@ from aiogram.types import Message
from config import config
import logging
async def handle_actions(message: Message) -> None:
if not config.is_feature_enabled('actions', message.chat.id):
logging.debug(f"收到了命中 / 开头的的消息,但是 actions 功能未启用,跳过处理")
return
rawtext = message.text
logging.debug(f"收到了命中 / 开头的消息: {rawtext}")
# 防止识别成命令而被误触发
import re
if re.match("^[a-zA-Z]+$", rawtext.replace('/','',1)) or '@' in rawtext:
logging.debug(f"{rawtext} 看起来是一条命令,跳过处理")
if rawtext.replace('/','',1).isascii() or '@' in rawtext:
return
from_user = message.from_user.mention_html(message.sender_chat.title) if message.sender_chat else message.from_user.mention_html()
@ -29,7 +25,5 @@ async def handle_reverse_actions(message: Message) -> None:
from_user = message.from_user.mention_html(message.sender_chat.title) if message.sender_chat else message.from_user.mention_html()
replied_user = message.reply_to_message.from_user.mention_html(message.reply_to_message.sender_chat.title) if message.reply_to_message and message.reply_to_message.sender_chat else message.reply_to_message.from_user.mention_html()
if not config.is_feature_enabled('actions', message.chat.id):
logging.debug(f"收到了命中 \\ 开头的的消息,但是 actions 功能未启用,跳过处理")
return
logging.debug(f"收到了命中 \\ 开头的消息: {message.text}")
await message.reply(f"{from_user}{replied_user if message.reply_to_message else '自己'} {message.text.replace('\\','')}了!",disable_web_page_preview=True)

View file

@ -1,5 +1,3 @@
import logging
import aiohttp
import re
import html
@ -183,7 +181,6 @@ def transform_into_fixed_url(url):
return url
async def process_url(url):
logging.debug('发现链接,正在尝试清理')
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(url)
@ -236,7 +233,3 @@ async def handle_links(message: Message):
f"这个功能是试验性的,可能会出现链接无法访问等问题,如果出现链接没有清理干净的情况,"
f"可以将返回的结果再次发送给bot或者尝试手动清理。\n如果你找到了这个工具的问题,欢迎"
f"把它通过 `/report_broken_links 链接 需要去除的参数等等` 报告给开发者!")
else:
no_link_message = await message.reply("我没有发现可以处理的链接。\n此消息将会在 5 秒后被删除。", disable_web_page_preview=True)
await asyncio.sleep(5)
await no_link_message.delete()

View file

@ -1,10 +1,8 @@
import logging
import random
from aiogram import BaseMiddleware, Router
from aiogram import BaseMiddleware
from aiogram.types import Message
from typing import Dict, Optional, Callable, Awaitable, Any
router = Router()
class RikkiMiddleware(BaseMiddleware):
def __init__(self, target_user_id: str = "5545347637"):
@ -73,11 +71,10 @@ class RikkiMiddleware(BaseMiddleware):
if event.chat.type in ['group', 'supergroup'] and user_id == self.target_user_id:
# 更新几率
self.update_probability(user_id, event.text)
logging.debug("当前欠打的几率是{}".format(self.get_user_probability(user_id)))
if event.text and event.text.startswith('/打') and event.reply_to_message and str(event.reply_to_message.from_user.id) == self.target_user_id:
self.update_probability(user_id, event.text, hit_by_others=True)
logging.debug("当前欠打的几率是{}".format(self.get_user_probability(user_id)))
if self.get_user_probability(user_id) >= 80.0:
await event.reply("泥欠打了")

View file

@ -1,14 +1,7 @@
import json
import logging
import os
import uuid
import aiohttp
from aiogram.types import Message, CallbackQuery
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram import Router
from requests import session
from config import config
from mastodon import Mastodon
@ -18,12 +11,6 @@ router = Router()
class AuthStates(StatesGroup):
waiting_for_token = State()
def check_secrets_folder_exists() -> bool:
"""
检查 secrets 文件夹是否存在
"""
return os.path.exists('secrets') and os.path.isdir('secrets')
def check_client_cred_exists(instance: str) -> bool:
"""
检查实例的凭据文件是否存在
@ -44,22 +31,6 @@ def check_user_cred_exists(instance: str, userid: int) -> bool:
except FileNotFoundError:
return False
async def instance_is_misskey(instance: str) -> bool:
"""
检查实例是否是 Misskey 实例
"""
try:
async with aiohttp.ClientSession() as client:
async with client.post(f"https://{instance}/api/v1/instance", headers={"Content-Type": "application/json"},
allow_redirects=False) as r:
if r.status != 200:
return True
else:
return False # 如果没有异常,则不是 Misskey 实例
except Exception as e:
logging.debug(f"检查实例 {instance} 是否为 Misskey 时发生错误: {e}")
return True # 如果发生异常,则认为是 Misskey 实例
async def handle_token(instance,mastodon, message: Message):
mastodon.log_in(
code=message.text,
@ -80,40 +51,18 @@ async def handle_auth(message: Message, state: FSMContext):
if instance == '':
await message.reply('请输入实例域名,例如:`example.com`')
return
auth_url = ''
session_id = uuid.uuid4()
if not check_client_cred_exists(instance):
if not await instance_is_misskey(instance):
# 如果是 Misskey 实例,使用不同的创建应用方式
try:
if not check_secrets_folder_exists():
os.mkdir('secrets')
Mastodon.create_app(
'realbot',
api_base_url='https://{}'.format(instance),
to_file='secrets/realbot_{}_clientcred.secret'.format(instance),
scopes=['read:accounts', 'read:statuses','write:media','write:statuses']
)
except Exception as e:
logging.warning(e)
await message.reply(f'创建应用失败:{str(e)}\n请确保实例域名正确并且实例支持 Mastodon API。')
return
mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret')
auth_url = mastodon.auth_request_url()
else:
# 如果是 Misskey 实例,使用不同的创建应用方式
try:
if not check_secrets_folder_exists():
os.mkdir('secrets')
auth_url = f'https://{instance}/miauth/{session_id}?name=realbot&permission=read:account,write:notes,write:drive'
except Exception as e:
logging.warning(e)
await message.reply(f'创建应用失败:{str(e)}\n请确保实例域名正确并且实例支持 Misskey API。')
return
await message.reply('请在浏览器中打开链接进行身份验证:\n{}\n验证完成后,请用得到的 token 回复这条消息。\n注意:对于使用 Akkoma 的用户,可能需要验证两次。对于使用 misskey 的用户,请任意输入文本。'.format(auth_url))
Mastodon.create_app(
'realbot',
api_base_url='https://{}'.format(instance),
to_file='secrets/realbot_{}_clientcred.secret'.format(instance),
scopes=['read:accounts', 'read:statuses','write:media','write:statuses']
)
mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret', )
auth_url = mastodon.auth_request_url()
await message.reply('请在浏览器中打开链接进行身份验证:\n{}\n验证完成后,请用得到的 token 回复这条消息'.format(auth_url))
# 在发送消息后设置状态
await state.update_data(instance=instance,session=session_id)
await state.update_data(instance=instance)
await state.set_state(AuthStates.waiting_for_token)
# 创建处理回复的 handler
@ -121,27 +70,10 @@ async def handle_auth(message: Message, state: FSMContext):
async def handle_token_reply(message: Message, state: FSMContext):
data = await state.get_data()
instance = data.get('instance')
session_id = data.get('session')
if not await instance_is_misskey(instance):
mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret')
status = await message.reply('正在处理身份验证,请稍候...')
await handle_token(instance,mastodon,message)
await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。')
else:
status = await message.reply('正在处理身份验证,请稍候...')
misskey_check_url = f'https://{instance}/api/miauth/{session_id}/check'
logging.debug(misskey_check_url)
async with aiohttp.ClientSession() as client:
async with client.post(misskey_check_url, data="", headers={"Accept":"*/*"},allow_redirects=False) as r:
if r.status == 200:
data = await r.json()
if data['token']:
# 保存用户凭据
with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'w') as f:
f.write(data['token'])
await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。')
else:
await status.edit_text('身份验证失败,请确保实例域名正确并且实例支持 Misskey API。\n以下的信息可能有助于诊断问题:\n{}'.format(await r.text()))
mastodon = Mastodon(client_id=f'secrets/realbot_{instance}_clientcred.secret')
status = await message.reply('正在处理身份验证,请稍候...')
await handle_token(instance,mastodon,message)
await status.edit_text('身份验证成功!\n现在你可以使用 /post 命令将消息发布到联邦网络。')
# 清除状态
await state.clear()
@ -158,6 +90,7 @@ async def handle_post_to_fedi(message: Message):
user_id = message.from_user.id
# 查找用户绑定的实例
import os
import glob
user_cred_pattern = f'secrets/realbot_*_{user_id}_usercred.secret'
@ -232,7 +165,6 @@ async def handle_post_to_fedi(message: Message):
# 发布消息到联邦网络
try:
status_message = await message.reply('尝试发布消息到联邦网络...')
is_misskey = await instance_is_misskey(instance)
# 处理图片附件
media_ids = []
if message.reply_to_message.photo:
@ -241,73 +173,23 @@ async def handle_post_to_fedi(message: Message):
photo = message.reply_to_message.photo[-1]
file_info = await message.bot.get_file(photo.file_id)
file_data = await message.bot.download_file(file_info.file_path)
if not is_misskey:
# 上传图片到Mastodon
media = mastodon.media_post(file_data, mime_type='image/png')
media_ids.append(media['id'])
else:
misskey_upload_drive_url = f"https://{instance}/api/drive/files/create"
token = ''
with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'r') as f:
token = f.read()
file_info = await message.bot.get_file(photo.file_id)
file_data = await message.bot.download_file(file_info.file_path)
data = aiohttp.FormData()
data.add_field('i',token)
data.add_field('file', file_data, filename=f"{photo.file_id}.png", content_type='image/png')
#if photo.has_media_spoiler:
# data.add_field('isSensitive', True)
data.add_field('name', f"tg_{photo.file_id}.png")
async with aiohttp.ClientSession() as client:
async with client.post(misskey_upload_drive_url, allow_redirects=False, data=data) as r:
if r.status == 200:
media_info = await r.json()
media_ids.append(media_info['id'])
else:
await status_message.reply(f'上传图片失败,但是仍然可以发布这条消息的文本部分。\n错误信息: {r.status} {await r.text()}')
# 如果有图片附件而且图片有 caption优先使用图片的 caption 作为文本
text = message.reply_to_message.caption or message.reply_to_message.text
if not is_misskey:
status = mastodon.status_post(
text,
media_ids=media_ids if media_ids else None,
visibility = visibility
)
status_url = status['url']
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
else:
# 对于 Misskey 实例,使用不同的发布方式
misskey_create_status_url = f'https://{instance}/api/notes/create'
token = ''
with open(f'secrets/realbot_{instance}_{message.from_user.id}_usercred.secret', 'r') as f:
token = f.read()
data = {}
if token and media_ids:
data = json.dumps({
"visibility": visibility if visibility else "public",
"text": text,
"fileIds": media_ids
})
elif token:
data = json.dumps({
"visibility": visibility if visibility else "public",
"text": text
})
created_note = {}
async with aiohttp.ClientSession() as client:
async with client.post(misskey_create_status_url, allow_redirects=False, data=data,headers={"Authorization": f"Bearer {token}","Content-Type": "application/json"}) as r:
if r.status == 200:
created_note = await r.json()
else:
await status_message.edit_text(f'发布失败: {r.status} - {await r.text()}')
return
status_url = f'https://{instance}/notes/{created_note["createdNote"]["id"]}'
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
# 上传图片到Mastodon
media = mastodon.media_post(file_data, mime_type='image/png')
media_ids.append(media['id'])
text = message.reply_to_message.text
if media_ids:
text = message.reply_to_message.caption
# 发布消息
status = mastodon.status_post(
text,
media_ids=media_ids if media_ids else None,
visibility = visibility
)
status_url = status['url']
await status_message.edit_text(f'消息已成功发布到联邦网络!\n{status_url}')
except Exception as e:
logging.warning('Error posting to fedi:', exc_info=e)
await status_message.edit_text(f'发布失败: {str(e)}')
await message.reply(f'发布失败: {str(e)}')
@router.callback_query(lambda c: c.data.startswith('post:'))

View file

@ -1,5 +1,3 @@
import logging
from aiogram.types import Message
from config import config
@ -8,7 +6,6 @@ from config import config
async def handle_unpin_channel_message(message: Message):
"""Handle unpinning messages from linked channels without a specific hashtag"""
if not config.is_feature_enabled('unpin', message.chat.id):
logging.debug('发现了频道试图置顶消息,但未启用 unpin 功能,跳过处理')
return
try:
regex_pattern = config.get_feature_config('unpin', message.chat.id)['regex']
@ -16,11 +13,10 @@ async def handle_unpin_channel_message(message: Message):
if regex_pattern:
import re
if re.search(regex_pattern, message.text or message.caption or ""):
logging.debug(f"发现了频道试图置顶消息,但消息匹配了正则表达式{regex_pattern},跳过取消置顶")
# Message matches regex, don't unpin
return
# Either no regex pattern or message doesn't match, proceed to unpin
logging.debug('正在尝试取消频道消息的置顶')
print("trying to unpin the message")
await message.unpin()
except Exception as e:
logging.error('Error unpinning message:', e)
print('Error unpinning message:', e)