""" SAP C4C 附件下载工具 功能: 1. 下载 ServiceRequest 级别附件 2. 下载 XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder) 3. 通过 Scrapling 爬虫下载 Salesforce 外部链接附件 4. 可选:将下载的附件上传到群晖 DSM 环境要求: Python >= 3.8 安装依赖: pip install requests scrapling[all] playwright python -m playwright install chromium 用法: # 下载附件 python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588 # 下载附件并上传到群晖 python sap-c4c-AttachmentFolder.py --tenant https://xxx.c4c.saphybriscloud.cn --user admin --password xxx --ticket 24588 \\ --dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ # JSON 模式(供 Java/其他程序调用) python sap-c4c-AttachmentFolder.py --ticket 24588 --dsm-url http://10.0.10.235:5000 --dsm-user PLM --dsm-password 123456 --dsm-path /Newgonow/AU-SPFJ --json # 也可通过环境变量传入凭证 export C4C_TENANT=https://xxx.c4c.saphybriscloud.cn export C4C_USERNAME=admin export C4C_PASSWORD=xxx export DSM_URL=http://10.0.10.235:5000 export DSM_USERNAME=PLM export DSM_PASSWORD=123456 export DSM_PATH=/Newgonow/AU-SPFJ python sap-c4c-AttachmentFolder.py --ticket 24588 --json """ import os import sys import json import argparse import mimetypes import requests import urllib3 from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) print_lock = Lock() @dataclass class Config: tenant: str = "" username: str = "" password: str = "" odata_c4c: str = "" odata_cust: str = "" output_dir: str = "downloads" max_workers: int = 5 dsm_url: str = "" dsm_user: str = "" dsm_password: str = "" dsm_path: str = "" def init_endpoints(self): base = self.tenant.rstrip("/") self.odata_c4c = f"{base}/sap/c4c/odata/v1/c4codata" self.odata_cust = f"{base}/sap/c4c/odata/cust/v1/custticketapi" cfg = Config() def get_session(): s = requests.Session() s.auth = (cfg.username, cfg.password) s.headers.update({"Accept": "application/json"}) return s def find_service_request_object_id(session, ticket_id): """通过人类可读的 ticket ID 查找 OData ObjectID 和 SerialID""" url = f"{cfg.odata_c4c}/ServiceRequestCollection" params = {"$format": "json", "$filter": f"ID eq '{ticket_id}'"} resp = session.get(url, params=params, timeout=60) resp.raise_for_status() results = resp.json().get("d", {}).get("results", []) if not results: raise ValueError(f"未找到 ID={ticket_id} 的 ServiceRequest") sr = results[0] return sr["ObjectID"], sr.get("SerialID", "") def _parse_attachments(results): """将 OData 返回的附件列表解析为统一格式""" attachments = [] for row in results: uuid = row.get("UUID") file_name = row.get("Name") or f"{uuid}.bin" mime_type = row.get("MimeType") or "application/octet-stream" category = row.get("CategoryCode") # 2=File, 3=Link link_uri = row.get("LinkWebURI") if uuid: attachments.append({ "UUID": uuid, "ObjectID": row.get("ObjectID"), "ParentObjectID": row.get("ParentObjectID"), "FileName": file_name, "MimeType": mime_type, "CategoryCode": category, "LinkWebURI": link_uri, "DocumentLink": row.get("DocumentLink"), "SizeInkB": row.get("SizeInkB"), }) return attachments def list_sr_attachments(session, sr_object_id): """获取 ServiceRequest 级别的附件(通过 c4codata 导航)""" url = f"{cfg.odata_c4c}/ServiceRequestCollection('{sr_object_id}')/ServiceRequestAttachmentFolder" params = {"$format": "json"} resp = session.get(url, params=params, timeout=60) resp.raise_for_status() results = resp.json().get("d", {}).get("results", []) return _parse_attachments(results) def list_issue_items(session, ticket_id): """获取 ServiceRequest 下的 XIssueItem 列表(通过 custticketapi)""" url = f"{cfg.odata_cust}/ServiceRequest_XIssueItem_SDKCollection" params = {"$format": "json", "$filter": f"TicketID eq '{ticket_id}'"} resp = session.get(url, params=params, timeout=60) resp.raise_for_status() return resp.json().get("d", {}).get("results", []) def get_issue_item_detail(session, object_id): """通过 ObjectID 获取 XIssueItem 详细信息,包括真实的 IssueID_SDK""" url = f"{cfg.odata_cust}/ServiceRequest_XIssueItem_SDKCollection('{object_id}')" params = {"$format": "json"} resp = session.get(url, params=params, timeout=60) resp.raise_for_status() return resp.json().get("d", {}).get("results", {}) def _fetch_attachment_folder(session, att_oid): """获取单个 AttachmentFolder 条目(供并发调用)""" folder_url = ( f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentCollection('{att_oid}')" f"/BO_XSRIssueItemAttachmentFolder" ) resp = session.get(folder_url, params={"$format": "json"}, timeout=60) resp.raise_for_status() return resp.json().get("d", {}).get("results", []) def list_issue_item_attachments(session, issue_item_uuid): """ 获取 XIssueItem 级别的附件。 路径: BO_XSRIssueItemAttachmentCollection (按 XIssueItemUUID 过滤) -> BO_XSRIssueItemAttachmentFolder (实际附件文件) Step 2 并发请求各 AttachmentFolder。 """ url = f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentCollection" params = {"$format": "json", "$filter": f"XIssueItemUUID eq guid'{issue_item_uuid}'"} resp = session.get(url, params=params, timeout=60) resp.raise_for_status() att_results = resp.json().get("d", {}).get("results", []) if not att_results: return [] all_attachments = [] with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor: futures = { executor.submit(_fetch_attachment_folder, session, att["ObjectID"]): att for att in att_results } for future in as_completed(futures): try: folders = future.result() all_attachments.extend(_parse_attachments(folders)) except Exception as e: with print_lock: print(f" ⚠ 获取附件文件夹失败: {e}", file=sys.stderr) return all_attachments def download_file_via_odata(session, attachment, file_path, base_url=None): """通过 OData $value 流式下载文件,直接写入 file_path(避免大文件 OOM)""" doc_link = attachment.get("DocumentLink") if doc_link: url = doc_link else: obj_id = attachment["ObjectID"] if base_url is None: base_url = f"{cfg.odata_c4c}/ServiceRequestAttachmentFolderCollection" url = f"{base_url}('{obj_id}')/$value" resp = session.get(url, timeout=300, stream=True) resp.raise_for_status() with open(file_path, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) def download_link_via_scrapling(link_url, save_name): """通过 Scrapling 打开外部链接页面,点击 .downloadbutton 按钮下载文件""" from scrapling.fetchers import StealthyFetcher result = {"saved": None, "error": None} def click_download(page): page.wait_for_selector("button.downloadbutton[title='Download']", timeout=15000) with page.expect_download(timeout=120000) as download_info: page.click("button.downloadbutton[title='Download']") download = download_info.value filename = download.suggested_filename or save_name save_path = os.path.join(cfg.output_dir, filename) download.save_as(save_path) result["saved"] = save_path try: StealthyFetcher.fetch( link_url, headless=True, network_idle=True, timeout=60000, page_action=click_download, ) except Exception as e: result["error"] = str(e) return result # ==================== 群晖 DSM 上传 ==================== def dsm_login(): """登录群晖 DSM,返回 SID""" resp = requests.get(f"{cfg.dsm_url}/webapi/auth.cgi", params={ "api": "SYNO.API.Auth", "version": "3", "method": "login", "account": cfg.dsm_user, "passwd": cfg.dsm_password, "session": "FileStation", "format": "sid", }, verify=False, timeout=30) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"DSM 登录失败: {data}") return data["data"]["sid"] def dsm_upload_file(sid, local_path, remote_path): """上传单个文件到群晖 DSM""" filename = os.path.basename(local_path) mime = mimetypes.guess_type(filename)[0] or "application/octet-stream" form = { "api": "SYNO.FileStation.Upload", "version": "2", "method": "upload", "path": remote_path, "create_parents": "true", "overwrite": "true", } with open(local_path, "rb") as f: resp = requests.post( f"{cfg.dsm_url}/webapi/entry.cgi", data=form, files={"file": (filename, f, mime)}, cookies={"id": sid}, verify=False, timeout=600, ) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"DSM 上传失败: {data}") return data def dsm_upload_downloaded_files(downloaded_files, ticket_id, serial_id, json_mode=False): """将所有已下载文件并发上传到群晖 DSM,按 ticket 和 issue 组织目录结构""" if not cfg.dsm_url or not cfg.dsm_user or not cfg.dsm_password or not cfg.dsm_path: return [] files_to_upload = [f for f in downloaded_files if f.get("savedPath") and not f.get("error")] if not files_to_upload: return [] folder_name = f"{ticket_id}_{serial_id}" if serial_id else ticket_id if not json_mode: print(f"\n{'='*60}") print(f"上传到群晖 DSM: {cfg.dsm_url}") print(f"目标路径: {cfg.dsm_path}/{folder_name}") print('='*60) try: sid = dsm_login() if not json_mode: print(f" DSM 登录成功") except Exception as e: if not json_mode: print(f" DSM 登录失败: {e}", file=sys.stderr) return [{"error": f"DSM 登录失败: {e}"}] # 找出其他 issue 的 ID 列表(排除 "Quote & Chassis" issue) other_issue_ids = list({ f["issueId"] for f in files_to_upload if f.get("issueId") and "Quote & Chassis" not in (f.get("issueDescription") or "") }) def _upload_one(local_path, remote_path): filename = os.path.basename(local_path) full_remote_path = f"{remote_path}/{filename}" entry = { "file": filename, "ticketId": ticket_id, "serialId": serial_id, "remotePath": full_remote_path, } try: dsm_upload_file(sid, local_path, remote_path) entry["success"] = True if not json_mode: with print_lock: print(f" 上传成功: {filename} -> {full_remote_path}") except Exception as e: entry["success"] = False entry["error"] = str(e) if not json_mode: with print_lock: print(f" 上传失败: {filename}: {e}") return entry # 构建上传任务列表 # 构建 issue_id -> issue 文件夹名 的映射 issue_folder_map = {} for f in files_to_upload: oid = f.get("issueId", "") if oid and oid not in issue_folder_map: desc = f.get("issueDescription", "") issue_folder_map[oid] = f"{oid}_{desc}" if desc else oid upload_tasks = [] for f in files_to_upload: local_path = f["savedPath"] issue_id = f.get("issueId", "") issue_desc = f.get("issueDescription", "") issue_folder = issue_folder_map.get(issue_id, issue_id) if "Quote & Chassis" in (issue_desc or ""): # 上传到自己的目录 upload_tasks.append((local_path, f"{cfg.dsm_path}/{folder_name}/{issue_folder}")) # 同时分发到所有其他 issue 目录 for oid in other_issue_ids: upload_tasks.append((local_path, f"{cfg.dsm_path}/{folder_name}/{issue_folder_map.get(oid, oid)}")) else: remote_path = ( f"{cfg.dsm_path}/{folder_name}/{issue_folder}" if issue_id else f"{cfg.dsm_path}/{folder_name}" ) upload_tasks.append((local_path, remote_path)) with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor: upload_results = list(executor.map(lambda t: _upload_one(*t), upload_tasks)) if not json_mode: ok = sum(1 for r in upload_results if r.get("success")) fail = len(upload_results) - ok print(f"\n 上传完成: {ok} 成功, {fail} 失败") return upload_results def print_attachment_summary(all_attachments): """打印附件清单汇总""" print(f"\n{'='*60}") print("附件清单汇总") print('='*60) if not all_attachments: print(" 无附件") return total_files = sum(1 for _, atts in all_attachments for a in atts if a["CategoryCode"] == "2") total_links = sum(1 for _, atts in all_attachments for a in atts if a["CategoryCode"] == "3") print(f" 合计: {total_files} 个文件附件, {total_links} 个链接附件\n") idx = 0 for level, atts in all_attachments: if not atts: continue print(f" [{level}]") for a in atts: idx += 1 cat = "文件" if a["CategoryCode"] == "2" else "链接" size_str = "" if a.get("SizeInkB") and cat == "文件": try: kb = float(a["SizeInkB"]) size_str = f" ({kb/1024:.1f} MB)" if kb > 1024 else f" ({kb:.0f} KB)" except ValueError: pass print(f" {idx}. [{cat}] {a['FileName']}{size_str}") if a.get("MimeType"): print(f" MIME: {a['MimeType']}") if a.get("LinkWebURI"): print(f" 链接: {a['LinkWebURI']}") print() def _download_single_file(session, att, label, issue_id, odata_url, json_mode): """下载单个文件附件,流式写入磁盘(用于多线程)""" entry = { "source": label, "issueId": issue_id, "c4cName": att["FileName"], "type": "file", "mime": att.get("MimeType"), } file_path = os.path.join(cfg.output_dir, att["FileName"]) try: download_file_via_odata(session, att, file_path, odata_url) entry["savedPath"] = os.path.abspath(file_path) entry["savedName"] = att["FileName"] if not json_mode: with print_lock: print(f" ✓ saved: {file_path}") except Exception as e: entry["error"] = str(e) if not json_mode: with print_lock: print(f" ✗ OData 下载失败 ({att['FileName']}): {e}") return entry def _download_single_link(link_att, label, issue_id, json_mode): """下载单个链接附件(用于多线程)""" link_url = link_att.get("LinkWebURI") entry = { "source": label, "issueId": issue_id, "c4cName": link_att["FileName"], "type": "link", "linkUrl": link_url, } if not link_url: entry["error"] = "无链接地址" return entry if not json_mode: with print_lock: print(f" {link_att['FileName']}: {link_url}") r = download_link_via_scrapling(link_url, link_att["FileName"]) if r["saved"]: entry["savedPath"] = os.path.abspath(r["saved"]) entry["savedName"] = os.path.basename(r["saved"]) if not json_mode: with print_lock: print(f" ✓ saved: {r['saved']}") else: entry["error"] = r["error"] if not json_mode: with print_lock: print(f" ✗ 下载失败: {r['error']}") return entry def _do_download(session, attachments, label, issue_id, odata_url, json_mode): """并发下载附件列表,返回下载结果列表""" file_atts = [a for a in attachments if a["CategoryCode"] == "2"] link_atts = [a for a in attachments if a["CategoryCode"] == "3"] with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor: futures = [ executor.submit(_download_single_file, session, att, label, issue_id, odata_url, json_mode) for att in file_atts ] + [ executor.submit(_download_single_link, att, label, issue_id, json_mode) for att in link_atts ] downloaded_entries = [] for future in as_completed(futures): try: downloaded_entries.append(future.result()) except Exception as e: if not json_mode: with print_lock: print(f" ✗ 下载任务异常: {e}") return downloaded_entries def _process_issue_item(session, item, list_only, json_mode): """处理单个 XIssueItem:fetch detail → list attachments → download(供并发调用)""" item_oid = item["ObjectID"] issue_uuid = item.get("XIssueItemUUIDcontent_SDK", "") issue_desc = (item.get("IssuesDescriptionX_SDK") or "")[:80] issue_id = "" try: item_detail = get_issue_item_detail(session, item_oid) issue_id = item_detail.get("IssueID_SDK", "") except Exception as e: with print_lock: print(f" ⚠ 获取 IssueID 失败 ({item_oid}): {e}", file=sys.stderr) if not json_mode: with print_lock: print(f"\n XIssueItem: {item_oid}") if issue_id: print(f" IssueID: {issue_id}") print(f" UUID: {issue_uuid}") print(f" 描述: {issue_desc}") issue_entry = { "objectId": item_oid, "issueId": issue_id, "uuid": issue_uuid, "description": issue_desc, "attachments": [], } downloaded_entries = [] if not issue_uuid: if not json_mode: with print_lock: print(" ⚠ 无 XIssueItemUUID,跳过") return issue_entry, downloaded_entries atts = list_issue_item_attachments(session, issue_uuid) issue_entry["attachments"] = atts if not json_mode: with print_lock: print(f" 找到 {len(atts)} 个附件") if not list_only: label = f"IssueItem-{issue_id}" if issue_id else f"IssueItem-{item_oid[:12]}" downloaded_entries = _do_download( session, atts, label, issue_id, f"{cfg.odata_cust}/BO_XSRIssueItemAttachmentFolderCollection", json_mode, ) for e in downloaded_entries: e["issueDescription"] = issue_desc return issue_entry, downloaded_entries def run(ticket_id, output_dir, list_only=False, json_mode=False): """核心逻辑,返回结构化结果""" cfg.output_dir = output_dir os.makedirs(cfg.output_dir, exist_ok=True) session = get_session() result = { "ticketId": ticket_id, "outputDir": os.path.abspath(cfg.output_dir), "success": True, "error": None, "srAttachments": [], "issueItems": [], "downloadedFiles": [], } try: # 1) 通过 ticket ID 找到 ObjectID 和 SerialID sr_object_id, serial_id = find_service_request_object_id(session, ticket_id) result["srObjectId"] = sr_object_id result["serialId"] = serial_id if not json_mode: print(f"ServiceRequest ID={ticket_id}, ObjectID={sr_object_id}, SerialID={serial_id}") # 2) ServiceRequest 级别附件 if not json_mode: print(f"\n{'='*60}") print("ServiceRequest 级别附件") print('='*60) sr_attachments = list_sr_attachments(session, sr_object_id) result["srAttachments"] = sr_attachments if not json_mode: print(f"找到 {len(sr_attachments)} 个附件") if not list_only: entries = _do_download(session, sr_attachments, "SR", "", None, json_mode) result["downloadedFiles"].extend(entries) # 3) XIssueItem 级别附件(并发处理每个 item) if not json_mode: print(f"\n{'='*60}") print("XIssueItem 级别附件 (BO_XSRIssueItemAttachmentFolder)") print('='*60) issue_items = list_issue_items(session, ticket_id) if not json_mode: print(f"找到 {len(issue_items)} 个 XIssueItem") with ThreadPoolExecutor(max_workers=cfg.max_workers) as executor: futures = [ executor.submit(_process_issue_item, session, item, list_only, json_mode) for item in issue_items ] for future in as_completed(futures): try: issue_entry, downloaded_entries = future.result() result["issueItems"].append(issue_entry) result["downloadedFiles"].extend(downloaded_entries) except Exception as e: if not json_mode: print(f" ✗ XIssueItem 处理异常: {e}", file=sys.stderr) # 4) 汇总清单 if not json_mode: all_attachments = [("SR", sr_attachments)] for ie in result["issueItems"]: ie_label = ( f"IssueItem-{ie['issueId']}" if ie.get("issueId") else f"IssueItem-{ie['objectId'][:12]}" ) all_attachments.append((ie_label, ie["attachments"])) print_attachment_summary(all_attachments) except Exception as e: result["success"] = False result["error"] = str(e) if not json_mode: print(f"\n错误: {e}", file=sys.stderr) return result def main(): parser = argparse.ArgumentParser(description="SAP C4C 附件下载工具") parser.add_argument("--tenant", default=os.environ.get("C4C_TENANT", ""), help="C4C 租户地址 (如 https://xxx.c4c.saphybriscloud.cn),也可设 C4C_TENANT 环境变量") parser.add_argument("--user", default=os.environ.get("C4C_USERNAME", ""), help="C4C 用户名,也可设 C4C_USERNAME 环境变量") parser.add_argument("--password", default=os.environ.get("C4C_PASSWORD", ""), help="C4C 密码,也可设 C4C_PASSWORD 环境变量") parser.add_argument("--ticket", required=True, help="ServiceRequest ticket ID (如 24588)") parser.add_argument("--output-dir", default="downloads", help="附件保存目录 (默认: downloads)") parser.add_argument("--json", action="store_true", dest="json_mode", help="JSON 输出模式(供程序调用)") parser.add_argument("--list-only", action="store_true", help="仅列出附件清单,不下载") parser.add_argument("--max-workers", type=int, default=5, help="并发线程数 (默认: 5)") # 群晖 DSM 上传参数 parser.add_argument("--dsm-url", default=os.environ.get("DSM_URL", ""), help="群晖 DSM 地址 (如 http://10.0.10.235:5000),也可设 DSM_URL 环境变量") parser.add_argument("--dsm-user", default=os.environ.get("DSM_USERNAME", ""), help="群晖 DSM 用户名,也可设 DSM_USERNAME 环境变量") parser.add_argument("--dsm-password", default=os.environ.get("DSM_PASSWORD", ""), help="群晖 DSM 密码,也可设 DSM_PASSWORD 环境变量") parser.add_argument("--dsm-path", default=os.environ.get("DSM_PATH", ""), help="群晖 DSM 目标路径 (如 /Newgonow/AU-SPFJ),也可设 DSM_PATH 环境变量") args = parser.parse_args() if not args.tenant or not args.user or not args.password: parser.error("必须提供 --tenant, --user, --password 参数,或设置 C4C_TENANT, C4C_USERNAME, C4C_PASSWORD 环境变量") # 初始化全局配置 cfg.tenant = args.tenant.rstrip("/") cfg.username = args.user cfg.password = args.password cfg.max_workers = args.max_workers cfg.dsm_url = args.dsm_url.rstrip("/") if args.dsm_url else "" cfg.dsm_user = args.dsm_user cfg.dsm_password = args.dsm_password cfg.dsm_path = args.dsm_path cfg.init_endpoints() if not args.json_mode and not args.list_only: print(f"并发线程数: {cfg.max_workers}") result = run(args.ticket, args.output_dir, args.list_only, args.json_mode) # 下载完成后上传到群晖 DSM if cfg.dsm_url and not args.list_only and result["success"]: serial_id = result.get("serialId", "") upload_results = dsm_upload_downloaded_files( result["downloadedFiles"], args.ticket, serial_id, args.json_mode ) result["dsmUpload"] = upload_results # 上传完成后清理本地下载文件及子目录 for f in result["downloadedFiles"]: local_path = f.get("savedPath") if local_path and os.path.exists(local_path): try: os.remove(local_path) if not args.json_mode: print(f" 已删除本地文件: {local_path}") except OSError as e: if not args.json_mode: print(f" 删除失败: {local_path}: {e}") # 删除空的下载子目录 try: if os.path.isdir(args.output_dir) and not os.listdir(args.output_dir): os.rmdir(args.output_dir) if not args.json_mode: print(f" 已删除目录: {args.output_dir}") except OSError: pass if args.json_mode: print(json.dumps(result, ensure_ascii=False, indent=2)) sys.exit(0 if result["success"] else 1) if __name__ == "__main__": main()