from __future__ import annotations import json from typing import Any, Mapping, Optional, Union from urllib.error import HTTPError from urllib.parse import urlencode, urljoin from urllib.request import Request, urlopen from .signer import OpenXSigner class SimpleResponse: """标准库 urllib fallback 返回的简单响应对象。 为了和 requests 的常用属性保持一致,这里提供 status_code、text、content、headers。 """ def __init__(self, status_code: int, content: bytes, headers: Mapping[str, str]) -> None: self.status_code = status_code self.content = content self.headers = dict(headers) self.text = content.decode("utf-8", errors="replace") def json(self) -> Any: """按 JSON 解析响应体。""" return json.loads(self.text) class OpenXClient: """带自动签名能力的 HTTP 客户端。 这是一个轻量封装。优先使用 requests;如果当前环境未安装 requests, 会自动使用 Python 标准库 urllib。每次 request/get/post 时会自动根据 params 和 body 生成 signature/timestamp/nonce/accessKey Header。 """ def __init__(self, base_url: str, access_key: str, secret_key: str) -> None: """初始化客户端。 Args: base_url: 接口根地址,例如 https://open-test.jsptax.com。 access_key: 接口分配的 accessKey。 secret_key: 接口分配的 secretKey,不会作为 Header 发送。 """ self.base_url = base_url.rstrip("/") + "/" self.signer = OpenXSigner(access_key, secret_key) def signed_headers( self, query: Optional[Mapping[str, Any]] = None, body: Any = None, timestamp: Optional[Union[str, int]] = None, nonce: Optional[str] = None, ) -> dict[str, str]: """只生成签名 Header,不发送 HTTP 请求。""" return self.signer.sign( query=query, body=body, timestamp=timestamp, nonce=nonce, ).headers def request( self, method: str, path: str, params: Optional[Mapping[str, Any]] = None, json_body: Any = None, data: Any = None, headers: Optional[Mapping[str, str]] = None, timeout: int = 30, ): """发送带签名的 HTTP 请求。 Args: method: HTTP 方法,例如 GET/POST。 path: 接口路径,例如 /ledger/multi-column。 params: URL Query 参数,这些参数会参与签名。 json_body: JSON 请求体。会按紧凑 JSON 序列化后参与签名并发送。 data: 原始请求体。适合用户已经准备好字符串或 bytes 的场景。 headers: 额外 Header,会和签名 Header 合并。 timeout: 请求超时时间,单位秒。 """ # 签名必须使用“最终实际发送”的 body 字节;JSON 场景先序列化再签名。 body = _encode_json(json_body) if json_body is not None else data signed_headers = self.signed_headers(query=params, body=body) request_headers = dict(headers or {}) request_headers.update(signed_headers) if json_body is not None: request_headers.setdefault("Content-Type", "application/json") url = urljoin(self.base_url, path.lstrip("/")) try: import requests return requests.request( method=method, url=url, params=params, data=body, headers=request_headers, timeout=timeout, ) except ModuleNotFoundError: return _urllib_request( method=method, url=url, params=params, body=body, headers=request_headers, timeout=timeout, ) def get(self, path: str, params: Optional[Mapping[str, Any]] = None, **kwargs): """发送 GET 请求。""" return self.request("GET", path, params=params, **kwargs) def post(self, path: str, params: Optional[Mapping[str, Any]] = None, **kwargs): """发送 POST 请求。""" return self.request("POST", path, params=params, **kwargs) def _encode_json(value: Any) -> bytes: """把 JSON 对象转成紧凑 UTF-8 字节,确保签名内容和发送内容一致。""" return json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8") def _urllib_request( method: str, url: str, params: Optional[Mapping[str, Any]], body: Any, headers: Mapping[str, str], timeout: int, ) -> SimpleResponse: """使用 Python 标准库发送请求,避免强依赖 requests。""" request_url = _append_query(url, params) body_bytes = _to_request_body(body) request = Request( url=request_url, data=body_bytes, headers=dict(headers), method=method.upper(), ) try: with urlopen(request, timeout=timeout) as response: return SimpleResponse( status_code=response.getcode(), content=response.read(), headers=response.headers, ) except HTTPError as error: return SimpleResponse( status_code=error.code, content=error.read(), headers=error.headers, ) def _append_query(url: str, params: Optional[Mapping[str, Any]]) -> str: if not params: return url separator = "&" if "?" in url else "?" return url + separator + urlencode(params, doseq=True) def _to_request_body(body: Any) -> Optional[bytes]: if body is None: return None if isinstance(body, bytes): return body if isinstance(body, bytearray): return bytes(body) if isinstance(body, str): return body.encode("utf-8") raise TypeError("data must be None, bytes, bytearray, or str")