go-redis
是 Go 语言的一个流行的 Redis 客户端库,它提供了丰富的功能来与 Redis 数据库进行交互。
1、简单应用
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
// 设置值
err1 := rdb.Set(ctx, "your_key1", "your_value1", 0).Err()
if err1 != nil {
log.Fatalf("无法设置键值: %v", err1)
}
// 获取值
val, _ := rdb.Get(ctx, "your_key1").Result()
fmt.Println("your_key1: ", val)
err2 := rdb.LPush(ctx, "your_key2", 111, 222, 333, "444").Err()
if err2 != nil {
log.Fatalf("无法推送到列表: %v", err2)
}
// 循环处理邮件队列
for {
// 从 Redis 队列中获取邮件
email, err := rdb.RPop(ctx, "your_key2").Result()
if err == redis.Nil {
// 队列为空,退出循环
fmt.Println("电子邮件队列为空。")
break
} else if err != nil {
log.Fatalf("无法从队列中获取电子邮件: %v", err)
} else {
fmt.Println(email)
}
}
}
2、连接池管理
使用 go-redis
可以方便地管理 Redis 连接池,确保高效的连接复用。
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func main() {
var ctx = context.Background()
// 创建一个新的 Redis 客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10, // 设置连接池大小
})
// 使用客户端
pong, err := client.Ping(ctx).Result()
if err != nil {
panic(err)
}
fmt.Println(pong) // 输出 PONG
}
3、管道和事务
go-redis
支持管道(pipelines)和事务(transactions),可以批量执行命令,减少网络往返次数。
事务(Transactions)
事务在 Redis 中用来执行一系列命令,这些命令被打包在一起,然后一起执行。如果在执行过程中遇到错误,整个事务将被取消。事务可以保证操作的原子性。
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func main() {
var ctx = context.Background()
// 创建 Redis 客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// 检查连接
pong, err := client.Ping(ctx).Result()
if err != nil {
fmt.Printf("连接失败: %v", err)
return
}
fmt.Printf("连接成功: %s\n", pong)
key1 := "key1"
key2 := "key2"
// 使用 WATCH 命令监视键
err = client.Watch(ctx, func(tx *redis.Tx) error {
// 事务中的命令
// SetNX函数:当该键不存在时才设置。如果键已经存在,命令将不执行任何操作。第三个参数为设置过期时间,0表示永不过期
k1, err := tx.SetNX(ctx, key1, "value1", 0).Result() // 仅当 key1 不存在时设置值
if err != nil {
return err
}
if k1 {
fmt.Println("key1 设置成功")
} else {
fmt.Println("key1 已存在,未进行设置操作")
}
k2, err := tx.SetNX(ctx, key2, "value2", 0).Result() // 仅当 key2 不存在时设置值
if err != nil {
return err
}
if k2 {
fmt.Println("key2 设置成功")
} else {
fmt.Println("key2 已存在,未进行设置操作")
}
// 使用 MULTI 命令开始事务
pipe := tx.Pipeline()
// 将命令添加到事务队列
_, err = pipe.Set(ctx, key1, "value1", 0).Result()
if err != nil {
return err
}
_, err = pipe.Set(ctx, key2, "value2", 0).Result()
if err != nil {
return err
}
// 使用 EXEC 命令执行事务
_, err = pipe.Exec(ctx)
if err != nil {
return err
}
return nil
}, key1, key2)
if err != nil {
if err == redis.TxFailedErr {
fmt.Println("事务失败,可能因为键被其他客户端修改")
} else {
fmt.Printf("发生错误: %v", err)
}
return
}
// 如果事务成功,获取 key1 和 key2 的值
val1, err := client.Get(ctx, key1).Result()
if err != nil {
fmt.Printf("获取 key1 失败: %v", err)
return
}
val2, err := client.Get(ctx, key2).Result()
if err != nil {
fmt.Printf("获取 key2 失败: %v", err)
return
}
fmt.Printf("key1: %s, key2: %s\n", val1, val2)
}
使用 Watch
方法监视了两个键 "key1"
和 "key2"
。在 Watch
的回调函数中,我们尝试设置这两个键的值,如果设置成功,我们使用 Pipeline
方法将设置操作添加到事务队列中。最后,我们使用 Exec
方法提交事务。
请注意,如果 "key1"
或 "key2"
在我们设置它们之前已经被其他客户端设置,那么事务将不会执行,并且 Watch
方法将返回 redis.TxFailedErr
错误。在实际应用中,你可能需要根据这个错误来决定是否重试事务。
事务&管道
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func main() {
var ctx = context.Background()
// 创建一个新的 Redis 客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10, // 设置连接池大小
})
// 假设我们想检查并更新的key以及期望的旧值和新值
key := "mykey"
oldValue := "expectedOldValue"
newValue := "newValue"
// 事务:使用 MULTI 和 EXEC 命令保证操作的原子性
err := client.Watch(ctx, func(tx *redis.Tx) error {
// 在事务中获取key的值
val, err := tx.Get(ctx, key).Result()
if err != nil && err != redis.Nil {
return err
}
// 检查值是否与预期相符
if val == oldValue {
// 如果相符,尝试更新值
_, err := tx.Set(ctx, key, newValue, 0).Result()
if err != nil {
return err
}
fmt.Println("更新成功")
return nil
} else {
fmt.Println("键存在,值与预期值不匹配。")
return nil // 这里返回nil表示不执行任何操作,事务会因为没有EXEC调用而自动取消
}
}, key)
if err != nil {
panic(err)
}
// 管道:批量发送命令
cmds, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "key1", "value1", 0)
pipe.Get(ctx, "key1")
return nil
})
if err != nil {
panic(err)
}
fmt.Println(cmds) // 输出 [SetResult(string) <nil>, StringResult(value1, nil)]
}
4、发布/订阅模式
发布:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx := context.Background()
// 发送消息的循环
for {
err := client.Publish(ctx, "my_channel", "Hello, Redis!").Err()
if err != nil {
fmt.Printf("发布消息失败: %v\n", err)
return
}
fmt.Println("消息发布成功")
// 每5秒发布一条消息
time.Sleep(5 * time.Second)
}
}
订阅:
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"log"
"os"
"os/signal"
)
func main() {
// 创建 Redis 客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
ctx := context.Background()
// 订阅频道
pubsub := client.Subscribe(ctx, "my_channel")
// 处理接收到的消息
go func() {
defer func() {
if err := pubsub.Close(); err != nil {
log.Printf("关闭订阅失败: %v", err)
}
}()
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
log.Printf("接收消息失败: %v", err)
return
}
fmt.Printf("接收到消息: %s\n", msg.Payload)
}
}()
// 等待中断信号以退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
<-sigChan
fmt.Println("正在关闭订阅...")
}
package main
import (
"context"
"github.com/redis/go-redis/v9"
"log"
"os"
"os/signal"
"syscall"
"time"
)
var ctx = context.Background()
func main() {
// 创建 Redis 客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
if err := client.Ping(ctx).Err(); err != nil {
log.Fatalf("无法连接到 Redis: %v", err)
}
// 启动订阅者
go runSubscriber(client)
// 发布者循环发送消息
runPublisher(client)
// 等待中断信号以优雅退出
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("接收到退出信号,正在关闭...")
if err := client.Close(); err != nil {
log.Fatalf("关闭 Redis 客户端失败: %v", err)
}
}
func runSubscriber(client *redis.Client) {
pubsub := client.Subscribe(ctx, "my_channel")
defer func() {
if err := pubsub.Close(); err != nil {
log.Printf("关闭订阅失败: %v", err)
}
}()
log.Println("订阅者启动,正在监听消息...")
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
log.Printf("接收消息失败: %v", err)
return
}
log.Printf("接收到消息: %s\n", msg.Payload)
}
}
func runPublisher(client *redis.Client) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
log.Println("发布者启动,将定时发送消息...")
for {
select {
case <-ticker.C:
err := client.Publish(ctx, "my_channel", "Hello, Redis!").Err()
if err != nil {
log.Printf("发布消息失败: %v", err)
return
}
log.Println("消息发布成功")
}
}
}