【Go】微服务架构下实现etcd服务注册与服务发现

中心网关:gateway
四个微服务:user、message、note、relationship
在这里插入图片描述

1 中心网关实现服务发现

1.1 设计EtcdDiscovery类

package entity

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"gonote/gateway/internal/option"
	messageService "gonote/message/service"
	noteService "gonote/note/service"
	relationshipService "gonote/relationship/service"
	userService "gonote/user/service"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
	"time"
)

type EtcdDiscovery struct {
   
	etcdClient *clientv3.Client
	serviceMap map[string]interface{
   }
	mu         sync.RWMutex
}

func NewEtcdDiscovery(ip string, port int) (*EtcdDiscovery, error) {
   
	etcdCli, err := clientv3.New(clientv3.Config{
   
		Endpoints:   []string{
   fmt.Sprintf("%v:%v", ip, port)},
		DialTimeout: time.Second * 3,
	})
	if err != nil {
   
		return nil, err
	}
	
	return &EtcdDiscovery{
   
		etcdClient: etcdCli,
		serviceMap: make(map[string]interface{
   }),
	}, nil
}

func (ed *EtcdDiscovery) Start(serviceNames []string) {
   
	for _, serviceName := range serviceNames {
   
		resp, err := ed.etcdClient.Get(context.TODO(), serviceName)
		if err != nil {
   
			panic(err)
		}
		addr := string(resp.Kvs[0].Value)

		conn, err := grpc.Dial(addr,
			grpc.WithTransportCredentials(insecure.NewCredentials()))
		if err != nil {
   
			panic(err)
		}

		// etcd存储的是各个微服务的监听地址,通过监听地址创建服务实例
		switch serviceName {
   
		case option.User:
			grpcCli := userService.NewUserServiceClient(conn)
			ed.serviceMap[serviceName] = grpcCli
		case option.Relationship:
			grpcCli := relationshipService.NewRelationshipServiceClient(conn)
			ed.serviceMap[serviceName] = grpcCli
		case option.Note:
			grpcCli := noteService.NewNoteServiceClient(conn)
			ed.serviceMap[serviceName] = grpcCli
		case option.Message:
			grpcCli := messageService.NewMessageServiceClient(conn)
			ed.serviceMap[serviceName] = grpcCli
		}
		
		// 开启协程,监听etcd的变化,动态维护各个grpc服务实例
		go ed.watch(serviceName)
	}
}

func (ed *EtcdDiscovery) watch(serviceName string) {
   
	watchChan := ed.etcdClient.Watch(context.TODO(), serviceName)
	for event := range watchChan {
   
		for _, e := range event.Events {
   
			if e.Type == clientv3.EventTypePut {
   
				addr := string(e.Kv.Value)
				conn, err := grpc.Dial(addr,
					grpc.WithTransportCredentials(insecure.NewCredentials()))
				if err != nil {
   
					continue
				}
				ed.mu.Lock()
				switch serviceName {
   
				case option.User:
					grpcCli := userService.NewUserServiceClient(conn)
					ed.serviceMap[serviceName] = grpcCli
				case option.Relationship:
					grpcCli := relationshipService.NewRelationshipServiceClient(conn)
					ed.serviceMap[serviceName] = grpcCli
				case option.Note:
					grpcCli := noteService.NewNoteServiceClient(conn)
					ed.serviceMap[serviceName] = grpcCli
				case option.Message:
					grpcCli := messageService.NewMessageServiceClient(conn)
					ed.serviceMap[serviceName] = grpcCli
				}
				ed.mu.Unlock()
			} else if e.Type == clientv3.EventTypeDelete {
   
				ed.mu.Lock()
				delete(ed.serviceMap, serviceName)
				ed.mu.Unlock()
			}
		}
	}
}

func (ed *EtcdDiscovery) GetServiceAddr(serviceName string) interface{
   } {
   
	ed.mu.RLock()
	defer ed.mu.RUnlock()

	return ed.serviceMap[serviceName]
}

1.2 在web启动时,初始化EtcdDiscovery

package main

import (
	"gonote/gateway/internal"
	"gonote/gateway/internal/option"
	"gonote/gateway/internal/util"
)

func init() {
   
	util.InitLogger()

	util.InitWebSocketServer(64)

	err := util.InitRedis()
	if err != nil {
   
		panic(err)
	}

	util.InitKafka(option.Topic)

	util.InitEtcdDiscovery([]string{
   
		option.User,
		option.Relationship,
		option.Note,
		option.Message})
}

func main() {
   
	engine := internal.Router()

	if err := engine.Run("0.0.0.0:9090"); err != nil {
   
		panic(err)
	}
}

1.3 调用EtcdDiscovery获取服务实例

举个用户注册的例子:

func UserLogin(c *gin.Context) {
   
	em := c.PostForm("email")
	pwd := c.PostForm("pwd")

    // 获取服务实例
	cli := util.EtcdDiscovery.GetServiceAddr(option.User).(service.UserServiceClient)
   
    // 调用服务
	_, err := cli.UserLogin(context.TODO(), &service.User{
   
		Email: em,
		Pwd:   pwd,
	})

	if err != nil {
   
		c.JSON(200, gin.H{
   
			"code": 1,
			"msg":  err.Error(),
		})
		return
	}

	// 生成jwt令牌
	token, err := util.GenToken(em)
	if err != nil {
   
		c.JSON(200, gin.H{
   
			"code": 1,
			"msg":  err.Error(),
		})
		return
	}

    // session维护长连接
	session := sessions.Default(c)
	session.Set("email", em)
	err = session.Save()
	if err != nil {
   
		c.JSON(200, gin.H{
   
			"code": 1,
			"msg":  err.Error(),
		})
		return
	}

	c.JSON(200, gin.H{
   
		"code": 0,
		"data": token,
	})
}

2 微服务端进行服务注册

user业务对应的微服务:

func init() {
   
	util.InitLogger()

	err := util.InitDB()
	if err != nil {
   
		panic(err)
	}

	util.InitKafka(option.Topic)

	util.InitEtcdCli()
}

func main() {
   
	addr := option.IP + ":" + option.Port
	ln, err := net.Listen("tcp", addr)
	if err != nil {
   
		panic(err)
	}
	defer ln.Close()

	defer util.EtcdCli.Close()
	defer util.KafkaCli.Close()

    // 服务注册
	_, err = util.EtcdCli.Put(context.TODO(), "user", addr)
	if err != nil {
   
		panic(err)
	}

	server := grpc.NewServer()

	service.RegisterUserServiceServer(server, &service.UserServiceImpl{
   })

	if err = server.Serve(ln); err != nil {
   
		panic(err)
	}
}

通过etcd命令查看相关注册信息
在这里插入图片描述

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-01-31 05:56:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-31 05:56:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-31 05:56:04       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-31 05:56:04       20 阅读

热门阅读

  1. Linux系统MySQL重置root密码

    2024-01-31 05:56:04       28 阅读
  2. 代码随想录算法训练营第17天

    2024-01-31 05:56:04       35 阅读
  3. react的withRouter高阶组件:

    2024-01-31 05:56:04       34 阅读
  4. 力扣0111——二叉树的最小深度

    2024-01-31 05:56:04       42 阅读
  5. ClickHouse(24)ClickHouse集成mongodb表引擎详细解析

    2024-01-31 05:56:04       37 阅读
  6. React 基础学习01

    2024-01-31 05:56:04       45 阅读
  7. 比VS Code快得多

    2024-01-31 05:56:04       33 阅读