rabbitMQ的简单使用

rabbitMQ的介绍

RabbitMQ是一个开源的消息代理和队列服务器,主要用于在不同的应用程序之间传递消息。它基于AMQP(Advanced Message Queuing Protocol)协议,提供了一种可靠的方式来处理异步通信。RabbitMQ使用Erlang语言编写,具有高可用性、可伸缩性和可靠性。

windows下安装

1.安装Erlang,官网:erlang
在这里插入图片描述

2.安装rabbitmq,官网:rabbitmq
在这里插入图片描述

启动rabbitmq

黑窗口进入…/sbin输入命令:

rabbitmq-plugins enable rabbitmq_management

在浏览器中输入地址查看是否安装成功:http://127.0.0.1:15672/
在这里插入图片描述

使用默认账号密码登录:guest/guest

rabbitMQ的优缺点

2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点:

  • 由于 erlang 语言的高并发特性,性能较好;
  • 吞吐量到万级,MQ功能比较完备、健壮、稳定、易用、跨平台、支持多种语言如Python、Java等,支持AJAX 文档齐全;
  • 开源提供的管理界面非常棒,用起来很好用,社区活跃度高;
  • 更新频率相当高。

缺点:

  • 商业版需要收费,学习成本较高。
  • 选用场景:结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

为什么使用rabbitmq

1️⃣ 高并发的流量削峰

举个例子,假设某订单系统每秒最多能处理一万次订单,也就是最多承受的10000qps,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
在这里插入图片描述
2️⃣ 应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
在这里插入图片描述

3️⃣ 异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息队列,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。

在这里插入图片描述

简单示例:

客户端(发消息):

package main

import (
	"context"
	"log"
	"os"
	"strings"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

// 检查每个 AMQP电话
func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

// 连接到 RabbitMQ 服务器
func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	//创建一个通道,这是大多数 API 用于获取 完成的事情驻留
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	//声明一个队列供我们发送到;然后我们可以发布一条消息 到队列
	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable 持久
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	/*body := "Hello World!"
	err = ch.PublishWithContext(ctx,
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)*/

	//允许从命令行发送任意消息
	body := bodyFrom(os.Args)
	err = ch.PublishWithContext(ctx,
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,
		amqp.Publishing{
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

服务端(接收消息):

package main

import (
	"bytes"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")
	//告诉 RabbitMQ 不要给超过 一次向工作人员发送一条消息。
	//或者,换句话说,不要派遣 向工作人员发送一条新消息,直到它处理并确认 上一个。
	//相反,它会将其分派给下一个尚未忙碌的工作人员。
	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack  发送适当的确认 来自工作人员(这确认单次交付), 一旦我们完成了一项任务
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	/*go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()*/

	//需要 为邮件正文中的每个点伪造一秒钟的工作。它会弹出 来自队列的消息并执行任务
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			dotCount := bytes.Count(d.Body, []byte("."))
			t := time.Duration(dotCount)
			time.Sleep(t * time.Second)
			log.Printf("Done")
			d.Ack(false)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

运行效果:
在这里插入图片描述

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-06-18 14:00:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-18 14:00:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-18 14:00:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-18 14:00:03       20 阅读

热门阅读

  1. 【go】go初始化命令总结

    2024-06-18 14:00:03       6 阅读
  2. 【大数据】gRPC、Flink、Kafka 分别是什么?

    2024-06-18 14:00:03       6 阅读
  3. C#面:请说说C#引用和对象?

    2024-06-18 14:00:03       5 阅读
  4. IntelliJ IDEA调试技巧

    2024-06-18 14:00:03       7 阅读
  5. APK打包 |应用图标 | 应用名称设置

    2024-06-18 14:00:03       7 阅读
  6. 数据库引擎有哪些?

    2024-06-18 14:00:03       9 阅读
  7. 实际中如何应对ARP泛洪攻击

    2024-06-18 14:00:03       7 阅读
  8. USB - 常用开发工具

    2024-06-18 14:00:03       6 阅读