需求描述
给mqtt的topic发送信息
对应的topic接收请求后,执行发送短信指令
信息内容
SMS,10086,102
lua的mqtt里面,截取判断即可
–>可以实现
=====
定时任务,
每月开机一次
发短信?
或者使用开机通知?
定时消费方式,
定时执行
cron表达式,每隔2个月执行一次
cron定时表达式–每两月执行一次_corn 每月执行一次-CSDN博客
https://blog.csdn.net/weixin_44234800/article/details/129949900
0 0 2 1 /2 ?
0 0 1 1/2 * ?
# -*- coding: utf-8 -*-
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from datetime import datetime
#
topic='led001'
# 用户私钥
uid='7801e4ba1bf7406593d47250797860fd'
# 定义中文日期时间格式
chinese_format = "%Y年%m月%d日 %H时%M分%S秒"
# 存放结果信息
msg_success =datetime.now().month+'月份消费情况\n生成时间:'+datetime.now().strftime(chinese_format)+'\n========\n'
msg_failed=''
phone_list=[
13123456701,
13123456702,
13123456703,
13123456704
]
# 根据手机号获取发送的短信内容
def getSmsContent(phone_num):
# 获取当前日期和时间
now = datetime.now()
# 格式化日期和时间
formatted_date = now.strftime("%Y-%m-%d %H:%M:%S")
print("当前日期和时间:", formatted_date)
sms_content= '{}\t{}\t执行消费'.format(formatted_date,phone_num)
print("sms_content:",sms_content)
return sms_content
# 请求mqtt
def post_mqtt(phone_num):
content= getSmsContent(phone_num)
msg= 'SMS,{},{}'.format(phone_num,content)
print("msg:",msg)
url ='https://apis.bemfa.com/va/postJsonMsg'
# print("url:",url)
json_text ={
"uid": uid,
"topic": topic,
"type": 1,
"msg": msg
}
headers = {'Content-Type': 'application/json;charset=utf-8'}
response_str = requests.post(url, json.dumps(json_text), headers=headers)
print('response_str:',response_str)
response_dict = json.loads(response_str)
if response_dict['code'] == 0 and response_dict['message'] == 'ok':
print("请求成功")
msg_success+ phone_num+ '\t发短信成功\t消费成功\n========\n'
else:
print("请求失败")
msg_failed+phone_num+ '\t发短信失败\t消费失败\n========\n'
# 遍历list取出号码
def do_work():
for index, item in enumerate(phone_list):
print(f"Index: {index}, Item: {item}")
post_mqtt(item)
# 等待20s
time.sleep(20)
print("==============")
if __name__ == '__main__':
print("111111")
# 打印返回结果
# print("result:\n",result)
do_work()