Redis的一个典型应用

1.redis服务器与python编程环境

#install server

sudo apt update

sudo apt install redis-server

#install python api

pip install redis --timeout 200 -i http://mirrors.aliyun.com/pypi/simple/  --trusted-host mirrors.aliyun.com 

1.1 测试代码

# 创建Redis客户端实例
r = redis.Redis(host='localhost', port=6379, db=0)

# 设置键值对
r.set('key', 'value')

# 获取值
value = r.get('key')
print(value)  # 输出: b'value',b表示bytes类型

# 删除键
r.delete('key')

2.接口封装1:命令行sh脚本的模式

使用redis缺省配置,就可以达到异常关机后,队列数据自动复原 

2.1 enqueue.sh

#!/bin/bash
# usage: enqueue key1 value1 5
# key1, push value1, the queue max length is 5.
# 陈旧数据会被抛弃

# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY=$1                          # 要设置的 Redis 键名
VALUE=$2                        # 要设置的 Redis 值
MAX_LENGTH=$3                   # 最大长度

# 检查列表当前长度
current_length=$(redis-cli -h $REDIS_HOST -p $REDIS_PORT LLEN $KEY)

# 如果列表长度小于最大长度,则添加新元素
if [ $current_length -gt $MAX_LENGTH ]; then
    $REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT RPOP $KEY
fi

# 使用 redis-cli 执行 SET 命令
$REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPUSH $KEY $VALUE

2.2 fetch_top.sh

#!/bin/bash
#usage: fetch_top key1
#得到某个队列的最新值

# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY="$1"                    # Redis 列表键名

# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LINDEX $KEY 0)

# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; then
  echo "$FIRST_ELEMENT"
else
  echo ""
fi

 2.3 dequeue.sh 

#!/bin/bash
#将当前队列最新的元素丢弃

# 设置 Redis 连接信息
REDIS_CLI="redis-cli"  # 替换为你的 redis-cli 路径,或者如果已在 PATH 中,直接使用 redis-cli
REDIS_HOST="localhost"          # Redis 服务器主机名或 IP
REDIS_PORT="6379"               # Redis 服务器端口号
KEY="$1"                    # Redis 列表键名

# 使用 redis-cli 执行 LINDEX 命令获取列表的首个元素
FIRST_ELEMENT=$($REDIS_CLI -h $REDIS_HOST -p $REDIS_PORT LPOP $KEY)

# 检查 LINDEX 命令执行结果
if [ -n "$FIRST_ELEMENT" ]; then
  echo 0
else
  echo 1
fi

3.接口封装2:python的格式,包含结构化对象的定义

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 获取当前脚本文件所在目录的父目录,并构建相对路径
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
project_path = os.path.join(current_dir, '..')
sys.path.append(project_path)
sys.path.append(current_dir)
import datetime
import paho.mqtt.client as mqtt
import cat4Config
import struct
import subprocess
import time
import json
import queue
import numpy as np
import threading
import subprocess
import pickle
import redis

class GpPublishMsg:
    def __init__(self, time, ar_class, pic, addr_web_api):
        self.time = time
        self.ar_class = ar_class,
        self.pic = pic
        self.addr_web_api = addr_web_api

class GpPublicMsgQueue:
    def __init__(self):
        self.MAX_QUEUE_LEN = 3600
        self.errCnt = 0
        self.keyname = current_dir.replace('/', ':').replace('\\', ':').replace('.', ':')
        
    def enqueue(self, msg:GpPublishMsg):
        try:
            # 创建Redis客户端实例
            r = redis.Redis(host='localhost', port=6379, db=0)
            size = r.llen(self.keyname)
            if(size>self.MAX_QUEUE_LEN):
                r.rpop(self.keyname)
            r.lpush(self.keyname, pickle.dumps(msg))
        except:
            self.errCnt = self.errCnt+1

    def dequeue(self) -> GpPublishMsg:
        try:
            # 创建Redis客户端实例
            r = redis.Redis(host='localhost', port=6379, db=0)
            br = r.lpop(self.keyname)
            if(br is not None):
                return pickle.loads(br)
            else:
                return None
        except:
            self.errCnt = self.errCnt+1
            return None

    def fetch_queue_top(self) -> GpPublishMsg:
        try:
            # 创建Redis客户端实例
            r = redis.Redis(host='localhost', port=6379, db=0)
            size = r.llen(self.keyname)
            if(size==0):
                return None
            br = r.lindex(self.keyname, 0)
            if(br is not None):
                return pickle.loads(br)
            else:
                return None
        except:
            self.errCnt = self.errCnt+1
            return None
            
    def test(self):
        item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")
        item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")
        item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")
        dumb = self.fetch_queue_top()
        print('before first enqueue, top() = ', dumb)
        self.enqueue(item1)
        print('enqueue item1, top() = ', self.fetch_queue_top().pic)
        self.enqueue(item2)
        print('enqueue item2, top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('first dequeue, top() = ', self.fetch_queue_top().pic)
        self.enqueue(item3)
        print('enqueue(item3), top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('dequeue(), top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('dequeue(), top() = ', self.fetch_queue_top())

3.1 python封装测试

'''
>>> import ext_pic_out_offline_queue as gpqueue
>>> q = gpqueue.GpPublicMsgQueue()
>>> q.test()
before first enqueue, top() =  None
enqueue item1, top() =  pic1
enqueue item2, top() =  pic2
first dequeue, top() =  pic1
enqueue(item3), top() =  pic3
dequeue(), top() =  pic1
dequeue(), top() =  None
'''


   def test(self):
        item1 = GpPublishMsg(2024, 'class1', 'pic1', "http://1")
        item2 = GpPublishMsg(2024, 'class1', 'pic2', "http://1")
        item3 = GpPublishMsg(2024, 'class1', 'pic3', "http://1")
        dumb = self.fetch_queue_top()
        print('before first enqueue, top() = ', dumb)
        self.enqueue(item1)
        print('enqueue item1, top() = ', self.fetch_queue_top().pic)
        self.enqueue(item2)
        print('enqueue item2, top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('first dequeue, top() = ', self.fetch_queue_top().pic)
        self.enqueue(item3)
        print('enqueue(item3), top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('dequeue(), top() = ', self.fetch_queue_top().pic)
        self.dequeue()
        print('dequeue(), top() = ', self.fetch_queue_top())

4.为什么要用redis

在编程模式上,订阅模式在分布式环境的一个实现形式,就是mqtt。它可以非常方便地处理消息分发。甚至可以将这个应用添加进嵌入式系统——这个分布式协同机制的开销极小。但是mqtt不负责持久化。顶多,它会保留同一个topic下的最后一笔数据,它不进行数据保存,只是把信息分发给当前在线的用户。

如果你需要一个存储缓冲的机制,那么redis就是非常适合的选择。

你可以手工实现,但是它有代价——你无法方便地调试,然后你会遇到重复发明轮子过程中必然遭遇的各种技术细节。

相关推荐

  1. Redis一个典型应用

    2024-07-13 19:10:02       17 阅读
  2. C语言中一些基本数据类型典型大小

    2024-07-13 19:10:02       51 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-13 19:10:02       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-13 19:10:02       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-13 19:10:02       58 阅读
  4. Python语言-面向对象

    2024-07-13 19:10:02       69 阅读

热门阅读

  1. Python 列表深度解析:功能强大的数据结构

    2024-07-13 19:10:02       24 阅读
  2. 什么是天使投资

    2024-07-13 19:10:02       20 阅读
  3. C++中的自定义数据类型:类和结构体

    2024-07-13 19:10:02       19 阅读
  4. 【PLC】基本概念

    2024-07-13 19:10:02       19 阅读
  5. package.json 脚本配置使用环境文件

    2024-07-13 19:10:02       22 阅读
  6. ADC分类

    2024-07-13 19:10:02       19 阅读
  7. Linq的常用方法

    2024-07-13 19:10:02       22 阅读
  8. 数据湖仓一体(四)安装hive

    2024-07-13 19:10:02       19 阅读