Golang RPC实现-day01

Golang RPC实现

  • 先来一个最简单的版本,后续更新。
  • RPC也可以说是一种自定义的应用层协议
  • 所以我们需要自定义消息格式,消息包括 请求头和请求体,所以我们定义一个消息结构体
type request struct {
	h            *codec.Header // header of request
	argv, replyv reflect.Value // argv and replyv of request
}
  • 请求头结构体
type Header struct {
	ServiceMethod string // format "Service.Method" 调用的服务和方法
	Seq           uint64 // sequence number chosen by client 连接Id
	Error         string
}
  • 请求的第一条消息用来确定后续消息的格式和编码,这里规定第一条消息是以Json格式编码,对应的结构体如下
type Option struct {
	MagicNumber int        // MagicNumber marks this's a geerpc request
	CodecType   codec.Type // client may choose different Codec to encode body
}
  • 消息格式和编码结构体
var DefaultOption = &Option{
	MagicNumber: MagicNumber,
	CodecType:   codec.GobType,
}

一、主体逻辑设计

func main() {
	log.SetFlags(0)
	addr := make(chan string)
	go startServer(addr)//这是RPC的服务端逻辑

	// in fact, following code is like a simple geerpc client
	conn, _ := net.Dial("tcp", <-addr) //客户端拨号建立链接,每次服务逻辑,都通过conn来确定当前客户端/服务端是在哪一个连接上。

	defer func() { _ = conn.Close() }()

	time.Sleep(time.Second)
	// send options,先发了一个,定义后续数据的编码方式
	_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)//Encode方法就是发送一个Option结构体内容到conn连接中
	cc := codec.NewGobCodec(conn)
	// send request & receive response
	for i := 0; i < 5; i++ {//发五次请求
		h := &codec.Header{//定义每次请求的请求头
			ServiceMethod: "Foo.Sum",
			Seq:           uint64(i),
		}
		_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))//发送请求体和body内容
		_ = cc.ReadHeader(h)//接收服务端的响应的请求头内容,处理相应得按顺序,不能并发接收响应数据

		log.Println("clinet receive response:", h.ServiceMethod)
		var reply string
		_ = cc.ReadBody(&reply)//接收服务端的响应的请求体内容
		log.Println("reply:", reply)//打印
	}
}

二、服务设计

逻辑就是

  1. 监听请求
  2. 循环获取请求,异步处理请求
  3. 获取当前连接的第一条消息,确认后续消息的格式和编码
  4. 获取请求内容
  5. 响应请求

1、监听和接收请求

func startServer(addr chan string) {
	// pick a free port
	l, err := net.Listen("tcp", ":0")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", l.Addr())
	addr <- l.Addr().String()
	geerpc.Accept(l)//服务端接收请求
}

2、处理请求

(1)服务结构体定义
type Server struct{}
// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
(2)确认请求方和服务方编解码格式
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
	for {//不断接收请求
		conn, err := lis.Accept()
		if err != nil {
			log.Println("rpc server: accept error:", err)
			return
		}

		go server.ServeConn(conn)//异步处理请求
	}
}
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	log.Println("服务端处理连接中..... ")
	defer func() { _ = conn.Close() }()
	var opt Option
	if err := json.NewDecoder(conn).Decode(&opt); err != nil {//解析第一个Option,确定后续协议消息的格式
		log.Println("rpc server: options error: ", err)
		return
	}
	if opt.MagicNumber != MagicNumber {//服务方编码方式是否与客户端相同
		log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
		return
	}
	f := codec.NewCodecFuncMap[opt.CodecType]//服务方编码方式是否与客户端相同
	if f == nil {//服务端是否存在客户端对应编码方式
		log.Printf("rpc server: invalid codec type %s", opt.CodecType)
		return
	}
	server.serveCodec(f(conn))//第一个确认包通过后,再发后续消息,通过conn拿到连接信息,保证服务端后续能向conn发送信息
}
(3)循环读取请求
func (server *Server) serveCodec(cc codec.Codec) {
	sending := new(sync.Mutex) // make sure to send a complete response
	wg := new(sync.WaitGroup)  // wait until all request are handled
	for {
		req, err := server.readRequest(cc)//反复从请求方接收请求,这里会把请求头和请求体内容获取到req的结构体中
		if err != nil {
			if req == nil {//直到没有请求过来
				break // it's not possible to recover, so close the connection
			}
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			continue
		}
		wg.Add(1)
		go server.handleRequest(cc, req, sending, wg)//异步处理数据
	}
	wg.Wait()
	_ = cc.Close()
}
(4)解析请求的内容
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
	// TODO, should call registered rpc methods to get the right replyv
	// day 1, just print argv and send a hello message
	defer wg.Done()
	log.Println("handleRequest :", req.h, req.argv.Elem())
	req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
	
	server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
(5)响应请求
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
	sending.Lock()
	defer sending.Unlock()
	//time.Sleep(time.Second)
	if err := cc.Write(h, body); err != nil {
		log.Println("rpc server: write response error:", err)
	}
}

三、读取和发送数据到连接中代码

package codec

import (
	"bufio"
	"encoding/gob"
	"io"
	"log"
)

type GobCodec struct {
	conn io.ReadWriteCloser
	buf  *bufio.Writer
	dec  *gob.Decoder
	enc  *gob.Encoder
}

var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
	buf := bufio.NewWriter(conn)
	return &GobCodec{
		conn: conn,
		buf:  buf,
		dec:  gob.NewDecoder(conn),
		enc:  gob.NewEncoder(buf),
	}
}

func (c *GobCodec) ReadHeader(h *Header) error {
	return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
	defer func() {
		_ = c.buf.Flush()
		if err != nil {
			_ = c.Close()
		}
	}()
	if err = c.enc.Encode(h); err != nil {
		log.Println("rpc: gob error encoding header:", err)
		return
	}
	if err = c.enc.Encode(body); err != nil {
		log.Println("rpc: gob error encoding body:", err)
		return
	}
	return
}

func (c *GobCodec) Close() error {
	return c.conn.Close()
}


欢迎大家关注我的博客在这里插入图片描述

相关推荐

  1. 实习记录——day01-day03

    2024-05-16 01:56:05       11 阅读
  2. <span style='color:red;'>Day</span>-<span style='color:red;'>02</span>-<span style='color:red;'>01</span>

    Day-02-01

    2024-05-16 01:56:05      39 阅读
  3. <span style='color:red;'>Day</span>-<span style='color:red;'>01</span>-<span style='color:red;'>02</span>

    Day-01-02

    2024-05-16 01:56:05      26 阅读
  4. DAY01

    2024-05-16 01:56:05       21 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-16 01:56:05       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-16 01:56:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-16 01:56:05       20 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-16 01:56:05       20 阅读

热门阅读

  1. LeetCode657.机器人能否返回原点

    2024-05-16 01:56:05       10 阅读
  2. Spacy的安装与使用教程

    2024-05-16 01:56:05       8 阅读
  3. ElasticSearch与机器学习:智能搜索的未来

    2024-05-16 01:56:05       11 阅读
  4. leetcode61-Rotate List

    2024-05-16 01:56:05       9 阅读
  5. RedisSearch深度解析:探索全文搜索的新境界

    2024-05-16 01:56:05       10 阅读
  6. MongoDB聚合运算符:$tsSecond

    2024-05-16 01:56:05       12 阅读
  7. vue使用postcss-pxtorem实现自适应

    2024-05-16 01:56:05       10 阅读
  8. 前端模块导入导出方式

    2024-05-16 01:56:05       9 阅读
  9. vue2 双向数据绑定的实现及原理

    2024-05-16 01:56:05       12 阅读
  10. OpenAI和互联网行业的发展,有着异曲同工之处

    2024-05-16 01:56:05       16 阅读
  11. 数据库SQL查询语句汇总详解

    2024-05-16 01:56:05       12 阅读