Files
c4c-download/sap-c4c-AttachmentFolder.py
2026-03-19 11:01:37 +08:00

673 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SAP C4C 附件下载工具
功能:
1. 下载 ServiceRequest 级别附件
2. 下载 XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder)
3. 通过 Scrapling 爬虫下载 Salesforce 外部链接附件
4. 可选:将下载的附件上传到群晖 DSM
环境要求:
Python >= 3.8
安装依赖:
pip install requests scrapling[all] playwright
python -m playwright install chromium
用法:
# 下载附件
python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588
# 下载附件并上传到群晖
python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588 \\
--dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ
# JSON 模式(供 Java/其他程序调用)
python sap-c4c-AttachmentFolder.py --ticket 24588 --dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ --json
# 也可通过环境变量传入凭证
export C4C_TENANT=https://xxx.c4c.saphybriscloud.cn
export C4C_USERNAME=admin
export C4C_PASSWORD=xxx
export DSM_URL=http://10.0.10.235:5000
export DSM_USERNAME=PLM
export DSM_PASSWORD=123456
export DSM_PATH=/Newgonow/AU-SPFJ
python sap-c4c-AttachmentFolder.py --ticket 24588 --json
"""
import os
import sys
import json
import argparse
import mimetypes
import requests
import urllib3
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print_lock = Lock()
@dataclass
class Config:
tenant: str = ""
username: str = ""
password: str = ""
odata_c4c: str = ""
odata_cust: str = ""
output_dir: str = "downloads"
max_workers: int = 5
dsm_url: str = ""
dsm_user: str = ""
dsm_password: str = ""
dsm_path: str = ""
def init_endpoints(self):
base = self.tenant.rstrip("/")
self.odata_c4c = f"{base}/sap/c4c/odata/v1/c4codata"
self.odata_cust = f"{base}/sap/c4c/odata/cust/v1/custticketapi"
cfg = Config()
def get_session():
s = requests.Session()
s.auth = (cfg.username, cfg.password)
s.headers.update({"Accept": "application/json"})
return s
def find_service_request_object_id(session, ticket_id):
"""通过人类可读的 ticket ID 查找 OData ObjectID 和 SerialID"""
url = f"{cfg.odata_c4c}/ServiceRequestCollection"
params = {"$format": "json", "$filter": f"ID eq '{ticket_id}'"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
results = resp.json().get("d", {}).get("results", [])
if not results:
raise ValueError(f"未找到 ID={ticket_id} 的 ServiceRequest")
sr = results[0]
return sr["ObjectID"], sr.get("SerialID", "")
def _parse_attachments(results):
"""将 OData 返回的附件列表解析为统一格式"""
attachments = []
for row in results:
uuid = row.get("UUID")
file_name = row.get("Name") or f"{uuid}.bin"
mime_type = row.get("MimeType") or "application/octet-stream"
category = row.get("CategoryCode") # 2=File, 3=Link
link_uri = row.get("LinkWebURI")
if uuid:
attachments.append({
"UUID": uuid,
"ObjectID": row.get("ObjectID"),
"ParentObjectID": row.get("ParentObjectID"),
"FileName": file_name,
"MimeType": mime_type,
"CategoryCode": category,
"LinkWebURI": link_uri,
"DocumentLink": row.get("DocumentLink"),
"SizeInkB": row.get("SizeInkB"),
})
return attachments
def list_sr_attachments(session, sr_object_id):
"""获取 ServiceRequest 级别的附件(通过 c4codata 导航)"""
url = f"{cfg.odata_c4c}/ServiceRequestCollection('{sr_object_id}')/ServiceRequestAttachmentFolder"
params = {"$format": "json"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
results = resp.json().get("d", {}).get("results", [])
return _parse_attachments(results)
def list_issue_items(session, ticket_id):
"""获取 ServiceRequest 下的 XIssueItem 列表(通过 custticketapi"""
url = f"{cfg.odata_cust}/ServiceRequest_XIssueItem_SDKCollection"
params = {"$format": "json", "$filter": f"TicketID eq '{ticket_id}'"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.json().get("d", {}).get("results", [])
def get_issue_item_detail(session, object_id):
"""通过 ObjectID 获取 XIssueItem 详细信息,包括真实的 IssueID_SDK"""
url = f"{cfg.odata_cust}/ServiceRequest_XIssueItem_SDKCollection('{object_id}')"
params = {"$format": "json"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.json().get("d", {}).get("results", {})
def _fetch_attachment_folder(session, att_oid):
"""获取单个 AttachmentFolder 条目(供并发调用)"""
folder_url = (
f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentCollection('{att_oid}')"
f"/BO_XSRIssueItemAttachmentFolder"
)
resp = session.get(folder_url, params={"$format": "json"}, timeout=60)
resp.raise_for_status()
return resp.json().get("d", {}).get("results", [])
def list_issue_item_attachments(session, issue_item_uuid):
"""
获取 XIssueItem 级别的附件。
路径: BO_XSRIssueItemAttachmentCollection (按 XIssueItemUUID 过滤)
-> BO_XSRIssueItemAttachmentFolder (实际附件文件)
Step 2 并发请求各 AttachmentFolder。
"""
url = f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentCollection"
params = {"$format": "json", "$filter": f"XIssueItemUUID eq guid'{issue_item_uuid}'"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
att_results = resp.json().get("d", {}).get("results", [])
if not att_results:
return []
all_attachments = []
with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor:
futures = {
executor.submit(_fetch_attachment_folder, session, att["ObjectID"]): att
for att in att_results
}
for future in as_completed(futures):
try:
folders = future.result()
all_attachments.extend(_parse_attachments(folders))
except Exception as e:
with print_lock:
print(f" ⚠ 获取附件文件夹失败: {e}", file=sys.stderr)
return all_attachments
def download_file_via_odata(session, attachment, file_path, base_url=None):
"""通过 OData $value 流式下载文件,直接写入 file_path避免大文件 OOM"""
doc_link = attachment.get("DocumentLink")
if doc_link:
url = doc_link
else:
obj_id = attachment["ObjectID"]
if base_url is None:
base_url = f"{cfg.odata_c4c}/ServiceRequestAttachmentFolderCollection"
url = f"{base_url}('{obj_id}')/$value"
resp = session.get(url, timeout=300, stream=True)
resp.raise_for_status()
with open(file_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
def download_link_via_scrapling(link_url, save_name):
"""通过 Scrapling 打开外部链接页面,点击 .downloadbutton 按钮下载文件"""
from scrapling.fetchers import StealthyFetcher
result = {"saved": None, "error": None}
def click_download(page):
page.wait_for_selector("button.downloadbutton[title='Download']", timeout=15000)
with page.expect_download(timeout=120000) as download_info:
page.click("button.downloadbutton[title='Download']")
download = download_info.value
filename = download.suggested_filename or save_name
save_path = os.path.join(cfg.output_dir, filename)
download.save_as(save_path)
result["saved"] = save_path
try:
StealthyFetcher.fetch(
link_url,
headless=True,
network_idle=True,
timeout=60000,
page_action=click_download,
)
except Exception as e:
result["error"] = str(e)
return result
# ==================== 群晖 DSM 上传 ====================
def dsm_login():
"""登录群晖 DSM返回 SID"""
resp = requests.get(f"{cfg.dsm_url}/webapi/auth.cgi", params={
"api": "SYNO.API.Auth", "version": "3", "method": "login",
"account": cfg.dsm_user, "passwd": cfg.dsm_password,
"session": "FileStation", "format": "sid",
}, verify=False, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"DSM 登录失败: {data}")
return data["data"]["sid"]
def dsm_upload_file(sid, local_path, remote_path):
"""上传单个文件到群晖 DSM"""
filename = os.path.basename(local_path)
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
form = {
"api": "SYNO.FileStation.Upload",
"version": "2",
"method": "upload",
"path": remote_path,
"create_parents": "true",
"overwrite": "true",
}
with open(local_path, "rb") as f:
resp = requests.post(
f"{cfg.dsm_url}/webapi/entry.cgi",
data=form,
files={"file": (filename, f, mime)},
cookies={"id": sid},
verify=False,
timeout=600,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"DSM 上传失败: {data}")
return data
def dsm_upload_downloaded_files(downloaded_files, ticket_id, serial_id, json_mode=False):
"""将所有已下载文件并发上传到群晖 DSM按 ticket 和 issue 组织目录结构"""
if not cfg.dsm_url or not cfg.dsm_user or not cfg.dsm_password or not cfg.dsm_path:
return []
files_to_upload = [f for f in downloaded_files if f.get("savedPath") and not f.get("error")]
if not files_to_upload:
return []
folder_name = f"{ticket_id}_{serial_id}" if serial_id else ticket_id
if not json_mode:
print(f"\n{'='*60}")
print(f"上传到群晖 DSM: {cfg.dsm_url}")
print(f"目标路径: {cfg.dsm_path}/{folder_name}")
print('='*60)
try:
sid = dsm_login()
if not json_mode:
print(f" DSM 登录成功")
except Exception as e:
if not json_mode:
print(f" DSM 登录失败: {e}", file=sys.stderr)
return [{"error": f"DSM 登录失败: {e}"}]
def _upload_one(f):
local_path = f["savedPath"]
filename = os.path.basename(local_path)
issue_id = f.get("issueId", "")
remote_path = (
f"{cfg.dsm_path}/{folder_name}/{issue_id}"
if issue_id else
f"{cfg.dsm_path}/{folder_name}"
)
full_remote_path = f"{remote_path}/{filename}"
entry = {
"file": filename,
"ticketId": ticket_id,
"serialId": serial_id,
"issueId": issue_id,
"remotePath": full_remote_path,
}
try:
dsm_upload_file(sid, local_path, remote_path)
entry["success"] = True
if not json_mode:
with print_lock:
print(f" 上传成功: {filename} -> {full_remote_path}")
except Exception as e:
entry["success"] = False
entry["error"] = str(e)
if not json_mode:
with print_lock:
print(f" 上传失败: {filename}: {e}")
return entry
with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor:
upload_results = list(executor.map(_upload_one, files_to_upload))
if not json_mode:
ok = sum(1 for r in upload_results if r.get("success"))
fail = len(upload_results) - ok
print(f"\n 上传完成: {ok} 成功, {fail} 失败")
return upload_results
def print_attachment_summary(all_attachments):
"""打印附件清单汇总"""
print(f"\n{'='*60}")
print("附件清单汇总")
print('='*60)
if not all_attachments:
print(" 无附件")
return
total_files = sum(1 for _, atts in all_attachments for a in atts if a["CategoryCode"] == "2")
total_links = sum(1 for _, atts in all_attachments for a in atts if a["CategoryCode"] == "3")
print(f" 合计: {total_files} 个文件附件, {total_links} 个链接附件\n")
idx = 0
for level, atts in all_attachments:
if not atts:
continue
print(f" [{level}]")
for a in atts:
idx += 1
cat = "文件" if a["CategoryCode"] == "2" else "链接"
size_str = ""
if a.get("SizeInkB") and cat == "文件":
try:
kb = float(a["SizeInkB"])
size_str = f" ({kb/1024:.1f} MB)" if kb > 1024 else f" ({kb:.0f} KB)"
except ValueError:
pass
print(f" {idx}. [{cat}] {a['FileName']}{size_str}")
if a.get("MimeType"):
print(f" MIME: {a['MimeType']}")
if a.get("LinkWebURI"):
print(f" 链接: {a['LinkWebURI']}")
print()
def _download_single_file(session, att, label, issue_id, odata_url, json_mode):
"""下载单个文件附件,流式写入磁盘(用于多线程)"""
entry = {
"source": label, "issueId": issue_id,
"c4cName": att["FileName"], "type": "file", "mime": att.get("MimeType"),
}
file_path = os.path.join(cfg.output_dir, att["FileName"])
try:
download_file_via_odata(session, att, file_path, odata_url)
entry["savedPath"] = os.path.abspath(file_path)
entry["savedName"] = att["FileName"]
if not json_mode:
with print_lock:
print(f" ✓ saved: {file_path}")
except Exception as e:
entry["error"] = str(e)
if not json_mode:
with print_lock:
print(f" ✗ OData 下载失败 ({att['FileName']}): {e}")
return entry
def _download_single_link(link_att, label, issue_id, json_mode):
"""下载单个链接附件(用于多线程)"""
link_url = link_att.get("LinkWebURI")
entry = {
"source": label, "issueId": issue_id,
"c4cName": link_att["FileName"], "type": "link", "linkUrl": link_url,
}
if not link_url:
entry["error"] = "无链接地址"
return entry
if not json_mode:
with print_lock:
print(f" {link_att['FileName']}: {link_url}")
r = download_link_via_scrapling(link_url, link_att["FileName"])
if r["saved"]:
entry["savedPath"] = os.path.abspath(r["saved"])
entry["savedName"] = os.path.basename(r["saved"])
if not json_mode:
with print_lock:
print(f" ✓ saved: {r['saved']}")
else:
entry["error"] = r["error"]
if not json_mode:
with print_lock:
print(f" ✗ 下载失败: {r['error']}")
return entry
def _do_download(session, attachments, label, issue_id, odata_url, json_mode):
"""并发下载附件列表,返回下载结果列表"""
file_atts = [a for a in attachments if a["CategoryCode"] == "2"]
link_atts = [a for a in attachments if a["CategoryCode"] == "3"]
with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor:
futures = [
executor.submit(_download_single_file, session, att, label, issue_id, odata_url, json_mode)
for att in file_atts
] + [
executor.submit(_download_single_link, att, label, issue_id, json_mode)
for att in link_atts
]
downloaded_entries = []
for future in as_completed(futures):
try:
downloaded_entries.append(future.result())
except Exception as e:
if not json_mode:
with print_lock:
print(f" ✗ 下载任务异常: {e}")
return downloaded_entries
def _process_issue_item(session, item, list_only, json_mode):
"""处理单个 XIssueItemfetch detail → list attachments → download供并发调用"""
item_oid = item["ObjectID"]
issue_uuid = item.get("XIssueItemUUIDcontent_SDK", "")
issue_desc = (item.get("IssuesDescriptionX_SDK") or "")[:80]
issue_id = ""
try:
item_detail = get_issue_item_detail(session, item_oid)
issue_id = item_detail.get("IssueID_SDK", "")
except Exception as e:
with print_lock:
print(f" ⚠ 获取 IssueID 失败 ({item_oid}): {e}", file=sys.stderr)
if not json_mode:
with print_lock:
print(f"\n XIssueItem: {item_oid}")
if issue_id:
print(f" IssueID: {issue_id}")
print(f" UUID: {issue_uuid}")
print(f" 描述: {issue_desc}")
issue_entry = {
"objectId": item_oid,
"issueId": issue_id,
"uuid": issue_uuid,
"description": issue_desc,
"attachments": [],
}
downloaded_entries = []
if not issue_uuid:
if not json_mode:
with print_lock:
print(" ⚠ 无 XIssueItemUUID跳过")
return issue_entry, downloaded_entries
atts = list_issue_item_attachments(session, issue_uuid)
issue_entry["attachments"] = atts
if not json_mode:
with print_lock:
print(f" 找到 {len(atts)} 个附件")
if not list_only:
label = f"IssueItem-{issue_id}" if issue_id else f"IssueItem-{item_oid[:12]}"
downloaded_entries = _do_download(
session, atts, label, issue_id,
f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentFolderCollection",
json_mode,
)
return issue_entry, downloaded_entries
def run(ticket_id, output_dir, list_only=False, json_mode=False):
"""核心逻辑,返回结构化结果"""
cfg.output_dir = output_dir
os.makedirs(cfg.output_dir, exist_ok=True)
session = get_session()
result = {
"ticketId": ticket_id,
"outputDir": os.path.abspath(cfg.output_dir),
"success": True,
"error": None,
"srAttachments": [],
"issueItems": [],
"downloadedFiles": [],
}
try:
# 1) 通过 ticket ID 找到 ObjectID 和 SerialID
sr_object_id, serial_id = find_service_request_object_id(session, ticket_id)
result["srObjectId"] = sr_object_id
result["serialId"] = serial_id
if not json_mode:
print(f"ServiceRequest ID={ticket_id}, ObjectID={sr_object_id}, SerialID={serial_id}")
# 2) ServiceRequest 级别附件
if not json_mode:
print(f"\n{'='*60}")
print("ServiceRequest 级别附件")
print('='*60)
sr_attachments = list_sr_attachments(session, sr_object_id)
result["srAttachments"] = sr_attachments
if not json_mode:
print(f"找到 {len(sr_attachments)} 个附件")
if not list_only:
entries = _do_download(session, sr_attachments, "SR", "", None, json_mode)
result["downloadedFiles"].extend(entries)
# 3) XIssueItem 级别附件(并发处理每个 item
if not json_mode:
print(f"\n{'='*60}")
print("XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder)")
print('='*60)
issue_items = list_issue_items(session, ticket_id)
if not json_mode:
print(f"找到 {len(issue_items)} 个 XIssueItem")
with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor:
futures = [
executor.submit(_process_issue_item, session, item, list_only, json_mode)
for item in issue_items
]
for future in as_completed(futures):
try:
issue_entry, downloaded_entries = future.result()
result["issueItems"].append(issue_entry)
result["downloadedFiles"].extend(downloaded_entries)
except Exception as e:
if not json_mode:
print(f" ✗ XIssueItem 处理异常: {e}", file=sys.stderr)
# 4) 汇总清单
if not json_mode:
all_attachments = [("SR", sr_attachments)]
for ie in result["issueItems"]:
ie_label = (
f"IssueItem-{ie['issueId']}" if ie.get("issueId")
else f"IssueItem-{ie['objectId'][:12]}"
)
all_attachments.append((ie_label, ie["attachments"]))
print_attachment_summary(all_attachments)
except Exception as e:
result["success"] = False
result["error"] = str(e)
if not json_mode:
print(f"\n错误: {e}", file=sys.stderr)
return result
def main():
parser = argparse.ArgumentParser(description="SAP C4C 附件下载工具")
parser.add_argument("--tenant", default=os.environ.get("C4C_TENANT", ""),
help="C4C 租户地址 (如 https://xxx.c4c.saphybriscloud.cn),也可设 C4C_TENANT 环境变量")
parser.add_argument("--user", default=os.environ.get("C4C_USERNAME", ""),
help="C4C 用户名,也可设 C4C_USERNAME 环境变量")
parser.add_argument("--password", default=os.environ.get("C4C_PASSWORD", ""),
help="C4C 密码,也可设 C4C_PASSWORD 环境变量")
parser.add_argument("--ticket", required=True, help="ServiceRequest ticket ID (如 24588)")
parser.add_argument("--output-dir", default="downloads", help="附件保存目录 (默认: downloads)")
parser.add_argument("--json", action="store_true", dest="json_mode", help="JSON 输出模式(供程序调用)")
parser.add_argument("--list-only", action="store_true", help="仅列出附件清单,不下载")
parser.add_argument("--max-workers", type=int, default=5, help="并发线程数 (默认: 5)")
# 群晖 DSM 上传参数
parser.add_argument("--dsm-url", default=os.environ.get("DSM_URL", ""),
help="群晖 DSM 地址 (如 http://10.0.10.235:5000),也可设 DSM_URL 环境变量")
parser.add_argument("--dsm-user", default=os.environ.get("DSM_USERNAME", ""),
help="群晖 DSM 用户名,也可设 DSM_USERNAME 环境变量")
parser.add_argument("--dsm-password", default=os.environ.get("DSM_PASSWORD", ""),
help="群晖 DSM 密码,也可设 DSM_PASSWORD 环境变量")
parser.add_argument("--dsm-path", default=os.environ.get("DSM_PATH", ""),
help="群晖 DSM 目标路径 (如 /Newgonow/AU-SPFJ),也可设 DSM_PATH 环境变量")
args = parser.parse_args()
if not args.tenant or not args.user or not args.password:
parser.error("必须提供 --tenant, --user, --password 参数,或设置 C4C_TENANT, C4C_USERNAME, C4C_PASSWORD 环境变量")
# 初始化全局配置
cfg.tenant = args.tenant.rstrip("/")
cfg.username = args.user
cfg.password = args.password
cfg.max_workers = args.max_workers
cfg.dsm_url = args.dsm_url.rstrip("/") if args.dsm_url else ""
cfg.dsm_user = args.dsm_user
cfg.dsm_password = args.dsm_password
cfg.dsm_path = args.dsm_path
cfg.init_endpoints()
if not args.json_mode and not args.list_only:
print(f"并发线程数: {cfg.max_workers}")
result = run(args.ticket, args.output_dir, args.list_only, args.json_mode)
# 下载完成后上传到群晖 DSM
if cfg.dsm_url and not args.list_only and result["success"]:
serial_id = result.get("serialId", "")
upload_results = dsm_upload_downloaded_files(
result["downloadedFiles"], args.ticket, serial_id, args.json_mode
)
result["dsmUpload"] = upload_results
# 上传完成后清理本地下载文件
for f in result["downloadedFiles"]:
local_path = f.get("savedPath")
if local_path and os.path.exists(local_path):
try:
os.remove(local_path)
if not args.json_mode:
print(f" 已删除本地文件: {local_path}")
except OSError as e:
if not args.json_mode:
print(f" 删除失败: {local_path}: {e}")
if args.json_mode:
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["success"] else 1)
if __name__ == "__main__":
main()