182 lines
5.3 KiB
Python
182 lines
5.3 KiB
Python
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from hashlib import sha256
|
||
import hmac
|
||
import time
|
||
from typing import Any, Mapping, Optional, Union
|
||
from uuid import uuid4
|
||
|
||
|
||
QueryValue = Any
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SignResult:
|
||
"""签名结果。
|
||
|
||
signature/timestamp/nonce/accessKey 需要放到请求 Header 中。
|
||
canonical_string 是实际参与 HMAC 计算的原文,主要用于排查验签失败问题。
|
||
"""
|
||
|
||
signature: str
|
||
timestamp: str
|
||
nonce: str
|
||
access_key: str
|
||
canonical_string: str
|
||
|
||
@property
|
||
def headers(self) -> dict[str, str]:
|
||
"""返回接口要求的鉴权 Header。"""
|
||
return {
|
||
"signature": self.signature,
|
||
"timestamp": self.timestamp,
|
||
"nonce": self.nonce,
|
||
"accessKey": self.access_key,
|
||
}
|
||
|
||
|
||
class OpenXSigner:
|
||
"""Open API 签名器。
|
||
|
||
该实现严格对齐 Go 源码 `openx.Sign`:
|
||
- Header 名是 `accessKey`,但参与签名的参数名是 `ak`
|
||
- body 先做 SHA256,再以 `body=<hash>` 参与签名
|
||
- 参数按 key 升序排序后直接拼接,不加 `&`,不做 URL 编码
|
||
- 使用 secret_key 字符串的 UTF-8 字节做 HMAC-SHA256 key
|
||
"""
|
||
|
||
def __init__(self, access_key: str, secret_key: str) -> None:
|
||
if not access_key:
|
||
raise ValueError("access_key is required")
|
||
if not secret_key:
|
||
raise ValueError("secret_key is required")
|
||
|
||
self.access_key = access_key
|
||
self.secret_key = secret_key
|
||
|
||
def sign(
|
||
self,
|
||
query: Optional[Mapping[str, QueryValue]] = None,
|
||
body: Any = None,
|
||
timestamp: Optional[Union[str, int]] = None,
|
||
nonce: Optional[str] = None,
|
||
) -> SignResult:
|
||
"""生成签名。
|
||
|
||
Args:
|
||
query: URL 查询参数,例如 {"comId": "100841795"}。
|
||
如果某个值是 list/tuple,会像 Go 的 url.Values 一样取第一个值。
|
||
body: 请求体。GET 请求可不传;POST JSON 建议传最终发送的 JSON 字符串或 bytes。
|
||
timestamp: 秒级时间戳。不传时自动使用当前时间。
|
||
nonce: 随机字符串。不传时自动生成 UUID。
|
||
"""
|
||
|
||
timestamp_text = str(timestamp if timestamp is not None else int(time.time()))
|
||
nonce_text = nonce or str(uuid4())
|
||
|
||
canonical = build_canonical_string(
|
||
access_key=self.access_key,
|
||
query=query,
|
||
body=body,
|
||
timestamp=timestamp_text,
|
||
nonce=nonce_text,
|
||
)
|
||
|
||
# Go 源码使用 hmac.New(sha256.New, []byte(sk)),输出 hex.EncodeToString。
|
||
signature = hmac.new(
|
||
self.secret_key.encode("utf-8"),
|
||
canonical.encode("utf-8"),
|
||
sha256,
|
||
).hexdigest()
|
||
|
||
return SignResult(
|
||
signature=signature,
|
||
timestamp=timestamp_text,
|
||
nonce=nonce_text,
|
||
access_key=self.access_key,
|
||
canonical_string=canonical,
|
||
)
|
||
|
||
|
||
def generate_signature(
|
||
access_key: str,
|
||
secret_key: str,
|
||
query: Optional[Mapping[str, QueryValue]] = None,
|
||
body: Any = None,
|
||
timestamp: Optional[Union[str, int]] = None,
|
||
nonce: Optional[str] = None,
|
||
) -> SignResult:
|
||
"""便捷函数:无需手动创建 OpenXSigner,也可以直接生成签名。"""
|
||
return OpenXSigner(access_key, secret_key).sign(
|
||
query=query,
|
||
body=body,
|
||
timestamp=timestamp,
|
||
nonce=nonce,
|
||
)
|
||
|
||
|
||
def build_canonical_string(
|
||
access_key: str,
|
||
query: Optional[Mapping[str, QueryValue]],
|
||
body: Any,
|
||
timestamp: str,
|
||
nonce: str,
|
||
) -> str:
|
||
"""构建签名原文。
|
||
|
||
Go 源码中的拼接方式是 `key=valuekey=value`,中间没有 `&`。
|
||
例如:`ak=xxxbody=yyycomId=100...nonce=...timestamp=...`
|
||
"""
|
||
|
||
params = {
|
||
"ak": access_key,
|
||
"timestamp": timestamp,
|
||
"nonce": nonce,
|
||
"body": sign_body(body),
|
||
}
|
||
params.update(_normalize_query(query))
|
||
|
||
# sort.Strings(keys) 的 Python 等价写法:sorted(params)。
|
||
return "".join(f"{key}={params[key]}" for key in sorted(params))
|
||
|
||
|
||
def sign_body(body: Any) -> str:
|
||
"""计算 body 的 SHA256 hex。
|
||
|
||
Go 里 nil body 会当作空字节切片处理,因此这里 None 等价于 b""。
|
||
"""
|
||
|
||
return sha256(_to_body_bytes(body)).hexdigest()
|
||
|
||
|
||
def _normalize_query(query: Optional[Mapping[str, QueryValue]]) -> dict[str, str]:
|
||
"""把 query 参数转换成 Go `map[string]string` 等价结构。"""
|
||
|
||
if not query:
|
||
return {}
|
||
|
||
result: dict[str, str] = {}
|
||
for key, value in query.items():
|
||
# Go 源码对 url.Values 使用 v[0],所以多值参数只取第一个值。
|
||
if isinstance(value, (list, tuple)):
|
||
first_value = value[0] if value else ""
|
||
else:
|
||
first_value = value
|
||
result[str(key)] = "" if first_value is None else str(first_value)
|
||
return result
|
||
|
||
|
||
def _to_body_bytes(body: Any) -> bytes:
|
||
"""把用户传入的 body 转成参与 SHA256 的字节。"""
|
||
|
||
if body is None:
|
||
return b""
|
||
if isinstance(body, bytes):
|
||
return body
|
||
if isinstance(body, bytearray):
|
||
return bytes(body)
|
||
if isinstance(body, str):
|
||
return body.encode("utf-8")
|
||
raise TypeError("body must be None, bytes, bytearray, or str")
|