-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPromptCompass.py
602 lines (509 loc) · 32.8 KB
/
PromptCompass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
import json
import streamlit as st
import streamlit_ext as ste
import os
import time
import gc
import pandas as pd
from dotenv import load_dotenv
from langchain.chains import LLMChain # import LangChain libraries
from langchain.llms import OpenAI # import OpenAI model
from langchain.chat_models import ChatOpenAI # import OpenAI chat model
from langchain.callbacks import get_openai_callback # import OpenAI callbacks
from langchain.prompts import PromptTemplate # import PromptTemplate
from langchain.llms import HuggingFacePipeline # import HuggingFacePipeline
from langchain_anthropic import ChatAnthropic # import Anthropic model
import torch # import torch
# pip install git+https://github.com/huggingface/transformers
from transformers import AutoTokenizer, pipeline, AutoModelForSeq2SeqLM, AutoModelForCausalLM, StoppingCriteria, StoppingCriteriaList
def main():
load_dotenv(".env")
pipe = None
open_ai_key = None
claude_api_key = None
uploaded_file = None
# import css tasks and prompts
with open('prompts.json') as f:
promptlib = json.load(f)
hide_default_format = """
<style>
#MainMenu {visibility: hidden; }
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_default_format, unsafe_allow_html=True)
# title
st.title("Prompt Compass")
st.subheader(
"A Tool for Navigating LLMs and Prompts for Computational Social Science and Digital Humanities Research")
# Add Link to your repo
st.markdown(
'''
[![Repo](https://badgen.net/badge/icon/GitHub?icon=github&label)](https://github.com/ErikBorra/PromptCompass)
[![DOI](https://zenodo.org/badge/649855474.svg)](https://zenodo.org/badge/latestdoi/649855474)
''', unsafe_allow_html=True)
# load available models
model_with_names = [
model for model in promptlib['models'] if model['name']]
# create input area for model selection
input_values = {}
input_values['model'] = st.selectbox('Select a model', model_with_names,
format_func=lambda x: x['name'])
# If there is no previous state, set the default model as the first model
if not st.session_state.get('previous_model'):
st.session_state['previous_model'] = model_with_names[0]['name']
st.caption(f"Model info: [{input_values['model']['name']}]({input_values['model']['resource']})" + (
f". {input_values['model']['comment']}" if 'comment' in input_values['model'] else ""))
# ask for open ai key if no key is set in .env
if input_values['model']['resource'] in ["https://platform.openai.com/docs/models/gpt-3-5", "https://platform.openai.com/docs/models/gpt-4", "https://platform.openai.com/docs/models/gpt-4o", "https://platform.openai.com/docs/models/gpt-4o-mini"]:
# Load the OpenAI API key from the environment variable
if os.getenv("OPENAI_API_KEY") is None or os.getenv("OPENAI_API_KEY") == "":
open_ai_key = st.text_input("OpenAI API Key", "")
else:
open_ai_key = os.getenv("OPENAI_API_KEY")
# ask for open claude key if no key is set in .env
if input_values['model']['resource'] in ["https://docs.anthropic.com/en/docs/about-claude/models"]:
# Load the Claude API key from the environment variable
if os.getenv("CLAUDE_API_KEY") is None or os.getenv("CLAUDE_API_KEY") == "":
claude_api_key = st.text_input("Claude API Key", "")
else:
claude_api_key = os.getenv("CLAUDE_API_KEY")
# set default values
do_sample = False
temperature = 0.001
top_p = -1
max_new_tokens = -1
with st.expander("Advanced settings"):
if input_values['model']['resource'] not in ["https://platform.openai.com/docs/models/gpt-3-5", "https://platform.openai.com/docs/models/gpt-4", "https://platform.openai.com/docs/models/gpt-4o", "https://platform.openai.com/docs/models/gpt-4o-mini"]:
st.markdown(
"""
**Set Maximum Length**: Determines the maximum number of tokens of the **generated** text. A token is approximately four characters word, although this depends on the model.
A value of -1 means the parameter will not be specified.
"""
)
max_new_tokens = st.number_input(
'Maximum Length', value=256, min_value=-1, step=1)
st.markdown(
"""
**Set do_sample**: This controls how the model generates text. If do_sample=True, the model will use a probabilistic approach to generate text, where the likelihood of each word being chosen depends on its predicted probability. Use the below parameters to further control its behavior. If do_sample=False, the model will use a deterministic approach and always choose the most likely next word.
"""
)
do_sample = st.radio(
'Set do_sample',
('False', 'True')
)
st.markdown(
"""
**Temperature**: Controls the randomness in the model's responses.
Lower values (closer to 0.0) make the output more deterministic, while higher values (closer to 2.0) make it more diverse.
A value of -1 means the parameter will not be specified.
"""
)
temperature = st.number_input(
'Set Temperature', min_value=-1.0, max_value=2.0, value=0.001, format="%.3f")
st.markdown(
"""
**Top P**: Also known as "nucleus sampling", is an alternative to temperature that can also be used to control the randomness of the model's responses.
It essentially trims the less likely options in the model's distribution of possible responses. Possible values lie between 0.0 and 1.0.
A value of -1 means the parameter will not be specified. Only applies if do_sample=True.
"""
)
top_p = st.number_input('Set Top-P', min_value=-
1.0, max_value=1.0, value=-1.0)
# Check for correct values
allgood = True
# set model kwargs
model_kwargs = {}
if input_values['model']['resource'] not in ["https://platform.openai.com/docs/models/gpt-3-5", "https://platform.openai.com/docs/models/gpt-4", "https://platform.openai.com/docs/models/gpt-4o", "https://platform.openai.com/docs/models/gpt-4o-mini", "https://docs.anthropic.com/en/docs/about-claude/models"]:
# check if max_new_tokens is at least 1 or -1
if not (max_new_tokens > 0 or max_new_tokens == -1):
st.error(
'Error: Max Tokens must be at least 1. Choose -1 if you want to use the default model value.')
max_new_tokens = -1
allgood = False
if max_new_tokens > 0:
model_kwargs['max_new_tokens'] = max_new_tokens
if do_sample not in ['True', 'False']:
st.error(
'Error: do_Sample must be True or False')
do_sample = False
allgood = False
do_sample = True if do_sample == 'True' else False
if do_sample in [True, False]:
model_kwargs['do_sample'] = do_sample
if not (0 <= temperature <= 2 or temperature == -1):
st.error(
"Temperature value must be between 0 and 2. Choose -1 if you want to use the default model value.")
temperature = -1
allgood = False
if 0 <= temperature <= 2:
model_kwargs['temperature'] = temperature
if not (0 <= top_p <= 1 or top_p == -1):
st.error(
"Top P value must be between 0 and 1. Choose -1 if you want to use the default model value.")
top_p = -1
allgood = False
if 0 <= top_p <= 1:
model_kwargs['top_p'] = top_p
# create input area for task selection
tasks_with_names = [task for task in promptlib['tasks'] if task['name']]
task = st.selectbox('Select a task', tasks_with_names,
format_func=lambda x: x['name'] + " - " + x['authors'])
# Create input areas for prompts and user input
if task:
# concatenate all strings from prompt array
prompt = '\n'.join(task['prompt'])
# create input area for prompt
input_values['prompt'] = st.text_area(
"Inspect, and possibly modify, the prompt by ["+task['authors']+"]("+task['paper']+")", prompt, height=200)
# allow the user to select the input type
input_type = st.radio("Choose input type:",
('Text input', 'Upload a CSV'), horizontal=True)
if input_type == 'Text input':
# create input area for user input
input_values['user'] = st.text_area(
"Input to be analyzed with the prompt (one thing per line):",
"this user is happy\none user is just a user\nthe other user is a lier")
# if the user's input is not a list (e.g. a string), then split it by newlines
if isinstance(input_values['user'], str):
input_values['user'] = input_values['user'].split('\n')
original_data = pd.DataFrame(
input_values['user'], columns=['user_input'])
else:
# upload CSV
uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
if uploaded_file is not None:
# convert the uploaded file to a dataframe
original_data = pd.read_csv(uploaded_file)
# ask user to select a column
column_to_extract = st.selectbox(
'Choose a column to apply the prompt on:', original_data.columns)
# process the selected column from the dataframe
input_values['user'] = original_data[column_to_extract].tolist()
data = pd.DataFrame()
# Determine the output file name
filename = uploaded_file.name if uploaded_file else 'output.csv'
base_filename, file_extension = os.path.splitext(filename)
output_filename = f"{base_filename}_promptcompass{file_extension}"
repeat_input = st.number_input(
'Enter the number of times the prompt/input combination should be repeated:', min_value=1, max_value=10, value=1, step=1)
# Submit button
submit_button = st.button('Submit')
st.write('---') # Add a horizontal line
# Process form submission
if submit_button and allgood:
if 'user' not in input_values or input_values['user'] is None:
st.error("No user input provided")
else:
with st.spinner(text="In progress..."):
try:
start_time = time.time()
st.write("Start time: " +
time.strftime("%H:%M:%S", time.localtime()))
if input_values['prompt'] and input_values['user']:
# create prompt template
# add location of user input to prompt
if task['location_of_input'] == 'before':
template = "{user_input}" + \
"\n\n" + input_values['prompt']
elif task['location_of_input'] == 'after':
template = input_values['prompt'] + \
"\n\n" + "{user_input}"
else:
template = input_values['prompt']
# make sure users don't forget the user input variable
if "{user_input}" not in template:
template = template + "\n\n{user_input}"
# fill prompt template
prompt_template = PromptTemplate(
input_variables=["user_input"], template=template)
# loop over user values in prompt
for key, user_input in enumerate(input_values['user']):
for i in range(repeat_input):
num_prompt_tokens = None
num_completion_tokens = None
cost = None
user_input = str(user_input).strip()
if user_input == "" or user_input == "nan":
continue
# set up and run the model
model_id = input_values['model']['name']
if model_id in ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"]:
if claude_api_key is None or claude_api_key == "":
st.error(
"Please provide a Claude API Key")
exit(1)
if model_id in ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"]:
llm = ChatAnthropic(
model=model_id, api_key=claude_api_key, **model_kwargs)
AIMessage = llm.invoke(prompt_template.format(
user_input=user_input))
output = AIMessage.content
num_completion_tokens = AIMessage.usage_metadata[
'output_tokens']
num_prompt_tokens = AIMessage.usage_metadata['input_tokens']
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
elif model_id in ['gpt-3.5-turbo', "gpt-3.5-turbo-instruct", 'gpt-4', 'gpt-4-turbo', "gpt-4o-mini", "gpt-4o", 'babbage-002', 'davinci-002']:
if open_ai_key is None or open_ai_key == "":
st.error(
"Please provide an Open AI API Key")
exit(1)
with get_openai_callback() as cb:
if model_id in ['gpt-3.5-turbo', "gpt-3.5-turbo-instruct", 'gpt-4', 'gpt-4-turbo', "gpt-4o-mini", "gpt-4o"]:
llm = ChatOpenAI(
model=model_id, openai_api_key=open_ai_key, **model_kwargs)
else:
llm = OpenAI(
model=model_id, openai_api_key=open_ai_key, **model_kwargs)
llm_chain = LLMChain(
llm=llm, prompt=prompt_template)
output = llm_chain.run(user_input)
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
st.text(cb)
num_prompt_tokens = cb.prompt_tokens
num_completion_tokens = cb.completion_tokens
cost = cb.total_cost
elif model_id in ['meta-llama/Llama-2-7b-chat-hf', 'meta-llama/Llama-2-13b-chat-hf', 'meta-llama/Meta-Llama-3-8B', 'meta-llama/Meta-Llama-3.1-8B', 'meta-llama/Meta-Llama-3-8B-Instruct', 'meta-llama/Meta-Llama-3.1-8B-Instruct']:
if pipe == None:
with st.status('Loading model %s' % model_id) as status:
# to use the llama-2 models,
# you first need to get access to the llama-2 models via e.g. https://huggingface.co/meta-llama/Llama-2-7b-chat-hf
# once accepted, get a hugging face auth token https://huggingface.co/settings/tokens
# and then run `huggingface-cli login` on the command line, filling in the generated token
if model_id in ['meta-llama/Llama-2-7b-chat-hf', 'meta-llama/Llama-2-13b-chat-hf', 'meta-llama/Meta-Llama-3-8B', 'meta-llama/Meta-Llama-3.1-8B', 'meta-llama/Meta-Llama-3-8B-Instruct', 'meta-llama/Meta-Llama-3.1-8B-Instruct']:
tokenizer = AutoTokenizer.from_pretrained(
model_id, token=True)
else:
tokenizer = AutoTokenizer.from_pretrained(
model_id)
if model_id == "meta-llama/Llama-2-13b-chat-hf":
pipe = pipeline(
"text-generation",
model=model_id,
tokenizer=tokenizer,
# torch_dtype="auto",
trust_remote_code=True,
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs
)
else:
pipe = pipeline(
"text-generation",
model=model_id,
tokenizer=tokenizer,
torch_dtype="auto",
trust_remote_code=True,
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs
)
local_llm = HuggingFacePipeline(
pipeline=pipe)
status.update(
label='Model %s loaded' % model_id, state="complete")
llm_chain = LLMChain(
llm=local_llm, prompt=prompt_template)
output = llm_chain.run(user_input)
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
elif model_id in ['google/flan-t5-large', 'google/flan-t5-xl', 'google/gemma-2b-it', 'google/gemma-7b-it', 'tiiuae/falcon-7b-instruct', 'tiiuae/falcon-40b-instruct', 'databricks/dolly-v2-3b', 'databricks/dolly-v2-7b']:
if pipe is None:
with st.status('Loading model %s' % model_id) as status:
tokenizer = AutoTokenizer.from_pretrained(
model_id)
if model_id in ['google/flan-t5-large', 'google/flan-t5-xl']:
model = AutoModelForSeq2SeqLM.from_pretrained(
model_id, load_in_8bit=False, device_map='auto')
pipe = pipeline(
"text2text-generation",
model=model_id,
tokenizer=tokenizer,
torch_dtype="auto",
trust_remote_code=True,
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs
)
# elif model_id in ['tiiuae/falcon-7b-instruct', 'tiiuae/falcon-40b-instruct']:
else:
pipe = pipeline(
"text-generation",
model=model_id,
tokenizer=tokenizer,
torch_dtype="auto",
trust_remote_code=True,
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs
)
local_llm = HuggingFacePipeline(
pipeline=pipe)
status.update(
label='Model %s loaded' % model_id, state="complete")
llm_chain = LLMChain(
llm=local_llm, prompt=prompt_template)
output = llm_chain.run(user_input)
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
elif model_id == "mosaicml/mpt-7b-instruct":
if pipe is None:
with st.status('Loading model %s' % model_id) as status:
model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
max_seq_len=2048,
device_map="auto"
)
# MPT-7B model was trained using the EleutherAI/gpt-neox-20b tokenizer
tokenizer = AutoTokenizer.from_pretrained(
"EleutherAI/gpt-neox-20b")
# mtp-7b is trained to add "<|endoftext|>" at the end of generations
stop_token_ids = tokenizer.convert_tokens_to_ids(
["<|endoftext|>"])
# define custom stopping criteria object
class StopOnTokens(StoppingCriteria):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
for stop_id in stop_token_ids:
if input_ids[0][-1] == stop_id:
return True
return False
stopping_criteria = StoppingCriteriaList(
[StopOnTokens()])
pipe = pipeline(
task='text-generation',
model=model,
tokenizer=tokenizer,
torch_dtype="auto",
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs,
return_full_text=True, # langchain expects the full text
stopping_criteria=stopping_criteria, # without this model will ramble
repetition_penalty=1.1 # without this output begins repeating
)
local_llm = HuggingFacePipeline(
pipeline=pipe)
status.update(
label='Model %s loaded' % model_id, state="complete")
llm_chain = LLMChain(
llm=local_llm, prompt=prompt_template)
output = llm_chain.run(user_input)
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
elif model_id == "allenai/OLMo-7B" or model_id == "ehartford/dolphin-2.1-mistral-7b" or model_id == "lvkaokao/mistral-7b-finetuned-orca-dpo-v2" or model_id == "lmsys/vicuna-13b-v1.5" or model_id == "microsoft/Orca-2-13b":
if pipe is None:
with st.status('Loading model %s' % model_id) as status:
model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto"
)
if model_id == "ehartford/dolphin-2.1-mistral-7b":
tokenizer = AutoTokenizer.from_pretrained(
model_id, use_fast=False)
else:
tokenizer = AutoTokenizer.from_pretrained(
model_id)
pipe = pipeline(
task='text-generation',
model=model,
tokenizer=tokenizer,
torch_dtype="auto",
device_map="auto",
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
**model_kwargs,
return_full_text=True, # langchain expects the full text
)
local_llm = HuggingFacePipeline(
pipeline=pipe)
status.update(
label='Model %s loaded' % model_id, state="complete")
llm_chain = LLMChain(
llm=local_llm, prompt=prompt_template)
output = llm_chain.run(user_input)
st.success("Input: " + user_input + " \n\n " +
"Output: " + output)
else:
st.error("Model %s not found" % model_id)
exit(1)
if not num_prompt_tokens or not num_completion_tokens:
num_prompt_tokens = len(tokenizer.tokenize(
prompt_template.format(user_input=user_input)))
num_completion_tokens = len(tokenizer.tokenize(
output))
# Prepare data as dictionary
original_row = original_data.loc[key].copy()
new_row = {
'user_input': user_input,
'output': output,
'llm': model_id,
'prompt name': task['name'],
'prompt authors': task['authors'],
'prompt': template,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'# prompt tokens': str(int(num_prompt_tokens)),
'# completion tokens': str(int(num_completion_tokens)),
'max_new_tokens': int(model_kwargs['max_new_tokens']) if "max_new_tokens" in model_kwargs else None,
'do_sample': int(model_kwargs['do_sample']) if "do_sample" in model_kwargs else None,
'temperature': model_kwargs['temperature'] if "temperature" in model_kwargs else None,
'top_p': model_kwargs['top_p'] if "top_p" in model_kwargs else None,
'cost': cost if cost is not None else None
}
# Update the original row with the new data
for key2, value in new_row.items():
original_row[key2] = value
# Append the updated row to the DataFrame
updated_row_df = pd.DataFrame([original_row])
data = pd.concat(
[data, updated_row_df], ignore_index=True)
st.subheader("Results")
st.dataframe(data, column_config={},
hide_index=True)
# make output available as csv
csv = data.to_csv(index=False).encode('utf-8')
ste.download_button(
"Download CSV",
csv,
output_filename,
"text/csv",
)
end_time = time.time()
elapsed_time = end_time - start_time
st.write("End time: " +
time.strftime("%H:%M:%S", time.localtime()))
st.write("Elapsed time: " +
str(round(elapsed_time, 2)) + " seconds")
except Exception as e:
st.error(e)
finally:
# free up variables
if 'data' in locals() and data is not None:
del data
if 'pipe' in locals() and pipe is not None:
del pipe
if 'llm_chain' in locals() and llm_chain is not None:
del llm_chain
if 'llm' in locals() and llm is not None:
del llm
if 'local_llm' in locals() and local_llm is not None:
del local_llm
if 'model' in locals() and model is not None:
del model
if 'tokenizer' in locals() and tokenizer is not None:
del tokenizer
gc.collect() # garbage collection
# empty cuda cache
torch.cuda.empty_cache()
if __name__ == "__main__":
main()