from __future__ import annotations from dataclasses import dataclass from hashlib import sha256 import hmac import time from typing import Any, Mapping, Optional, Union from uuid import uuid4 QueryValue = Any @dataclass(frozen=True) class SignResult: """签名结果。 signature/timestamp/nonce/accessKey 需要放到请求 Header 中。 canonical_string 是实际参与 HMAC 计算的原文,主要用于排查验签失败问题。 """ signature: str timestamp: str nonce: str access_key: str canonical_string: str @property def headers(self) -> dict[str, str]: """返回接口要求的鉴权 Header。""" return { "signature": self.signature, "timestamp": self.timestamp, "nonce": self.nonce, "accessKey": self.access_key, } class OpenXSigner: """Open API 签名器。 该实现严格对齐 Go 源码 `openx.Sign`: - Header 名是 `accessKey`,但参与签名的参数名是 `ak` - body 先做 SHA256,再以 `body=` 参与签名 - 参数按 key 升序排序后直接拼接,不加 `&`,不做 URL 编码 - 使用 secret_key 字符串的 UTF-8 字节做 HMAC-SHA256 key """ def __init__(self, access_key: str, secret_key: str) -> None: if not access_key: raise ValueError("access_key is required") if not secret_key: raise ValueError("secret_key is required") self.access_key = access_key self.secret_key = secret_key def sign( self, query: Optional[Mapping[str, QueryValue]] = None, body: Any = None, timestamp: Optional[Union[str, int]] = None, nonce: Optional[str] = None, ) -> SignResult: """生成签名。 Args: query: URL 查询参数,例如 {"comId": "100841795"}。 如果某个值是 list/tuple,会像 Go 的 url.Values 一样取第一个值。 body: 请求体。GET 请求可不传;POST JSON 建议传最终发送的 JSON 字符串或 bytes。 timestamp: 秒级时间戳。不传时自动使用当前时间。 nonce: 随机字符串。不传时自动生成 UUID。 """ timestamp_text = str(timestamp if timestamp is not None else int(time.time())) nonce_text = nonce or str(uuid4()) canonical = build_canonical_string( access_key=self.access_key, query=query, body=body, timestamp=timestamp_text, nonce=nonce_text, ) # Go 源码使用 hmac.New(sha256.New, []byte(sk)),输出 hex.EncodeToString。 signature = hmac.new( self.secret_key.encode("utf-8"), canonical.encode("utf-8"), sha256, ).hexdigest() return SignResult( signature=signature, timestamp=timestamp_text, nonce=nonce_text, access_key=self.access_key, canonical_string=canonical, ) def generate_signature( access_key: str, secret_key: str, query: Optional[Mapping[str, QueryValue]] = None, body: Any = None, timestamp: Optional[Union[str, int]] = None, nonce: Optional[str] = None, ) -> SignResult: """便捷函数:无需手动创建 OpenXSigner,也可以直接生成签名。""" return OpenXSigner(access_key, secret_key).sign( query=query, body=body, timestamp=timestamp, nonce=nonce, ) def build_canonical_string( access_key: str, query: Optional[Mapping[str, QueryValue]], body: Any, timestamp: str, nonce: str, ) -> str: """构建签名原文。 Go 源码中的拼接方式是 `key=valuekey=value`,中间没有 `&`。 例如:`ak=xxxbody=yyycomId=100...nonce=...timestamp=...` """ params = { "ak": access_key, "timestamp": timestamp, "nonce": nonce, "body": sign_body(body), } params.update(_normalize_query(query)) # sort.Strings(keys) 的 Python 等价写法:sorted(params)。 return "".join(f"{key}={params[key]}" for key in sorted(params)) def sign_body(body: Any) -> str: """计算 body 的 SHA256 hex。 Go 里 nil body 会当作空字节切片处理,因此这里 None 等价于 b""。 """ return sha256(_to_body_bytes(body)).hexdigest() def _normalize_query(query: Optional[Mapping[str, QueryValue]]) -> dict[str, str]: """把 query 参数转换成 Go `map[string]string` 等价结构。""" if not query: return {} result: dict[str, str] = {} for key, value in query.items(): # Go 源码对 url.Values 使用 v[0],所以多值参数只取第一个值。 if isinstance(value, (list, tuple)): first_value = value[0] if value else "" else: first_value = value result[str(key)] = "" if first_value is None else str(first_value) return result def _to_body_bytes(body: Any) -> bytes: """把用户传入的 body 转成参与 SHA256 的字节。""" if body is None: return b"" if isinstance(body, bytes): return body if isinstance(body, bytearray): return bytes(body) if isinstance(body, str): return body.encode("utf-8") raise TypeError("body must be None, bytes, bytearray, or str")