diff --git a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml index f60224138..3cdfe1262 100644 --- a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml +++ b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml @@ -25,7 +25,7 @@ spec: image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z imagePullPolicy: IfNotPresent runner: - image: kubeagi/arcadia-fastchat-worker:v0.2.36 + image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix imagePullPolicy: IfNotPresent resources: limits: diff --git a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml index f1ddd8de3..670a64502 100644 --- a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml +++ b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml @@ -12,7 +12,7 @@ spec: image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z imagePullPolicy: IfNotPresent runner: - image: kubeagi/arcadia-fastchat-worker:v0.2.36 + image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix imagePullPolicy: IfNotPresent model: kind: "Models" diff --git a/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml index f3c99ed12..05bcabb0d 100644 --- a/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml +++ b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml @@ -15,7 +15,7 @@ spec: image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z imagePullPolicy: IfNotPresent runner: - image: kubeagi/arcadia-fastchat-worker:v0.2.36 + image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix imagePullPolicy: IfNotPresent resources: limits: diff --git a/config/samples/ray.io_v1_raycluster.yaml b/config/samples/ray.io_v1_raycluster.yaml index 6ec0b612f..c19deb51a 100644 --- a/config/samples/ray.io_v1_raycluster.yaml +++ b/config/samples/ray.io_v1_raycluster.yaml @@ -18,7 +18,7 @@ spec: runAsGroup: 0 fsGroup: 0 containers: - - image: kubeagi/ray-ml:2.9.3-py39-vllm + - image: kubeagi/ray-ml:2.9.3-py39-vllm-0.4.0 name: ray-head resources: limits: @@ -48,7 +48,7 @@ spec: app.kubernetes.io/name: kuberay spec: containers: - - image: kubeagi/ray-ml:2.9.3-py39-vllm + - image: kubeagi/ray-ml:2.9.3-py39-vllm-0.4.0 name: ray-worker resources: limits: diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index a74ee8d19..4f25507cf 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(Also a KubeBB Component) for KubeAGI Arcadia type: application -version: 0.3.29 +version: 0.3.30 appVersion: "0.2.1" keywords: diff --git a/deploy/charts/llm-worker/Chart.yaml b/deploy/charts/llm-worker/Chart.yaml index 2d1b41538..baf8167f0 100644 --- a/deploy/charts/llm-worker/Chart.yaml +++ b/deploy/charts/llm-worker/Chart.yaml @@ -30,6 +30,3 @@ sources: keywords: - kubeagi - LLMOps -maintainers: - - name: lanture1064 - url: https://github.com/lanture1064 diff --git a/deploy/charts/llm-worker/values.yaml b/deploy/charts/llm-worker/values.yaml index 94ea01e11..ef67bc2fe 100644 --- a/deploy/charts/llm-worker/values.yaml +++ b/deploy/charts/llm-worker/values.yaml @@ -5,7 +5,7 @@ image: repository: kubeagi/arcadia-fastchat-worker pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. - tag: "v0.2.0" + tag: "vllm-v0.4.0-hotfix" env: - name: FASTCHAT_MODEL_NAME value: "baichuan2-7b" diff --git a/deploy/llms/start-worker.sh b/deploy/llms/start-worker.sh index 506857d05..abfefc1da 100755 --- a/deploy/llms/start-worker.sh +++ b/deploy/llms/start-worker.sh @@ -31,4 +31,4 @@ python3.9 -m $FASTCHAT_WORKER_NAME --model-names $FASTCHAT_REGISTRATION_MODEL_NA --model-path $FASTCHAT_MODEL_NAME_PATH --worker-address $FASTCHAT_WORKER_ADDRESS \ --controller-address $FASTCHAT_CONTROLLER_ADDRESS \ --num-gpus $NUMBER_GPUS \ - --host 0.0.0.0 --port 21002 $EXTRA_ARGS + --host 0.0.0.0 --port 21002 $SYSTEM_ARGS $EXTRA_ARGS diff --git a/pkg/appruntime/agent/streamhandler.go b/pkg/appruntime/agent/streamhandler.go index 15d5eff94..71febb291 100644 --- a/pkg/appruntime/agent/streamhandler.go +++ b/pkg/appruntime/agent/streamhandler.go @@ -35,11 +35,11 @@ type StreamHandler struct { var _ callbacks.Handler = StreamHandler{} func (handler StreamHandler) HandleStreamingFunc(ctx context.Context, chunk []byte) { - if _, ok := handler.args[base.OutputAnserStreamChanKeyInArg]; ok { + if _, ok := handler.args[base.OutputAnswerStreamChanKeyInArg]; ok { logger := klog.FromContext(ctx) - streamChan, ok := handler.args[base.OutputAnserStreamChanKeyInArg].(chan string) + streamChan, ok := handler.args[base.OutputAnswerStreamChanKeyInArg].(chan string) if !ok { - err := fmt.Errorf("answer_stream is not chan string, but %T", handler.args[base.OutputAnserStreamChanKeyInArg]) + err := fmt.Errorf("answer_stream is not chan string, but %T", handler.args[base.OutputAnswerStreamChanKeyInArg]) logger.Error(err, "answer_stream is not chan string") return } diff --git a/pkg/appruntime/app_runtime.go b/pkg/appruntime/app_runtime.go index 9045ca778..82e3d5595 100644 --- a/pkg/appruntime/app_runtime.go +++ b/pkg/appruntime/app_runtime.go @@ -145,7 +145,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha out := map[string]any{ base.InputQuestionKeyInArg: input.Question, "files": input.Files, - base.OutputAnserStreamChanKeyInArg: respStream, + base.OutputAnswerStreamChanKeyInArg: respStream, base.InputIsNeedStreamKeyInArg: input.NeedStream, base.LangchaingoChatMessageHistoryKeyInArg: input.History, // Use an empty context before run @@ -205,7 +205,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha var er *base.RetrieverGetNullDocError if errors.As(err, &er) { agentReturnNothing := true - v, ok := out[base.OutputAnserKeyInArg] + v, ok := out[base.OutputAnswerKeyInArg] if ok { if answer, ok := v.(string); ok && len(answer) > 0 { agentReturnNothing = false @@ -229,7 +229,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha waitRunningNodes.PushBack(n) } } - if a, ok := out[base.OutputAnserKeyInArg]; ok { + if a, ok := out[base.OutputAnswerKeyInArg]; ok { if answer, ok := a.(string); ok && len(answer) > 0 { output = Output{Answer: answer} } diff --git a/pkg/appruntime/base/keyword.go b/pkg/appruntime/base/keyword.go index 465e19443..2171c84bc 100644 --- a/pkg/appruntime/base/keyword.go +++ b/pkg/appruntime/base/keyword.go @@ -20,10 +20,10 @@ const ( InputQuestionKeyInArg = "question" InputIsNeedStreamKeyInArg = "_need_stream" LangchaingoChatMessageHistoryKeyInArg = "_history" - OutputAnserKeyInArg = "_answer" + OutputAnswerKeyInArg = "_answer" AgentOutputInArg = "_agent_answer" MapReduceDocumentOutputInArg = "_mapreduce_document_answer" - OutputAnserStreamChanKeyInArg = "_answer_stream" + OutputAnswerStreamChanKeyInArg = "_answer_stream" RuntimeRetrieverReferencesKeyInArg = "_references" LangchaingoRetrieverKeyInArg = "retriever" LangchaingoLLMKeyInArg = "llm" diff --git a/pkg/appruntime/chain/apichain.go b/pkg/appruntime/chain/apichain.go index abc7bf96b..13e0a684d 100644 --- a/pkg/appruntime/chain/apichain.go +++ b/pkg/appruntime/chain/apichain.go @@ -112,7 +112,7 @@ func (l *APIChain) Run(ctx context.Context, _ client.Client, args map[string]any out, err = handleNoErrNoOut(ctx, needStream, out, err, l.APIChain, args, options) klog.FromContext(ctx).V(5).Info("use apichain, blocking out:" + out) if err == nil { - args[base.OutputAnserKeyInArg] = out + args[base.OutputAnswerKeyInArg] = out return args, nil } return args, fmt.Errorf("apichain run error: %w", err) diff --git a/pkg/appruntime/chain/common.go b/pkg/appruntime/chain/common.go index e8f518ed8..7fedfd57c 100644 --- a/pkg/appruntime/chain/common.go +++ b/pkg/appruntime/chain/common.go @@ -33,11 +33,11 @@ import ( func stream(res map[string]any) func(ctx context.Context, chunk []byte) error { return func(ctx context.Context, chunk []byte) error { - if _, ok := res[base.OutputAnserStreamChanKeyInArg]; ok { + if _, ok := res[base.OutputAnswerStreamChanKeyInArg]; ok { logger := klog.FromContext(ctx) - streamChan, ok := res[base.OutputAnserStreamChanKeyInArg].(chan string) + streamChan, ok := res[base.OutputAnswerStreamChanKeyInArg].(chan string) if !ok { - err := fmt.Errorf("answer_stream is not chan string, but %T", res[base.OutputAnserStreamChanKeyInArg]) + err := fmt.Errorf("answer_stream is not chan string, but %T", res[base.OutputAnswerStreamChanKeyInArg]) logger.Error(err, "answer_stream is not chan string") return err } diff --git a/pkg/appruntime/chain/llmchain.go b/pkg/appruntime/chain/llmchain.go index dacd99a56..9883693bc 100644 --- a/pkg/appruntime/chain/llmchain.go +++ b/pkg/appruntime/chain/llmchain.go @@ -153,7 +153,7 @@ func (l *LLMChain) Run(ctx context.Context, cli client.Client, args map[string]a out, err = handleNoErrNoOut(ctx, needStream, out, err, l.LLMChain, args, options) klog.FromContext(ctx).V(5).Info("use llmchain, blocking out:" + out) if err == nil { - args[base.OutputAnserKeyInArg] = out + args[base.OutputAnswerKeyInArg] = out return args, nil } return args, fmt.Errorf("llmchain run error: %w", err) diff --git a/pkg/appruntime/chain/retrievalqachain.go b/pkg/appruntime/chain/retrievalqachain.go index 366586c94..e14a2eece 100644 --- a/pkg/appruntime/chain/retrievalqachain.go +++ b/pkg/appruntime/chain/retrievalqachain.go @@ -186,7 +186,7 @@ func (l *RetrievalQAChain) Run(ctx context.Context, cli client.Client, args map[ out, err = handleNoErrNoOut(ctx, needStream, out, err, l.ConversationalRetrievalQA, args, options) klog.FromContext(ctx).V(5).Info("use retrievalqachain, blocking out:" + out) if err == nil { - args[base.OutputAnserKeyInArg] = out + args[base.OutputAnswerKeyInArg] = out return args, nil } return args, fmt.Errorf("retrievalqachain run error: %w", err) diff --git a/pkg/worker/runner.go b/pkg/worker/runner.go index a7a3f38d8..bce283559 100644 --- a/pkg/worker/runner.go +++ b/pkg/worker/runner.go @@ -34,9 +34,9 @@ import ( const ( // tag is the same version as fastchat - defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:v0.2.36" + defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix" // For ease of maintenance and stability, VLLM module is now included in standard image as a default feature. - defaultFastchatVLLMImage = "kubeagi/arcadia-fastchat-worker:v0.2.36" + defaultFastchatVLLMImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix" // defaultKubeAGIImage for RunnerKubeAGI defaultKubeAGIImage = "kubeagi/core-library-cli:v0.0.1" @@ -93,24 +93,16 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1. return nil, fmt.Errorf("failed to get arcadia config with %w", err) } - extraAgrs := "" - for _, envItem := range runner.w.Spec.AdditionalEnvs { - if envItem.Name == "EXTRA_ARGS" { - extraAgrs = envItem.Value - break - } - } - modelFileDir := fmt.Sprintf("%s/%s", defaultModelMountPath, model.Name) additionalEnvs := []corev1.EnvVar{} - extraArgs := fmt.Sprintf("--device %s %s", runner.Device().String(), extraAgrs) + systemArgs := fmt.Sprintf("--device %s", runner.Device().String()) if runner.modelFileFromRemote { m := arcadiav1alpha1.Model{} if err := runner.c.Get(ctx, types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, &m); err != nil { return nil, err } if m.Spec.Revision != "" { - extraArgs += fmt.Sprintf(" --revision %s ", m.Spec.Revision) + systemArgs += fmt.Sprintf(" --revision %s ", m.Spec.Revision) } if m.Spec.ModelSource == modelSourceFromHugginfFace { modelFileDir = m.Spec.HuggingFaceRepo @@ -139,7 +131,6 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1. {Name: "FASTCHAT_WORKER_ADDRESS", Value: fmt.Sprintf("http://%s.%s:%d", runner.w.Name+WokerCommonSuffix, runner.w.Namespace, arcadiav1alpha1.DefaultWorkerPort)}, {Name: "FASTCHAT_CONTROLLER_ADDRESS", Value: gw.Controller}, {Name: "NUMBER_GPUS", Value: runner.NumberOfGPUs()}, - {Name: "EXTRA_ARGS", Value: extraArgs}, }, Ports: []corev1.ContainerPort{ {Name: "http", ContainerPort: arcadiav1alpha1.DefaultWorkerPort}, @@ -149,6 +140,7 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1. }, Resources: runner.w.Spec.Resources, } + additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "SYSTEM_ARGS", Value: systemArgs}) container.Env = append(container.Env, additionalEnvs...) return container, nil @@ -193,12 +185,12 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp return nil, fmt.Errorf("failed to get arcadia config with %w", err) } - extraAgrs := "" + systemArgs := "" additionalEnvs := []corev1.EnvVar{} // configure ray cluster resources := runner.w.Spec.Resources - gpus := runner.NumberOfGPUs() + gpuEnvExist := false // default ray cluster which can only utilize gpus on single nodes rayCluster := config.DefaultRayCluster() for _, envItem := range runner.w.Spec.AdditionalEnvs { @@ -223,12 +215,10 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp // By default, gpu_memory_utilization will be 0.9 if envItem.Name == "GPU_MEMORY_UTILIZATION" { gpuMemoryUtilization, _ := strconv.ParseFloat(envItem.Value, 64) - extraAgrs += fmt.Sprintf(" --gpu_memory_utilization %f", gpuMemoryUtilization) + systemArgs += fmt.Sprintf(" --gpu_memory_utilization %f", gpuMemoryUtilization) } - - // extra arguments to run llm - if envItem.Name == "EXTRA_ARGS" { - extraAgrs = envItem.Value + if envItem.Name == "NUMBER_GPUS" { + gpuEnvExist = true } } klog.V(5).Infof("run worker with raycluster:\n %s", rayCluster.String()) @@ -245,20 +235,16 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp Name: "PYTHON_VERSION", Value: rayCluster.GetPythonVersion(), }) - // Set gpu number to the number of GPUs in the worker's resource - additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "NUMBER_GPUS", Value: gpus}) modelFileDir := fmt.Sprintf("%s/%s", defaultModelMountPath, model.Name) - // --enforce-eager to disable cupy - // TODO: remove --enforce-eager when https://github.com/kubeagi/arcadia/issues/878 is fixed - extraAgrs = fmt.Sprintf("%s --trust-remote-code --enforce-eager", extraAgrs) + systemArgs = fmt.Sprintf("%s --trust-remote-code", systemArgs) if runner.modelFileFromRemote { m := arcadiav1alpha1.Model{} if err := runner.c.Get(ctx, types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, &m); err != nil { return nil, err } if m.Spec.Revision != "" { - extraAgrs += fmt.Sprintf(" --revision %s", m.Spec.Revision) + systemArgs += fmt.Sprintf(" --revision %s", m.Spec.Revision) } if m.Spec.ModelSource == modelSourceFromHugginfFace { modelFileDir = m.Spec.HuggingFaceRepo @@ -285,7 +271,6 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp {Name: "FASTCHAT_MODEL_NAME", Value: model.Name}, {Name: "FASTCHAT_WORKER_ADDRESS", Value: fmt.Sprintf("http://%s.%s:%d", runner.w.Name+WokerCommonSuffix, runner.w.Namespace, arcadiav1alpha1.DefaultWorkerPort)}, {Name: "FASTCHAT_CONTROLLER_ADDRESS", Value: gw.Controller}, - {Name: "EXTRA_ARGS", Value: extraAgrs}, }, Ports: []corev1.ContainerPort{ {Name: "http", ContainerPort: arcadiav1alpha1.DefaultWorkerPort}, @@ -297,6 +282,13 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp }, Resources: resources, } + if !gpuEnvExist { + // if env doesn't exist, set gpu number to the number of GPUs in the worker's resource + additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "NUMBER_GPUS", Value: runner.NumberOfGPUs()}) + } + + additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "SYSTEM_ARGS", Value: systemArgs}) + container.Env = append(container.Env, additionalEnvs...) return container, nil }