#!/usr/bin/env python3 """ 收集并合并 xlsx 文件 功能: 1. 从 data/batch_output 子文件夹收集 results.xlsx(图片分析结果) 2. 与 data/data_all 中对应的原始数据({name}_text_img.xlsx)合并 3. 通过图片名关联两个数据源 4. 保存合并后的文件到目标目录 用法: python3 collect_xlsx.py # 默认合并并输出 python3 collect_xlsx.py -o ../data/merged # 指定输出目录 python3 collect_xlsx.py --no-merge # 不合并,只复制 python3 collect_xlsx.py -n # 预览模式 """ import argparse from pathlib import Path from typing import Optional, Tuple, List import pandas as pd def extract_image_name(path: str) -> str: """ 从完整路径提取图片文件名 支持 Windows 和 Unix 路径格式 """ if pd.isna(path): return "" path_str = str(path).strip() if not path_str: return "" # 同时处理 Windows (\) 和 Unix (/) 路径分隔符 # 先统一替换为 /,再提取文件名 normalized = path_str.replace("\\", "/") filename = normalized.split("/")[-1] return filename def merge_xlsx_files( results_file: Path, original_file: Path, results_image_col: str = "image_name", original_image_cols: list = None, original_text_col: str = "文本" ) -> Tuple[pd.DataFrame, dict]: """ 合并分析结果和原始数据 Args: results_file: 分析结果文件 (batch_output/.../results.xlsx) original_file: 原始数据文件 (data_all/..._text_img.xlsx) results_image_col: 结果文件中的图片名列 original_image_cols: 原始文件中可能的图片路径列(按优先级) original_text_col: 原始文件中的文本列 Returns: 合并后的 DataFrame 和统计信息 """ if original_image_cols is None: original_image_cols = ["图片_新", "图片", "图片链接"] # 读取文件 results_df = pd.read_excel(results_file) original_df = pd.read_excel(original_file) stats = { "results_rows": len(results_df), "original_rows": len(original_df), "merged_rows": 0, "unmatched_results": 0, "original_columns_added": [], "image_col_used": None } # 找到可用的图片列 image_col = None for col in original_image_cols: if col in original_df.columns: image_col = col break if image_col is None: raise ValueError(f"原始文件中未找到图片列,尝试过: {original_image_cols}") stats["image_col_used"] = image_col # 从原始数据提取图片名作为关联键 original_df["_image_name"] = original_df[image_col].apply(extract_image_name) # 去重:原始数据可能有重复图片,保留第一条 original_dedup = original_df.drop_duplicates(subset=["_image_name"], keep="first") # 确定要添加的原始数据列(排除图片路径列和临时列) exclude_cols = set(original_image_cols + ["_image_name"]) original_cols_to_add = [col for col in original_df.columns if col not in exclude_cols and col not in results_df.columns] stats["original_columns_added"] = original_cols_to_add # 创建图片名到原始数据的映射 original_map = original_dedup.set_index("_image_name")[original_cols_to_add].to_dict("index") # 合并:为结果数据添加原始数据列 merged_df = results_df.copy() # 初始化新列 for col in original_cols_to_add: merged_df[col] = None # 逐行匹配并填充 matched_count = 0 for idx, row in merged_df.iterrows(): image_name = row[results_image_col] if image_name in original_map: for col in original_cols_to_add: merged_df.at[idx, col] = original_map[image_name].get(col) matched_count += 1 stats["merged_rows"] = len(merged_df) stats["matched_count"] = matched_count stats["unmatched_results"] = len(merged_df) - matched_count return merged_df, stats def collect_and_merge_xlsx( source_dir: str, data_all_dir: str, output_dir: str, merge: bool = True, dry_run: bool = False ) -> List[dict]: """ 收集并合并 xlsx 文件 Args: source_dir: batch_output 目录路径 data_all_dir: data_all 目录路径 output_dir: 输出目录路径 merge: 是否合并原始数据 dry_run: 预览模式 Returns: 处理结果列表 """ source_path = Path(source_dir) data_all_path = Path(data_all_dir) output_path = Path(output_dir) if not source_path.exists(): print(f"错误: 源目录不存在: {source_dir}") return [] # 创建输出目录 if not dry_run: output_path.mkdir(parents=True, exist_ok=True) results = [] # 遍历子文件夹 for folder in sorted(source_path.iterdir()): if not folder.is_dir(): continue folder_name = folder.name results_file = folder / "results.xlsx" if not results_file.exists(): continue # 输出文件名 output_file = output_path / f"{folder_name}.xlsx" # 查找对应的原始数据文件 original_file = data_all_path / f"{folder_name}_text_img.xlsx" result_info = { "folder": folder_name, "results_file": str(results_file), "original_file": str(original_file) if original_file.exists() else None, "output_file": str(output_file), "merged": False, "stats": {} } if dry_run: if merge and original_file.exists(): print(f"[预览] 合并: {folder_name}/results.xlsx + {folder_name}_text_img.xlsx -> {folder_name}.xlsx") else: print(f"[预览] 复制: {folder_name}/results.xlsx -> {folder_name}.xlsx") results.append(result_info) continue # 执行合并或复制 if merge and original_file.exists(): try: merged_df, stats = merge_xlsx_files(results_file, original_file) merged_df.to_excel(output_file, index=False, engine="openpyxl") result_info["merged"] = True result_info["stats"] = stats print(f"已合并: {folder_name}") print(f" - 分析结果: {stats['results_rows']} 行") print(f" - 原始数据: {stats['original_rows']} 行") print(f" - 匹配成功: {stats['matched_count']} 行") print(f" - 添加列: {stats['original_columns_added']}") except Exception as e: print(f"合并失败 {folder_name}: {e}") # 回退到复制模式 import shutil shutil.copy2(results_file, output_file) print(f" 已回退到复制模式") else: # 只复制,不合并 import shutil shutil.copy2(results_file, output_file) if merge and not original_file.exists(): print(f"已复制: {folder_name} (原始数据不存在: {folder_name}_text_img.xlsx)") else: print(f"已复制: {folder_name}") results.append(result_info) return results def main(): parser = argparse.ArgumentParser( description="收集并合并 batch_output 和 data_all 中的 xlsx 文件", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python3 collect_xlsx.py # 默认合并并输出 python3 collect_xlsx.py -o ../data/merged # 指定输出目录 python3 collect_xlsx.py --no-merge # 不合并,只复制 python3 collect_xlsx.py -n # 预览模式 """ ) parser.add_argument( "-s", "--source", default="../data/batch_output", help="batch_output 目录路径 (默认: ../data/batch_output)" ) parser.add_argument( "-d", "--data-all", default="../data/data_all", help="data_all 目录路径 (默认: ../data/data_all)" ) parser.add_argument( "-o", "--output", default="../data/collected_xlsx", help="输出目录路径 (默认: ../data/collected_xlsx)" ) parser.add_argument( "--no-merge", action="store_true", help="不合并原始数据,只复制分析结果" ) parser.add_argument( "-n", "--dry-run", action="store_true", help="预览模式,只打印不执行" ) args = parser.parse_args() # 转换为绝对路径 script_dir = Path(__file__).parent source_dir = (script_dir / args.source).resolve() data_all_dir = (script_dir / args.data_all).resolve() output_dir = (script_dir / args.output).resolve() print("=" * 60) print("收集并合并 xlsx 文件") print("=" * 60) print(f"分析结果目录: {source_dir}") print(f"原始数据目录: {data_all_dir}") print(f"输出目录: {output_dir}") print(f"合并模式: {'否' if args.no_merge else '是'}") print("-" * 60) results = collect_and_merge_xlsx( str(source_dir), str(data_all_dir), str(output_dir), merge=not args.no_merge, dry_run=args.dry_run ) print("-" * 60) merged_count = sum(1 for r in results if r.get("merged")) print(f"共处理 {len(results)} 个文件") if not args.no_merge: print(f" - 合并成功: {merged_count}") print(f" - 仅复制: {len(results) - merged_count}") if __name__ == "__main__": main()