"""
SAP C4C 附件下载工具
功能:
1. 下载 ServiceRequest 级别附件
2. 下载 XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder)
3. 通过 Scrapling 爬虫下载 Salesforce 外部链接附件
4. 可选:将下载的附件上传到群晖 DSM
环境要求:
Python >= 3.8
安装依赖:
pip install requests scrapling[all] playwright
python -m playwright install chromium
用法:
# 下载附件
python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588
# 下载附件并上传到群晖
python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588 \
--dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ
# JSON 模式(供 Java/其他程序调用)
python sap-c4c-AttachmentFolder.py --ticket 24588 --dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ --json
# 也可通过环境变量传入凭证
export C4C_TENANT=https://xxx.c4c.saphybriscloud.cn
export C4C_USERNAME=admin
export C4C_PASSWORD=xxx
export DSM_URL=http://10.0.10.235:5000
export DSM_USERNAME=PLM
export DSM_PASSWORD=123456
export DSM_PATH=/Newgonow/AU-SPFJ
python sap-c4c-AttachmentFolder.py --ticket 24588 --json
"""
import os
import sys
import json
import argparse
import base64
import mimetypes
import requests
import urllib3
import xml.etree.ElementTree as ET
from scrapling.fetchers import StealthyFetcher
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TENANT = ""
USERNAME = ""
PASSWORD = ""
# OData 路径(在 main 中根据 TENANT 初始化)
ODATA_C4C = ""
ODATA_CUST = ""
# SOAP endpoint(如需通过 SOAP 下载文件内容,请替换为实际地址)
SOAP_URL = f"{TENANT}/sap/bc/srt/scs/sap/manageattachmentfolderin"
# 默认值,可通过命令行参数覆盖
OUTPUT_DIR = "downloads"
# 群晖 DSM 配置(可选,在 main 中初始化)
DSM_URL = ""
DSM_USER = ""
DSM_PASSWORD = ""
DSM_PATH = ""
def get_session():
s = requests.Session()
s.auth = (USERNAME, PASSWORD)
s.headers.update({"Accept": "application/json"})
return s
def find_service_request_object_id(session, ticket_id):
"""通过人类可读的 ticket ID 查找 OData ObjectID"""
url = f"{ODATA_C4C}/ServiceRequestCollection"
params = {"$format": "json", "$filter": f"ID eq '{ticket_id}'", "$select": "ObjectID,ID"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
results = resp.json().get("d", {}).get("results", [])
if not results:
raise ValueError(f"未找到 ID={ticket_id} 的 ServiceRequest")
return results[0]["ObjectID"]
def _parse_attachments(results):
"""将 OData 返回的附件列表解析为统一格式"""
attachments = []
for row in results:
uuid = row.get("UUID")
file_name = row.get("Name") or f"{uuid}.bin"
mime_type = row.get("MimeType") or "application/octet-stream"
category = row.get("CategoryCode") # 2=File, 3=Link
link_uri = row.get("LinkWebURI")
if uuid:
attachments.append({
"UUID": uuid,
"ObjectID": row.get("ObjectID"),
"ParentObjectID": row.get("ParentObjectID"),
"FileName": file_name,
"MimeType": mime_type,
"CategoryCode": category,
"LinkWebURI": link_uri,
"DocumentLink": row.get("DocumentLink"),
"SizeInkB": row.get("SizeInkB"),
})
return attachments
def list_sr_attachments(session, sr_object_id):
"""获取 ServiceRequest 级别的附件(通过 c4codata 导航)"""
url = f"{ODATA_C4C}/ServiceRequestCollection('{sr_object_id}')/ServiceRequestAttachmentFolder"
params = {"$format": "json"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
results = resp.json().get("d", {}).get("results", [])
return _parse_attachments(results)
def list_issue_items(session, ticket_id):
"""获取 ServiceRequest 下的 XIssueItem 列表(通过 custticketapi)"""
url = f"{ODATA_CUST}/ServiceRequest_XIssueItem_SDKCollection"
params = {"$format": "json", "$filter": f"TicketID eq '{ticket_id}'"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
return resp.json().get("d", {}).get("results", [])
def list_issue_item_attachments(session, issue_item_uuid):
"""
获取 XIssueItem 级别的附件。
路径: BO_XSRIssueItemAttachmentCollection (按 XIssueItemUUID 过滤)
-> BO_XSRIssueItemAttachmentFolder (实际附件文件)
"""
# Step 1: 通过 XIssueItemUUID 找到 BO_XSRIssueItemAttachment
url = f"{ODATA_CUST}/BO_XSRIssueItemAttachmentCollection"
params = {"$format": "json", "$filter": f"XIssueItemUUID eq guid'{issue_item_uuid}'"}
resp = session.get(url, params=params, timeout=60)
resp.raise_for_status()
att_results = resp.json().get("d", {}).get("results", [])
if not att_results:
return []
# Step 2: 对每个 Attachment 导航到 AttachmentFolder
all_attachments = []
for att in att_results:
att_oid = att["ObjectID"]
folder_url = (
f"{ODATA_CUST}/BO_XSRIssueItemAttachmentCollection('{att_oid}')"
f"/BO_XSRIssueItemAttachmentFolder"
)
resp2 = session.get(folder_url, params={"$format": "json"}, timeout=60)
resp2.raise_for_status()
folders = resp2.json().get("d", {}).get("results", [])
all_attachments.extend(_parse_attachments(folders))
return all_attachments
def download_file_via_odata(session, attachment, base_url=None):
"""通过 OData $value 直接下载文件内容(适用于 CategoryCode=2 的文件附件)"""
# 优先使用 DocumentLink(custticketapi 返回的完整 $value URL)
doc_link = attachment.get("DocumentLink")
if doc_link:
url = doc_link
else:
obj_id = attachment["ObjectID"]
if base_url is None:
base_url = f"{ODATA_C4C}/ServiceRequestAttachmentFolderCollection"
url = f"{base_url}('{obj_id}')/$value"
resp = session.get(url, timeout=300, stream=True)
resp.raise_for_status()
return resp.content
def build_read_documents_payload(document_uuids, size_limit_kb=10240):
uuid_xml = "\n".join(
f"{u}" for u in document_uuids
)
return f"""
{uuid_xml}
{size_limit_kb}
"""
def read_documents_file_content(session, document_uuids):
payload = build_read_documents_payload(document_uuids)
headers = {"Content-Type": "text/xml; charset=utf-8"}
resp = session.post(SOAP_URL, data=payload.encode("utf-8"), headers=headers, timeout=120)
resp.raise_for_status()
return resp.text
def parse_soap_response(xml_text):
root = ET.fromstring(xml_text)
items = []
for elem in root.iter():
if elem.tag.endswith("AttachmentFolderDocumentFileContent"):
item = {"DocumentUUID": None, "BinaryObject": None}
for child in elem:
if child.tag.endswith("DocumentUUID"):
item["DocumentUUID"] = child.text
elif child.tag.endswith("BinaryObject"):
item["BinaryObject"] = child.text
if item["DocumentUUID"] and item["BinaryObject"]:
items.append(item)
more_hits = False
for elem in root.iter():
if elem.tag.endswith("MoreHitsAvailableIndicator") and (elem.text or "").lower() == "true":
more_hits = True
break
return items, more_hits
def save_files(content_items, attachment_index):
for item in content_items:
doc_uuid = item["DocumentUUID"]
binary_b64 = item["BinaryObject"]
meta = attachment_index.get(doc_uuid, {})
file_name = meta.get("FileName", f"{doc_uuid}.bin")
file_path = os.path.join(OUTPUT_DIR, file_name)
with open(file_path, "wb") as f:
f.write(base64.b64decode(binary_b64))
print(f" saved: {file_path}")
def chunked(seq, size):
for i in range(0, len(seq), size):
yield seq[i:i + size]
def download_link_via_scrapling(link_url, save_name):
"""通过 Scrapling 打开外部链接页面,点击 .downloadbutton 按钮下载文件"""
result = {"saved": None, "error": None}
def click_download(page):
page.wait_for_selector("button.downloadbutton[title='Download']", timeout=15000)
with page.expect_download(timeout=120000) as download_info:
page.click("button.downloadbutton[title='Download']")
download = download_info.value
filename = download.suggested_filename or save_name
save_path = os.path.join(OUTPUT_DIR, filename)
download.save_as(save_path)
result["saved"] = save_path
try:
StealthyFetcher.fetch(
link_url,
headless=True,
network_idle=True,
timeout=60000,
page_action=click_download,
)
except Exception as e:
result["error"] = str(e)
return result
# ==================== 群晖 DSM 上传 ====================
def dsm_login():
"""登录群晖 DSM,返回 SID"""
resp = requests.get(f"{DSM_URL}/webapi/auth.cgi", params={
"api": "SYNO.API.Auth", "version": "3", "method": "login",
"account": DSM_USER, "passwd": DSM_PASSWORD,
"session": "FileStation", "format": "sid",
}, verify=False, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"DSM 登录失败: {data}")
return data["data"]["sid"]
def dsm_upload_file(sid, local_path, remote_path):
"""上传单个文件到群晖 DSM"""
filename = os.path.basename(local_path)
mime = mimetypes.guess_type(filename)[0] or "application/octet-stream"
form = {
"api": "SYNO.FileStation.Upload",
"version": "2",
"method": "upload",
"path": remote_path,
"create_parents": "true",
"overwrite": "true",
}
with open(local_path, "rb") as f:
resp = requests.post(
f"{DSM_URL}/webapi/entry.cgi",
data=form,
files={"file": (filename, f, mime)},
cookies={"id": sid},
verify=False,
timeout=600,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"DSM 上传失败: {data}")
return data
def dsm_upload_downloaded_files(downloaded_files, json_mode=False):
"""将所有已下载文件上传到群晖 DSM"""
if not DSM_URL or not DSM_USER or not DSM_PASSWORD or not DSM_PATH:
return []
files_to_upload = [f for f in downloaded_files if f.get("savedPath") and not f.get("error")]
if not files_to_upload:
return []
if not json_mode:
print(f"\n{'='*60}")
print(f"上传到群晖 DSM: {DSM_URL}")
print(f"目标路径: {DSM_PATH}")
print('='*60)
upload_results = []
try:
sid = dsm_login()
if not json_mode:
print(f" DSM 登录成功")
except Exception as e:
if not json_mode:
print(f" DSM 登录失败: {e}", file=sys.stderr)
return [{"error": f"DSM 登录失败: {e}"}]
for f in files_to_upload:
local_path = f["savedPath"]
filename = os.path.basename(local_path)
entry = {"file": filename, "remotePath": f"{DSM_PATH}/{filename}"}
try:
dsm_upload_file(sid, local_path, DSM_PATH)
entry["success"] = True
if not json_mode:
print(f" 上传成功: {filename} -> {DSM_PATH}/{filename}")
except Exception as e:
entry["success"] = False
entry["error"] = str(e)
if not json_mode:
print(f" 上传失败: {filename}: {e}")
upload_results.append(entry)
if not json_mode:
ok = sum(1 for r in upload_results if r.get("success"))
fail = len(upload_results) - ok
print(f"\n 上传完成: {ok} 成功, {fail} 失败")
return upload_results
def print_attachment_summary(all_attachments):
"""打印附件清单汇总"""
print(f"\n{'='*60}")
print("附件清单汇总")
print('='*60)
if not all_attachments:
print(" 无附件")
return
total_files = 0
total_links = 0
for level, atts in all_attachments:
for a in atts:
if a["CategoryCode"] == "2":
total_files += 1
elif a["CategoryCode"] == "3":
total_links += 1
print(f" 合计: {total_files} 个文件附件, {total_links} 个链接附件\n")
idx = 0
for level, atts in all_attachments:
if not atts:
continue
print(f" [{level}]")
for a in atts:
idx += 1
cat = "文件" if a["CategoryCode"] == "2" else "链接"
size = a.get("SizeInkB", "")
size_str = ""
if size and cat == "文件":
try:
kb = float(size)
if kb > 1024:
size_str = f" ({kb/1024:.1f} MB)"
else:
size_str = f" ({kb:.0f} KB)"
except ValueError:
pass
mime = a.get("MimeType") or ""
link = a.get("LinkWebURI") or ""
print(f" {idx}. [{cat}] {a['FileName']}{size_str}")
if mime:
print(f" MIME: {mime}")
if link:
print(f" 链接: {link}")
print()
def run(ticket_id, output_dir, list_only=False, json_mode=False):
"""核心逻辑,返回结构化结果"""
global OUTPUT_DIR
OUTPUT_DIR = output_dir
os.makedirs(OUTPUT_DIR, exist_ok=True)
session = get_session()
result = {
"ticketId": ticket_id,
"outputDir": os.path.abspath(OUTPUT_DIR),
"success": True,
"error": None,
"srAttachments": [],
"issueItems": [],
"downloadedFiles": [],
}
try:
# 1) 通过 ticket ID 找到 ObjectID
sr_object_id = find_service_request_object_id(session, ticket_id)
result["srObjectId"] = sr_object_id
if not json_mode:
print(f"ServiceRequest ID={ticket_id}, ObjectID={sr_object_id}")
# 2) ServiceRequest 级别附件
if not json_mode:
print(f"\n{'='*60}")
print("ServiceRequest 级别附件")
print('='*60)
sr_attachments = list_sr_attachments(session, sr_object_id)
result["srAttachments"] = sr_attachments
if not json_mode:
print(f"找到 {len(sr_attachments)} 个附件")
if not list_only:
_do_download(session, sr_attachments, "SR", None, result, json_mode)
# 3) XIssueItem 级别附件
if not json_mode:
print(f"\n{'='*60}")
print("XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder)")
print('='*60)
issue_items = list_issue_items(session, ticket_id)
if not json_mode:
print(f"找到 {len(issue_items)} 个 XIssueItem")
for item in issue_items:
item_oid = item["ObjectID"]
issue_uuid = item.get("XIssueItemUUIDcontent_SDK", "")
issue_desc = (item.get("IssuesDescriptionX_SDK") or "")[:80]
issue_entry = {
"objectId": item_oid,
"uuid": issue_uuid,
"description": issue_desc,
"attachments": [],
}
if not json_mode:
print(f"\n XIssueItem: {item_oid}")
print(f" UUID: {issue_uuid}")
print(f" 描述: {issue_desc}")
if not issue_uuid:
if not json_mode:
print(" ⚠ 无 XIssueItemUUID,跳过")
result["issueItems"].append(issue_entry)
continue
atts = list_issue_item_attachments(session, issue_uuid)
issue_entry["attachments"] = atts
if not json_mode:
print(f" 找到 {len(atts)} 个附件")
if not list_only:
_do_download(
session, atts, f"IssueItem-{item_oid[:12]}",
f"{ODATA_CUST}/BO_XSRIssueItemAttachmentFolderCollection",
result, json_mode,
)
result["issueItems"].append(issue_entry)
# 4) 汇总清单
if not json_mode:
all_attachments = [("SR", sr_attachments)]
for ie in result["issueItems"]:
all_attachments.append((f"IssueItem-{ie['objectId'][:12]}", ie["attachments"]))
print_attachment_summary(all_attachments)
except Exception as e:
result["success"] = False
result["error"] = str(e)
if not json_mode:
print(f"\n错误: {e}", file=sys.stderr)
return result
def _do_download(session, attachments, label, odata_url, result, json_mode):
"""执行下载并将结果追加到 result['downloadedFiles']"""
file_atts = [a for a in attachments if a["CategoryCode"] == "2"]
link_atts = [a for a in attachments if a["CategoryCode"] == "3"]
# 链接附件 -> Scrapling
for a in link_atts:
link_url = a.get("LinkWebURI")
if not link_url:
continue
if not json_mode:
print(f" {a['FileName']}: {link_url}")
r = download_link_via_scrapling(link_url, a["FileName"])
entry = {"source": label, "c4cName": a["FileName"], "type": "link", "linkUrl": link_url}
if r["saved"]:
entry["savedPath"] = os.path.abspath(r["saved"])
entry["savedName"] = os.path.basename(r["saved"])
if not json_mode:
print(f" saved: {r['saved']}")
else:
entry["error"] = r["error"]
if not json_mode:
print(f" 下载失败: {r['error']}")
result["downloadedFiles"].append(entry)
# 文件附件 -> OData
for att in file_atts:
entry = {"source": label, "c4cName": att["FileName"], "type": "file", "mime": att.get("MimeType")}
try:
content = download_file_via_odata(session, att, odata_url)
file_path = os.path.join(OUTPUT_DIR, att["FileName"])
with open(file_path, "wb") as f:
f.write(content)
entry["savedPath"] = os.path.abspath(file_path)
entry["savedName"] = att["FileName"]
if not json_mode:
print(f" saved: {file_path}")
except Exception as e:
entry["error"] = str(e)
if not json_mode:
print(f" OData 下载失败 ({att['FileName']}): {e}")
result["downloadedFiles"].append(entry)
def main():
parser = argparse.ArgumentParser(description="SAP C4C 附件下载工具")
parser.add_argument("--tenant", default=os.environ.get("C4C_TENANT", ""),
help="C4C 租户地址 (如 https://xxx.c4c.saphybriscloud.cn),也可设 C4C_TENANT 环境变量")
parser.add_argument("--user", default=os.environ.get("C4C_USERNAME", ""),
help="C4C 用户名,也可设 C4C_USERNAME 环境变量")
parser.add_argument("--password", default=os.environ.get("C4C_PASSWORD", ""),
help="C4C 密码,也可设 C4C_PASSWORD 环境变量")
parser.add_argument("--ticket", required=True, help="ServiceRequest ticket ID (如 24588)")
parser.add_argument("--output-dir", default="downloads", help="附件保存目录 (默认: downloads)")
parser.add_argument("--json", action="store_true", dest="json_mode", help="JSON 输出模式(供程序调用)")
parser.add_argument("--list-only", action="store_true", help="仅列出附件清单,不下载")
# 群晖 DSM 上传参数
parser.add_argument("--dsm-url", default=os.environ.get("DSM_URL", ""),
help="群晖 DSM 地址 (如 http://10.0.10.235:5000),也可设 DSM_URL 环境变量")
parser.add_argument("--dsm-user", default=os.environ.get("DSM_USERNAME", ""),
help="群晖 DSM 用户名,也可设 DSM_USERNAME 环境变量")
parser.add_argument("--dsm-password", default=os.environ.get("DSM_PASSWORD", ""),
help="群晖 DSM 密码,也可设 DSM_PASSWORD 环境变量")
parser.add_argument("--dsm-path", default=os.environ.get("DSM_PATH", ""),
help="群晖 DSM 目标路径 (如 /Newgonow/AU-SPFJ),也可设 DSM_PATH 环境变量")
args = parser.parse_args()
if not args.tenant or not args.user or not args.password:
parser.error("必须提供 --tenant, --user, --password 参数,或设置 C4C_TENANT, C4C_USERNAME, C4C_PASSWORD 环境变量")
# 初始化全局配置
global TENANT, USERNAME, PASSWORD, ODATA_C4C, ODATA_CUST, SOAP_URL
TENANT = args.tenant.rstrip("/")
USERNAME = args.user
PASSWORD = args.password
ODATA_C4C = f"{TENANT}/sap/c4c/odata/v1/c4codata"
ODATA_CUST = f"{TENANT}/sap/c4c/odata/cust/v1/custticketapi"
SOAP_URL = f"{TENANT}/sap/bc/srt/scs/sap/manageattachmentfolderin"
# 初始化 DSM 配置
global DSM_URL, DSM_USER, DSM_PASSWORD, DSM_PATH
DSM_URL = args.dsm_url.rstrip("/") if args.dsm_url else ""
DSM_USER = args.dsm_user
DSM_PASSWORD = args.dsm_password
DSM_PATH = args.dsm_path
result = run(args.ticket, args.output_dir, args.list_only, args.json_mode)
# 下载完成后上传到群晖 DSM
if DSM_URL and not args.list_only and result["success"]:
upload_results = dsm_upload_downloaded_files(result["downloadedFiles"], args.json_mode)
result["dsmUpload"] = upload_results
if args.json_mode:
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["success"] else 1)
if __name__ == "__main__":
main()