OpenAI Client

BaseClient

    def _enforce_trailing_slash(self, url: URL) -> URL:
        if url.raw_path.endswith(b"/"):
            return url
        return url.copy_with(raw_path=url.raw_path + b"/")

确保URL的路径以/结尾

    def _make_status_error_from_response(
        self,
        response: httpx.Response,
    ) -> APIStatusError:
        if response.is_closed and not response.is_stream_consumed:
            # We can't read the response body as it has been closed
            # before it was read. This can happen if an event hook
            # raises a status error.
            body = None
            err_msg = f"Error code: {response.status_code}"
        else:
            err_text = response.text.strip()
            body = err_text

            try:
                body = json.loads(err_text)
                err_msg = f"Error code: {response.status_code} - {body}"
            except Exception:
                err_msg = err_text or f"Error code: {response.status_code}"

        return self._make_status_error(err_msg, body=body, response=response)

    def _make_status_error(
        self,
        err_msg: str,
        *,
        body: object,
        response: httpx.Response,
    ) -> _exceptions.APIStatusError:
        raise NotImplementedError()

"""
class APIStatusError(APIError):
    # Raised when an API response has a status code of 4xx or 5xx.

    response: httpx.Response
    status_code: int

    def __init__(self, message: str, *, response: httpx.Response, body: object | None) -> None:
        super().__init__(message, response.request, body=body)
        self.response = response
        self.status_code = response.status_code
"""

处理API响应的状态码,4xx 或 5xx 时被抛出

当响应体没有被完全读取,将 body 设置为 None,并将错误消息设置为响应状态码。

如果响应体未被关闭,我们尝试从中提取错误文本

提供一个统一的错误处理方式,无论响应体是文本、JSON 还是其他格式,都能够正确地创建一个 APIStatusError 实例,以便于在应用程序中进行错误处理

    def _remaining_retries(
        self,
        remaining_retries: Optional[int],
        options: FinalRequestOptions,
    ) -> int:
        return remaining_retries if remaining_retries is not None else options.get_max_retries(self.max_retries)

"""
class FinalRequestOptions(pydantic.BaseModel):
    method: str
    url: str
    params: Query = {}
    headers: Union[Headers, NotGiven] = NotGiven()
    max_retries: Union[int, NotGiven] = NotGiven()
    timeout: Union[float, Timeout, None, NotGiven] = NotGiven()
    files: Union[HttpxRequestFiles, None] = None
    idempotency_key: Union[str, None] = None
    post_parser: Union[Callable[[Any], Any], NotGiven] = NotGiven()

    # It should be noted that we cannot use `json` here as that would override
    # a BaseModel method in an incompatible fashion.
    json_data: Union[Body, None] = None
    extra_json: Union[AnyMapping, None] = None

    if PYDANTIC_V2:
        model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True)
    else:

        class Config(pydantic.BaseConfig):  # pyright: ignore[reportDeprecated]
            arbitrary_types_allowed: bool = True

    def get_max_retries(self, max_retries: int) -> int:
        if isinstance(self.max_retries, NotGiven):
            return max_retries
        return self.max_retries

    def _strip_raw_response_header(self) -> None:
        if not is_given(self.headers):
            return

        if self.headers.get(RAW_RESPONSE_HEADER):
            self.headers = {**self.headers}
            self.headers.pop(RAW_RESPONSE_HEADER)

    # override the `construct` method so that we can run custom transformations.
    # this is necessary as we don't want to do any actual runtime type checking
    # (which means we can't use validators) but we do want to ensure that `NotGiven`
    # values are not present
    #
    # type ignore required because we're adding explicit types to `**values`
    @classmethod
    def construct(  # type: ignore
        cls,
        _fields_set: set[str] | None = None,
        **values: Unpack[FinalRequestOptionsInput],
    ) -> FinalRequestOptions:
        kwargs: dict[str, Any] = {
            # we unconditionally call `strip_not_given` on any value
            # as it will just ignore any non-mapping types
            key: strip_not_given(value)
            for key, value in values.items()
        }
        if PYDANTIC_V2:
            return super().model_construct(_fields_set, **kwargs)
        return cast(FinalRequestOptions, super().construct(_fields_set, **kwargs))  # pyright: ignore[reportDeprecated]

    if not TYPE_CHECKING:
        # type checkers incorrectly complain about this assignment
        model_construct = construct
"""

计算HTTP请求剩余的重试次数

FinalRequestOptions 提供一个灵活的HTTP请求配置类,使用pydantic来解析和验证数据,同时允许自定义处理逻辑,如删除请求头中的特定字段等

    def _build_headers(self, options: FinalRequestOptions) -> httpx.Headers:
        custom_headers = options.headers or {}
        headers_dict = _merge_mappings(self.default_headers, custom_headers)
        self._validate_headers(headers_dict, custom_headers)

        # headers are case-insensitive while dictionaries are not.
        headers = httpx.Headers(headers_dict)

        idempotency_header = self._idempotency_header
        if idempotency_header and options.method.lower() != "get" and idempotency_header not in headers:
            headers[idempotency_header] = options.idempotency_key or self._idempotency_key()

        return headers

将默认头部信息和自定义头部信息合并,然后添加幂等性头部字段(如果需要),最后返回一个httpx.Headers对象

    def _prepare_url(self, url: str) -> URL:
        """
        Merge a URL argument together with any 'base_url' on the client,
        to create the URL used for the outgoing request.
        """
        # Copied from httpx's `_merge_url` method.
        merge_url = URL(url)
        if merge_url.is_relative_url:
            merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
            return self.base_url.copy_with(raw_path=merge_raw_path)

        return merge_url

确保发出的HTTP请求使用正确的URL。如果传入的URL是相对的,它会与客户端的base_url相对应地合并

    def _build_request(
        self,
        options: FinalRequestOptions,
    ) -> httpx.Request:
        if log.isEnabledFor(logging.DEBUG):
            log.debug("Request options: %s", model_dump(options, exclude_unset=True))

        kwargs: dict[str, Any] = {}

        json_data = options.json_data
        if options.extra_json is not None:
            if json_data is None:
                json_data = cast(Body, options.extra_json)
            elif is_mapping(json_data):
                json_data = _merge_mappings(json_data, options.extra_json)
            else:
                raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`")

        headers = self._build_headers(options)
        params = _merge_mappings(self._custom_query, options.params)

        # If the given Content-Type header is multipart/form-data then it
        # has to be removed so that httpx can generate the header with
        # additional information for us as it has to be in this form
        # for the server to be able to correctly parse the request:
        # multipart/form-data; boundary=---abc--
        if headers.get("Content-Type") == "multipart/form-data":
            headers.pop("Content-Type")

            # As we are now sending multipart/form-data instead of application/json
            # we need to tell httpx to use it, https://www.python-httpx.org/advanced/#multipart-file-encoding
            if json_data:
                if not is_dict(json_data):
                    raise TypeError(
                        f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead."
                    )
                kwargs["data"] = self._serialize_multipartform(json_data)

        # TODO: report this error to httpx
        return self._client.build_request(  # pyright: ignore[reportUnknownMemberType]
            headers=headers,
            timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout,
            method=options.method,
            url=self._prepare_url(options.url),
            # the `Query` type that we use is incompatible with qs'
            # `Params` type as it needs to be typed as `Mapping[str, object]`
            # so that passing a `TypedDict` doesn't cause an error.
            # https://github.com/microsoft/pyright/issues/3526#event-6715453066
            params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None,
            json=json_data,
            files=options.files,
            **kwargs,
        )

"""
def cast(typ, val):
    return val


    def isEnabledFor(self, level):
        # Is this logger enabled for level 'level'?
        if self.disabled:
            return False

        try:
            return self._cache[level]
        except KeyError:
            _acquireLock()
            try:
                if self.manager.disable >= level:
                    is_enabled = self._cache[level] = False
                else:
                    is_enabled = self._cache[level] = (
                        level >= self.getEffectiveLevel()
                    )
            finally:
                _releaseLock()
            return is_enabled

def _merge_mappings(
    obj1: Mapping[_T_co, Union[_T, Omit]],
    obj2: Mapping[_T_co, Union[_T, Omit]],
) -> Dict[_T_co, _T]:
    merged = {**obj1, **obj2}
    return {key: value for key, value in merged.items() if not isinstance(value, Omit)}
"""

创建并返回一个httpx.Request对象,该对象包含了所有必要的请求信息。

如果请求头中包含multipart/form-data,则需要httpx处理表单数据,因此需要移除这个内容类型头

如果请求体是JSON数据,并且Content-Type是multipart/form-data,则需要将JSON数据转换为适合multipart格式的数据

cast 告诉类型检查器一个变量应该具有的类型,而不需要在运行时进行实际的类型转换

isEnabledFor 用于确定日志记录器是否应该处理和发出给定严重级别的日志消息

_merge_mappings 用于合并两个映射对象,合并时会优先考虑第二个映射对象中的值,并且会移除所有值为Omit的键值对

    def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]:
        items = self.qs.stringify_items(
            # TODO: type ignore is required as stringify_items is well typed but we can't be
            # well typed without heavy validation.
            data,  # type: ignore
            array_format="brackets",
        )
        serialized: dict[str, object] = {}
        for key, value in items:
            if key in serialized:
                raise ValueError(f"Duplicate key encountered: {key}; This behaviour is not supported")
            serialized[key] = value
        return serialized

"""
    def stringify_items(
        self,
        params: Params,
        *,
        array_format: NotGivenOr[ArrayFormat] = NOT_GIVEN,
        nested_format: NotGivenOr[NestedFormat] = NOT_GIVEN,
    ) -> list[tuple[str, str]]:
        opts = Options(
            qs=self,
            array_format=array_format,
            nested_format=nested_format,
        )
        return flatten([self._stringify_item(key, value, opts) for key, value in params.items()])
"""

将一个字典(data)中的键值对序列化为适用于multipart/form-data请求格式的字典

stringify_items 将一个Params对象中的键值对序列化为字符串,以便在URL查询字符串中使用

    def _process_response(
        self,
        *,
        cast_to: Type[ResponseT],
        options: FinalRequestOptions,
        response: httpx.Response,
        stream: bool,
        stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
    ) -> ResponseT:
        api_response = APIResponse(
            raw=response,
            client=self,
            cast_to=cast_to,
            stream=stream,
            stream_cls=stream_cls,
            options=options,
        )

        if response.request.headers.get(RAW_RESPONSE_HEADER) == "true":
            return cast(ResponseT, api_response)

        return api_response.parse()

"""
    def parse(self) -> R:
        if self._parsed is not None:
            return self._parsed

        parsed = self._parse()
        if is_given(self._options.post_parser):
            parsed = self._options.post_parser(parsed)

        self._parsed = parsed
        return parsed
"""

处理HTTP响应,并根据需要将响应转换为特定的类型

parse 如果响应数据已经被解析,直接返回已解析的数据,否则使用_parse方法解析响应数据。如果指定了post_parser选项,则允许用户在响应解析后应用自定义的处理逻辑。将解析后的数据存储在APIResponse对象中返回解析后的数据

    def _process_response_data(
        self,
        *,
        data: object,
        cast_to: type[ResponseT],
        response: httpx.Response,
    ) -> ResponseT:
        if data is None:
            return cast(ResponseT, None)

        if cast_to is UnknownResponse:
            return cast(ResponseT, data)

        try:
            if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol):
                return cast(ResponseT, cast_to.build(response=response, data=data))

            if self._strict_response_validation:
                return cast(ResponseT, validate_type(type_=cast_to, value=data))

            return cast(ResponseT, construct_type(type_=cast_to, value=data))
        except pydantic.ValidationError as err:
            raise APIResponseValidationError(response=response, body=data) from err

    def _should_stream_response_body(self, *, request: httpx.Request) -> bool:
        if request.headers.get(STREAMED_RAW_RESPONSE_HEADER) == "true":
            return True

        return False

_process_response_data:处理响应数据,并根据指定的类型cast_to将数据转换成相应的类型

如果cast_to是一个类,并且是ModelBuilderProtocol的子类,则调用build方法来构建一个模型实例
如果设置了self._strict_response_validation,则使用validate_type函数来验证数据是否符合cast_to类型的期望
如果cast_to是一个普通类,而不是模型类,则使用construct_type函数来构造一个实例

_should_stream_response_body:判断响应体是否应该以流式方式处理

# 定义一个属性,返回一个Querystring类的实例
@property
def qs(self) -> Querystring:
    return Querystring()

# 定义一个属性,返回一个httpx.Auth对象或None
@property
def custom_auth(self) -> httpx.Auth | None:
    return None

# 定义一个属性,返回一个包含认证相关HTTP头部的字典
@property
def auth_headers(self) -> dict[str, str]:
    return {}

# 定义一个属性,返回一个包含默认HTTP头部的字典
@property
def default_headers(self) -> dict[str, str | Omit]:
    return {
        "Accept": "application/json",
        "Content-Type": "application/json",
        "User-Agent": self.user_agent,
        **self.platform_headers(),
        **self.auth_headers,
        **self._custom_headers,
    }

# 定义一个方法,用于验证传递给请求的默认头部和自定义头部
def _validate_headers(
    self,
    headers: Headers,  # noqa: ARG002
    custom_headers: Headers,  # noqa: ARG002
) -> None:
    """Validate the given default headers and custom headers.

    Does nothing by default.
    """
    return

# 定义一个属性,返回一个字符串,包含用户代理信息
@property
def user_agent(self) -> str:
    return f"{self.__class__.__name__}/Python {self._version}"

# 定义一个属性,用于获取或设置请求的基本URL
@property
def base_url(self) -> URL:
    return self._base_url

# 定义一个设置器方法,用于设置请求的基本URL,并确保URL总是带有尾随的斜杠
@base_url.setter
def base_url(self, url: URL | str) -> None:
    self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url))

# 定义一个方法,返回与平台相关的HTTP头部信息
def platform_headers(self) -> Dict[str, str]:
    return platform_headers(self._version)

处理HTTP请求的头部信息,用户代理,以及请求的基本URL

    def _calculate_retry_timeout(
        self,
        remaining_retries: int,
        options: FinalRequestOptions,
        response_headers: Optional[httpx.Headers] = None,
    ) -> float:
        max_retries = options.get_max_retries(self.max_retries)
        try:
            # About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
            #
            # <http-date>". See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax for
            # details.
            if response_headers is not None:
                retry_header = response_headers.get("retry-after")
                try:
                    retry_after = float(retry_header)
                except Exception:
                    retry_date_tuple = email.utils.parsedate_tz(retry_header)
                    if retry_date_tuple is None:
                        retry_after = -1
                    else:
                        retry_date = email.utils.mktime_tz(retry_date_tuple)
                        retry_after = int(retry_date - time.time())
            else:
                retry_after = -1

        except Exception:
            retry_after = -1

        # If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
        if 0 < retry_after <= 60:
            return retry_after

        initial_retry_delay = 0.5
        max_retry_delay = 8.0
        nb_retries = max_retries - remaining_retries

        # Apply exponential backoff, but not more than the max.
        sleep_seconds = min(initial_retry_delay * pow(2.0, nb_retries), max_retry_delay)

        # Apply some jitter, plus-or-minus half a second.
        jitter = 1 - 0.25 * random()
        timeout = sleep_seconds * jitter
        return timeout if timeout >= 0 else 0

根据HTTP响应头部中的Retry-After信息或预定义的指数退避策略来计算在重试HTTP请求之前应该等待的时间

指数退避策略:将初始延迟乘以2的幂来实现的,幂次与剩余重试次数成正比。然后,它将这个值与最大延迟进行比较,取较小者

    def _should_retry(self, response: httpx.Response) -> bool:
        # Note: this is not a standard header
        should_retry_header = response.headers.get("x-should-retry")

        # If the server explicitly says whether or not to retry, obey.
        if should_retry_header == "true":
            return True
        if should_retry_header == "false":
            return False

        # Retry on request timeouts.
        if response.status_code == 408:
            return True

        # Retry on lock timeouts.
        if response.status_code == 409:
            return True

        # Retry on rate limits.
        if response.status_code == 429:
            return True

        # Retry internal errors.
        if response.status_code >= 500:
            return True

        return False

    def _idempotency_key(self) -> str:
        return f"stainless-python-retry-{uuid.uuid4()}"

_should_retry 决定是否重试

_idempotency_key 生成一个唯一标识符来保证重试请求的幂等性

SyncAPIClient

pass

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-16 23:44:02       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-16 23:44:02       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-16 23:44:02       87 阅读
  4. Python语言-面向对象

    2024-03-16 23:44:02       96 阅读

热门阅读

  1. latex入门笔记

    2024-03-16 23:44:02       36 阅读
  2. springboot项目jwt认证鉴权(企业级实现方案)

    2024-03-16 23:44:02       33 阅读
  3. react的高阶组件怎么用?

    2024-03-16 23:44:02       42 阅读
  4. 安卓UI面试题 51-55

    2024-03-16 23:44:02       42 阅读
  5. Acwing101 --- 最高的牛(差分)

    2024-03-16 23:44:02       39 阅读
  6. MATLAB中的数据类型

    2024-03-16 23:44:02       42 阅读
  7. Redis语法总结

    2024-03-16 23:44:02       59 阅读
  8. 【2024-03-16】蚂蚁金服春招实习笔试三道编程题解

    2024-03-16 23:44:02       57 阅读
  9. python-0008-修改django数据库为mysql

    2024-03-16 23:44:02       35 阅读
  10. libcurl test

    2024-03-16 23:44:02       43 阅读
  11. 【LeetCode】动态规划--题目练习

    2024-03-16 23:44:02       41 阅读