需求
传入参数后自动判定方法类型、方法参数格式化进行请求访问
代码
import json
import requests
def str_to_json(data):
if data is None:
return data
try:
return json.loads(data)
except Exception as e:
return data
def requester(flow):
url = flow["url"]
method = flow["method"]
headers = flow["headers"]
post_data = flow["post_data"]
proxy = flow["proxy"]
try:
if method == "POST":
if headers["content-type"] and "application/json" in headers["content-type"]:
r = requests.post(
url=url,
headers=headers,
json=str_to_json(post_data),
proxies=proxy,
verify=False
)
else:
r = requests.post(
url=url,
headers=headers,
data=post_data,
proxies=proxy,
verify=False
)
elif method == "GET":
r = requests.get(
url=url,
headers=headers,
proxies=proxy,
verify=False
)
else:
print("不支持的方法: {}, url: {}".format(method, url))
except Exception as e:
print("请求 {} 错误: {}".format(url, e))
return None
return r
测试
if __name__ == "__main__":
request_datas = [
{
#"request_url": "http://172.16.12.133:5888/api/shop/order/detail?order_id=9&_signature=11111&aaa=888",
"request_url": "http://172.16.12.133:5888/api/shop/product/buy",
"resource_type": "document",
#"request_method": "GET",
"request_method": "POST",
#"request_post_data": "{\"id\":1,\"first_name\":\"gesilaadmin\",\"surname\":\"gesilaadmin\"}",
"request_post_data": "{\"product_id\":\"4\",\"coupon_code\":\"dbe9fc5ba2e2da23acdae64dc5a2b2f2\"}",
"requset_headers": {
"accept": "Accept: */*",
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh;q=0.9",
"content-type": "application/json",
"Origin": "http://172.16.12.133:5888",
"Referer": "http://172.16.12.133:5888/checkout?product_id=4",
"connection": "close",
"cookie": "session=eyJ1c2VybmFtZSI6ImFkbWluIn0.Zb8-BA.3DX4qB1yNha7w3hz1R3_BedIZ_g",
"host": "http://172.16.12.133:5888",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Authorization": "Bareer 11111"
}
}
]
flow = {
"url": data["request_url"],
"headers": data["requset_headers"],
"method": data["request_method"],
"post_data": data["request_post_data"],
"query": get_url_query(data["request_url"]),
"proxy": None,
"cookie2": cookie2,
"matchreplace": matchreplace
}
r = requester(flow)
if not r:
print("请求出错")
else:
print(r.text)