jsp-open-api-signature/openx_sdk/signer.py
Simple.C.Han 13c7f62870 init
2026-06-17 10:23:52 +08:00

182 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from hashlib import sha256
import hmac
import time
from typing import Any, Mapping, Optional, Union
from uuid import uuid4
QueryValue = Any
@dataclass(frozen=True)
class SignResult:
"""签名结果。
signature/timestamp/nonce/accessKey 需要放到请求 Header 中。
canonical_string 是实际参与 HMAC 计算的原文,主要用于排查验签失败问题。
"""
signature: str
timestamp: str
nonce: str
access_key: str
canonical_string: str
@property
def headers(self) -> dict[str, str]:
"""返回接口要求的鉴权 Header。"""
return {
"signature": self.signature,
"timestamp": self.timestamp,
"nonce": self.nonce,
"accessKey": self.access_key,
}
class OpenXSigner:
"""Open API 签名器。
该实现严格对齐 Go 源码 `openx.Sign`
- Header 名是 `accessKey`,但参与签名的参数名是 `ak`
- body 先做 SHA256再以 `body=<hash>` 参与签名
- 参数按 key 升序排序后直接拼接,不加 `&`,不做 URL 编码
- 使用 secret_key 字符串的 UTF-8 字节做 HMAC-SHA256 key
"""
def __init__(self, access_key: str, secret_key: str) -> None:
if not access_key:
raise ValueError("access_key is required")
if not secret_key:
raise ValueError("secret_key is required")
self.access_key = access_key
self.secret_key = secret_key
def sign(
self,
query: Optional[Mapping[str, QueryValue]] = None,
body: Any = None,
timestamp: Optional[Union[str, int]] = None,
nonce: Optional[str] = None,
) -> SignResult:
"""生成签名。
Args:
query: URL 查询参数,例如 {"comId": "100841795"}。
如果某个值是 list/tuple会像 Go 的 url.Values 一样取第一个值。
body: 请求体。GET 请求可不传POST JSON 建议传最终发送的 JSON 字符串或 bytes。
timestamp: 秒级时间戳。不传时自动使用当前时间。
nonce: 随机字符串。不传时自动生成 UUID。
"""
timestamp_text = str(timestamp if timestamp is not None else int(time.time()))
nonce_text = nonce or str(uuid4())
canonical = build_canonical_string(
access_key=self.access_key,
query=query,
body=body,
timestamp=timestamp_text,
nonce=nonce_text,
)
# Go 源码使用 hmac.New(sha256.New, []byte(sk)),输出 hex.EncodeToString。
signature = hmac.new(
self.secret_key.encode("utf-8"),
canonical.encode("utf-8"),
sha256,
).hexdigest()
return SignResult(
signature=signature,
timestamp=timestamp_text,
nonce=nonce_text,
access_key=self.access_key,
canonical_string=canonical,
)
def generate_signature(
access_key: str,
secret_key: str,
query: Optional[Mapping[str, QueryValue]] = None,
body: Any = None,
timestamp: Optional[Union[str, int]] = None,
nonce: Optional[str] = None,
) -> SignResult:
"""便捷函数:无需手动创建 OpenXSigner也可以直接生成签名。"""
return OpenXSigner(access_key, secret_key).sign(
query=query,
body=body,
timestamp=timestamp,
nonce=nonce,
)
def build_canonical_string(
access_key: str,
query: Optional[Mapping[str, QueryValue]],
body: Any,
timestamp: str,
nonce: str,
) -> str:
"""构建签名原文。
Go 源码中的拼接方式是 `key=valuekey=value`,中间没有 `&`。
例如:`ak=xxxbody=yyycomId=100...nonce=...timestamp=...`
"""
params = {
"ak": access_key,
"timestamp": timestamp,
"nonce": nonce,
"body": sign_body(body),
}
params.update(_normalize_query(query))
# sort.Strings(keys) 的 Python 等价写法sorted(params)。
return "".join(f"{key}={params[key]}" for key in sorted(params))
def sign_body(body: Any) -> str:
"""计算 body 的 SHA256 hex。
Go 里 nil body 会当作空字节切片处理,因此这里 None 等价于 b""
"""
return sha256(_to_body_bytes(body)).hexdigest()
def _normalize_query(query: Optional[Mapping[str, QueryValue]]) -> dict[str, str]:
"""把 query 参数转换成 Go `map[string]string` 等价结构。"""
if not query:
return {}
result: dict[str, str] = {}
for key, value in query.items():
# Go 源码对 url.Values 使用 v[0],所以多值参数只取第一个值。
if isinstance(value, (list, tuple)):
first_value = value[0] if value else ""
else:
first_value = value
result[str(key)] = "" if first_value is None else str(first_value)
return result
def _to_body_bytes(body: Any) -> bytes:
"""把用户传入的 body 转成参与 SHA256 的字节。"""
if body is None:
return b""
if isinstance(body, bytes):
return body
if isinstance(body, bytearray):
return bytes(body)
if isinstance(body, str):
return body.encode("utf-8")
raise TypeError("body must be None, bytes, bytearray, or str")