Skip to content

Commit

Permalink
add statistics and plot
Browse files Browse the repository at this point in the history
  • Loading branch information
MeetWq committed Aug 13, 2024
1 parent 44919c0 commit 6030a71
Show file tree
Hide file tree
Showing 10 changed files with 937 additions and 27 deletions.
3 changes: 2 additions & 1 deletion nonebot_plugin_memes_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class MemeListImageConfig(BaseModel):
text_template: str = "{keywords}"
add_category_icon: bool = True
label_new_timedelta: timedelta = timedelta(days=30)
label_hot_frequency: int = 24
label_hot_threshold: int = 21
label_hot_days: int = 7


class Config(BaseModel):
Expand Down
1 change: 0 additions & 1 deletion nonebot_plugin_memes_api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def search(
meme_names = process.extract(
meme_name, self.__meme_names.keys(), limit=limit, score_cutoff=score_cutoff
)
logger.debug(meme_names)
result: dict[str, MemeInfo] = {}
for name, _, _ in meme_names:
for meme in self.__meme_names[name]:
Expand Down
2 changes: 1 addition & 1 deletion nonebot_plugin_memes_api/matchers/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from . import info, manage, search, help, command # noqa
from . import info, manage, search, help, command, statistics # noqa
11 changes: 7 additions & 4 deletions nonebot_plugin_memes_api/matchers/help.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import hashlib
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from itertools import chain

from nonebot_plugin_alconna import Image, Text, on_alconna
Expand Down Expand Up @@ -47,18 +47,21 @@ async def _(user_id: UserId, session: EventSession):
memes = sorted(memes, key=lambda meme: meme.date_modified, reverse=sort_reverse)

label_new_timedelta = list_image_config.label_new_timedelta
label_hot_frequency = list_image_config.label_hot_frequency
label_hot_threshold = list_image_config.label_hot_threshold
label_hot_days = list_image_config.label_hot_days

meme_generation_keys = await get_meme_generation_keys(
session, SessionIdType.GLOBAL, timedelta(days=1)
session,
SessionIdType.GLOBAL,
time_start=datetime.now(timezone.utc) - timedelta(days=label_hot_days),
)

meme_list: list[MemeKeyWithProperties] = []
for meme in memes:
labels = []
if datetime.now() - meme.date_created < label_new_timedelta:
labels.append("new")
if meme_generation_keys.count(meme.key) >= label_hot_frequency:
if meme_generation_keys.count(meme.key) >= label_hot_threshold:
labels.append("hot")
disabled = not meme_manager.check(user_id, meme.key)
meme_list.append(
Expand Down
218 changes: 218 additions & 0 deletions nonebot_plugin_memes_api/matchers/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from datetime import datetime, timedelta
from typing import Any, Optional, Union

from dateutil.relativedelta import relativedelta
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import (
Alconna,
AlconnaQuery,
Args,
Option,
Query,
UniMessage,
on_alconna,
store_true,
)
from nonebot_plugin_session import EventSession, SessionIdType

from ..plot import plot_duration_counts, plot_key_and_duration_counts
from ..recorder import get_meme_generation_records, get_meme_generation_times
from ..utils import add_timezone
from .utils import find_meme

statistics_matcher = on_alconna(
Alconna(
"表情调用统计",
Args["meme_name?", str],
Option("-g|--global", default=False, action=store_true, help_text="全局统计"),
Option("--my", default=False, action=store_true, help_text="我的"),
Option(
"-t|--type",
Args["type", ["day", "week", "month", "year", "24h", "7d", "30d", "1y"]],
help_text="统计类型",
),
),
aliases={"表情使用统计"},
block=True,
priority=11,
use_cmd_start=True,
)


def wrapper(
slot: Union[int, str], content: Optional[str], context: dict[str, Any]
) -> str:
if slot == "my" and content:
return "--my"
elif slot == "global" and content:
return "--global"
elif slot == "type" and content:
if content in ["日", "24小时", "1天"]:
return "--type 24h"
elif content in ["本日", "今日"]:
return "--type day"
elif content in ["周", "一周", "7天"]:
return "--type 7d"
elif content in ["本周"]:
return "--type week"
elif content in ["月", "30天"]:
return "--type 30d"
elif content in ["本月", "月度"]:
return "--type month"
elif content in ["年", "一年"]:
return "--type 1y"
elif content in ["本年", "年度"]:
return "--type year"
return ""


pattern_my = r"(?P<my>我的)"
pattern_type = r"(?P<type>日|24小时|1天|本日|今日|周|一周|7天|本周|月|30天|本月|月度|年|一年|本年|年度)" # noqa E501
pattern_global = r"(?P<global>全局)"
pattern_cmd = r"表情(?:调用|使用)统计"

statistics_matcher.shortcut(
rf"{pattern_my}{pattern_cmd}",
prefix=True,
wrapper=wrapper,
arguments=["{my}"],
).shortcut(
rf"{pattern_global}{pattern_cmd}",
prefix=True,
wrapper=wrapper,
arguments=["{global}"],
).shortcut(
rf"{pattern_my}{pattern_global}{pattern_cmd}",
prefix=True,
wrapper=wrapper,
arguments=["{my}", "{global}"],
).shortcut(
rf"{pattern_my}?{pattern_global}?{pattern_type}{pattern_cmd}",
prefix=True,
wrapper=wrapper,
arguments=["{my}", "{global}", "{type}"],
)


@statistics_matcher.handle()
async def _(
matcher: Matcher,
session: EventSession,
meme_name: Optional[str] = None,
query_global: Query[bool] = AlconnaQuery("global.value", False),
query_my: Query[bool] = AlconnaQuery("my.value", False),
query_type: Query[str] = AlconnaQuery("type", "24h"),
):
meme = await find_meme(matcher, meme_name) if meme_name else None

is_my = query_my.result
is_global = query_global.result
type = query_type.result

if is_my and is_global:
id_type = SessionIdType.USER
elif is_my:
id_type = SessionIdType.GROUP_USER
elif is_global:
id_type = SessionIdType.GLOBAL
else:
id_type = SessionIdType.GROUP

now = datetime.now().astimezone()
if type == "24h":
start = now - timedelta(days=1)
td = timedelta(hours=1)
fmt = "%H:%M"
humanized = "24小时"
elif type == "day":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
td = timedelta(hours=1)
fmt = "%H:%M"
humanized = "本日"
elif type == "7d":
start = now - timedelta(days=7)
td = timedelta(days=1)
fmt = "%m/%d"
humanized = "7天"
elif type == "week":
start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(
days=now.weekday()
)
td = timedelta(days=1)
fmt = "%a"
humanized = "本周"
elif type == "30d":
start = now - timedelta(days=30)
td = timedelta(days=1)
fmt = "%m/%d"
humanized = "30天"
elif type == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
td = timedelta(days=1)
fmt = "%m/%d"
humanized = "本月"
elif type == "1y":
start = now - relativedelta(years=1)
td = relativedelta(months=1)
fmt = "%y/%m"
humanized = "一年"
else:
start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
td = relativedelta(months=1)
fmt = "%b"
humanized = "本年"

if meme:
meme_times = await get_meme_generation_times(
session, id_type, meme_key=meme.key, time_start=start
)
meme_keys = [meme.key] * len(meme_times)
else:
meme_records = await get_meme_generation_records(
session, id_type, time_start=start
)
meme_times = [record.time for record in meme_records]
meme_keys = [record.meme_key for record in meme_records]

if not meme_times:
await matcher.finish("暂时没有表情调用记录")

meme_times = [add_timezone(time) for time in meme_times]
meme_times.sort()

def fmt_time(time: datetime) -> str:
if type in ["24h", "7d", "30d", "1y"]:
return (time + td).strftime(fmt)
return time.strftime(fmt)

duration_counts: dict[str, int] = {}
stop = start + td
count = 0
key = fmt_time(start)
for time in meme_times:
while time >= stop:
duration_counts[key] = count
key = fmt_time(stop)
stop += td
count = 0
count += 1
duration_counts[key] = count
while stop <= now:
key = fmt_time(stop)
stop += td
duration_counts[key] = 0

key_counts: dict[str, int] = {}
for key in meme_keys:
key_counts[key] = key_counts.get(key, 0) + 1

if meme:
title = (
f"表情“{meme.key}{humanized}调用统计"
f"(总调用次数为 {key_counts.get(meme.key, 0)})"
)
output = await plot_duration_counts(duration_counts, title)
else:
title = f"{humanized}表情调用统计"
output = await plot_key_and_duration_counts(key_counts, duration_counts, title)
await UniMessage.image(raw=output).send()
76 changes: 76 additions & 0 deletions nonebot_plugin_memes_api/plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from io import BytesIO

import matplotlib
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.font_manager import fontManager
from matplotlib.ticker import MaxNLocator
from nonebot.utils import run_sync

matplotlib.use("agg")
fallback_fonts = [
"PingFang SC",
"Hiragino Sans GB",
"Microsoft YaHei",
"Source Han Sans SC",
"Noto Sans SC",
"Noto Sans CJK SC",
"WenQuanYi Micro Hei",
]
for fontfamily in fallback_fonts.copy():
try:
fontManager.findfont(fontfamily, fallback_to_default=False)
except ValueError:
fallback_fonts.remove(fontfamily)
matplotlib.rcParams["font.family"] = fallback_fonts


@run_sync
def plot_key_and_duration_counts(
key_counts: dict[str, int], duration_counts: dict[str, int], title: str
) -> BytesIO:
up_x = list(key_counts.keys())
up_y = list(key_counts.values())
low_x = list(duration_counts.keys())
low_y = list(duration_counts.values())
up_height = len(up_x) * 0.3
low_height = 3
fig_width = 8
fig, axs = plt.subplots(
nrows=2,
figsize=(fig_width, up_height + low_height),
height_ratios=[up_height, low_height],
)
up: Axes = axs[0]
up.barh(up_x, up_y, height=0.6)
up.xaxis.set_major_locator(MaxNLocator(integer=True))
low: Axes = axs[1]
low.plot(low_x, low_y, marker="o")
if len(low_x) > 24:
low.set_xticks(low_x[::3])
elif len(low_x) > 12:
low.set_xticks(low_x[::2])
low.yaxis.set_major_locator(MaxNLocator(integer=True))
fig.suptitle(title)
fig.tight_layout()
output = BytesIO()
fig.savefig(output)
return output


@run_sync
def plot_duration_counts(duration_counts: dict[str, int], title: str) -> BytesIO:
x = list(duration_counts.keys())
y = list(duration_counts.values())
fig, ax = plt.subplots(figsize=(6, 4))
ax.plot(x, y, marker="o")
if len(x) > 24:
ax.set_xticks(x[::3])
elif len(x) > 12:
ax.set_xticks(x[::2])
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
fig.suptitle(title)
fig.tight_layout()
output = BytesIO()
fig.savefig(output)
return output
Loading

0 comments on commit 6030a71

Please sign in to comment.