Go: IM系统接入ws进行消息发送以及群聊功能 (5)

概述

  • 在即时通讯(IM)系统中,实现多媒体消息(如文本、表情包、拍照、图片、音频、视频)的实时传输是一项核心功能
  • 随着HTML5和WebSocket技术的发展,现代Web应用能够支持更高效、更实时的通信方式
  • 本文将详细探讨如何使用Go语言结合WebSocket技术,在IM系统中实现多媒体消息的发送和接收

基于MVC的目录设计

im-project
├── go.mod
├── main.go          主程序
├── ctrl             控制器层
│     └── chat.go
├── views            模板层
│     └── chat
│           ├── foot.html
│           └── x.html

主程序

main.go 核心代码

package main

import (
	"net/http"
	"im-project/ctrl"
)

func main() {
	// 1. 绑定请求和处理函数
	http.HandleFunc("/chat", ctrl.Chat)
	http.HandleFunc("/attach/upload", ctrl.Upload)
	// 2. 指定目录的静态文件
	http.Handle("/asset/",http.FileServer(http.Dir(".")))
	http.Handle("/mnt/",http.FileServer(http.Dir(".")))
	// 3. 启动
	http.ListenAndServe(":80",nil)
}

控制器

ctrl/attach.go

package ctrl

import (
	"net/http"
	"im-project/util"
	"os"
	"strings"
	"fmt"
	"time"
	"math/rand"
	"io"
	"github.com/aliyun/aliyun-oss-go-sdk/oss"
)

func init(){
	os.MkdirAll("./mnt",os.ModePerm)
}

func Upload(w http.ResponseWriter, r *http.Request){
	//UploadLocal(w,r)
	UploadOss(w,r)
}

// 1.存储位置 ./mnt,需要确保已经创建好
// 2.url格式 /mnt/xxxx.png  需要确保网络能访问/mnt/
func UploadLocal(writer http.ResponseWriter, request * http.Request) {
	// 获得上传的源文件
    srcfile,head,err:=request.FormFile("file")
    if err!=nil{
    	util.RespFail(writer,err.Error())
	}
	// 创建一个新文件
	suffix := ".png"
	// 如果前端文件名称包含后缀 xx.xx.png
	ofilename := head.Filename
	tmp := strings.Split(ofilename,".")
	if len(tmp)>1 {
		suffix = "."+tmp[len(tmp)-1]
	}
	// 如果前端指定filetype
	// formdata.append("filetype",".png")
	filetype := request.FormValue("filetype")
	if len(filetype)>0 {
		suffix = filetype
	}
	// time.Now().Unix()
    filename := fmt.Sprintf("%d%04d%s", time.Now().Unix(), rand.Int31(), suffix)
    dstfile,err:= os.Create("./mnt/"+filename)
    if err!=nil {
    	util.RespFail(writer,err.Error())
    	return
	}
	// todo 将源文件内容copy到新文件
	_,err = io.Copy(dstfile,srcfile)
	if err!=nil{
		util.RespFail(writer,err.Error())
		return
	}
	// 将新文件路径转换成url地址
	url := "/mnt/"+filename
	// 响应到前端
	util.RespOk(writer,url,"")
}

// 即将删掉,定期更新
const (
	AccessKeyId="5p2RZ******nMuQw9" // 填入自己的 key
	AccessKeySecret="bsNmjU8Au08*****S5XIFAkK" // 填入自己的secret
	EndPoint="oss-cn-shenzhen.aliyuncs.com"
	Bucket="winliondev"
)

// 权限设置为公共读状态
// 需要安装
func UploadOss(writer http.ResponseWriter, request * http.Request) {
	// 获得上传的文件
	srcfile,head,err := request.FormFile("file")
	if err!=nil {
		util.RespFail(writer,err.Error())
		return
	}
	// 获得文件后缀.png/.mp3
	suffix := ".png"
	//如果前端文件名称包含后缀 xx.xx.png
	ofilename := head.Filename
	tmp := strings.Split(ofilename,".")
	if len(tmp)>1 {
		suffix = "."+tmp[len(tmp)-1]
	}
	// 如果前端指定filetype
	// formdata.append("filetype",".png")
	filetype := request.FormValue("filetype")
	if len(filetype)>0{
		suffix = filetype
	}
	// 初始化ossclient
	client,err:=oss.New(EndPoint,AccessKeyId,AccessKeySecret)
	if err!=nil{
		util.RespFail(writer,err.Error())
		return
	}
	// todo 获得bucket
	bucket,err := client.Bucket(Bucket)
	if err!=nil{
		util.RespFail(writer,err.Error())
		return
	}
	// 设置文件名称
	// time.Now().Unix()
	filename := fmt.Sprintf("mnt/%d%04d%s", time.Now().Unix(), rand.Int31(), suffix)
	// 通过bucket上传
	err = bucket.PutObject(filename, srcfile)
	if err!=nil {
		util.RespFail(writer,err.Error())
		return
	}
	// 获得url地址
	url := "http://"+Bucket+"."+EndPoint+"/"+filename
	// 响应到前端
	util.RespOk(writer,url,"")
}

ctrl/chat.go

package ctrl

import (
	"net/http"
	"github.com/gorilla/websocket"
	"gopkg.in/fatih/set.v0"
	"sync"
	"strconv"
	"log"
	"fmt"
	"encoding/json"
)

const (
	CMD_SINGLE_MSG = 10
	CMD_ROOM_MSG   = 11
	CMD_HEART      = 0
)

type Message struct {
	Id      int64  `json:"id,omitempty" form:"id"` //消息ID
	Userid  int64  `json:"userid,omitempty" form:"userid"` //谁发的
	Cmd     int    `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
	Dstid   int64  `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
	Media   int    `json:"media,omitempty" form:"media"` //消息按照什么样式展示
	Content string `json:"content,omitempty" form:"content"` //消息的内容
	Pic     string `json:"pic,omitempty" form:"pic"` //预览图片
	Url     string `json:"url,omitempty" form:"url"` //服务的URL
	Memo    string `json:"memo,omitempty" form:"memo"` //简单描述
	Amount  int    `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}

9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}

*/

// 本核心在于形成userid和Node的映射关系
type Node struct {
	Conn *websocket.Conn
	//并行转串行,
	DataQueue chan []byte
	GroupSets set.Interface
}
// 映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
// 读写锁
var rwlocker sync.RWMutex

// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter, request *http.Request) {

	// 检验接入是否合法
    // checkToken(userId int64, token string)
    query := request.URL.Query()
    id := query.Get("id")
    token := query.Get("token")
    userId ,_ := strconv.ParseInt(id,10,64)
	isvalida := checkToken(userId,token)
	// 如果 isvalida = true
	// isvalida = false
	conn,err :=(&websocket.Upgrader {
		CheckOrigin: func(r *http.Request) bool {
			return isvalida
		},
	}).Upgrade(writer,request,nil)
	
	if err!=nil {
		log.Println(err.Error())
		return
	}
	//  获得conn
	node := &Node {
		Conn:conn,
		DataQueue:make(chan []byte,50),
		GroupSets:set.New(set.ThreadSafe),
	}
	// 获取用户全部群Id
	comIds := contactService.SearchComunityIds(userId)
	for _,v:=range comIds {
		node.GroupSets.Add(v)
	}
	// userid和node形成绑定关系
	rwlocker.Lock()
	clientMap[userId] = node
	rwlocker.Unlock()
	// 完成发送逻辑, con
	go sendproc(node)
	// 完成接收逻辑
	go recvproc(node)
	sendMsg(userId,[]byte("hello,world!"))
}

//发送协程
func sendproc(node *Node) {
	for {
		select {
			case data:= <-node.DataQueue:
				err := node.Conn.WriteMessage(websocket.TextMessage,data)
			    if err!=nil{
			    	log.Println(err.Error())
			    	return
				}
		}
	}
}
// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64) {
	// 取得node
	rwlocker.Lock()
	node,ok := clientMap[userId]
	if ok {
		node.GroupSets.Add(gid)
	}
	// clientMap[userId] = node
	rwlocker.Unlock()
	// 添加gid到set
}
// 接收协程
func recvproc(node *Node) {
	for {
		_,data,err := node.Conn.ReadMessage()
		if err!=nil {
			log.Println(err.Error())
			return
		}
		// 对data进一步处理
		dispatch(data)
		fmt.Printf("recv<=%s",data)
	}
}
//后端调度逻辑处理
func dispatch(data[]byte) {
	// 解析data为message
	msg := Message{}
	err := json.Unmarshal(data, &msg)
	if err!=nil {
		log.Println(err.Error())
		return
	}
	// 根据cmd对逻辑进行处理
	switch msg.Cmd {
		case CMD_SINGLE_MSG:
			sendMsg(msg.Dstid, data)
		case CMD_ROOM_MSG:
			// 群聊转发逻辑
			for _,v:= range clientMap {
				if v.GroupSets.Has(msg.Dstid){
					v.DataQueue <- data
				}
			}
		case CMD_HEART:
			// 一般啥都不做
		}
}

// 发送消息
func sendMsg(userId int64,msg []byte) {
	rwlocker.RLock()
	node,ok := clientMap[userId]
	rwlocker.RUnlock()
	if ok {
		node.DataQueue<- msg
	}
}

// 检测是否有效
func checkToken(userId int64,token string) bool {
	// 从数据库里面查询并比对
	user := userService.Find(userId)
	return user.Token == token
}

视图层


这里,视图层要着重说明下

1 )发送文本

sendtxtmsg:function(txt) {
    //{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
    var msg =this.createmsgcontext();
    msg.media=1;
    msg.content=txt;
    this.showmsg(userInfo(),msg);
    this.webSocket.send(JSON.stringify(msg))
}
  • 前端user1拼接好数据对象Message msg={id:1,userid:2,dstid:3,cmd:10,media:1,content:txt}
  • 转化成json字符串jsonstr : jsonstr = JSON.stringify(msg)
  • 通过websocket.send(jsonstr)发送, 后端S在recvproc中接收收数据data
  • 并做相应的逻辑处理dispatch(data)-转发给user2
  • user2通过websocket.onmessage收到消息后做解析并显示

2 )发送表情包

{
	loaddoutures:function(){
      var res=[];
      var config = this.doutu.config;
      for(var i in config.pkgids){
          res[config.pkgids[i]]= (config.baseurl+"/"+config.pkgids[i]+"/info.json")
      }
      var that = this;
      for(var id in res){
          //console.log("res[i]",id,res[id])
          post(res[id],{},function(pkginfo){
              //console.log("post res[i]",id,res[id],pkginfo)
              var baseurl= config.baseurl+"/"+pkginfo.id+"/"
              for(var j in pkginfo.assets){
                  pkginfo.assets[j] = baseurl+pkginfo.assets[j];
              }
              pkginfo.icon = baseurl + pkginfo.icon;
              that.doutu.packages.push(pkginfo)
              if(that.doutu.choosed.pkgid==pkginfo.id){
                  that.doutu.choosed.assets=pkginfo.assets;
              }

          })
      }
  },
}
  • 表情包简单逻辑:弹出一个窗口, 选择图片获得一个连接地址
  • 调用sendpicmsg方法开始发送流程

3 ) 拍照

3.1 照片

<input accept=\"image/gif,image/jpeg,,image/png\" type=\"file\" οnchange=\"upload(this)\" class='upload' />

3.2 拍照

<input accept=\"image/*\" capture=\"camera\" type=\"file\" οnchange=\"upload(this)\" class='upload' />
function upload(dom){
    uploadfile("attach/upload",dom,function(res){
        if(res.code==0){
            app.sendpicmsg(res.data)
        }
    })
}

function uploadfile(uri,dom,fn){
    var xhr = new XMLHttpRequest();
    xhr.open("POST","//"+location.host+"/"+uri, true);
    // 添加http头,发送信息至服务器时内容编码类型
    xhr.onreadystatechange = function() {
        if (xhr.readyState == 4 && (xhr.status == 200 || xhr.status == 304)) {
            fn.call(this, JSON.parse(xhr.responseText));
        }
    };
    var _data=[];
    var formdata = new FormData();
    if(!! userId()){
        formdata.append("userid",userId());
    }
    formdata.append("file",dom.files[0])
    xhr.send(formdata);
}

// vue methods 中的发送图片方法
{
	sendpicmsg:function(picurl){
		// {id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
        var msg =this.createmsgcontext();
        msg.media=4;
        msg.url=picurl;
        this.showmsg(userInfo(),msg)
        this.webSocket.send(JSON.stringify(msg))
    },
}
  • 发送图片/拍照, 弹出一个窗口,
  • 选择图片,上传到服务器, 获得一个链接地址
  • 调用sendpicmsg方法开始发送流程

4 )音视频

// 上传语音
function uploadblob(uri,blob,filetype,fn){
    var xhr = new XMLHttpRequest();
    xhr.open("POST","//"+location.host+"/"+uri, true);
    // 添加http头,发送信息至服务器时内容编码类型
    xhr.onreadystatechange = function() {
        if (xhr.readyState == 4 && (xhr.status == 200 || xhr.status == 304)) {
            fn.call(this, JSON.parse(xhr.responseText));
        }
    };
    var _data=[];
    var formdata = new FormData();
    formdata.append("filetype",filetype);
    formdata.append("file",blob)
    xhr.send(formdata);
}

// vue methods 中的方法
{
	playaudio:function(url) {
      document.getElementById('audio4play').src = url;
      document.getElementById('audio4play').play();
    },
    startrecorder:function(){
      let audioTarget = document.getElementById('audio');
      var types = ["video/webm",
          "audio/webm",
          "video/webm\;codecs=vp8",
          "video/webm\;codecs=daala",
          "video/webm\;codecs=h264",
          "audio/webm\;codecs=opus",
          "video/mpeg"];
      var suporttype ="";
      for (var i in types) {
          if(MediaRecorder.isTypeSupported(types[i])){
              suporttype = types[i];
          }
      }
      if(!suporttype){
          mui.toast("编码不支持")
          return ;
      }
      this.duration = new Date().getTime();
      navigator.mediaDevices.getUserMedia({audio: true, video: false})
      .then(function(stream){
          this.showprocess = true
          this.recorder = new MediaRecorder(stream);
          audioTarget.srcObject = stream;

          this.recorder.ondataavailable = (event) => {
              console.log("ondataavailable");
              uploadblob("attach/upload",event.data,".mp3",res=>{
                  var duration = Math.ceil((new Date().getTime()-this.duration)/1000);
                  this.sendaudiomsg(res.data,duration);
              })
              stream.getTracks().forEach(function (track) {
                  track.stop();
              });
              this.showprocess = false
          }
          this.recorder.start();
      }.bind(this))
       .catch(function(err){
          console.log(err)
          mui.toast(err)
          this.showprocess = false
      }.bind(this));
  },
  stoprecorder :function() {
      if(typeof this.recorder.stop=="function"){
          this.recorder.stop();
      }
      this.showprocess = false
      console.log("stoprecorder")

  },
  sendaudiomsg:function(url,num) {
  	//{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
    var msg = this.createmsgcontext();
    msg.media = 3;
    msg.url = url;
    msg.amount = num;
    this.showmsg(userInfo(), msg)
    // console.log("sendaudiomsg",this.msglist);
    this.webSocket.send(JSON.stringify(msg))
  },
}

5 )单聊

{
	singlemsg:function(user){
	    //console.log(user)
	    this.win = "single";
	    this.title = "和"+user.nickname+"聊天中";
	    this.msgcontext.dstid = parseInt(user.id);
	    this.msgcontext.cmd = 10;
	},
}

6 )群聊

// vue methods 中的方法
{
	groupmsg: function(group){
        this.win = "group";
        this.title=group.name;
        this.msgcontext.dstid = parseInt(group.id);
        this.msgcontext.cmd = 11;
    },
}
  • 群聊原理: 分析群id,找到加了这个群的用户,把消息发送过去
  • 方案一:map<qunid1,qunid2,qunid3>
    • 优势是锁的频次低
    • 劣势是要轮训全部map
      type Node struct {
      	Conn *websocket.Conn
      	//并行转串行,
      	DataQueue chan []byte
      	GroupSets set.Interface
      }
      // 映射关系表
      var clientMap map[int64]*Node = make(map[int64]*Node,0)
      
  • 方案二: map<群id><userid1,userid2,userid3>
    • 优势是找用户ID非常快
    • 劣势是发送信息时需要根据userid获取node,锁的频次太高
      type Node struct {
      	Conn *websocket.Conn
      	//并行转串行,
      	DataQueue chan []byte
      }
      // 映射关系表
      var clientMap map[int64]*Node = make(map[int64]*Node,0)
      var comMap map[int64]set.Interface= make(map[int64]set.Interface,0)
      
  • 需要处理的问题
    • 当用户接入的时候初始化groupset
    • 当用户加入群的时候刷新groupset
    • 完成信息分发

相关推荐

  1. Go: IM系统接入ws进行消息发送以及功能 (5)

    2024-07-21 05:58:02       15 阅读
  2. MFC模拟消息发送,自定义以及系统消息

    2024-07-21 05:58:02       44 阅读
  3. 如何使用 RabbitMQ 进行消息发送接收

    2024-07-21 05:58:02       39 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-21 05:58:02       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-21 05:58:02       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-21 05:58:02       45 阅读
  4. Python语言-面向对象

    2024-07-21 05:58:02       55 阅读

热门阅读

  1. Perl编程秘籍:匿名数组与哈希的隐秘力量

    2024-07-21 05:58:02       15 阅读
  2. 寻茶索味,齐赴安康

    2024-07-21 05:58:02       14 阅读
  3. 实战:shell脚本练习

    2024-07-21 05:58:02       14 阅读
  4. Spring Boot 单元测试什么时候需要添加 @RunWith

    2024-07-21 05:58:02       23 阅读
  5. leetcode--链表类题目总结

    2024-07-21 05:58:02       15 阅读
  6. Python实现精确读取PDF文件的全部内容(8)

    2024-07-21 05:58:02       17 阅读
  7. Python模块化编程:import机制剖析

    2024-07-21 05:58:02       18 阅读
  8. setlocal enabledelayedexpansion 详解

    2024-07-21 05:58:02       21 阅读
  9. MySQL中EXPLAIN关键字详解

    2024-07-21 05:58:02       14 阅读