-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathllm.py
139 lines (134 loc) · 4.31 KB
/
llm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import backoff
import threading
from typing import Dict
from collections import defaultdict
from protocol import ModelResponse
from errors import RetryConstantError, RetryExpoError, UnknownLLMError
if os.getenv("DISABLE_TRACKING", "0") == "1":
print("Tracking is disabled")
disable_tracking = True
import openai
else:
disable_tracking = False
from langfuse.openai import openai
API_BASE=os.environ.get("RC_API_BASE", "http://140.238.223.13:8092/v1/service/llm/v1")
client = openai.OpenAI(
api_key=os.getenv("RC_VLLM_API_KEY", "YOUR_API_KEY"),
base_url=API_BASE,
)
def handle_llm_exception(e: Exception):
if isinstance(
e,
(
openai.APIError,
openai.Timeout,
),
):
raise RetryConstantError from e
elif isinstance(e, openai.RateLimitError):
raise RetryExpoError from e
elif isinstance(
e,
(
openai.APIConnectionError,
openai.AuthenticationError,
),
):
raise e
else:
raise UnknownLLMError from e
@backoff.on_exception(
wait_gen=backoff.constant,
exception=RetryConstantError,
max_tries=3,
interval=3,
)
@backoff.on_exception(
wait_gen=backoff.expo,
exception=RetryExpoError,
jitter=backoff.full_jitter,
max_value=100,
factor=1.5,
)
def chat_completion(**kwargs) -> ModelResponse:
user_key = kwargs.pop("user_key")
master_key = kwargs.pop("master_key")
def _completion():
try:
kwargs['name']="chat-generation"
if user_key == master_key:
# use as admin of the server
kwargs['user_id'] = user_key
if disable_tracking:
kwargs.pop("trace_id", None)
kwargs.pop("user_id", None)
kwargs.pop("user_key", None)
kwargs.pop("session_id", None)
kwargs.pop("name", None)
response = client.chat.completions.create(**kwargs)
else:
# for end user based rate limiting
# todo: budget control here
kwargs['user_id'] = user_key
if disable_tracking:
kwargs.pop("trace_id", None)
kwargs.pop("user_id", None)
kwargs.pop("user_key", None)
kwargs.pop("session_id", None)
kwargs.pop("name", None)
response = client.chat.completions.create(**kwargs)
return response
except Exception as e:
handle_llm_exception(e) # this tries fallback requests
try:
return _completion()
except Exception as e:
raise e
@backoff.on_exception(
wait_gen=backoff.constant,
exception=RetryConstantError,
max_tries=3,
interval=3,
)
@backoff.on_exception(
wait_gen=backoff.expo,
exception=RetryExpoError,
jitter=backoff.full_jitter,
max_value=100,
factor=1.5,
)
def completion(**kwargs) -> ModelResponse:
user_key = kwargs.pop("user_key")
master_key = kwargs.pop("master_key")
def _completion():
try:
kwargs['name']="chat-generation"
if user_key == master_key:
# use as admin of the server
kwargs['user_id'] = user_key
if disable_tracking:
kwargs.pop("trace_id", None)
kwargs.pop("user_id", None)
kwargs.pop("user_key", None)
kwargs.pop("session_id", None)
kwargs.pop("name", None)
response = client.completions.create(**kwargs)
else:
# for end user based rate limiting
# todo: budget control here
kwargs['user_id'] = user_key
if disable_tracking:
kwargs.pop("trace_id", None)
kwargs.pop("user_id", None)
kwargs.pop("user_key", None)
kwargs.pop("session_id", None)
kwargs.pop("name", None)
response = client.completions.create(**kwargs)
return response
except Exception as e:
handle_llm_exception(e) # this tries fallback requests
try:
return _completion()
except Exception as e:
raise e