from threading import Thread
import socket
class HandelParse(Thread):
def __init__(self,socket_obj):
super().__init__()
self.socket_obj = socket_obj
def run(self):
while True:
recv_msg = self.socket_obj.recv(1024)
if len(recv_msg)!=0:
print(recv_msg.decode("utf-8"))
self.socket_obj.send(recv_msg) #再次原路发回去。注tcp用send方法,udp用sendto方法
else:
self.socket_obj.close()
break
class TcpServer(Thread):
def __init__(self):
super().__init__()
self.tcp_sercer = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.tcp_sercer.bind(("",7878))
self.tcp_sercer.listen(128) #设置监听数量
def run(self):
while True:
msg,client_info = self.tcp_sercer.accept() #等待连接
#创建子线程,实现多并发
handel_data_thread = HandelParse(msg)
handel_data_thread.start()
def __del__(self):
self.tcp_sercer.close()
if __name__ == '__main__':
tcp = TcpServer()
tcp.start()
使用网络调试工具测试。同时开通3 个调试助手软件进行连接。截图如下:
网络调试工具下载地址: