Qwen1.5-1.8b部署

仿照ChatGLM3部署,参考了Qwen模型的文档,模型地址https://modelscope.cn/models/qwen/Qwen1.5-1.8B-Chat/summary

http接口

  • 服务端代码api.py
from fastapi import FastAPI, Request
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import uvicorn
import json
import datetime
import torch

# 设置设备参数
DEVICE = "cuda"  # 使用CUDA
DEVICE_ID = "0"  # CUDA设备ID,如果未设置则为空
CUDA_DEVICE = f"{DEVICE}:{DEVICE_ID}" if DEVICE_ID else DEVICE  # 组合CUDA设备信息

# 加载预训练的分词器和模型
model_name_or_path = '/root/autodl-tmp/qwen/Qwen1.5-1.8B-Chat'
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map="auto", torch_dtype=torch.bfloat16)

# 清理GPU内存函数
def torch_gc():
    if torch.cuda.is_available():  # 检查是否可用CUDA
        with torch.cuda.device(CUDA_DEVICE):  # 指定CUDA设备
            torch.cuda.empty_cache()  # 清空CUDA缓存
            torch.cuda.ipc_collect()  # 收集CUDA内存碎片

# 创建FastAPI应用
app = FastAPI()

# 处理POST请求的端点
@app.post("/")
async def create_item(request: Request):
    global model, tokenizer  # 声明全局变量以便在函数内部使用模型和分词器
    json_post_raw = await request.json()  # 获取POST请求的JSON数据
    json_post = json.dumps(json_post_raw)  # 将JSON数据转换为字符串
    json_post_list = json.loads(json_post)  # 将字符串转换为Python对象
    prompt = json_post_list.get('prompt')  # 获取请求中的提示

    messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
    ]

    # 调用模型进行对话生成
    input_ids = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
    model_inputs = tokenizer([input_ids], return_tensors="pt").to('cuda')
    generated_ids = model.generate(model_inputs.input_ids,max_new_tokens=512)
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    now = datetime.datetime.now()  # 获取当前时间
    time = now.strftime("%Y-%m-%d %H:%M:%S")  # 格式化时间为字符串
    # 构建响应JSON
    answer = {
        "response": response,
        "status": 200,
        "time": time
    }
    # 构建日志信息
    log = "[" + time + "] " + '", prompt:"' + prompt + '", response:"' + repr(response) + '"'
    print(log)  # 打印日志
    torch_gc()  # 执行GPU内存清理
    return answer  # 返回响应

# 主函数入口
if __name__ == '__main__':
    # 启动FastAPI应用
    # 用6006端口可以将autodl的端口映射到本地,从而在本地使用api
    uvicorn.run("api:app", host='127.0.0.1', port=6006, workers=2)  # 在指定端口和主机上启动应用
    # gunicorn api:app -w 3 -k uvicorn.workers.UvicornWorker -b 127.0.0.1:6006
  • 客户端代码clientapi.py
import requests
import json

def get_completion(prompt):
    headers = {'Content-Type': 'application/json'}
    data = {"prompt": prompt}
    response = requests.post(url='http://127.0.0.1:6006', headers=headers, data=json.dumps(data))
    return response.json()['response']

if __name__ == '__main__':
    print(get_completion('你可以记录之前说过的内容吗'))
  • 压测代码locusthttp.py,运行locust -f locusthttp.py(有UI),使用vmstattop查看进程、内存、CPU等情况。
import json

from locust import HttpUser, TaskSet, task

# 定义用户行为
class UserBehavior(TaskSet):
    # 任一测试用例执行前均会执行一次
    def on_start(self):
        print('开始性能测试')

    # 表示一个用户为行,访问百度首页。使用 @task装饰该方法为一个事务。client.get()用于指请求的路径“ / ”,因为是百度首页,所以指定为根路径。
    @task(1)
    def index(self):
        self.client.get("/")

    @task(2)  # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
    def index2(self):
        headers = {'Content-Type': 'application/json'}
        data = {'prompt': '你知道珠穆朗玛峰吗'}
        self.client.post(url='/index2', headers=headers, data=json.dumps(data))

    @task(2)  # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
    def index3(self):
        headers = {'Content-Type': 'application/json'}
        data = {'prompt': '你是谁'}
        self.client.post(url='/index3', headers=headers, data=json.dumps(data))

    @task(2)  # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
    def index4(self):
        headers = {'Content-Type': 'application/json'}
        data = {'prompt': '西红柿炒番茄怎么做'}
        self.client.post(url='/index4', headers=headers, data=json.dumps(data))


# 用于设置性能测试
class WebsiteUser(HttpUser):
    # 指向一个定义的用户行为类。
    tasks = [UserBehavior]
    # 执行事务之间用户等待时间的下界(单位:毫秒)。如果TaskSet类中有覆盖,以TaskSet 中的定义为准。
    min_wait = 3000
    # 执行事务之间用户等待时间的上界(单位:毫秒)。如果TaskSet类中有覆盖,以TaskSet中的定义为准。
    max_wait = 6000
    # 设置 Locust 多少秒后超时,如果为 None ,则不会超时。
    stop_timeout = 5
    # 一个Locust实例被挑选执行的权重,数值越大,执行频率越高。在一个 locustfile.py 文件中可以同时定义多个 HttpUser 子类,然后分配他们的执行权重
    weight = 3
    # 脚本指定host执行测试时则不在需要指定
    host = "http://127.0.0.1:6006"

WebSocket长连接

  • 服务端代码websocketapi.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import uvicorn
import torch

pretrained = "/root/autodl-tmp/qwen/Qwen1.5-1.8B-Chat"
tokenizer = AutoTokenizer.from_pretrained(pretrained, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(pretrained, device_map="auto", torch_dtype=torch.bfloat16)
model = model.eval()
app = FastAPI()

app.add_middleware(
    CORSMiddleware
)

with open('websocket_demo.html') as f:
    html = f.read()


@app.get("/")
async def get():
    return HTMLResponse(html)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    input: JSON String of {"prompt": ""}
    output: JSON String of {"response": "", "status": 200}
        status 200 stand for response ended, else not
    """
    await websocket.accept()
    try:
        while True:
            json_request = await websocket.receive_json()
            prompt = json_request['prompt']
            messages = [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt}
            ]
            input_ids = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
            model_inputs = tokenizer([input_ids], return_tensors="pt").to('cuda')
            generated_ids = model.generate(model_inputs.input_ids,max_new_tokens=512)
            generated_ids = [
                output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
            ]
            response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
            await websocket.send_json({
                "response": response,
                "status": 202,
            })
            await websocket.send_json({"status": 200})
    except WebSocketDisconnect:
        pass


def main():
    uvicorn.run(f"{__name__}:app", host='127.0.0.1', port=6006, workers=2)


if __name__ == '__main__':
    main()
    # gunicorn websocket_api_qwen:app -w 3 -k uvicorn.workers.UvicornWorker -b 127.0.0.1:6006
  • 前端代码websocket.html
<!DOCTYPE html>
<html lang="en">
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="return false;" id="form">
    <label for="messageText"></label>
    <input type="text" id="messageText" autocomplete="off"/>
    <button type="submit">Send</button>
</form>
<ul id='messageBox'>
</ul>
<script>
    let ws = new WebSocket("ws://" + location.host + "/ws");
    let history = [];
    let last_message_element = null;

    function appendMessage(text, sender, dom = null) {
        if (dom === null) {
            let messageBox = document.getElementById('messageBox');
            dom = document.createElement('li');
            messageBox.appendChild(dom);
        }
        dom.innerText = sender + ':' + text;
        return dom
    }

    function sendMessage(event) {
        if (last_message_element !== null) {  // 如果机器人还没回复完
            return;
        }
        let input = document.getElementById("messageText");
        if (input.value === "") {
            return;
        }
        let body = {"prompt": input.value};
        ws.send(JSON.stringify(body));
        appendMessage(input.value, '用户')
        input.value = '';
        event.preventDefault();
    }

    document.getElementById("form").addEventListener('submit', sendMessage)

    ws.onmessage = function (event) {
        let body = JSON.parse(event.data);
        let status = body['status']
        if (status === 200) {  // 如果回答结束了
            last_message_element = null;
        } else {
            history = body['history']
            last_message_element = appendMessage(body['response'], 'Qwen1.5-1.8B-chat', last_message_element)
        }
    };
</script>
</body>
</html>
  • 运行结果
    在这里插入图片描述
  • 压测代码locustwebsocket.py
import json
import time

import websocket
from locust import User, TaskSet, task, events
import random

class WebSocketClient(object):

    def __init__(self, host):
        self.host = host
        self.ws = websocket.WebSocket()

    def connect(self, burl):
        start_time = time.time()
        try:
            self.conn = self.ws.connect(url=burl)
        except websocket.WebSocketTimeoutException as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request_failure.fire(request_type="websocket", name='urllib', response_time=total_time, exception=e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            events.request_success.fire(request_type="websocket", name='urllib', response_time=total_time, response_length=0)
        return self.conn

    def recv(self):
        return self.ws.recv()

    def send(self, msg):
        self.ws.send(msg)

class WebsocketLocust(User):
    def __init__(self, *args, **kwargs):
        super(WebsocketLocust, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)

class SupperDianCan(TaskSet):

    @task
    def test(self):
        self.url = 'http://127.0.0.1:6006'

        self.data = {}

        self.client.connect(self.url)
        while True:
            recv = self.client.recv()
            print(recv)
            if eval(recv)['type'] == 'keepalive':
                self.client.send(recv)
            else:
                self.client.send(self.data)
class WebsocketUser(TaskSet):
    host = "http://127.0.0.1:6006"

    client = None

    def on_start(self):
        self.client = WebSocketClient("ws://127.0.0.1:6006/ws")
        self.client.connect()

    @task
    def send_message(self):
        # 发送的订阅请求
        num = random.randint(0, 10)
        prompt = f"世界上第{num}高的山峰是什么"
        self.client.send(json.dumps({'prompt': prompt}))
        response = self.client.recv()
        print(json.loads(response))


class WebsiteUser(User):
    tasks = [WebsocketUser]
    min_wait = 3000
    max_wait = 6000
    stop_timeout = 5

相关推荐

  1. GPT实战系列-实战Qwen在Cuda 12+24G部署方案

    2024-07-10 12:06:02       38 阅读
  2. vLLM部署Qwen1.5-32B-Chat

    2024-07-10 12:06:02       27 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-10 12:06:02       4 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-10 12:06:02       5 阅读
  3. 在Django里面运行非项目文件

    2024-07-10 12:06:02       4 阅读
  4. Python语言-面向对象

    2024-07-10 12:06:02       5 阅读

热门阅读

  1. 索引知识总结

    2024-07-10 12:06:02       10 阅读
  2. Oracle怎么实现RSA加密解密

    2024-07-10 12:06:02       8 阅读
  3. 前端判断场景和方式

    2024-07-10 12:06:02       9 阅读
  4. AWS EKS上GPU工作负载自动扩缩容的异常排查指南

    2024-07-10 12:06:02       10 阅读
  5. 深入WebKit内核:揭秘HTML与XML的识别之谜

    2024-07-10 12:06:02       9 阅读
  6. shell脚本实现mysql 数据库备份

    2024-07-10 12:06:02       10 阅读