diff --git a/README.md b/README.md index e48bdc25..aacece0b 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ LightLLM is a Python-based LLM (Large Language Model) inference and serving fram > InternVL-Chat(InternLM2) needs to set the parameter '--eos_id 92542 --trust_remote_code'. -> Qwen2-VL-7b needs to set the parameter '--eos_id 151645 --trust_remote_code'. +> Qwen2-VL-7b needs to set the parameter '--eos_id 151645 --trust_remote_code', and use 'pip install git+https://github.com/huggingface/transformers' to upgrade to the latest version. > Stablelm needs to set the parameter '--trust_remote_code'. diff --git a/lightllm/models/internlm_xcomposer/internlm_visual.py b/lightllm/models/internlm_xcomposer/internlm_visual.py index 868aab0d..a2581885 100644 --- a/lightllm/models/internlm_xcomposer/internlm_visual.py +++ b/lightllm/models/internlm_xcomposer/internlm_visual.py @@ -7,17 +7,26 @@ from typing import List, Union from torchvision import transforms from torchvision.transforms.functional import InterpolationMode +from rpyc.utils.classic import obtain +from io import BytesIO +import rpyc +from lightllm.server.embed_cache.utils import tensor2bytes, read_shm, create_shm, get_shm_name_data, get_shm_name_embed +from lightllm.utils.log_utils import init_logger class InternVisionModel: - - def __init__(self): + def __init__(self, kvargs): + self.tp_rank_ = kvargs["tp_rank"] + self.world_size_ = kvargs["vit_world_size"] + self.client_port = kvargs["client_port"] + self.cache_client = rpyc.connect("localhost", self.client_port) + self.device = torch.device(f"cuda:{self.visual_gpu}") pass def load_projector_update(self, config, weight_dir): - projector_type = config.get("projector_type", 'mlp2x_gelu') + projector_type = config.get("projector_type", "mlp2x_gelu") projector_weights = [] - mlp_gelu_match = re.match(r'^mlp(\d+)x_gelu$', projector_type) + mlp_gelu_match = re.match(r"^mlp(\d+)x_gelu$", projector_type) if mlp_gelu_match: self.mlp_depth = int(mlp_gelu_match.group(1)) new_dict = {} @@ -25,54 +34,55 @@ def load_projector_update(self, config, weight_dir): if f.endswith(".bin"): d = torch.load(os.path.join(weight_dir, f), "cpu") for k, v in d.items(): - if 'vision_proj' in k: + if "vision_proj" in k: projector_weights.append(v.half()) elif "vit.vision_tower." in k: - new_dict[k[len("vit.vision_tower."):]] = v.half() + new_dict[k[len("vit.vision_tower.") :]] = v.half() self.vision_tower.load_state_dict(new_dict, strict=True) return projector_weights - if projector_type == 'identity': + if projector_type == "identity": return [] - raise ValueError(f'Unknown projector type: {projector_type}') + raise ValueError(f"Unknown projector type: {projector_type}") def load_model(self, weight_dir): config_file = os.path.join(weight_dir, "config.json") config = json.load(open(config_file)) - self.select_layer = config.get('mm_vision_select_layer', -1) - self.select_feature = config.get('mm_vision_select_feature', 'patch') + self.select_layer = config.get("mm_vision_select_layer", -1) + self.select_feature = config.get("mm_vision_select_feature", "patch") # load clip vision model by cfg['mm_vision_tower']: # huggingface_name or path_of_clip_relative_to_llava_model_dir - vision_path = config.get('mm_vision_tower', 'openai/clip-vit-large-patch14-336') + vision_path = config.get("mm_vision_tower", "openai/clip-vit-large-patch14-336") if isinstance(vision_path, list): vision_path = vision_path[0] if vision_path.startswith("./"): vision_path = os.path.join(weight_dir, vision_path) - - self.image_processor = transforms.Compose([ - transforms.Resize((config["img_size"], config["img_size"]), - interpolation=InterpolationMode.BICUBIC), - transforms.ToTensor(), - transforms.Normalize((0.48145466, 0.4578275, 0.40821073), - (0.26862954, 0.26130258, 0.27577711)), - ]) + + self.image_processor = transforms.Compose( + [ + transforms.Resize((config["img_size"], config["img_size"]), interpolation=InterpolationMode.BICUBIC), + transforms.ToTensor(), + transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)), + ] + ) from transformers import CLIPVisionModel + self.vision_tower = CLIPVisionModel.from_pretrained(vision_path) self.vision_tower.requires_grad_(False) self.resize_pos(config, vision_path) self.projector_weights = self.load_projector_update(config, weight_dir) self.vision_tower = self.vision_tower.half() - self.device = torch.device('cpu') + self.device = torch.device("cpu") # load projector weights assert len(self.projector_weights) == self.mlp_depth * 2 def resize_pos(self, config, vision_path): - mm_vision_tower = vision_path.split('/')[-1] - vision_tower_match = re.match(r'^clip-vit-large-patch(\d+)-(\d+)$', mm_vision_tower) + mm_vision_tower = vision_path.split("/")[-1] + vision_tower_match = re.match(r"^clip-vit-large-patch(\d+)-(\d+)$", mm_vision_tower) patch_size = int(vision_tower_match.group(1)) clip_imge_size = int(vision_tower_match.group(2)) - + orig_size = clip_imge_size // patch_size new_size = config["img_size"] // patch_size if orig_size == new_size: @@ -82,43 +92,37 @@ def resize_pos(self, config, vision_path): pos_embed_checkpoint = self.vision_tower.vision_model.embeddings.position_embedding.weight pos_embed_checkpoint = pos_embed_checkpoint.unsqueeze(0) - if pos_embed_checkpoint.shape[1] == new_size**2 + 1: + if pos_embed_checkpoint.shape[1] == new_size ** 2 + 1: self.is_resize_pos = True else: embedding_size = pos_embed_checkpoint.shape[-1] num_extra_tokens = 1 - new_num = new_size**2 + num_extra_tokens - print('Position interpolate from %dx%d to %dx%d' % - (orig_size, orig_size, new_size, new_size)) + new_num = new_size ** 2 + num_extra_tokens + print("Position interpolate from %dx%d to %dx%d" % (orig_size, orig_size, new_size, new_size)) extra_tokens = pos_embed_checkpoint[:, :num_extra_tokens] # only the position tokens are interpolated pos_tokens = pos_embed_checkpoint[:, num_extra_tokens:] - pos_tokens = pos_tokens.reshape(-1, orig_size, orig_size, - embedding_size).permute( - 0, 3, 1, 2) + pos_tokens = pos_tokens.reshape(-1, orig_size, orig_size, embedding_size).permute(0, 3, 1, 2) pos_tokens = torch.nn.functional.interpolate( - pos_tokens, - size=(new_size, new_size), - mode='bicubic', - align_corners=False) + pos_tokens, size=(new_size, new_size), mode="bicubic", align_corners=False + ) pos_tokens = pos_tokens.permute(0, 2, 3, 1).flatten(1, 2) new_pos_embed = torch.cat((extra_tokens, pos_tokens), dim=1) new_pos_embed = new_pos_embed.squeeze(0) - self.vision_tower.vision_model.embeddings.position_embedding = torch.nn.Embedding( - new_num, 1024) + self.vision_tower.vision_model.embeddings.position_embedding = torch.nn.Embedding(new_num, 1024) self.vision_tower.vision_model.embeddings.position_embedding.weight = torch.nn.Parameter( - new_pos_embed.to(pos_embed_checkpoint.dtype)) - self.vision_tower.vision_model.embeddings.position_ids = torch.arange( - new_num).expand((1, -1)) + new_pos_embed.to(pos_embed_checkpoint.dtype) + ) + self.vision_tower.vision_model.embeddings.position_ids = torch.arange(new_num).expand((1, -1)) self.is_resize_pos = True def cuda(self): self.vision_tower = self.vision_tower.cuda() for i in range(len(self.projector_weights)): self.projector_weights[i] = self.projector_weights[i].cuda() - self.device = torch.device('cuda') + torch.cuda.set_device(self.device) return self # batch images infer @@ -127,13 +131,13 @@ def forward(self, x): x = self.vision_tower(x, output_hidden_states=True) x = x.hidden_states[self.select_layer] - if self.select_feature == 'patch': + if self.select_feature == "patch": x = x[:, 1:].contiguous() - + if len(self.projector_weights) == 0: return x - - B, L, N = x.shape + + B, L, N = x.shape x = x.view(-1, N) # mm_project x = F.linear( @@ -151,16 +155,54 @@ def forward(self, x): x = x.view(B, L, -1) return x - def encode(self, image_items: List[Union[str, Image.Image]]): - images = [] - for item in image_items: + def encode(self, image_items: List[Union[int, str, torch.Tensor, Image.Image]]): + img_tensors = [] + uuids = [] + valid_id = 0 + valid_ids = [] + for i, item in enumerate(image_items): + if self.world_size_ != 1: + item = obtain(item) if isinstance(item, Image.Image): - image = item + image = item.convert("RGB") + t = self.image_processor.preprocess(image, return_tensors="pt")["pixel_values"] + img_tensors.append(t) + elif isinstance(item, torch.Tensor): + img_tensors.append(item) + elif isinstance(item, int): + uuids.append(item) + image_data = read_shm(get_shm_name_data(item)) + image_data = Image.open(BytesIO(image_data)).convert("RGB") + t = self.image_processor.preprocess(image_data, return_tensors="pt")["pixel_values"] + img_tensors.append(t) elif item.startswith("http://") or item.startswith("https://"): + import requests + image = Image.open(requests.get(item, stream=True).raw) else: - image = Image.open(item) - image = self.image_processor(image.convert('RGB')).unsqueeze(0).to(self.device) - images.append(image) - images = torch.cat(images, dim=0) - return self.forward(images) \ No newline at end of file + raise Exception("Unsupport input types: {} for {}".format(type(item), item)) + + cur_num = img_tensors[-1].shape[0] + + valid_ids.append([valid_id, valid_id + cur_num]) + valid_id += cur_num + + if len(img_tensors) <= 0: + return None + + img = torch.cat(img_tensors, dim=0) + pixel_values = img.to(self.device) + all_img_embeds = self.forward(pixel_values) + + if len(uuids) == 0: + return [all_img_embeds[start:end] for start, end in valid_ids] + else: + for i in range(len(uuids)): + uid = uuids[i] + if not self.cache_client.root.get_item_embed(uid): + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) + self.cache_client.root.set_item_embed(uuids[i]) + + return diff --git a/lightllm/models/internvl/internvl_visual.py b/lightllm/models/internvl/internvl_visual.py index b773141e..3c67a4e8 100644 --- a/lightllm/models/internvl/internvl_visual.py +++ b/lightllm/models/internvl/internvl_visual.py @@ -13,17 +13,24 @@ import rpyc from io import BytesIO from lightllm.models.internvl.img_process import load_image +from rpyc.utils.classic import obtain +from lightllm.utils.log_utils import init_logger + +logger = init_logger(__name__) class InternVLVisionModel: def __init__(self, kvargs): - self.cache_port = kvargs["client_port"] - self.cache_client = None + self.tp_rank_id = kvargs["tp_rank_id"] + self.vit_tp = kvargs["vit_tp"] + self.client_port = kvargs["client_port"] + self.cache_client = rpyc.connect("localhost", self.client_port) + self.visual_gpu = kvargs["visual_gpu"] + self.device = torch.device(f"cuda:{self.visual_gpu}") pass def load_model(self, weight_dir): assert torch.cuda.is_available() - self.device = torch.device("cuda") self.dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float32 self.config = json.load(open(os.path.join(weight_dir, "config.json"))) self.model = AutoModel.from_pretrained( @@ -37,15 +44,27 @@ def load_model(self, weight_dir): def cuda(self): return self - def encode(self, image_items: List[Union[str, torch.Tensor, Image.Image]]): + def encode(self, image_items: List[Union[int, str, torch.Tensor, Image.Image]]): img_tensors = [] valid_ids = [] valid_id = 0 + uuids = [] # load images to batch tensor + for i, url in enumerate(image_items): + if self.vit_tp != 1: + url = obtain(url) if isinstance(url, Image.Image): t = load_image(url, max_num=6) img_tensors.append(t) + elif isinstance(url, torch.Tensor): + img_tensors.append(url) + elif isinstance(url, int): + uuids.append(url) + image_data = read_shm(get_shm_name_data(url)) + image_data = Image.open(BytesIO(image_data)) + t = load_image(image_data) + img_tensors.append(t) else: raise Exception("Unsupport input types: {} for {}".format(type(url), url)) @@ -56,9 +75,20 @@ def encode(self, image_items: List[Union[str, torch.Tensor, Image.Image]]): if len(img_tensors) <= 0: return None - # (b, 3, 224, 224) + torch.cuda.set_device(self.device) imgs = torch.cat(img_tensors, dim=0) - pixel_values = imgs.to(self.device, dtype=self.dtype) + pixel_values = imgs.to(device=self.device, dtype=self.dtype) all_img_embeds = self.model.extract_feature(pixel_values) - return [all_img_embeds[start:end] for start, end in valid_ids] + + if len(uuids) == 0: + return [all_img_embeds[start:end] for start, end in valid_ids] + else: + for i in range(len(uuids)): + uid = uuids[i] + if not self.cache_client.root.get_item_embed(uid): + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) + self.cache_client.root.set_item_embed(uuids[i]) + return diff --git a/lightllm/models/llava/llava_visual.py b/lightllm/models/llava/llava_visual.py index 776f2959..1d07c40b 100644 --- a/lightllm/models/llava/llava_visual.py +++ b/lightllm/models/llava/llava_visual.py @@ -5,10 +5,24 @@ from PIL import Image from typing import List, Union from safetensors import safe_open +from rpyc.utils.classic import obtain +from io import BytesIO +import rpyc +from lightllm.server.embed_cache.utils import tensor2bytes, read_shm, create_shm, get_shm_name_data, get_shm_name_embed +from lightllm.utils.log_utils import init_logger + + +logger = init_logger(__name__) class LlavaVisionModel: - def __init__(self): + def __init__(self, kvargs): + self.tp_rank_ = kvargs["tp_rank"] + self.world_size_ = kvargs["vit_world_size"] + self.client_port = kvargs["client_port"] + self.cache_client = rpyc.connect("localhost", self.client_port) + self.visual_gpu = kvargs["visual_gpu"] + self.device = torch.device(f"cuda:{self.visual_gpu}") pass def load_model(self, weight_dir): @@ -31,6 +45,7 @@ def load_model(self, weight_dir): def load_hf_model(self, config, weight_dir): from transformers import AutoConfig, AutoProcessor, LlavaForConditionalGeneration + config = AutoConfig.from_pretrained(weight_dir, trust_remote_code=True) self.select_layer = config.vision_feature_layer self.select_feature = config.vision_feature_select_strategy @@ -48,12 +63,16 @@ def load_hf_model(self, config, weight_dir): self.projector_weights = {} for f in os.listdir(weight_dir): if f.endswith(".safetensors"): - d = safe_open(os.path.join(weight_dir, f), 'pt', 'cpu') + d = safe_open(os.path.join(weight_dir, f), "pt", "cpu") for k in d.keys(): if "multi_modal_projector.linear_1" in k: - self.projector_weights[k.replace("multi_modal_projector.linear_1", "model.mm_projector.0")] = d.get_tensor(k).half() + self.projector_weights[ + k.replace("multi_modal_projector.linear_1", "model.mm_projector.0") + ] = d.get_tensor(k).half() if "multi_modal_projector.linear_2" in k: - self.projector_weights[k.replace("multi_modal_projector.linear_2", "model.mm_projector.2")] = d.get_tensor(k).half() + self.projector_weights[ + k.replace("multi_modal_projector.linear_2", "model.mm_projector.2") + ] = d.get_tensor(k).half() def load_bin_model(self, config, weight_dir): self.select_layer = config.get("mm_vision_select_layer", -2) @@ -68,6 +87,7 @@ def load_bin_model(self, config, weight_dir): vision_path = os.path.join(weight_dir, vision_path) from transformers import CLIPVisionModel, CLIPImageProcessor + self.image_processor = CLIPImageProcessor.from_pretrained(vision_path) self.vision_tower = CLIPVisionModel.from_pretrained(vision_path).half() @@ -84,7 +104,8 @@ def cuda(self): self.vision_tower = self.vision_tower.cuda() for k, v in self.projector_weights.items(): self.projector_weights[k] = v.cuda() - self.device = torch.device("cuda") + self.device = torch.device(self.device) + torch.cuda.set_device(self.device) return self # batch images infer @@ -113,17 +134,54 @@ def forward(self, x): x = x.view(B, L, -1) return x - def encode(self, image_items: List[Union[str, Image.Image]]): - images = [] - for item in image_items: + def encode(self, image_items: List[Union[int, str, torch.Tensor, Image.Image]]): + img_tensors = [] + uuids = [] + valid_id = 0 + valid_ids = [] + for i, item in enumerate(image_items): + if self.world_size_ != 1: + item = obtain(item) if isinstance(item, Image.Image): - image = item + image = item.convert("RGB") + t = self.image_processor.preprocess(image, return_tensors="pt")["pixel_values"] + img_tensors.append(t) + elif isinstance(item, torch.Tensor): + img_tensors.append(item) + elif isinstance(item, int): + uuids.append(item) + image_data = read_shm(get_shm_name_data(item)) + image_data = Image.open(BytesIO(image_data)).convert("RGB") + t = self.image_processor.preprocess(image_data, return_tensors="pt")["pixel_values"] + img_tensors.append(t) elif item.startswith("http://") or item.startswith("https://"): import requests + image = Image.open(requests.get(item, stream=True).raw) else: - image = Image.open(item) - images.append(image.convert("RGB")) + raise Exception("Unsupport input types: {} for {}".format(type(item), item)) - images = self.image_processor.preprocess(images, return_tensors="pt")["pixel_values"] - return self.forward(images) + cur_num = img_tensors[-1].shape[0] + + valid_ids.append([valid_id, valid_id + cur_num]) + valid_id += cur_num + + if len(img_tensors) <= 0: + return None + + img = torch.cat(img_tensors, dim=0) + pixel_values = img.to(self.device) + all_img_embeds = self.forward(pixel_values) + + if len(uuids) == 0: + return [all_img_embeds[start:end] for start, end in valid_ids] + else: + for i in range(len(uuids)): + uid = uuids[i] + if not self.cache_client.root.get_item_embed(uid): + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) + self.cache_client.root.set_item_embed(uuids[i]) + + return diff --git a/lightllm/models/qwen2_vl/qwen2_visual.py b/lightllm/models/qwen2_vl/qwen2_visual.py index eced443a..e2e27339 100644 --- a/lightllm/models/qwen2_vl/qwen2_visual.py +++ b/lightllm/models/qwen2_vl/qwen2_visual.py @@ -32,6 +32,7 @@ from lightllm.server.embed_cache.utils import tensor2bytes, read_shm, create_shm, get_shm_name_data, get_shm_name_embed import rpyc from io import BytesIO +from rpyc.utils.classic import obtain from transformers.configuration_utils import PretrainedConfig from transformers.utils import logging from transformers.modeling_utils import PreTrainedModel @@ -295,6 +296,7 @@ def forward(self, hidden_states, cu_seqlens, rotary_pos_emb) -> torch.Tensor: class Qwen2VisionTransformerPretrainedModel(nn.Module): def __init__( self, + kvargs, depth=32, embed_dim=1280, hidden_size=3584, @@ -307,6 +309,12 @@ def __init__( temporal_patch_size=2, **kwargs, ): + self.tp_tank_ = kvargs["tp_rank"] + self.world_size_ = kvargs["vit_world_size"] + self.client_port = kvargs["client_port"] + self.cache_client = rpyc.connect("localhost", self.client_port) + self.visual_gpu = kvargs["visual_gpu"] + self.device = torch.device(f"cuda:{self.visual_gpu}") super().__init__() self.depth = depth self.embed_dim = embed_dim @@ -382,11 +390,11 @@ def rot_pos_emb(self, grid_thw): def forward(self, hidden_states: torch.Tensor, grid_thw: torch.Tensor) -> torch.Tensor: hidden_states = hidden_states.to( dtype=self.get_dtype(), - device=torch.device("cuda"), + device=self.device, ) grid_thw = grid_thw.to( dtype=torch.int32, - device=torch.device("cuda"), + device=self.device, ) hidden_states = self.patch_embed(hidden_states) @@ -427,15 +435,38 @@ def load_model(self, weight_dir): self.load_state_dict(weight_dict) - def encode(self, image_items: List[Union[str, Image.Image]]): + def encode(self, image_items: List[Union[int, str, torch.Tensor, Image.Image]]): img_tensors = [] valid_ids = [] valid_id = 0 img_grids = [] + uuids = [] for i, url in enumerate(image_items): + if self.world_size_ != 1: + url = obtain(url) if isinstance(url, Image.Image): t = get_image(url) image_inputs = self.processor.preprocess(images=t, return_tensors="pt") + pixel_values = image_inputs["pixel_values"].to(dtype=torch.bfloat16, device=self.device) + image_grid_thw = image_inputs["image_grid_thw"] + img_tensors.append(pixel_values) + img_grids.append(image_grid_thw) + elif isinstance(url, torch.Tensor): + img_tensors.append(url) + elif isinstance(url, int): + uuids.append(url) + image_data = read_shm(get_shm_name_data(url)) + image_data = Image.open(BytesIO(image_data)) + image_data = get_image(image_data) + image_inputs = self.processor.preprocess(images=image_data, return_tensors="pt") + pixel_values = image_inputs["pixel_values"].to(dtype=torch.bfloat16, device=self.device) + image_grid_thw = image_inputs["image_grid_thw"] + img_tensors.append(pixel_values) + img_grids.append(image_grid_thw) + elif url.startswith("http://") or url.startswith("https://"): + image_data = Image.open(requests.get(url, stream=True).raw) + image_data = get_image(image_data) + image_inputs = self.processor.preprocess(images=image_data, return_tensors="pt") pixel_values = image_inputs["pixel_values"].to(dtype=torch.bfloat16, device="cuda") image_grid_thw = image_inputs["image_grid_thw"] img_tensors.append(pixel_values) @@ -461,4 +492,15 @@ def encode(self, image_items: List[Union[str, Image.Image]]): pixel_values = pixel_values.type(self.get_dtype()) all_img_embeds = self.forward(pixel_values, grid_thw=image_grid_thw).to(self.device) - return [all_img_embeds[start:end] for start, end in valid_ids] + if len(uuids) == 0: + return [all_img_embeds[start:end] for start, end in valid_ids] + else: + for i in range(len(uuids)): + uid = uuids[i] + if not self.cache_client.root.get_item_embed(uid): + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) + self.cache_client.root.set_item_embed(uuids[i]) + + return diff --git a/lightllm/models/qwen_vl/layer_infer/pre_layer_infer.py b/lightllm/models/qwen_vl/layer_infer/pre_layer_infer.py index 616703e3..3a099b12 100644 --- a/lightllm/models/qwen_vl/layer_infer/pre_layer_infer.py +++ b/lightllm/models/qwen_vl/layer_infer/pre_layer_infer.py @@ -38,6 +38,9 @@ def context_forward(self, input_ids, infer_state: LlamaInferStateInfo, layer_wei img_start_loc = 0 img_start_locs = [] + device = layer_weight.wte_weight_.device + dtype = layer_weight.wte_weight_.dtype + hidden_size = layer_weight.wte_weight_.shape[1] for batch_id, p in enumerate(infer_state.multimodal_params): for img in p["images"]: # skip the same image @@ -45,15 +48,11 @@ def context_forward(self, input_ids, infer_state: LlamaInferStateInfo, layer_wei continue # pull the img_embeds by uid from shm data = read_shm(get_shm_name_embed(img["uuid"])) - img_weight.append(bytes2tensor(data).reshape(img["token_num"], -1)) + img_weight.append(bytes2tensor(data, device).reshape(img["token_num"], -1)) img_start_token_ids.append(img["token_id"]) img_token_lens.append(img["token_num"]) img_start_locs.append(img_start_loc) img_start_loc += img["token_num"] - - device = layer_weight.wte_weight_.device - dtype = layer_weight.wte_weight_.dtype - hidden_size = layer_weight.wte_weight_.shape[1] out = torch.zeros((len(input_ids), hidden_size), dtype=dtype, device=device) if len(img_weight) > 0: img_weight = torch.cat(img_weight, dim=0).to(device=device, dtype=dtype) diff --git a/lightllm/models/qwen_vl/qwen_visual.py b/lightllm/models/qwen_vl/qwen_visual.py index fa372d41..dcd88dac 100644 --- a/lightllm/models/qwen_vl/qwen_visual.py +++ b/lightllm/models/qwen_vl/qwen_visual.py @@ -11,13 +11,15 @@ from PIL import Image from typing import Callable, Optional, Sequence, Tuple, List, Union import numpy as np - +import rpyc +from lightllm.server.embed_cache.utils import tensor2bytes, read_shm, create_shm, get_shm_name_data, get_shm_name_embed import torch from torch import nn from torch.nn import functional as F from torch.nn.init import trunc_normal_ from torchvision import transforms from torchvision.transforms import InterpolationMode +from rpyc.utils.classic import obtain def get_abs_pos(abs_pos, tgt_size): @@ -29,15 +31,21 @@ def get_abs_pos(abs_pos, tgt_size): dtype = abs_pos.dtype if src_size != tgt_size: - return F.interpolate( - abs_pos.float().reshape(1, src_size, src_size, -1).permute(0, 3, 1, 2), - size=(tgt_size, tgt_size), - mode="bicubic", - align_corners=False, - ).permute(0, 2, 3, 1).flatten(0, 2).to(dtype=dtype) + return ( + F.interpolate( + abs_pos.float().reshape(1, src_size, src_size, -1).permute(0, 3, 1, 2), + size=(tgt_size, tgt_size), + mode="bicubic", + align_corners=False, + ) + .permute(0, 2, 3, 1) + .flatten(0, 2) + .to(dtype=dtype) + ) else: return abs_pos + # https://github.com/facebookresearch/mae/blob/efb2a8062c206524e35e47d04501ed4f544c0ae8/util/pos_embed.py#L20 def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False): """ @@ -64,7 +72,7 @@ def get_2d_sincos_pos_embed_from_grid(embed_dim, grid): emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2) emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2) - emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D) + emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D) return emb @@ -76,14 +84,14 @@ def get_1d_sincos_pos_embed_from_grid(embed_dim, pos): """ assert embed_dim % 2 == 0 omega = np.arange(embed_dim // 2, dtype=np.float32) - omega /= embed_dim / 2. - omega = 1. / 10000**omega # (D/2,) + omega /= embed_dim / 2.0 + omega = 1.0 / 10000 ** omega # (D/2,) pos = pos.reshape(-1) # (M,) - out = np.einsum('m,d->md', pos, omega) # (M, D/2), outer product + out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product - emb_sin = np.sin(out) # (M, D/2) - emb_cos = np.cos(out) # (M, D/2) + emb_sin = np.sin(out) # (M, D/2) + emb_cos = np.cos(out) # (M, D/2) emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D) return emb @@ -96,14 +104,8 @@ class Resampler(nn.Module): Outputs: A tensor with the shape of (grid_size**2, embed_dim) """ - def __init__( - self, - grid_size, - embed_dim, - num_heads, - kv_dim=None, - norm_layer=nn.LayerNorm - ): + + def __init__(self, grid_size, embed_dim, num_heads, kv_dim=None, norm_layer=nn.LayerNorm): super().__init__() self.num_queries = grid_size ** 2 self.embed_dim = embed_dim @@ -114,7 +116,7 @@ def __init__( ).requires_grad_(False) self.query = nn.Parameter(torch.zeros(self.num_queries, embed_dim)) - trunc_normal_(self.query, std=.02) + trunc_normal_(self.query, std=0.02) if kv_dim is not None and kv_dim != embed_dim: self.kv_proj = nn.Linear(kv_dim, embed_dim, bias=False) @@ -124,12 +126,12 @@ def __init__( self.attn = nn.MultiheadAttention(embed_dim, num_heads) self.ln_q = norm_layer(embed_dim) self.ln_kv = norm_layer(embed_dim) - + # self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): - trunc_normal_(m.weight, std=.02) + trunc_normal_(m.weight, std=0.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): @@ -146,10 +148,8 @@ def forward(self, x, attn_mask=None): N = x.shape[1] q = self.ln_q(self.query) out = self.attn( - self._repeat(q, N) + self.pos_embed.unsqueeze(1), - x + pos_embed.unsqueeze(1), - x, - attn_mask=attn_mask)[0] + self._repeat(q, N) + self.pos_embed.unsqueeze(1), x + pos_embed.unsqueeze(1), x, attn_mask=attn_mask + )[0] return out.permute(1, 0, 2) def _repeat(self, query, N: int): @@ -163,8 +163,7 @@ class VisualAttention(nn.Module): and returns output of the same size. """ - def __init__(self, embed_dim, num_heads, - bias=True, kdim=None, vdim=None): + def __init__(self, embed_dim, num_heads, bias=True, kdim=None, vdim=None): super(VisualAttention, self).__init__() self.embed_dim = embed_dim self.kdim = kdim if kdim is not None else embed_dim @@ -180,37 +179,37 @@ def __init__(self, embed_dim, num_heads, self.hidden_size_per_partition = embed_dim # Strided linear layer. - assert self._qkv_same_embed_dim, 'Only Support SelfAttention Currently' + assert self._qkv_same_embed_dim, "Only Support SelfAttention Currently" self.in_proj = nn.Linear(embed_dim, 3 * embed_dim) self.out_proj = nn.Linear(embed_dim, embed_dim) self.norm_factor = math.sqrt(self.hidden_size_per_attention_head) - def forward(self, query, key, value, attn_mask = None): + def forward(self, query, key, value, attn_mask=None): # query/key/value: [sq, b, h] sq, b, _ = query.size() - assert torch.allclose(query, key), 'Only Support Self-Attention Currently' + assert torch.allclose(query, key), "Only Support Self-Attention Currently" sk = sq mixed_x_layer = self.in_proj(query) # [sq, b, (np * 3 * hn)] --> [sq, b, np, 3 * hn] - new_tensor_shape = mixed_x_layer.size()[:-1] + \ - (self.num_attention_heads_per_partition, - 3 * self.hidden_size_per_attention_head) + new_tensor_shape = mixed_x_layer.size()[:-1] + ( + self.num_attention_heads_per_partition, + 3 * self.hidden_size_per_attention_head, + ) mixed_x_layer = mixed_x_layer.view(*new_tensor_shape) # [sq, b, np, 3 * hn] --> 3 [sq, b, np, hn] - query_layer, key_layer, value_layer = mixed_x_layer.split( - self.hidden_size_per_attention_head, dim=-1) + query_layer, key_layer, value_layer = mixed_x_layer.split(self.hidden_size_per_attention_head, dim=-1) # [sq, b, np, hn] -> [sq, b * np, hn] - query_layer = query_layer.view(sq, - b * self.num_attention_heads_per_partition, - self.hidden_size_per_attention_head).transpose(0, 1) + query_layer = query_layer.view( + sq, b * self.num_attention_heads_per_partition, self.hidden_size_per_attention_head + ).transpose(0, 1) # [sk, b, np, hn] -> [sk, b * np, hn] - key_layer = key_layer.view(sk, - b * self.num_attention_heads_per_partition, - self.hidden_size_per_attention_head).transpose(0, 1) + key_layer = key_layer.view( + sk, b * self.num_attention_heads_per_partition, self.hidden_size_per_attention_head + ).transpose(0, 1) q_scaled = query_layer / self.norm_factor if attn_mask is not None: @@ -219,24 +218,23 @@ def forward(self, query, key, value, attn_mask = None): attention_probs = torch.bmm(q_scaled, key_layer.transpose(-2, -1)) attention_probs = attention_probs.softmax(dim=-1) - value_layer = value_layer.view(sk, - b * self.num_attention_heads_per_partition, - self.hidden_size_per_attention_head).transpose(0, 1) + value_layer = value_layer.view( + sk, b * self.num_attention_heads_per_partition, self.hidden_size_per_attention_head + ).transpose(0, 1) # matmul: [b * np, sq, hn] context_layer = torch.bmm(attention_probs, value_layer) # change view [b, np, sq, hn] - context_layer = context_layer.view(b, - self.num_attention_heads_per_partition, - sq, self.hidden_size_per_attention_head) + context_layer = context_layer.view( + b, self.num_attention_heads_per_partition, sq, self.hidden_size_per_attention_head + ) # [b, np, sq, hn] --> [sq, b, np, hn] context_layer = context_layer.permute(2, 0, 1, 3).contiguous() # [sq, b, np, hn] --> [sq, b, hp] - new_context_layer_shape = context_layer.size()[:-2] + \ - (self.hidden_size_per_partition,) + new_context_layer_shape = context_layer.size()[:-2] + (self.hidden_size_per_partition,) context_layer = context_layer.view(*new_context_layer_shape) output = self.out_proj(context_layer) @@ -246,13 +244,13 @@ def forward(self, query, key, value, attn_mask = None): class VisualAttentionBlock(nn.Module): def __init__( - self, - d_model: int, - n_head: int, - mlp_ratio: float = 4.0, - act_layer: Callable = nn.GELU, - norm_layer: Callable = nn.LayerNorm, - is_cross_attention: bool = False, + self, + d_model: int, + n_head: int, + mlp_ratio: float = 4.0, + act_layer: Callable = nn.GELU, + norm_layer: Callable = nn.LayerNorm, + is_cross_attention: bool = False, ): super().__init__() @@ -263,18 +261,22 @@ def __init__( self.ln_2 = norm_layer(d_model) mlp_width = int(d_model * mlp_ratio) self.attn = VisualAttention(d_model, n_head) - self.mlp = nn.Sequential(OrderedDict([ - ("c_fc", nn.Linear(d_model, mlp_width)), - ("gelu", act_layer()), - ("c_proj", nn.Linear(mlp_width, d_model)) - ])) + self.mlp = nn.Sequential( + OrderedDict( + [ + ("c_fc", nn.Linear(d_model, mlp_width)), + ("gelu", act_layer()), + ("c_proj", nn.Linear(mlp_width, d_model)), + ] + ) + ) def attention( - self, - q_x: torch.Tensor, - k_x: Optional[torch.Tensor] = None, - v_x: Optional[torch.Tensor] = None, - attn_mask: Optional[torch.Tensor] = None, + self, + q_x: torch.Tensor, + k_x: Optional[torch.Tensor] = None, + v_x: Optional[torch.Tensor] = None, + attn_mask: Optional[torch.Tensor] = None, ): k_x = k_x if k_x is not None else q_x v_x = v_x if v_x is not None else q_x @@ -283,11 +285,11 @@ def attention( return self.attn(q_x, k_x, v_x, attn_mask=attn_mask) def forward( - self, - q_x: torch.Tensor, - k_x: Optional[torch.Tensor] = None, - v_x: Optional[torch.Tensor] = None, - attn_mask: Optional[torch.Tensor] = None, + self, + q_x: torch.Tensor, + k_x: Optional[torch.Tensor] = None, + v_x: Optional[torch.Tensor] = None, + attn_mask: Optional[torch.Tensor] = None, ): k_x = self.ln_1_kv(k_x) if hasattr(self, "ln_1_kv") and k_x is not None else None v_x = self.ln_1_kv(v_x) if hasattr(self, "ln_1_kv") and v_x is not None else None @@ -299,23 +301,24 @@ def forward( class TransformerBlock(nn.Module): def __init__( - self, - width: int, - layers: int, - heads: int, - mlp_ratio: float = 4.0, - act_layer: Callable = nn.GELU, - norm_layer: Callable = nn.LayerNorm, + self, + width: int, + layers: int, + heads: int, + mlp_ratio: float = 4.0, + act_layer: Callable = nn.GELU, + norm_layer: Callable = nn.LayerNorm, ): super().__init__() self.width = width self.layers = layers - self.resblocks = nn.ModuleList([ - VisualAttentionBlock( - width, heads, mlp_ratio, act_layer=act_layer, norm_layer=norm_layer) - for _ in range(layers) - ]) + self.resblocks = nn.ModuleList( + [ + VisualAttentionBlock(width, heads, mlp_ratio, act_layer=act_layer, norm_layer=norm_layer) + for _ in range(layers) + ] + ) def get_cast_dtype(self) -> torch.dtype: return self.resblocks[0].mlp.c_fc.weight.dtype @@ -330,19 +333,25 @@ def forward(self, x: torch.Tensor, attn_mask: Optional[torch.Tensor] = None): class QWenVisionTransformer(nn.Module): - def __init__( - self, - image_size: int, - patch_size: int, - width: int, - layers: int, - heads: int, - mlp_ratio: float, - n_queries: int = 256, - output_dim: int = 512, - **kwargs + self, + kvargs, + image_size: int, + patch_size: int, + width: int, + layers: int, + heads: int, + mlp_ratio: float, + n_queries: int = 256, + output_dim: int = 512, + **kwargs, ): + self.tp_rank_ = kvargs["tp_rank"] + self.world_size_ = kvargs["vit_world_size"] + self.client_port = kvargs["client_port"] + self.cache_client = rpyc.connect("localhost", self.client_port) + self.visual_gpu = kvargs["visual_gpu"] + self.device = torch.device(f"cuda:{self.visual_gpu}") super().__init__() image_height, image_width = self.image_size = (image_size, image_size) patch_height, patch_width = self.patch_size = (patch_size, patch_size) @@ -351,14 +360,13 @@ def __init__( mean = (0.48145466, 0.4578275, 0.40821073) std = (0.26862954, 0.26130258, 0.27577711) - self.image_transform = transforms.Compose([ - transforms.Resize( - (image_size, image_size), - interpolation=InterpolationMode.BICUBIC - ), - transforms.ToTensor(), - transforms.Normalize(mean=mean, std=std), - ]) + self.image_transform = transforms.Compose( + [ + transforms.Resize((image_size, image_size), interpolation=InterpolationMode.BICUBIC), + transforms.ToTensor(), + transforms.Normalize(mean=mean, std=std), + ] + ) self.conv1 = nn.Conv2d(in_channels=3, out_channels=width, kernel_size=patch_size, stride=patch_size, bias=False) @@ -387,7 +395,7 @@ def __init__( norm_layer=norm_layer, ) self.ln_post = norm_layer(output_dim) - self.proj = nn.Parameter((output_dim** -0.5) * torch.randn(output_dim, output_dim)) + self.proj = nn.Parameter((output_dim ** -0.5) * torch.randn(output_dim, output_dim)) def forward(self, x: torch.Tensor): x = x.to( @@ -413,28 +421,62 @@ def forward(self, x: torch.Tensor): x = x.to(dtype=torch.float16) return x - - def encode(self, image_items: List[Union[str, Image.Image]]): - images = [] - for item in image_items: + + def encode(self, image_items: List[Union[int, str, torch.Tensor, Image.Image]]): + img_tensors = [] + uuids = [] + valid_id = 0 + valid_ids = [] + for i, item in enumerate(image_items): + if self.world_size_ != 1: + item = obtain(item) if isinstance(item, Image.Image): - image = item + image = item.convert("RGB") + t = self.image_transform(image) + img_tensors.append(t) + elif isinstance(item, torch.Tensor): + img_tensors.append(item) + elif isinstance(item, int): + uuids.append(item) + image_data = read_shm(get_shm_name_data(item)) + image_data = Image.open(BytesIO(image_data)).convert("RGB") + t = self.image_transform(image_data) + img_tensors.append(t) elif item.startswith("http://") or item.startswith("https://"): image = Image.open(requests.get(item, stream=True).raw) else: - image = Image.open(item) - image = image.convert("RGB") - images.append(self.image_transform(image)) - images = torch.stack(images, dim=0) - return self(images) - + raise Exception("Unsupport input types: {} for {}".format(type(item), item)) + cur_num = img_tensors[-1].shape[0] + + valid_ids.append([valid_id, valid_id + cur_num]) + valid_id += cur_num + if len(img_tensors) <= 0: + return None + + pixel_values = torch.stack(img_tensors, dim=0) + all_img_embeds = self(pixel_values) + + if len(uuids) == 0: + return [all_img_embeds[start:end] for start, end in valid_ids] + else: + for i in range(len(uuids)): + uid = uuids[i] + if not self.cache_client.root.get_item_embed(uid): + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) + self.cache_client.root.set_item_embed(uuids[i]) + + return + def load_model(self, weight_dir): import os - weight_files = [file_ for file_ in os.listdir(weight_dir) if file_.endswith(".bin") ] + + weight_files = [file_ for file_ in os.listdir(weight_dir) if file_.endswith(".bin")] weight_dict = {} for file_ in weight_files: f_weight_dict = torch.load(os.path.join(weight_dir, file_), "cpu") for k, v in f_weight_dict.items(): if "visual" in k: - weight_dict[k[len("transformer.visual."):]] = v + weight_dict[k[len("transformer.visual.") :]] = v self.load_state_dict(weight_dict) diff --git a/lightllm/server/api_server.py b/lightllm/server/api_server.py index 91126425..73688de5 100755 --- a/lightllm/server/api_server.py +++ b/lightllm/server/api_server.py @@ -467,6 +467,15 @@ def make_argument_parser() -> argparse.ArgumentParser: "--grouping_key", action="append", default=[], help="grouping_key for the monitor in the form key=value" ) parser.add_argument("--push_interval", type=int, default=10, help="interval of pushing monitoring metrics") + parser.add_argument("--visual_gpu_ids", type=str, default="0", help="Comma separated GPU IDs to use, e.g., '0,1,2'") + parser.add_argument("--visual_tp", type=int, default=1, help="number of tensort parallel instances for ViT") + parser.add_argument("--visual_dp", type=int, default=1, help="number of data parallel instances for ViT") + parser.add_argument( + "--visual_nccl_ports", + type=str, + default="29500", + help="Comma-separated list of NCCL ports to build a distributed environment for Vit.", + ) parser.add_argument( "--enable_monitor_auth", action="store_true", help="Whether to open authentication for push_gateway" ) @@ -523,6 +532,22 @@ def main(): if args.beam_mode or args.diverse_mode: assert args.router_token_ratio == 0.0 + # 检查GPU数量是否足够 + visual_gpu_ids = [int(gpu_id) for gpu_id in args.visual_gpu_ids.split(",")] + total_required_gpus = args.visual_dp * args.visual_tp + if len(visual_gpu_ids) < total_required_gpus: + raise ValueError( + f"Not enough GPUs specified. You need at least {total_required_gpus} GPUs, but got {len(visual_gpu_ids)}." + ) + else: + args.visual_gpu_ids = visual_gpu_ids[:total_required_gpus] + + visual_nccl_port_ids = [int(nccl_port_id) for nccl_port_id in args.visual_nccl_ports.split(",")] + if len(visual_nccl_port_ids) != args.visual_dp: + raise ValueError(f"The number of ports ({len(visual_nccl_port_ids)}) does not match vit_dp ({args.visual_dp}).") + + args.visual_nccl_port = visual_nccl_port_ids + if not args.splitfuse_mode: # 普通模式下 if args.batch_max_tokens is None: @@ -553,9 +578,18 @@ def main(): logger.info(f"all start args:{args}") - can_use_ports = alloc_can_use_network_port(num=6 + args.tp, used_nccl_port=args.nccl_port) + can_use_ports = alloc_can_use_network_port( + num=6 + args.tp + args.visual_dp * args.visual_tp, used_nccl_port=[args.nccl_port, args.visual_nccl_port] + ) router_port, detokenization_port, httpserver_port, visual_port, cache_port, metric_port = can_use_ports[0:6] - model_rpc_ports = can_use_ports[6:] + model_rpc_ports = can_use_ports[6 : 6 + args.tp] + + visual_model_tp_ports = [] + for dp_index in range(args.visual_dp): + tp_ports_for_dp = can_use_ports[ + 6 + args.tp + dp_index * args.visual_tp : 6 + args.tp + (dp_index + 1) * args.visual_tp + ] + visual_model_tp_ports.append(tp_ports_for_dp) if args.enable_multimodal: start_submodule_processes( @@ -598,7 +632,7 @@ def main(): start_visual_process, ], start_args=[ - (args, router_port, visual_port, cache_port), + (args, router_port, visual_port, cache_port, visual_model_tp_ports), ], ) diff --git a/lightllm/server/embed_cache/utils.py b/lightllm/server/embed_cache/utils.py index 78bce1ea..20c78724 100644 --- a/lightllm/server/embed_cache/utils.py +++ b/lightllm/server/embed_cache/utils.py @@ -13,9 +13,9 @@ def tensor2bytes(t): return buf.read() -def bytes2tensor(b): +def bytes2tensor(b, device): # return torch.from_numpy(np.frombuffer(b, dtype=np.float16)).cuda() - return torch.load(BytesIO(b)) + return torch.load(BytesIO(b), map_location=device) def create_shm(name, data): diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index f3158dcb..66dd854f 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -24,6 +24,7 @@ def __init__( router_port, visual_port, client_port, + visual_model_rpc_ports, infer_batch_size=4, ): context = zmq.asyncio.Context(2) @@ -37,28 +38,40 @@ def __init__( self.waiting_reqs = [] self.model_weightdir = args.model_dir self.tp_world_size = args.tp - self.world_size = 1 + self.vit_dp = args.visual_dp + self.vit_tp = args.visual_tp self.infer_batch_size = infer_batch_size self.trust_remote_code = args.trust_remote_code self.args = args + self.visual_model_rpcs_ports = visual_model_rpc_ports async def wait_to_model_ready(self): - self.model_rpcs: List[VisualModelRpcClient] = [] - for rank_id in range(self.world_size): - rpc_model = await start_model_process(world_size=self.world_size) - self.model_rpcs.append(rpc_model) + self.model_rpcs: List[List[VisualModelRpcClient]] = [[] for _ in range(self.vit_dp)] + + for dp_rank_id in range(self.vit_dp): + tp_ports_each_dp = self.visual_model_rpcs_ports[dp_rank_id] + for tp_rank_id in range(self.vit_tp): + rpc_model = await start_model_process(port=tp_ports_each_dp[tp_rank_id], vit_tp=self.vit_tp) + self.model_rpcs[dp_rank_id].append(rpc_model) init_model_ret = [] - for rank_id in range(self.world_size): # async init model process - kvargs = { - "weight_dir": self.model_weightdir, - "trust_remote_code": self.trust_remote_code, - "client_port": self.client_port, - "rank_id": rank_id, - "data_type": self.args.data_type, - } - init_model_ret.append(self.model_rpcs[rank_id].init_model(kvargs)) + for dp_rank_id in range(self.vit_dp): # async init model process + for tp_rank_id in range(self.vit_tp): + kvargs = { + "weight_dir": self.model_weightdir, + "trust_remote_code": self.trust_remote_code, + "vit_dp": self.vit_dp, + "vit_tp": self.vit_tp, + "client_port": self.client_port, + "tp_rank_id": tp_rank_id, + "dp_rank_id": dp_rank_id, + "vit_rank_id": dp_rank_id * self.vit_tp + tp_rank_id, + "data_type": self.args.data_type, + "visual_nccl_port": self.args.visual_nccl_port[dp_rank_id], + "visual_gpu_ids": self.args.visual_gpu_ids, + } + init_model_ret.append(self.model_rpcs[dp_rank_id][tp_rank_id].init_model(kvargs)) await asyncio.gather(*init_model_ret) return @@ -73,25 +86,15 @@ async def infer_imgs(self, uuids): if len(uuids) == 0: return # uuids -> PIL Images - images = [] - for uid in uuids: - image_data = read_shm(get_shm_name_data(uid)) - images.append(Image.open(BytesIO(image_data))) - # print(" + got pil image:", images[-1].size, images[-1].mode) - rets = [self.model_rpcs[tp_rank].encode(images) for tp_rank in range(self.world_size)] - ans = await asyncio.gather(*rets) - if self.world_size != 1: - img_embed = obtain(ans[0]) - else: - img_embed = ans[0] - torch.cuda.synchronize() - # b = time.time() - for i in range(len(uuids)): - # print(" + set_item_embed:", uuids[i], img_embed[i].shape) - if not self.cache_client.root.get_item_embed(uuids[i]): - cur_embed_bytes = tensor2bytes(img_embed[i]) - create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) - self.cache_client.root.set_item_embed(uuids[i]) + tasks = [] + for vit_dp_rank in range(self.vit_dp): + assigned_uuids = [uuids[i] for i in range(vit_dp_rank, len(uuids), self.vit_dp)] + if assigned_uuids: + task = asyncio.create_task(self.model_rpcs[vit_dp_rank][0].encode(assigned_uuids)) + tasks.append(task) + + # rets = [self.model_rpcs[tp_rank].encode(images) for tp_rank in range(self.world_size)] + await asyncio.gather(*tasks) return async def loop_for_fwd(self): @@ -140,7 +143,7 @@ def clean_up(self): return -def start_visual_process(args, router_port, visual_port, client_port, pipe_writer): +def start_visual_process(args, router_port, visual_port, client_port, model_rpc_ports, pipe_writer): # 注册graceful 退出的处理 from lightllm.utils.graceful_utils import graceful_registry import inspect @@ -148,7 +151,7 @@ def start_visual_process(args, router_port, visual_port, client_port, pipe_write graceful_registry(inspect.currentframe().f_code.co_name) try: - visualserver = VisualManager(args, router_port, visual_port, client_port) + visualserver = VisualManager(args, router_port, visual_port, client_port, model_rpc_ports) asyncio.run(visualserver.wait_to_model_ready()) except Exception as e: import traceback diff --git a/lightllm/server/visualserver/model_infer/model_rpc.py b/lightllm/server/visualserver/model_infer/model_rpc.py index 55aebdef..cda1d2ef 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc.py +++ b/lightllm/server/visualserver/model_infer/model_rpc.py @@ -2,6 +2,7 @@ import numpy as np import rpyc import torch +import os from datetime import timedelta from typing import Dict, List, Tuple from transformers.configuration_utils import PretrainedConfig @@ -18,42 +19,56 @@ class VisualModelRpcServer(rpyc.Service): def exposed_init_model(self, kvargs): - # 注册graceful 退出的处理 - from lightllm.utils.graceful_utils import graceful_registry - import inspect - - graceful_registry(inspect.currentframe().f_code.co_name) - - # import torch - # import torch.distributed as dist - # world_size = kvargs["world_size"] - # if world_size != 1: - # kvargs = obtain(kvargs) - # world_size = kvargs["world_size"] - # dist.init_process_group('nccl', init_method=f'tcp://127.0.0.1:{kvargs["nccl_port"]}', - # rank=self.tp_rank, world_size=world_size) - # torch.cuda.set_device(self.tp_rank) + import torch + import torch.distributed as dist + + self.vit_dp = kvargs["vit_dp"] + self.vit_tp = kvargs["vit_tp"] + self.dp_rank_id = kvargs["dp_rank_id"] + self.tp_rank_id = kvargs["tp_rank_id"] + client_port = kvargs["client_port"] + data_type = kvargs["data_type"] weight_dir = kvargs["weight_dir"] + visual_gpu_ids = kvargs["visual_gpu_ids"] + visual_nccl_port = kvargs["visual_nccl_port"] + self.vit_rank_id = kvargs["vit_rank_id"] + + model_kvargs = { + "tp_rank_id": self.tp_rank_id, + "vit_tp": self.vit_tp, + "weight_dir": weight_dir, + "client_port": client_port, + "data_type": data_type, + "vit_rank_id": self.vit_rank_id, + "visual_gpu": visual_gpu_ids[self.vit_rank_id], + } + if self.vit_tp != 1: + dist.init_process_group( + backend="nccl", + init_method=f"tcp://127.0.0.1:{visual_nccl_port}", + rank=self.tp_rank_id, + world_size=self.vit_tp, + ) + torch.cuda.set_device(visual_gpu_ids[self.vit_rank_id]) model_cfg, _ = PretrainedConfig.get_config_dict(weight_dir) + try: self.model_type = model_cfg["model_type"] if self.model_type == "qwen": - self.model = QWenVisionTransformer(**model_cfg["visual"]).eval().bfloat16() + self.model = QWenVisionTransformer(model_kvargs, **model_cfg["visual"]).eval().bfloat16() elif self.model_type == "qwen2_vl": - self.model = Qwen2VisionTransformerPretrainedModel(**model_cfg["vision_config"]).eval().bfloat16() + self.model = ( + Qwen2VisionTransformerPretrainedModel(model_kvargs, **model_cfg["vision_config"]).eval().bfloat16() + ) elif self.model_type == "llava": - self.model = LlavaVisionModel() + self.model = LlavaVisionModel(model_kvargs) elif self.model_type == "internlmxcomposer2": - self.model = InternVisionModel() + self.model = InternVisionModel(model_kvargs) elif self.model_type == "internvl_chat": - # tp_rank = kvargs['rank_id'] - client_port = kvargs["client_port"] - data_type = kvargs["data_type"] - model_kvargs = {"weight_dir": weight_dir, "client_port": client_port, "data_type": data_type} self.model = InternVLVisionModel(model_kvargs) - else: raise Exception(f"can not support {self.model_type} now") + self.model.load_model(weight_dir) self.model = self.model.cuda() except Exception as e: @@ -78,11 +93,11 @@ def exposed_encode(self, images): class VisualModelRpcClient: - def __init__(self, model_rpc, world_size, rpc_server_process=None): + def __init__(self, model_rpc, vit_tp, rpc_server_process=None): self.model: VisualModelRpcServer = model_rpc - self.world_size = world_size + self.vit_tp = vit_tp self.rpc_server_process = rpc_server_process - self.use_rpc = self.world_size != 1 + self.use_rpc = self.vit_tp != 1 if self.use_rpc: def async_wrap(f): @@ -111,14 +126,46 @@ async def init_model(self, kvargs): else: return - async def encode(self, images): - ans = self._encode(images) + async def encode(self, uuids): + ans = self._encode(uuids) if self.use_rpc: return await ans else: return ans -async def start_model_process(world_size): - if world_size == 1: - return VisualModelRpcClient(VisualModelRpcServer(), world_size) +def _init_env(port): + # 注册graceful 退出的处理 + from lightllm.utils.graceful_utils import graceful_registry + import inspect + + graceful_registry(inspect.currentframe().f_code.co_name) + + from rpyc.utils.server import ThreadedServer + + t = ThreadedServer(VisualModelRpcServer(), port=port, protocol_config={"allow_pickle": True}) + t.start() + return + + +async def start_model_process(port, vit_tp): + if vit_tp == 1: + return VisualModelRpcClient(VisualModelRpcServer(), vit_tp) + import multiprocessing + + proc = multiprocessing.Process(target=_init_env, args=(port,)) + proc.start() + await asyncio.sleep(2) + repeat_count = 0 + while repeat_count < 20: + try: + con = rpyc.connect("localhost", port, config={"allow_pickle": True}) + break + except BaseException: + await asyncio.sleep(1) + repeat_count += 1 + if repeat_count == 20: + raise Exception("init rpc env error!") + + assert proc.is_alive() + return VisualModelRpcClient(con.root, vit_tp, rpc_server_process=proc) diff --git a/lightllm/utils/net_utils.py b/lightllm/utils/net_utils.py index acac8fda..04ae751f 100644 --- a/lightllm/utils/net_utils.py +++ b/lightllm/utils/net_utils.py @@ -6,7 +6,7 @@ def alloc_can_use_network_port(num=3, used_nccl_port=None): for port in range(10000, 65536): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: result = s.connect_ex(("localhost", port)) - if result != 0 and port != used_nccl_port: + if result != 0 and port not in used_nccl_port: port_list.append(port) if len(port_list) == num: