BaseClient
def _enforce_trailing_slash(self, url: URL) -> URL:
if url.raw_path.endswith(b"/"):
return url
return url.copy_with(raw_path=url.raw_path + b"/")
确保URL的路径以/结尾
def _make_status_error_from_response(
self,
response: httpx.Response,
) -> APIStatusError:
if response.is_closed and not response.is_stream_consumed:
# We can't read the response body as it has been closed
# before it was read. This can happen if an event hook
# raises a status error.
body = None
err_msg = f"Error code: {response.status_code}"
else:
err_text = response.text.strip()
body = err_text
try:
body = json.loads(err_text)
err_msg = f"Error code: {response.status_code} - {body}"
except Exception:
err_msg = err_text or f"Error code: {response.status_code}"
return self._make_status_error(err_msg, body=body, response=response)
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> _exceptions.APIStatusError:
raise NotImplementedError()
"""
class APIStatusError(APIError):
# Raised when an API response has a status code of 4xx or 5xx.
response: httpx.Response
status_code: int
def __init__(self, message: str, *, response: httpx.Response, body: object | None) -> None:
super().__init__(message, response.request, body=body)
self.response = response
self.status_code = response.status_code
"""
处理API响应的状态码,4xx 或 5xx 时被抛出
当响应体没有被完全读取,将 body 设置为 None,并将错误消息设置为响应状态码。
如果响应体未被关闭,我们尝试从中提取错误文本
提供一个统一的错误处理方式,无论响应体是文本、JSON 还是其他格式,都能够正确地创建一个 APIStatusError 实例,以便于在应用程序中进行错误处理
def _remaining_retries(
self,
remaining_retries: Optional[int],
options: FinalRequestOptions,
) -> int:
return remaining_retries if remaining_retries is not None else options.get_max_retries(self.max_retries)
"""
class FinalRequestOptions(pydantic.BaseModel):
method: str
url: str
params: Query = {}
headers: Union[Headers, NotGiven] = NotGiven()
max_retries: Union[int, NotGiven] = NotGiven()
timeout: Union[float, Timeout, None, NotGiven] = NotGiven()
files: Union[HttpxRequestFiles, None] = None
idempotency_key: Union[str, None] = None
post_parser: Union[Callable[[Any], Any], NotGiven] = NotGiven()
# It should be noted that we cannot use `json` here as that would override
# a BaseModel method in an incompatible fashion.
json_data: Union[Body, None] = None
extra_json: Union[AnyMapping, None] = None
if PYDANTIC_V2:
model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True)
else:
class Config(pydantic.BaseConfig): # pyright: ignore[reportDeprecated]
arbitrary_types_allowed: bool = True
def get_max_retries(self, max_retries: int) -> int:
if isinstance(self.max_retries, NotGiven):
return max_retries
return self.max_retries
def _strip_raw_response_header(self) -> None:
if not is_given(self.headers):
return
if self.headers.get(RAW_RESPONSE_HEADER):
self.headers = {**self.headers}
self.headers.pop(RAW_RESPONSE_HEADER)
# override the `construct` method so that we can run custom transformations.
# this is necessary as we don't want to do any actual runtime type checking
# (which means we can't use validators) but we do want to ensure that `NotGiven`
# values are not present
#
# type ignore required because we're adding explicit types to `**values`
@classmethod
def construct( # type: ignore
cls,
_fields_set: set[str] | None = None,
**values: Unpack[FinalRequestOptionsInput],
) -> FinalRequestOptions:
kwargs: dict[str, Any] = {
# we unconditionally call `strip_not_given` on any value
# as it will just ignore any non-mapping types
key: strip_not_given(value)
for key, value in values.items()
}
if PYDANTIC_V2:
return super().model_construct(_fields_set, **kwargs)
return cast(FinalRequestOptions, super().construct(_fields_set, **kwargs)) # pyright: ignore[reportDeprecated]
if not TYPE_CHECKING:
# type checkers incorrectly complain about this assignment
model_construct = construct
"""
计算HTTP请求剩余的重试次数
FinalRequestOptions 提供一个灵活的HTTP请求配置类,使用pydantic来解析和验证数据,同时允许自定义处理逻辑,如删除请求头中的特定字段等
def _build_headers(self, options: FinalRequestOptions) -> httpx.Headers:
custom_headers = options.headers or {}
headers_dict = _merge_mappings(self.default_headers, custom_headers)
self._validate_headers(headers_dict, custom_headers)
# headers are case-insensitive while dictionaries are not.
headers = httpx.Headers(headers_dict)
idempotency_header = self._idempotency_header
if idempotency_header and options.method.lower() != "get" and idempotency_header not in headers:
headers[idempotency_header] = options.idempotency_key or self._idempotency_key()
return headers
将默认头部信息和自定义头部信息合并,然后添加幂等性头部字段(如果需要),最后返回一个httpx.Headers对象
def _prepare_url(self, url: str) -> URL:
"""
Merge a URL argument together with any 'base_url' on the client,
to create the URL used for the outgoing request.
"""
# Copied from httpx's `_merge_url` method.
merge_url = URL(url)
if merge_url.is_relative_url:
merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
return self.base_url.copy_with(raw_path=merge_raw_path)
return merge_url
确保发出的HTTP请求使用正确的URL。如果传入的URL是相对的,它会与客户端的base_url相对应地合并
def _build_request(
self,
options: FinalRequestOptions,
) -> httpx.Request:
if log.isEnabledFor(logging.DEBUG):
log.debug("Request options: %s", model_dump(options, exclude_unset=True))
kwargs: dict[str, Any] = {}
json_data = options.json_data
if options.extra_json is not None:
if json_data is None:
json_data = cast(Body, options.extra_json)
elif is_mapping(json_data):
json_data = _merge_mappings(json_data, options.extra_json)
else:
raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")
headers = self._build_headers(options)
params = _merge_mappings(self._custom_query, options.params)
# If the given Content-Type header is multipart/form-data then it
# has to be removed so that httpx can generate the header with
# additional information for us as it has to be in this form
# for the server to be able to correctly parse the request:
# multipart/form-data; boundary=---abc--
if headers.get("Content-Type") == "multipart/form-data":
headers.pop("Content-Type")
# As we are now sending multipart/form-data instead of application/json
# we need to tell httpx to use it, https://www.python-httpx.org/advanced/#multipart-file-encoding
if json_data:
if not is_dict(json_data):
raise TypeError(
f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
)
kwargs["data"] = self._serialize_multipartform(json_data)
# TODO: report this error to httpx
return self._client.build_request( # pyright: ignore[reportUnknownMemberType]
headers=headers,
timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
method=options.method,
url=self._prepare_url(options.url),
# the `Query` type that we use is incompatible with qs'
# `Params` type as it needs to be typed as `Mapping[str, object]`
# so that passing a `TypedDict` doesn't cause an error.
# https://github.com/microsoft/pyright/issues/3526#event-6715453066
params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
json=json_data,
files=options.files,
**kwargs,
)
"""
def cast(typ, val):
return val
def isEnabledFor(self, level):
# Is this logger enabled for level 'level'?
if self.disabled:
return False
try:
return self._cache[level]
except KeyError:
_acquireLock()
try:
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = (
level >= self.getEffectiveLevel()
)
finally:
_releaseLock()
return is_enabled
def _merge_mappings(
obj1: Mapping[_T_co, Union[_T, Omit]],
obj2: Mapping[_T_co, Union[_T, Omit]],
) -> Dict[_T_co, _T]:
merged = {**obj1, **obj2}
return {key: value for key, value in merged.items() if not isinstance(value, Omit)}
"""
创建并返回一个httpx.Request对象,该对象包含了所有必要的请求信息。
如果请求头中包含multipart/form-data,则需要httpx处理表单数据,因此需要移除这个内容类型头
如果请求体是JSON数据,并且Content-Type是multipart/form-data,则需要将JSON数据转换为适合multipart格式的数据
cast 告诉类型检查器一个变量应该具有的类型,而不需要在运行时进行实际的类型转换
isEnabledFor 用于确定日志记录器是否应该处理和发出给定严重级别的日志消息
_merge_mappings 用于合并两个映射对象,合并时会优先考虑第二个映射对象中的值,并且会移除所有值为Omit的键值对
def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
items = self.qs.stringify_items(
# TODO: type ignore is required as stringify_items is well typed but we can't be
# well typed without heavy validation.
data, # type: ignore
array_format="brackets",
)
serialized: dict[str, object] = {}
for key, value in items:
if key in serialized:
raise ValueError(f"Duplicate key encountered: {key}; This behaviour is not supported")
serialized[key] = value
return serialized
"""
def stringify_items(
self,
params: Params,
*,
array_format: NotGivenOr[ArrayFormat] = NOT_GIVEN,
nested_format: NotGivenOr[NestedFormat] = NOT_GIVEN,
) -> list[tuple[str, str]]:
opts = Options(
qs=self,
array_format=array_format,
nested_format=nested_format,
)
return flatten([self._stringify_item(key, value, opts) for key, value in params.items()])
"""
将一个字典(data)中的键值对序列化为适用于multipart/form-data请求格式的字典
stringify_items 将一个Params对象中的键值对序列化为字符串,以便在URL查询字符串中使用
def _process_response(
self,
*,
cast_to: Type[ResponseT],
options: FinalRequestOptions,
response: httpx.Response,
stream: bool,
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
) -> ResponseT:
api_response = APIResponse(
raw=response,
client=self,
cast_to=cast_to,
stream=stream,
stream_cls=stream_cls,
options=options,
)
if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
return cast(ResponseT, api_response)
return api_response.parse()
"""
def parse(self) -> R:
if self._parsed is not None:
return self._parsed
parsed = self._parse()
if is_given(self._options.post_parser):
parsed = self._options.post_parser(parsed)
self._parsed = parsed
return parsed
"""
处理HTTP响应,并根据需要将响应转换为特定的类型
parse 如果响应数据已经被解析,直接返回已解析的数据,否则使用_parse方法解析响应数据。如果指定了post_parser选项,则允许用户在响应解析后应用自定义的处理逻辑。将解析后的数据存储在APIResponse对象中返回解析后的数据
def _process_response_data(
self,
*,
data: object,
cast_to: type[ResponseT],
response: httpx.Response,
) -> ResponseT:
if data is None:
return cast(ResponseT, None)
if cast_to is UnknownResponse:
return cast(ResponseT, data)
try:
if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
return cast(ResponseT, cast_to.build(response=response, data=data))
if self._strict_response_validation:
return cast(ResponseT, validate_type(type_=cast_to, value=data))
return cast(ResponseT, construct_type(type_=cast_to, value=data))
except pydantic.ValidationError as err:
raise APIResponseValidationError(response=response, body=data) from err
def _should_stream_response_body(self, *, request: httpx.Request) -> bool:
if request.headers.get(STREAMED_RAW_RESPONSE_HEADER) == "true":
return True
return False
_process_response_data:处理响应数据,并根据指定的类型cast_to将数据转换成相应的类型
如果cast_to是一个类,并且是ModelBuilderProtocol的子类,则调用build方法来构建一个模型实例
如果设置了self._strict_response_validation,则使用validate_type函数来验证数据是否符合cast_to类型的期望
如果cast_to是一个普通类,而不是模型类,则使用construct_type函数来构造一个实例
_should_stream_response_body:判断响应体是否应该以流式方式处理
# 定义一个属性,返回一个Querystring类的实例
@property
def qs(self) -> Querystring:
return Querystring()
# 定义一个属性,返回一个httpx.Auth对象或None
@property
def custom_auth(self) -> httpx.Auth | None:
return None
# 定义一个属性,返回一个包含认证相关HTTP头部的字典
@property
def auth_headers(self) -> dict[str, str]:
return {}
# 定义一个属性,返回一个包含默认HTTP头部的字典
@property
def default_headers(self) -> dict[str, str | Omit]:
return {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": self.user_agent,
**self.platform_headers(),
**self.auth_headers,
**self._custom_headers,
}
# 定义一个方法,用于验证传递给请求的默认头部和自定义头部
def _validate_headers(
self,
headers: Headers, # noqa: ARG002
custom_headers: Headers, # noqa: ARG002
) -> None:
"""Validate the given default headers and custom headers.
Does nothing by default.
"""
return
# 定义一个属性,返回一个字符串,包含用户代理信息
@property
def user_agent(self) -> str:
return f"{self.__class__.__name__}/Python {self._version}"
# 定义一个属性,用于获取或设置请求的基本URL
@property
def base_url(self) -> URL:
return self._base_url
# 定义一个设置器方法,用于设置请求的基本URL,并确保URL总是带有尾随的斜杠
@base_url.setter
def base_url(self, url: URL | str) -> None:
self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))
# 定义一个方法,返回与平台相关的HTTP头部信息
def platform_headers(self) -> Dict[str, str]:
return platform_headers(self._version)
处理HTTP请求的头部信息,用户代理,以及请求的基本URL
def _calculate_retry_timeout(
self,
remaining_retries: int,
options: FinalRequestOptions,
response_headers: Optional[httpx.Headers] = None,
) -> float:
max_retries = options.get_max_retries(self.max_retries)
try:
# About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
#
# <http-date>". See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax for
# details.
if response_headers is not None:
retry_header = response_headers.get("retry-after")
try:
retry_after = float(retry_header)
except Exception:
retry_date_tuple = email.utils.parsedate_tz(retry_header)
if retry_date_tuple is None:
retry_after = -1
else:
retry_date = email.utils.mktime_tz(retry_date_tuple)
retry_after = int(retry_date - time.time())
else:
retry_after = -1
except Exception:
retry_after = -1
# If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
if 0 < retry_after <= 60:
return retry_after
initial_retry_delay = 0.5
max_retry_delay = 8.0
nb_retries = max_retries - remaining_retries
# Apply exponential backoff, but not more than the max.
sleep_seconds = min(initial_retry_delay * pow(2.0, nb_retries), max_retry_delay)
# Apply some jitter, plus-or-minus half a second.
jitter = 1 - 0.25 * random()
timeout = sleep_seconds * jitter
return timeout if timeout >= 0 else 0
根据HTTP响应头部中的Retry-After信息或预定义的指数退避策略来计算在重试HTTP请求之前应该等待的时间
指数退避策略:将初始延迟乘以2的幂来实现的,幂次与剩余重试次数成正比。然后,它将这个值与最大延迟进行比较,取较小者
def _should_retry(self, response: httpx.Response) -> bool:
# Note: this is not a standard header
should_retry_header = response.headers.get("x-should-retry")
# If the server explicitly says whether or not to retry, obey.
if should_retry_header == "true":
return True
if should_retry_header == "false":
return False
# Retry on request timeouts.
if response.status_code == 408:
return True
# Retry on lock timeouts.
if response.status_code == 409:
return True
# Retry on rate limits.
if response.status_code == 429:
return True
# Retry internal errors.
if response.status_code >= 500:
return True
return False
def _idempotency_key(self) -> str:
return f"stainless-python-retry-{uuid.uuid4()}"
_should_retry 决定是否重试
_idempotency_key 生成一个唯一标识符来保证重试请求的幂等性
SyncAPIClient
pass