Compare commits

...

2 commits

4 changed files with 91 additions and 12 deletions

1
.gitignore vendored
View file

@ -3,3 +3,4 @@
secrets/
message_stats.json
config.yaml
rikki_data.json

View file

@ -15,7 +15,7 @@ whitelist_param_links = ['www.iesdouyin.com','item.taobao.com', 'detail.tmall.co
'www.bilibili.com', 'm.bilibili.com', 'bilibili.com', 'mall.bilibili.com',
'space.bilibili.com', 'live.bilibili.com','item.m.jd.com','item.jd.com','www.xiaohongshu.com']
has_self_redirection_links = ['www.cnbeta.com.tw','m.cnbeta.com.tw','www.landiannews.com']
has_self_redirection_links = ['www.cnbeta.com.tw','m.cnbeta.com.tw','www.landiannews.com', 'www.bilibili.com']
def matches_adb_selector(url, selector):
"""Check if URL matches the given selector"""
@ -187,7 +187,7 @@ def transform_into_fixed_url(url):
async def process_url(url):
logging.debug('发现链接,正在尝试清理')
if urlparse(url).hostname in has_self_redirection_links and not urlparse(url).params:
# 对于有自我纠正的重定向而且不携带任何跟踪参数的链接,直接返回
# 对于有自我纠正的重定向而且不携带任何参数的链接,直接返回
return None
# 对于适配的网站,直接保留白名单参数并返回
if urlparse(url).hostname in whitelist_param_links:
@ -202,12 +202,14 @@ async def process_url(url):
extended_url = await extend_short_urls(cleaned_url)
if urlparse(extended_url).hostname in ['chatglm.cn']:
final_url = reserve_whitelisted_params(extended_url)
if url != final_url:
return final_url
# 对于扩展短链接之后的适配的网站,直接保留白名单参数并返回
if urlparse(extended_url).hostname in whitelist_param_links:
final_url = reserve_whitelisted_params(extended_url)
if urlparse(final_url).hostname in ['www.iesdouyin.com','bilibili.com', 'm.bilibili.com']:
final_url = transform_into_fixed_url(final_url)
if url != final_url:
return final_url
if urlparse(extended_url).hostname in ['x.com', 'twitter.com']:
# 对于 Twitter 链接,转换为 fixupx.com
@ -232,10 +234,8 @@ async def handle_links(message: Message):
if not urls:
return
final_urls = await asyncio.gather(*[process_url(url) for url in urls])
# Filter out None values
final_urls = [url for url in final_urls if url is not None]
# 回复处理后的链接
if final_urls:
await message.reply(f"{"\n".join(final_urls)}\n消息里有包含跟踪参数的链接,已经帮你转换了哦~\n\n注意:"

View file

@ -1,5 +1,7 @@
import logging
import time
import os
import json
from collections import deque, defaultdict
from dataclasses import dataclass
@ -24,10 +26,29 @@ class UserMetrics:
neutral_count: int = 0 # 中性发言数(用于水群频率)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'cai_count': self.cai_count,
'xm_count': self.xm_count,
'nsfw_count': self.nsfw_count,
'antisocial_count': self.antisocial_count,
'total_count': self.total_count,
'neutral_count': self.neutral_count
}
@classmethod
def from_dict(cls, data: Dict) -> 'UserMetrics':
"""从字典创建实例"""
return cls(**data)
class RikkiMiddleware(BaseMiddleware):
def __init__(self, target_user_id: str = "5545347637"):
# 存储每个用户的触发几率初始值在40-50%之间
def __init__(self, target_user_id: str = "5545347637", data_file: str = "rikki_data.json"):
# 存储每个用户的触发几率
self.user_probabilities: Dict[str, float] = {}
self.data_file = data_file
# 触发关键词
self.xm_keywords = ["", "羡慕", "xm", "xmsl", "羡慕死了", "我菜"]
self.cai_keywords = ["", "菜了", "菜死了", "我菜", "废物"]
@ -58,6 +79,44 @@ class RikkiMiddleware(BaseMiddleware):
self.has_sent_warning: Dict[str, bool] = defaultdict(bool)
# 加载持久化数据
self.load_data()
def load_data(self) -> None:
"""从JSON文件加载数据"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 加载用户指标
if 'user_metrics' in data:
for user_id, metrics_data in data['user_metrics'].items():
self.user_metrics[user_id] = UserMetrics.from_dict(metrics_data)
# 加载警告状态
if 'has_sent_warning' in data:
self.has_sent_warning.update(data['has_sent_warning'])
except (json.JSONDecodeError, KeyError) as e:
logging.warning(f"加载数据文件失败: {e}")
def save_data(self,user_id:str = '5545347637') -> None:
"""保存数据到JSON文件"""
try:
data = {
'hit_prob': self.calculate_qianda_score(user_id),
'user_metrics': {user_id: metrics.to_dict()
for user_id, metrics in self.user_metrics.items()},
'has_sent_warning': dict(self.has_sent_warning)
}
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"保存数据文件失败: {e}")
def record_message(self, user_id: str) -> None:
"""记录用户发送消息的时间"""
current_time = time.time()
@ -189,6 +248,8 @@ class RikkiMiddleware(BaseMiddleware):
score = self.calculate_qianda_score(user_id)
logging.debug("当前欠打的几率是{}".format(score))
self.save_data()
return await handler(event, data)
def get_user_status(self, user_id: str) -> str:

View file

@ -1,6 +1,23 @@
import json
from aiogram.types import Message
from core.middleware.rikki import RikkiMiddleware
async def handle_query_hit_command(message: Message) -> None:
hit_status = RikkiMiddleware().get_user_status("5545347637")
await message.reply(hit_status)
hit_status = ''
with open('rikki_data.json', 'r', encoding='utf-8') as f:
hit_status = json.load(f)
_id = str(message.from_user.id)
user_data = hit_status['user_metrics'].get('5545347637', {
"cai_count": 0,
"xm_count": 0,
"nsfw_count": 0,
"antisocial_count": 0,
"total_count": 0,
"neutral_count": 0
})
hit_prob = hit_status.get('hit_prob', 0.0)
formatted_message = f"欠打度: {hit_prob:.2f}%\n卖菜: {user_data['cai_count']}, 羡慕: {user_data['xm_count']}, NSFW: {user_data['nsfw_count']}, 反社会: {user_data['antisocial_count']}, 中性: {user_data['neutral_count']}\n总发言: {user_data['total_count']}"
await message.reply(formatted_message)