把所有的请求方法集中到一个地方 目的:利于后期维护和加入日志 异常处理 等
目的:框架封装好之后,功能测试不需要写代码,也能执行自动化测试
config.yml
base:
base_php_url: http://47.107.116.139
base_wx_url: https://api.weixin.qq.com
yaml_util.py
import os
import yaml
def get_path():
return os.path.abspath(os.path.dirname(__file__))
def read_config_file(one_node,two_node):
with open(f'{get_path()}/config.yml',encoding="utf-8") as f:
value=yaml.load(f,yaml.FullLoader)
# print(value[one_node][two_node])
return value[one_node][two_node]
# if __name__ == '__main__':
# print (read_config_file("base","base_php_url"))
requests_util.py
import requests
from pytestdemo.common.yaml_util import read_config_file
class RequestsUtil:
base_url=""
def __init__(self,base,base_url):
self.base_url=read_config_file(base,base_url)
session=requests.Session()
def send_request(self,method,url,**kwargs):
method=str(method).lower()
url=self.base_url+url
res=RequestsUtil.session.request(method,url,**kwargs);
return res
test_demo5.py
import re
from pytestdemo.common.requests_util import RequestsUtil
class Test5Api:
csrf_token=""
def test_get_csrf_token(self):
res =RequestsUtil("base","base_php_url").send_request("get",url=f'/phpwind')
# 通过正则表达式取值
obj= re.search('name="csrf_token" value="(.*?)"',res.text)
print(obj)
Test5Api.csrf_token=obj.group(1)
print(Test5Api.csrf_token)
接口关联
新建extract.yml
yaml_util.py
import os
import yaml
def get_path():
return os.path.abspath(os.getcwd().split("common")[0])
def read_config_file(one_node,two_node):
with open(f'{get_path()}/common/config.yml',encoding="utf-8") as f:
value=yaml.load(f,yaml.FullLoader)
# print(value[one_node][two_node])
return value[one_node][two_node]
def read_extract_file(node_name):
with open(f'{get_path()}/common/extract.yml',encoding="utf-8") as f:
value=yaml.load(f,yaml.FullLoader)
return value[node_name]
def write_extract_file(data):
with open(f'{get_path()}/common/extract.yml',encoding="utf-8",mode="a") as f:
yaml.dump(data,f,allow_unicode=True)
def clean_extract_file():
with open(f'{get_path()}/common/extract.yml',encoding="utf-8",mode="w") as f:
f.truncate()
# if __name__ == '__main__':
# # print (read_config_file("base","base_php_url"))
# print(get_path())
test_demo5.py
import re
from pytestdemo.common.requests_util import RequestsUtil
from pytestdemo.common.yaml_util import write_extract_file, read_extract_file
class Test5Api:
def test_get_csrf_token(self):
res =RequestsUtil("base","base_php_url").send_request("get",url=f'/phpwind')
# 通过正则表达式取值
obj= re.search('name="csrf_token" value="(.*?)"',res.text)
print(obj)
write_extract_file({"csrf_token":obj.group(1)})
def test_login(self):
data = {
"username":"msxy",
"password":"msxy",
"csrf_token": read_extract_file("csrf_token"),
"backurl": "http://47.107.116.139/phpwind/"
}
print(data)
res = RequestsUtil("base","base_php_url").send_request("post","/phpwind/index.php?m=u&c=login",
data=data)
print(res.request.headers)
print(res.text)
conftest.py
@pytest.fixture(scope="session",autouse=True)
def clean_extract():
clean_extract_file()
request 封装&接口关联
yaml_util.py
import os
import yaml
def get_path():
return os.path.abspath(os.path.dirname(__file__))
def read_config_file(one_node,two_node):
with open(f'{get_path()}/config.yml',encoding="utf-8") as f:
value=yaml.load(f,yaml.FullLoader)
# print(value[one_node][two_node])
return value[one_node][two_node]
def read_extract_file(node_name):
with open(f'{get_path()}/extract.yml',encoding="utf-8") as f:
value=yaml.load(f,yaml.FullLoader)
return value[node_name]
def write_extract_file(data):
with open(f'{get_path()}/extract.yml',encoding="utf-8",mode="a") as f:
yaml.dump(data,f,allow_unicode=True)
def clean_extract_file():
with open(f'{get_path()}/extract.yml',encoding="utf-8",mode="w") as f:
f.truncate()
# if __name__ == '__main__':
# print (read_config_file("base","base_php_url"))
requests_util.py
import json
import requests
from pytestdemo.common.yaml_util import read_config_file, read_extract_file
class RequestsUtil:
base_url=""
def __init__(self,base,base_url):
self.base_url=read_config_file(base,base_url)
session=requests.Session()
def replace_value(self,data):
if data and isinstance(data,dict):
str=json.dumps(data)
else:
str=data
# 将变量替换成值
for a in range(1,str.count("{{")+1):
if "{{" in str and "}}" in str:
print(str)
start_index=str.index("{{")
end_index = str.index("}}")
print(start_index,end_index)
old_value=str[start_index:end_index+2]
new_value=read_extract_file(old_value[2:-2])
str=str.replace(old_value,new_value)
if data and isinstance(data,dict):
data=json.loads(str)
else:
data=str
return data
def send_request(self,method,url,**kwargs):
method=str(method).lower()
url=self.base_url+self.replace_value(url)
for key,value in kwargs.items():
if key in ["params","data","json"]:
kwargs[key]=self.replace_value(value)
res=RequestsUtil.session.request(method,url,**kwargs);
return res
test_demo4.py
import random
from pytestdemo.common.requests_util import RequestsUtil
from pytestdemo.common.yaml_util import write_extract_file
class Test4Api:
access_token=""
def test_get_token(self):
data = {
"grant_type": "client_credential",
"appid": "wx6b11b3efd1cdc290",
"secret": "106a9c6157c4db5f6029918738f9529d"
}
res =RequestsUtil("base","base_wx_url").send_request("get",url="/cgi-bin/token", params=data)
return_date= res.json()
print(return_date["access_token"])
write_extract_file({"access_token":return_date["access_token"]})
def test_get_news(self):
data={
"tag":{
"id":23475,
"name":"ms"+str(random.randint(100000,999999)),
"keyss":"{{access_token}}"
}
}
res=RequestsUtil("base","base_wx_url").send_request("post","/cgi-bin/tags/update?access_token={{access_token}}",json=data)
print(res.text )