diff --git a/adapters/matrix.py b/adapters/matrix.py index f4c260e..e69de29 100644 --- a/adapters/matrix.py +++ b/adapters/matrix.py @@ -1,154 +0,0 @@ -import asyncio -import json -import logging -import os - -from nio import AsyncClient, MatrixRoom, RoomMessageText -from nio.events.room_events import RoomMessageText -from typing import Dict, Callable - -import config - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class MatrixAdapter: - def __init__(self, homeserver: str, user_id: str, token: str, device_name: str = "MatrixBot"): - """ - Initialize the Matrix bot. - - Args: - homeserver: The Matrix homeserver URL - user_id: The bot's Matrix user ID - token: The bot's token - device_name: Device name for the bot - """ - self.homeserver = homeserver - self.user_id = user_id - self.token = token - self.device_name = device_name - self.client = AsyncClient(homeserver, user_id) - self.commands: Dict[str, Callable] = {} - - # Register event handlers - self.client.add_event_callback(self.message_callback, RoomMessageText) - - def add_command(self, command: str, handler: Callable): - """Add a command handler.""" - self.commands[command.lower()] = handler - - async def message_callback(self, room: MatrixRoom, event: RoomMessageText): - """Handle incoming messages.""" - # Ignore messages from the bot itself - if event.sender == self.user_id: - return - - # Check if message starts with command prefix - message = event.body.strip() - if not message.startswith('!'): - return - - # Parse command and arguments - parts = message[1:].split(' ', 1) - command = parts[0].lower() - args = parts[1] if len(parts) > 1 else "" - - # Execute command if it exists - if command in self.commands: - try: - response = await self.commands[command](room, event, args) - if response: - await self.send_message(room.room_id, response) - except Exception as e: - logger.error(f"Error executing command {command}: {e}") - await self.send_message(room.room_id, f"Error executing command: {str(e)}") - else: - await self.send_message(room.room_id, f"Unknown command: {command}") - - async def send_message(self, room_id: str, message: str): - """Send a message to a room.""" - await self.client.room_send( - room_id=room_id, - message_type="m.room.message", - content={ - "msgtype": "m.text", - "body": message - } - ) - - async def start(self): - """Start the bot.""" - logger.info("Starting Matrix bot...") - - # Login - #response = await self.client.login(token=self.token, device_name=self.device_name) - self.client.access_token = self.token - self.client.device_id = "REALBOT" - - logger.info(f"Logged in as {self.user_id}") - - # Sync and listen for events - await self.client.sync_forever(timeout=30000) - - async def stop(self): - """Stop the bot.""" - logger.info("Stopping Matrix bot...") - await self.client.logout() - await self.client.close() - - -# Example command handlers -async def hello_command(room: MatrixRoom, event: RoomMessageText, args: str) -> str: - """Handle !hello command.""" - return f"Hello {event.sender}!" - - -async def echo_command(room: MatrixRoom, event: RoomMessageText, args: str) -> str: - """Handle !echo command.""" - if not args: - return "Usage: !echo " - return f"Echo: {args}" - - -async def help_command(room: MatrixRoom, event: RoomMessageText, args: str) -> str: - """Handle !help command.""" - help_text = """ -Available commands: -- !hello - Say hello -- !echo - Echo a message -- !help - Show this help message - """ - return help_text.strip() - - -async def main(): - """Main function to run the bot.""" - # 从环境变量或配置文件中获取值 - matrix_config = config.Config().get_config_value('matrix', {}) - homeserver = matrix_config.get('homeserver', "https://matrix.org") - user_id = matrix_config.get('user_id') - token = os.getenv("MATRIX_BOT_TOKEN") - - # Create bot instance - bot = MatrixAdapter(homeserver, user_id, token) - - # Register commands - bot.add_command("hello", hello_command) - bot.add_command("echo", echo_command) - bot.add_command("help", help_command) - - try: - print("Starting Matrix bot...") - await bot.start() - except KeyboardInterrupt: - logger.info("Bot interrupted by user") - except Exception as e: - logger.error(f"Bot error: {e}") - finally: - await bot.stop() - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/config.example.yaml b/config.example.yaml index 17242bd..e6d17a5 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -6,14 +6,6 @@ admin: 123456789 # 这部分仅限于汇报链接跟踪参数去除的问题 dev: 616760897 -start_telegram_bot: true -also_start_matrix_bot: false - -matrix: - homeserver: "https://matrix.org" - user_id: "@your_bot:matrix.org" - # Token 请使用登录后生成的 Access Token,一般可以在客户端的设置页面找到,然后设置到 MATRIX_BOT_TOKEN 环境变量中 - # 如果你想使用一个新的 token,可以使用 helpers/matrix_login.py 脚本生成一个新的 token # global features settings diff --git a/config.py b/config.py index a4d312c..1b4b184 100644 --- a/config.py +++ b/config.py @@ -27,28 +27,6 @@ class Config: """Get developer user ID""" return self.config_data.get('dev') - def get_config_value(self, key: str, default: Any = None) -> Any: - """ - Get a configuration value by key - - Args: - key: Configuration key to retrieve - default: Default value if key is not found - - Returns: - The configuration value or default if not found - """ - keys = key.split('.') - value = self.config_data - - for k in keys: - if isinstance(value, dict) and k in value: - value = value[k] - else: - return default - - return value - def is_feature_enabled(self, feature_name: str, chat_id: Optional[int] = None) -> bool: """ Check if a feature is enabled for a specific chat or globally diff --git a/core/inline.py b/core/inline.py index 6e62bb7..9c48751 100644 --- a/core/inline.py +++ b/core/inline.py @@ -133,58 +133,6 @@ async def handle_inline_query(query: InlineQuery): ), ], cache_time=0) return - # 如果查询以 "是什么歌" 结尾,则尝试根据关键词获取歌曲名称 - if query_text.endswith("是什么歌"): - keywords = query_text[:-4].strip() - from helpers.songs import get_song_by_partial_match, get_song_link - # 尝试根据关键词获取歌曲名称 - song_name = get_song_by_partial_match(keywords) - song_link = get_song_link(song_name) if song_name else None - if song_name: - await query.answer(results=[ - InlineQueryResultArticle( - id="1", - title=f"我感觉你应该在找 {song_name}", - input_message_content=InputTextMessageContent( - message_text=f"你是不是在找:{song_name}\n{song_link}\n如果不是,可能你需要[在网络上搜索](https://search.bilibili.com/all?keyword={keywords})", - parse_mode=ParseMode.MARKDOWN - ), - description=f"根据关键词 '{keywords}' 找到的歌曲" - ) - ], cache_time=0) - return - else: - from helpers.songs import fetch_from_b23_api - # 如果没有在本地找到歌曲,则尝试从 Bilibili API 获取 - result = await fetch_from_b23_api(keywords) - if result: - song_name, song_link = result - await query.answer(results=[ - InlineQueryResultArticle( - id="1", - title=f"我感觉你应该在找 {song_name}", - input_message_content=InputTextMessageContent( - message_text=f"你是不是在找:{song_name}\n{song_link}\n如果不是,可能你需要[在网络上搜索](https://search.bilibili.com/all?keyword={keywords})", - parse_mode=ParseMode.MARKDOWN - ), - description=f"根据关键词 '{keywords}' 找到的歌曲" - ) - ], cache_time=0) - return - # 如果还是没有找到,则返回一个默认的结果 - else: - await query.answer(results=[ - InlineQueryResultArticle( - id="1", - title=f"抱歉,数据库中没有搜索到 '{keywords}' 的歌曲", - input_message_content=InputTextMessageContent( - message_text=f"可能你需要[在网络上搜索](https://search.bilibili.com/all?keyword={keywords})", - parse_mode=ParseMode.MARKDOWN - ), - description=f"或许你应该尝试在网上搜索" - ) - ], cache_time=0) - return # 如果没有匹配到任何内容,则返回一个默认的结果 await query.answer(results=[ InlineQueryResultArticle( diff --git a/core/repeater.py b/core/repeater.py index 1ea6178..c056aa6 100644 --- a/core/repeater.py +++ b/core/repeater.py @@ -41,11 +41,7 @@ class MessageRepeater: content not in self.repeated_messages[chat_id]): # Mark as repeated and send the message self.repeated_messages[chat_id].add(content) - # if the message replies to another message, copy it with the reply_to_message_id - if message.reply_to_message: - await message.copy_to(chat_id, reply_to_message_id=message.reply_to_message.message_id) - else: - await message.copy_to(chat_id) + await message.copy_to(chat_id) self.last_messages[chat_id] = content diff --git a/helpers/matrix_login.py b/helpers/matrix_login.py deleted file mode 100644 index 5241645..0000000 --- a/helpers/matrix_login.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python3 -import asyncio -import getpass -import json -import os -import sys -import aiofiles -from nio import AsyncClient, LoginResponse - -CONFIG_FILE = "credentials.json" -# Check out main() below to see how it's done. -def write_details_to_disk(resp: LoginResponse, homeserver) -> None: - """Writes the required login details to disk so we can log in later without - using a password. - - Arguments: - resp {LoginResponse} -- the successful client login response. - homeserver -- URL of homeserver, e.g. "https://matrix.example.org" - """ - # open the config file in write-mode - with open(CONFIG_FILE, "w") as f: - # write the login details to disk - json.dump( - { - "homeserver": homeserver, # e.g. "https://matrix.example.org" - "user_id": resp.user_id, # e.g. "@user:example.org" - "device_id": resp.device_id, # device ID, 10 uppercase letters - "access_token": resp.access_token, # cryptogr. access token - }, - f, - ) - -async def main() -> None: - # If there are no previously-saved credentials, we'll use the password - if not os.path.exists(CONFIG_FILE): - print( - "First time use. Did not find credential file. Asking for " - "homeserver, user, and password to create credential file." - ) - homeserver = "https://matrix.example.org" - homeserver = input(f"Enter your homeserver URL: [{homeserver}] ") - if not (homeserver.startswith("https://") or homeserver.startswith("http://")): - homeserver = "https://" + homeserver - user_id = "@user:example.org" - user_id = input(f"Enter your full user ID: [{user_id}] ") - device_name = "matrix-nio" - device_name = input(f"Choose a name for this device: [{device_name}] ") - client = AsyncClient(homeserver, user_id) - pw = getpass.getpass() - resp = await client.login(pw, device_name=device_name) - # check that we logged in successfully - if isinstance(resp, LoginResponse): - write_details_to_disk(resp, homeserver) - else: - print(f'homeserver = "{homeserver}"; user = "{user_id}"') - print(f"Failed to log in: {resp}") - sys.exit(1) - print( - "登录成功,凭据已经保存到 " + CONFIG_FILE + " 文件中" - ) - # Otherwise the config file exists, so we'll use the stored credentials - else: - print("你已经登录过了") - -asyncio.run(main()) diff --git a/helpers/songs.py b/helpers/songs.py deleted file mode 100644 index edf7259..0000000 --- a/helpers/songs.py +++ /dev/null @@ -1,56 +0,0 @@ -# 一个暂时性的办法用来存储歌曲信息 -import aiohttp - -songs = { - "将军的小曲,三太阳的小曲": "你若三冬 - 阿悠悠", - "全斗焕的小曲,光州跑男的小曲,打成一片的小曲,无限制格斗的小曲,重拳的小曲,光州的小曲": "Shake and Sway", - "牛姐的养老保险,美国版难忘今宵,圣诞要你命": "All I Want for Christmas Is You - Mariah Carey", -} - -song_links = { - "你若三冬 - 阿悠悠": "https://www.bilibili.com/video/BV1wAdhYBEVg", - "Shake and Sway": "https://www.bilibili.com/video/av113101403850151", - "All I Want for Christmas Is You - Mariah Carey": "https://www.bilibili.com/video/BV1VJ411b7ah", -} - -def get_song_name(key): - """根据关键词获取歌曲名称""" - return songs.get(key) - -def get_song_link(key): - """根据歌曲名称获取歌曲链接""" - return song_links.get(key) - -def get_song_by_partial_match(partial_key): - """根据部分匹配获取歌曲名称""" - for key, value in songs.items(): - if partial_key in key: - return value - return None - -async def fetch_from_b23_api(song_name): - """从 Bilibili API 获取歌曲信息""" - resp = None - async with aiohttp.ClientSession() as session: - # 先访问 bilibili.com 获取 cookies - async with session.get('https://bilibili.com', headers={"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0"}) as response: - pass - - # 使用获取的 cookies 请求搜索 API - params = {'keyword': song_name} - async with session.get( - 'https://api.bilibili.com/x/web-interface/search/all/v2', - params=params, - headers={ - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0" - } - ) as response: - resp = await response.json() - if resp and resp.get('data'): - # 假设我们只取第一个视频的结果 - videos = next((item for item in resp['data']['result'] if item.get('result_type') == 'video'), None) - first_result = videos['data'][0] - title = first_result.get('title').replace('', '').replace('', '') # 清理标题中的 HTML 标签 - link = first_result.get('arcurl') - return title, link - return None \ No newline at end of file diff --git a/main.py b/main.py index 3531f7c..ad5bfa1 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,6 @@ import asyncio import logging import sys -import config from adapters.tg import TelegramAdapter @@ -10,19 +9,9 @@ async def main(): """Main entry point""" logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) - tasks = [] # Initialize and start Telegram adapter - if config.Config().get_config_value('start_telegram_bot', True): - tg_adapter = TelegramAdapter() - tasks.append(tg_adapter.start()) - if config.Config().get_config_value('also_start_matrix_bot', False): - import adapters.matrix as matrix_bot - # Initialize and start Matrix bot if configured - tasks.append(matrix_bot.main()) - if tasks: - await asyncio.gather(*tasks, return_exceptions=True) - else: - logging.error("No bot is configured to start. Please check your configuration.") + tg_adapter = TelegramAdapter() + await tg_adapter.start() if __name__ == "__main__":