From 13c7f62870fe12adbc6f284cf4520e8730461ff9 Mon Sep 17 00:00:00 2001 From: "Simple.C.Han" Date: Wed, 17 Jun 2026 10:23:52 +0800 Subject: [PATCH] init --- examples/multi_column.py | 49 +++++++++++ examples/sign_only.py | 30 +++++++ openx_sdk/__init__.py | 9 ++ openx_sdk/client.py | 185 +++++++++++++++++++++++++++++++++++++++ openx_sdk/signer.py | 181 ++++++++++++++++++++++++++++++++++++++ pyproject.toml | 11 +++ 6 files changed, 465 insertions(+) create mode 100644 examples/multi_column.py create mode 100644 examples/sign_only.py create mode 100644 openx_sdk/__init__.py create mode 100644 openx_sdk/client.py create mode 100644 openx_sdk/signer.py create mode 100644 pyproject.toml diff --git a/examples/multi_column.py b/examples/multi_column.py new file mode 100644 index 0000000..410c2f4 --- /dev/null +++ b/examples/multi_column.py @@ -0,0 +1,49 @@ +from pathlib import Path +import sys + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from openx_sdk import OpenXClient + + +# 正式使用时建议从环境变量或配置文件读取,不要硬编码在业务代码中。 +ACCESS_KEY = "" +SECRET_KEY = "" + +BASE_URL = "https://open.jsptax.com" +PATH = "/invoice/output/list" +QUERY = { + "comId": "100342309", + "period": "202604", + "invoiceType": [ + "1" + ], + "pageNum": "1", + "pageSize": "10", + "taxRate":1, + "autoLabel": [ + "0" + ] +} + +BODY = { + "comId": 101379804, + "period": "202605", + "taxRate": 1, + "invoiceType": [ + ], + "pageNum":1, + "pageSize":10 +} + + +def call_api() -> None: + client = OpenXClient(BASE_URL, ACCESS_KEY, SECRET_KEY) + # response = client.get(PATH, params=QUERY) + response = client.post(PATH, json_body=BODY) + print("status:", response.status_code) + print(response.text) + + +if __name__ == "__main__": + call_api() diff --git a/examples/sign_only.py b/examples/sign_only.py new file mode 100644 index 0000000..04a96b6 --- /dev/null +++ b/examples/sign_only.py @@ -0,0 +1,30 @@ +from pathlib import Path +import sys + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from openx_sdk import OpenXSigner + + +# 只演示签名生成,不依赖 requests,也不会发送网络请求。 +signer = OpenXSigner( + access_key="", + secret_key="", +) + +# 固定 timestamp 和 nonce 时,输出应与 curl 示例中的 signature 完全一致。 +signed = signer.sign( + query={ + "comId": "100787681", + "accountId": "861533101421002919", + "startDate": "2025-07-01", + "endDate": "2025-07-30", + "pageNum": "1", + "pageSize": "20" + }, + timestamp="1780889319", + nonce="9fb5235a-175c-4d24-adde-aca39ad1e7bc", +) + +print(signed.signature) +print(signed.headers) diff --git a/openx_sdk/__init__.py b/openx_sdk/__init__.py new file mode 100644 index 0000000..8ba7a30 --- /dev/null +++ b/openx_sdk/__init__.py @@ -0,0 +1,9 @@ +from .client import OpenXClient +from .signer import OpenXSigner, SignResult, generate_signature + +__all__ = [ + "OpenXClient", + "OpenXSigner", + "SignResult", + "generate_signature", +] diff --git a/openx_sdk/client.py b/openx_sdk/client.py new file mode 100644 index 0000000..d117f02 --- /dev/null +++ b/openx_sdk/client.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import json +from typing import Any, Mapping, Optional, Union +from urllib.error import HTTPError +from urllib.parse import urlencode, urljoin +from urllib.request import Request, urlopen + +from .signer import OpenXSigner + + +class SimpleResponse: + """标准库 urllib fallback 返回的简单响应对象。 + + 为了和 requests 的常用属性保持一致,这里提供 status_code、text、content、headers。 + """ + + def __init__(self, status_code: int, content: bytes, headers: Mapping[str, str]) -> None: + self.status_code = status_code + self.content = content + self.headers = dict(headers) + self.text = content.decode("utf-8", errors="replace") + + def json(self) -> Any: + """按 JSON 解析响应体。""" + + return json.loads(self.text) + + +class OpenXClient: + """带自动签名能力的 HTTP 客户端。 + + 这是一个轻量封装。优先使用 requests;如果当前环境未安装 requests, + 会自动使用 Python 标准库 urllib。每次 request/get/post 时会自动根据 + params 和 body 生成 signature/timestamp/nonce/accessKey Header。 + """ + + def __init__(self, base_url: str, access_key: str, secret_key: str) -> None: + """初始化客户端。 + + Args: + base_url: 接口根地址,例如 https://open-test.jsptax.com。 + access_key: 接口分配的 accessKey。 + secret_key: 接口分配的 secretKey,不会作为 Header 发送。 + """ + + self.base_url = base_url.rstrip("/") + "/" + self.signer = OpenXSigner(access_key, secret_key) + + def signed_headers( + self, + query: Optional[Mapping[str, Any]] = None, + body: Any = None, + timestamp: Optional[Union[str, int]] = None, + nonce: Optional[str] = None, + ) -> dict[str, str]: + """只生成签名 Header,不发送 HTTP 请求。""" + + return self.signer.sign( + query=query, + body=body, + timestamp=timestamp, + nonce=nonce, + ).headers + + def request( + self, + method: str, + path: str, + params: Optional[Mapping[str, Any]] = None, + json_body: Any = None, + data: Any = None, + headers: Optional[Mapping[str, str]] = None, + timeout: int = 30, + ): + """发送带签名的 HTTP 请求。 + + Args: + method: HTTP 方法,例如 GET/POST。 + path: 接口路径,例如 /ledger/multi-column。 + params: URL Query 参数,这些参数会参与签名。 + json_body: JSON 请求体。会按紧凑 JSON 序列化后参与签名并发送。 + data: 原始请求体。适合用户已经准备好字符串或 bytes 的场景。 + headers: 额外 Header,会和签名 Header 合并。 + timeout: 请求超时时间,单位秒。 + """ + + # 签名必须使用“最终实际发送”的 body 字节;JSON 场景先序列化再签名。 + body = _encode_json(json_body) if json_body is not None else data + signed_headers = self.signed_headers(query=params, body=body) + request_headers = dict(headers or {}) + request_headers.update(signed_headers) + + if json_body is not None: + request_headers.setdefault("Content-Type", "application/json") + + url = urljoin(self.base_url, path.lstrip("/")) + try: + import requests + + return requests.request( + method=method, + url=url, + params=params, + data=body, + headers=request_headers, + timeout=timeout, + ) + except ModuleNotFoundError: + return _urllib_request( + method=method, + url=url, + params=params, + body=body, + headers=request_headers, + timeout=timeout, + ) + + def get(self, path: str, params: Optional[Mapping[str, Any]] = None, **kwargs): + """发送 GET 请求。""" + return self.request("GET", path, params=params, **kwargs) + + def post(self, path: str, params: Optional[Mapping[str, Any]] = None, **kwargs): + """发送 POST 请求。""" + return self.request("POST", path, params=params, **kwargs) + + +def _encode_json(value: Any) -> bytes: + """把 JSON 对象转成紧凑 UTF-8 字节,确保签名内容和发送内容一致。""" + + return json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8") + + +def _urllib_request( + method: str, + url: str, + params: Optional[Mapping[str, Any]], + body: Any, + headers: Mapping[str, str], + timeout: int, +) -> SimpleResponse: + """使用 Python 标准库发送请求,避免强依赖 requests。""" + + request_url = _append_query(url, params) + body_bytes = _to_request_body(body) + request = Request( + url=request_url, + data=body_bytes, + headers=dict(headers), + method=method.upper(), + ) + + try: + with urlopen(request, timeout=timeout) as response: + return SimpleResponse( + status_code=response.getcode(), + content=response.read(), + headers=response.headers, + ) + except HTTPError as error: + return SimpleResponse( + status_code=error.code, + content=error.read(), + headers=error.headers, + ) + + +def _append_query(url: str, params: Optional[Mapping[str, Any]]) -> str: + if not params: + return url + + separator = "&" if "?" in url else "?" + return url + separator + urlencode(params, doseq=True) + + +def _to_request_body(body: Any) -> Optional[bytes]: + if body is None: + return None + if isinstance(body, bytes): + return body + if isinstance(body, bytearray): + return bytes(body) + if isinstance(body, str): + return body.encode("utf-8") + raise TypeError("data must be None, bytes, bytearray, or str") diff --git a/openx_sdk/signer.py b/openx_sdk/signer.py new file mode 100644 index 0000000..a3fc378 --- /dev/null +++ b/openx_sdk/signer.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from dataclasses import dataclass +from hashlib import sha256 +import hmac +import time +from typing import Any, Mapping, Optional, Union +from uuid import uuid4 + + +QueryValue = Any + + +@dataclass(frozen=True) +class SignResult: + """签名结果。 + + signature/timestamp/nonce/accessKey 需要放到请求 Header 中。 + canonical_string 是实际参与 HMAC 计算的原文,主要用于排查验签失败问题。 + """ + + signature: str + timestamp: str + nonce: str + access_key: str + canonical_string: str + + @property + def headers(self) -> dict[str, str]: + """返回接口要求的鉴权 Header。""" + return { + "signature": self.signature, + "timestamp": self.timestamp, + "nonce": self.nonce, + "accessKey": self.access_key, + } + + +class OpenXSigner: + """Open API 签名器。 + + 该实现严格对齐 Go 源码 `openx.Sign`: + - Header 名是 `accessKey`,但参与签名的参数名是 `ak` + - body 先做 SHA256,再以 `body=` 参与签名 + - 参数按 key 升序排序后直接拼接,不加 `&`,不做 URL 编码 + - 使用 secret_key 字符串的 UTF-8 字节做 HMAC-SHA256 key + """ + + def __init__(self, access_key: str, secret_key: str) -> None: + if not access_key: + raise ValueError("access_key is required") + if not secret_key: + raise ValueError("secret_key is required") + + self.access_key = access_key + self.secret_key = secret_key + + def sign( + self, + query: Optional[Mapping[str, QueryValue]] = None, + body: Any = None, + timestamp: Optional[Union[str, int]] = None, + nonce: Optional[str] = None, + ) -> SignResult: + """生成签名。 + + Args: + query: URL 查询参数,例如 {"comId": "100841795"}。 + 如果某个值是 list/tuple,会像 Go 的 url.Values 一样取第一个值。 + body: 请求体。GET 请求可不传;POST JSON 建议传最终发送的 JSON 字符串或 bytes。 + timestamp: 秒级时间戳。不传时自动使用当前时间。 + nonce: 随机字符串。不传时自动生成 UUID。 + """ + + timestamp_text = str(timestamp if timestamp is not None else int(time.time())) + nonce_text = nonce or str(uuid4()) + + canonical = build_canonical_string( + access_key=self.access_key, + query=query, + body=body, + timestamp=timestamp_text, + nonce=nonce_text, + ) + + # Go 源码使用 hmac.New(sha256.New, []byte(sk)),输出 hex.EncodeToString。 + signature = hmac.new( + self.secret_key.encode("utf-8"), + canonical.encode("utf-8"), + sha256, + ).hexdigest() + + return SignResult( + signature=signature, + timestamp=timestamp_text, + nonce=nonce_text, + access_key=self.access_key, + canonical_string=canonical, + ) + + +def generate_signature( + access_key: str, + secret_key: str, + query: Optional[Mapping[str, QueryValue]] = None, + body: Any = None, + timestamp: Optional[Union[str, int]] = None, + nonce: Optional[str] = None, +) -> SignResult: + """便捷函数:无需手动创建 OpenXSigner,也可以直接生成签名。""" + return OpenXSigner(access_key, secret_key).sign( + query=query, + body=body, + timestamp=timestamp, + nonce=nonce, + ) + + +def build_canonical_string( + access_key: str, + query: Optional[Mapping[str, QueryValue]], + body: Any, + timestamp: str, + nonce: str, +) -> str: + """构建签名原文。 + + Go 源码中的拼接方式是 `key=valuekey=value`,中间没有 `&`。 + 例如:`ak=xxxbody=yyycomId=100...nonce=...timestamp=...` + """ + + params = { + "ak": access_key, + "timestamp": timestamp, + "nonce": nonce, + "body": sign_body(body), + } + params.update(_normalize_query(query)) + + # sort.Strings(keys) 的 Python 等价写法:sorted(params)。 + return "".join(f"{key}={params[key]}" for key in sorted(params)) + + +def sign_body(body: Any) -> str: + """计算 body 的 SHA256 hex。 + + Go 里 nil body 会当作空字节切片处理,因此这里 None 等价于 b""。 + """ + + return sha256(_to_body_bytes(body)).hexdigest() + + +def _normalize_query(query: Optional[Mapping[str, QueryValue]]) -> dict[str, str]: + """把 query 参数转换成 Go `map[string]string` 等价结构。""" + + if not query: + return {} + + result: dict[str, str] = {} + for key, value in query.items(): + # Go 源码对 url.Values 使用 v[0],所以多值参数只取第一个值。 + if isinstance(value, (list, tuple)): + first_value = value[0] if value else "" + else: + first_value = value + result[str(key)] = "" if first_value is None else str(first_value) + return result + + +def _to_body_bytes(body: Any) -> bytes: + """把用户传入的 body 转成参与 SHA256 的字节。""" + + if body is None: + return b"" + if isinstance(body, bytes): + return body + if isinstance(body, bytearray): + return bytes(body) + if isinstance(body, str): + return body.encode("utf-8") + raise TypeError("body must be None, bytes, bytearray, or str") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d7adf5a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "jsp-open-api-signature" +version = "0.1.0" +description = "Python SDK for JSP Open API signature generation" +requires-python = ">=3.9" +dependencies = [ + "requests>=2.31.0", +] + +[tool.setuptools.packages.find] +where = ["."]