Skip to content

Commit

Permalink
✨ feat(controller): add option to hide long text in tagger response
Browse files Browse the repository at this point in the history
Added `hidden_long_text` parameter to `tagger` method to control
the display of long text responses. Updated relevant method calls
to include the new parameter.
  • Loading branch information
sudoskys committed Sep 28, 2024
1 parent d79f794 commit b384abc
Showing 1 changed file with 76 additions and 35 deletions.
111 changes: 76 additions & 35 deletions app/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from io import BytesIO

import telegramify_markdown
from PIL import Image
from asgiref.sync import sync_to_async
from loguru import logger
from novelai_python.tool.image_metadata import ImageMetadata, ImageVerifier
Expand Down Expand Up @@ -45,22 +46,39 @@ async def download(self, file):
downloaded_file = await self.bot.download_file(_file_info.file_path)
return downloaded_file

async def tagger(self, file, hidden_long_text=False) -> str:
raw_file_data = await self.download(file=file)
if raw_file_data is None:
return "🥛 Not An image"
if isinstance(raw_file_data, bytes):
file_data = BytesIO(raw_file_data)
async def read_a111(self, file: BytesIO):
try:
file.seek(0)
with Image.open(file) as img:
print(img.info)
parameter = img.info.get("parameters", None)
if not parameter:
raise Exception("Empty Parameter")
except Exception as e:
logger.debug(f"Error {e}")
return []
else:
file_data = raw_file_data
result = await pipeline_tag(trace_id="test", content=file_data)
content = [
f"**🥽 AnimeScore: {result.anime_score}**",
"**🔍 Infer Tags**",
]
return [f"📦 Prompt: \n>{parameter}\n"]

async def read_comfyui(self, file: BytesIO):
try:
file.seek(0)
with Image.open(file) as img:
print(img.info)
parameter = img.info.get("prompt", None)
if not parameter:
raise Exception("Empty Parameter")
except Exception as e:
logger.debug(f"Error {e}")
return []
else:
return [f"**📦 Comfyui:** \n```{parameter}```"]

async def read_novelai(self, file: BytesIO):
message = []
try:
file_data.seek(0)
meta_data = ImageMetadata.load_image(file_data)
file.seek(0)
meta_data = ImageMetadata.load_image(file)
read_prompt = meta_data.Description
read_model = meta_data.used_model
rq_type = meta_data.Comment.request_type
Expand All @@ -72,34 +90,57 @@ async def tagger(self, file, hidden_long_text=False) -> str:
if meta_data.Comment.reference_strength:
mode += "+VibeTransfer"
except Exception as e:
logger.info(f"Empty metadata {e}")
content.append(f"```{result.anime_tags}```")
logger.debug(f"Empty metadata {e}")
return []
else:
if hidden_long_text:
content.append(f"\n>{result.anime_tags}\n")
else:
content.append(f"```{result.anime_tags}```")
if read_prompt:
content.append(f"**📦 Prompt:** `{read_prompt}`")
if read_model:
content.append(f"**📦 Model:** `{read_model.value}`")
if meta_data.Source:
content.append(f"**📦 Source:** `{meta_data.Source}`")
content.append(f"**📦 Mode:** `{mode}`")
message.extend(
[
f"**📦 Prompt:** `{read_prompt}`",
f"**📦 Model:** `{read_model.value}`",
f"**📦 Source:** `{meta_data.Source}`",
]
)
try:
is_novelai = False
has_latent = False
file_data.seek(0)
is_novelai, has_latent = ImageVerifier().verify(file_data)
file.seek(0)
is_novelai, has_latent = ImageVerifier().verify(file)
except Exception:
logger.debug("Not NovelAI")
else:
if not is_novelai:
content.append("**🧊 Not Signed by NovelAI**")
if is_novelai:
message.append("**🧊 Signed by NovelAI**")
if has_latent:
content.append("**🧊 Find Latent Space**")
message.append("**🧊 Find Latent Space**")
return message

async def tagger(self, file, hidden_long_text=False) -> str:
raw_file_data = await self.download(file=file)
if raw_file_data is None:
return "🥛 Not An image"
if isinstance(raw_file_data, bytes):
file_data = BytesIO(raw_file_data)
else:
file_data = raw_file_data
result = await pipeline_tag(trace_id="test", content=file_data)
infer_message = [
f"**🥽 AnimeScore: {result.anime_score}**",
"**🔍 Infer Tags**",
]
novelai_message = await self.read_novelai(file=file_data)
comfyui_message = await self.read_comfyui(file=file_data)
a111_message = await self.read_a111(file=file_data)
# 只能选一个有内容的
read_message = next(
filter(lambda msg: msg, [novelai_message, comfyui_message, a111_message]),
None,
)
if read_message and hidden_long_text:
infer_message.append(f"\n>{result.anime_tags}\n")
else:
infer_message.append(f"```{result.anime_tags}```")
if result.characters:
content.append(f"**🌟 Characters:** `{','.join(result.characters)}`")
infer_message.append(f"**🌟 Characters:** `{','.join(result.characters)}`")
read_message = read_message or ["🥛 No Metadata"]
content = infer_message + read_message
prompt = telegramify_markdown.convert("\n".join(content))
file_data.close()
return prompt
Expand Down

0 comments on commit b384abc

Please sign in to comment.