python-网络并发模型

3. 网络并发模型

3.1 网络并发模型概述

  • 什么是网络并发

    在实际工作中,一个服务端程序往往要应对多个客户端同时发起访问的情况。如果让服务端程序能够更好的同时满足更多客户端网络请求的情形,这就是并发网络模型。

    在这里插入图片描述

  • 循环网络模型问题

    循环网络模型只能循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。这样的网络模型虽然简单,资源占用不多,但是无法同时处理多个客户端请求就是其最大的弊端,往往只有在一些低频的小请求任务中才会使用。

3.2 多进程/线程并发模型

多进程/线程并发模中每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程,多任务并发模型也是实际工作中最为常用的服务端处理模型。

  • 模型特点

    • 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
    • 缺点: 资源消耗较大
    • 适用情况:客户端请求较复杂,需要长时间占有服务器。
  • 创建流程

    • 创建网络套接字
    • 等待客户端连接
    • 有客户端连接,则创建新的进程/线程具体处理客户端请求
    • 主进程/线程继续等待处理其他客户端连接
    • 如果客户端退出,则销毁对应的进程/线程
多进程并发模型示例:

"""
基于多进程的网络并发模型
重点代码 !!

创建tcp套接字
等待客户端连接
有客户端连接,则创建新的进程具体处理客户端请求
父进程继续等待处理其他客户端连接
如果客户端退出,则销毁对应的进程
"""
from socket import *
from multiprocessing import Process
import sys

# 地址变量
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST, PORT)

# 处理客户端具体请求
def handle(connfd):
    while True:
        data = connfd.recv(1024)
        if not data:
            break
        print(data.decode())
    connfd.close()

# 服务入口函数
def main():
    # 创建tcp套接字
    tcp_socket = socket()
    tcp_socket.bind(ADDR)
    tcp_socket.listen(5)
    print("Listen the port %d"%PORT)

    # 循环连接客户端
    while True:
        try:
            connfd, addr = tcp_socket.accept()
            print("Connect from", addr)
        except KeyboardInterrupt:
            tcp_socket.close()
            sys.exit("服务结束")

        # 创建进程 处理客户端请求
        p = Process(target=handle, args=(connfd,),daemon=True)
        p.start()

if __name__ == '__main__':
    main()
多线程并发模型示例:
"""
基于多线程的网络并发模型
重点代码 !!

思路: 网络构建    线程搭建    /   具体处理请求
"""
from socket import *
from threading import Thread


# 处理客户端具体请求
class Handle:
    # 具体处理请求函数 (逻辑处理,数据处理)
    def request(self, data):
        print(data)


# 创建线程得到请求
class ThreadServer(Thread):
    def __init__(self, connfd):
        self.connfd = connfd
        self.handle = Handle()
        super().__init__(daemon=True)

    # 接收客户端的请求
    def run(self):
        while True:
            data = self.connfd.recv(1024).decode()
            if not data:
                break
            self.handle.request(data)
        self.connfd.close()


# 网络搭建
class ConcurrentServer:
    """
    提供网络功能
    """
    def __init__(self, *, host="", port=0):
        self.host = host
        self.port = port
        self.address = (host, port)
        self.sock = self.__create_socket()

    def __create_socket(self):
        tcp_socket = socket()
        tcp_socket.bind(self.address)
        return tcp_socket

    # 启动服务 --> 准备连接客户端
    def serve_forever(self):
        self.sock.listen(5)
        print("Listen the port %d" % self.port)

        while True:
            connfd, addr = self.sock.accept()
            print("Connect from", addr)
            # 创建线程
            t = ThreadServer(connfd)
            t.start()


if __name__ == '__main__':
    server = ConcurrentServer(host="0.0.0.0", port=8888)
    server.serve_forever()  # 启动服务

ftp 文件服务器

【1】 分为服务端和客户端,要求可以有多个客户端同时操作。

【2】 客户端可以查看服务器文件库中有什么文件。

【3】 客户端可以从文件库中下载文件到本地。

【4】 客户端可以上传一个本地文件到文件库。

【5】 使用print在客户端打印命令输入提示,引导操作

参考代码:

######################### 服务端 ############################
from socket import *
from threading import Thread
import os
from time import sleep

# 文件库
FTP = "/home/tarena/FTP/"


# 处理客户端具体请求
class Handle:
    def __init__(self, connfd):
        self.connfd = connfd

    def do_list(self):
        filelist = os.listdir(FTP)
        if filelist:
            self.connfd.send(b"OK")
            sleep(0.1)
            # 发送文件列表
            files = "\n".join(filelist)
            self.connfd.send(files.encode())
        else:
            self.connfd.send(b"FAIL")

    def do_get(self, filename):
        try:
            file = open(FTP + filename, 'rb')
        except:
            self.connfd.send(b"FAIL")
        else:
            self.connfd.send(b"OK")
            sleep(0.1)
            #  发送文件
            while True:
                data = file.read(1024)
                if not data:
                    break
                self.connfd.send(data)
            file.close()
            sleep(0.1)
            self.connfd.send(b"##")

    def do_put(self, filename):
        # 判断文件是否存在
        if os.path.exists(FTP + filename):
            self.connfd.send(b"FAIL")
        else:
            self.connfd.send(b"OK")
            # 接收文件
            file = open(FTP + filename, 'wb')
            while True:
                data = self.connfd.recv(1024)
                if data == b"##":
                    break
                file.write(data)
            file.close()

    def request(self):
        while True:
            data = self.connfd.recv(1024).decode()
            # 分情况具体处理请求函数
            tmp = data.split(' ')
            if not data or tmp[0] == "EXIT":
                break
            elif tmp[0] == "LIST":
                self.do_list()
            elif tmp[0] == "GET":
                # tmp-> [GET,filename]
                self.do_get(tmp[1])
            elif tmp[0] == "PUT":
                self.do_put(tmp[1])


# 创建线程得到请求
class FTPThread(Thread):
    def __init__(self, connfd):
        self.connfd = connfd
        self.handle = Handle(connfd)
        super().__init__(daemon=True)

    # 接收客户端的请求
    def run(self):
        self.handle.request()
        self.connfd.close()


# 网络搭建
class ConcurrentServer:
    """
    提供网络功能
    """

    def __init__(self, *, host="", port=0):
        self.host = host
        self.port = port
        self.address = (host, port)
        self.sock = self.__create_socket()

    def __create_socket(self):
        tcp_socket = socket()
        tcp_socket.bind(self.address)
        return tcp_socket

    # 启动服务 --> 准备连接客户端
    def serve_forever(self):
        self.sock.listen(5)
        print("Listen the port %d" % self.port)

        while True:
            connfd, addr = self.sock.accept()
            print("Connect from", addr)
            # 创建线程
            t = FTPThread(connfd)
            t.start()


if __name__ == '__main__':
    server = ConcurrentServer(host="0.0.0.0", port=8880)
    server.serve_forever()  # 启动服务

    
########################### 客户端 ###############################

"""
文件服务器客户端
"""
from socket import *
import sys
from time import sleep


# 具体发起请求,逻辑处理
class Handle:
    def __init__(self):
        self.server_address = ("127.0.0.1", 8880)
        self.sock = self.__connect_server()

    def __connect_server(self):
        tcp_socket = socket()
        tcp_socket.connect(self.server_address)
        return tcp_socket

    def do_list(self):
        self.sock.send(b"LIST")  # 发送请求
        response = self.sock.recv(1024)  # 接收响应
        if response == b"OK":
            # 接收文件列表 file1\nfile2\n..
            files = self.sock.recv(1024 * 1024)
            print(files.decode())
        else:
            print("获取文件列表失败")

    def do_exit(self):
        self.sock.send(b"EXIT")
        self.sock.close()
        sys.exit("谢谢使用")

    def do_get(self, filename):
        request = "GET " + filename
        self.sock.send(request.encode())  # 发送请求
        response = self.sock.recv(128)  # 接收响应
        if response == b"OK":
            file = open(filename, 'wb')
            # 接收文件内容,写入文件
            while True:
                data = self.sock.recv(1024)
                if data == b"##":
                    break
                file.write(data)
            file.close()
        else:
            print("该文件不存在")

    def do_put(self, filename):
        try:
            file = open(filename, 'rb')
        except:
            print("该文件不存在")
        else:
            filename = filename.split("/")[-1]  # 获取文件名
            request = "PUT " + filename
            self.sock.send(request.encode())
            response = self.sock.recv(128)
            if response == b"OK":
                # 发送文件
                while True:
                    data = file.read(1024)
                    if not data:
                        break
                    self.sock.send(data)
                file.close()
                sleep(0.1)
                self.sock.send(b"##")
            else:
                print("上传失败")


# 图形交互类
class FTPView:
    def __init__(self):
        self.__handle = Handle()

    def __display_menu(self):
        print()
        print("1. 查看文件")
        print("2. 下载文件")
        print("3. 上传文件")
        print("4. 退   出")
        print()

    def __select_menu(self):
        item = input("请输入选项:")
        if item == "1":
            self.__handle.do_list()
        elif item == "2":
            filename = input("要下载的文件:")
            self.__handle.do_get(filename)
        elif item == "3":
            filename = input("要上传的文件:")
            self.__handle.do_put(filename)
        elif item == "4":
            self.__handle.do_exit()
        else:
            print("请输入正确选项!")

    def main(self):
        while True:
            self.__display_menu()
            self.__select_menu()


if __name__ == '__main__':
    ftp = FTPView()
    ftp.main()  # 启动

3.3 IO并发模型

3.3.1 IO概述
  • 什么是IO

    在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出 ,文件读写,数据库修改和网络消息收发等。

  • 程序分类

    • IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长。
    • 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞。
  • IO分类:阻塞IO ,非阻塞IO,IO多路复用等。

3.3.2 阻塞IO
  • 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
  • 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为。
  • 阻塞情况
    • 因为某种执行条件没有满足造成的函数阻塞
      e.g. accept input recv
    • 处理IO的时间较长产生的阻塞状态
      e.g. 网络传输,大文件读写
3.3.3 非阻塞IO
  • 定义 :通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态。
  • 设置套接字为非阻塞IO

    sockfd.setblocking(bool)
    功能:设置套接字为非阻塞IO
    参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
    
  • 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。

    sockfd.settimeout(sec)
    功能:设置套接字的超时时间
    参数:设置的时间
    
    非阻塞IO示例:
    """
    设置非阻塞的套接字
    """
    from socket import *
    from time import sleep, ctime
    
    # 日志文件模拟与网络无关IO
    file = open("my.log", "a")
    
    # 创建tcp套接字
    sock = socket()
    sock.bind(("127.0.0.1", 8888))
    sock.listen(5)
    
    # 设置为非阻塞
    # sock.setblocking(False)
    
    # 设置超时事件
    sock.settimeout(3)
    
    # 循环处理客户端连接
    while True:
        try:
            connfd, addr = sock.accept()
            print("Connect from", addr)
        except timeout as e:
            # 模拟一个与accept 无关的事件
            msg = "%s : %s\n" % (ctime(), e)
            file.write(msg)
        except BlockingIOError as e:
            # 模拟一个与accept 无关的事件
            msg = "%s : %s\n" % (ctime(), e)
            file.write(msg)
            sleep(2)
        else:
            # accept 正常执行
            data = connfd.recv(1024)
            print(data.decode())
    
    
3.3.4 IO多路复用
  • 定义

    同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。

  • 具体方案

    • select方法 : Windows Linux Unix
    • epoll方法: Linux
  • select 方法

rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数: rlist  列表  读IO列表,添加等待发生的或者可读的IO事件
      wlist  列表  写IO列表,存放要可以主动处理的或者可写的IO事件
      xlist  列表 异常IO列表,存放出现异常要处理的IO事件
      timeout  超时时间

返回值: rs 列表  rlist中准备就绪的IO
        ws 列表  wlist中准备就绪的IO
	      xs 列表  xlist中准备就绪的IO
select 方法示例:
"""
IO多路复用 基础演示 select
"""
from select import select
from socket import *

# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)

file = open("my.log",'r')

udp_sock = socket(AF_INET,SOCK_DGRAM)

print("开始监控IO")
rs,ws,xs = select([file,udp_sock],[file,udp_sock],[])
print("rlist:",rs)
print("wlist:",ws)
print("xlist:",xs)
  • epoll方法
ep = select.epoll()
功能 : 创建epoll对象
返回值: epoll对象
ep.register(fd,event)   
功能: 注册关注的IO事件
参数:fd  要关注的IO
      event  要关注的IO事件类型
  	     常用类型EPOLLIN  读IO事件(rlist)
		      EPOLLOUT 写IO事件 (wlist)
		      EPOLLERR 异常IO  (xlist)
		  e.g. ep.register(sockfd,EPOLLIN|EPOLLERR)

ep.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno
events = ep.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
        events格式  [(fileno,event),()....]
        每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
epoll方法示例:
"""
IO多路复用 基础演示 epoll
"""
from select import *
from socket import *

# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)

file = open("my.log",'r+')

udp_sock = socket(AF_INET,SOCK_DGRAM)


# 创建epoll对象
ep = epoll()

# 关注IO对象
ep.register(tcp_sock,EPOLLIN)
ep.register(udp_sock,EPOLLOUT|EPOLLERR)

# 建立查找字典
map = {
    tcp_sock.fileno():tcp_sock,
    udp_sock.fileno():udp_sock,
}

print("开始监控IO")
events = ep.poll()
print(events) # 就绪的IO

# 不再关注
ep.unregister(udp_sock)
del map[udp_sock.fileno()]
  • select 方法与epoll方法对比
    • epoll 效率比select要高
    • epoll 同时监控IO数量比select要多
    • epoll 支持EPOLLET触发方式
3.3.5 IO并发模型

利用IO多路复用等技术,同时处理多个客户端IO请求。

  • 优点 : 资源消耗少,能同时高效处理多个IO行为

  • 缺点 : 只针对处理并发产生的IO事件

  • 适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求。

  • 网络并发服务实现过程

    【1】将套接字对象设置为关注的IO,通常设置为非阻塞状态。

    【2】通过IO多路复用方法提交,进行IO监控。

    【3】阻塞等待,当监控的IO有事件发生时结束阻塞。

    【4】遍历返回值列表,确定就绪的IO事件类型。

    【5】处理发生的IO事件。

    【6】继续循环监控IO发生。

IO多路复用并发模型

################################# select 方法 ####################################
"""
基于select的并发服务模型
使用函数完成
"""
from select import select
from socket import *

# 服务器地址
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST,PORT)

# 监控列表
rlist = []
wlist = []
xlist = []

# 处理客户端连接
def connect_client(sock):
    connfd, addr = sock.accept()
    print("Connect from", addr)
    connfd.setblocking(False)
    rlist.append(connfd)  # 增加关注对象

# 处理客户端消息
def handle_client(connfd):
    data = connfd.recv(1024)
    # 处理客户端退出
    if not data:
        rlist.remove(connfd) # 不再关注
        connfd.close()
        return
    print(data.decode())
    connfd.send(b"Thanks")


def main():
    # 创建监听套接字
    sock = socket()
    sock.bind(ADDR)
    sock.listen(3)
    # 配合非阻塞IO防止网络中断带来的内部阻塞
    sock.setblocking(False)
    rlist.append(sock) #  初始监控的IO对象

    # 循环监控关注的IO发生
    while True:
        rs,ws,xs = select(rlist,wlist,xlist)
        for r in rs:
            if r is sock:
                connect_client(r) # 连接客户端
            else:
                handle_client(r) # 处理客户端消息

if __name__ == '__main__':
    main()



################################ epoll 方法 ################################
"""
基于epoll的并发服务模型
使用类实现
"""
from select import *
from socket import *


class EpollServer:
    def __init__(self, host="", port=0):
        self.host = host
        self.port = port
        self.address = (host, port)
        self.sock = self._create_socket()
        self.ep = epoll()
        self.map = {} # 查找字典

    def _create_socket(self):
        sock = socket()
        sock.bind(self.address)
        sock.setblocking(False)
        return sock

    # 处理客户端连接
    def _connect_client(self, fd):
        connfd, addr = self.map[fd].accept()
        print("Connect from", addr)
        connfd.setblocking(False)
        # 增加关注对象,设置边缘触发
        self.ep.register(connfd, EPOLLIN | EPOLLET)
        self.map[connfd.fileno()] = connfd  # 维护字典

    # 处理客户端消息
    def _handle_client(self, fd):
        data = self.map[fd].recv(1024)
        # 处理客户端退出
        if not data:
            self.ep.unregister(fd)  # 不再关注
            self.map[fd].close()
            del self.map[fd]  # 从字典删除
            return
        print(data.decode())
        self.map[fd].send(b"Thanks")

    # 启动服务
    def serve_forever(self):
        self.sock.listen(3)
        print("Listen the port %d" % self.port)
        self.ep.register(self.sock, EPOLLIN)  # 设置关注
        self.map[self.sock.fileno()] = self.sock

        while True:
            events = self.ep.poll()
            # 循环查看哪个IO发生就处理哪个
            for fd, event in events:
                if fd == self.sock.fileno():
                    self._connect_client(fd)
                elif event == EPOLLIN:
                    self._handle_client(fd)


if __name__ == '__main__':
    ep = EpollServer(host="0.0.0.0", port=8888)
    ep.serve_forever()  # 启动服务

相关推荐

  1. Python并发

    2024-07-22 11:20:03       54 阅读
  2. Python网络爬虫项目开发实战:如何处理并发下载

    2024-07-22 11:20:03       30 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-22 11:20:03       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-22 11:20:03       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-22 11:20:03       45 阅读
  4. Python语言-面向对象

    2024-07-22 11:20:03       55 阅读

热门阅读

  1. 为什么vue3项目中推荐使用const,而不是let语法

    2024-07-22 11:20:03       20 阅读
  2. NLP专业术语及工具【hanlp、jiolp】

    2024-07-22 11:20:03       18 阅读
  3. 【Python】探索 Python 中的 any 和 all 方法

    2024-07-22 11:20:03       19 阅读
  4. dsa加训

    dsa加训

    2024-07-22 11:20:03      20 阅读
  5. 等保测评与ISO27001认证的区别全解析

    2024-07-22 11:20:03       18 阅读
  6. (leetcode)20. 有效的括号

    2024-07-22 11:20:03       17 阅读
  7. TiDB分布式数据库索引

    2024-07-22 11:20:03       15 阅读
  8. 速盾:cdn能防御ddos吗?

    2024-07-22 11:20:03       13 阅读