316 lines
9.6 KiB
Python
316 lines
9.6 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
收集并合并 xlsx 文件
|
|||
|
|
|
|||
|
|
功能:
|
|||
|
|
1. 从 data/batch_output 子文件夹收集 results.xlsx(图片分析结果)
|
|||
|
|
2. 与 data/data_all 中对应的原始数据({name}_text_img.xlsx)合并
|
|||
|
|
3. 通过图片名关联两个数据源
|
|||
|
|
4. 保存合并后的文件到目标目录
|
|||
|
|
|
|||
|
|
用法:
|
|||
|
|
python3 collect_xlsx.py # 默认合并并输出
|
|||
|
|
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
|
|||
|
|
python3 collect_xlsx.py --no-merge # 不合并,只复制
|
|||
|
|
python3 collect_xlsx.py -n # 预览模式
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Optional, Tuple, List
|
|||
|
|
|
|||
|
|
import pandas as pd
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_image_name(path: str) -> str:
|
|||
|
|
"""
|
|||
|
|
从完整路径提取图片文件名
|
|||
|
|
|
|||
|
|
支持 Windows 和 Unix 路径格式
|
|||
|
|
"""
|
|||
|
|
if pd.isna(path):
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
path_str = str(path).strip()
|
|||
|
|
if not path_str:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
# 同时处理 Windows (\) 和 Unix (/) 路径分隔符
|
|||
|
|
# 先统一替换为 /,再提取文件名
|
|||
|
|
normalized = path_str.replace("\\", "/")
|
|||
|
|
filename = normalized.split("/")[-1]
|
|||
|
|
|
|||
|
|
return filename
|
|||
|
|
|
|||
|
|
|
|||
|
|
def merge_xlsx_files(
|
|||
|
|
results_file: Path,
|
|||
|
|
original_file: Path,
|
|||
|
|
results_image_col: str = "image_name",
|
|||
|
|
original_image_cols: list = None,
|
|||
|
|
original_text_col: str = "文本"
|
|||
|
|
) -> Tuple[pd.DataFrame, dict]:
|
|||
|
|
"""
|
|||
|
|
合并分析结果和原始数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
results_file: 分析结果文件 (batch_output/.../results.xlsx)
|
|||
|
|
original_file: 原始数据文件 (data_all/..._text_img.xlsx)
|
|||
|
|
results_image_col: 结果文件中的图片名列
|
|||
|
|
original_image_cols: 原始文件中可能的图片路径列(按优先级)
|
|||
|
|
original_text_col: 原始文件中的文本列
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
合并后的 DataFrame 和统计信息
|
|||
|
|
"""
|
|||
|
|
if original_image_cols is None:
|
|||
|
|
original_image_cols = ["图片_新", "图片", "图片链接"]
|
|||
|
|
|
|||
|
|
# 读取文件
|
|||
|
|
results_df = pd.read_excel(results_file)
|
|||
|
|
original_df = pd.read_excel(original_file)
|
|||
|
|
|
|||
|
|
stats = {
|
|||
|
|
"results_rows": len(results_df),
|
|||
|
|
"original_rows": len(original_df),
|
|||
|
|
"merged_rows": 0,
|
|||
|
|
"unmatched_results": 0,
|
|||
|
|
"original_columns_added": [],
|
|||
|
|
"image_col_used": None
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 找到可用的图片列
|
|||
|
|
image_col = None
|
|||
|
|
for col in original_image_cols:
|
|||
|
|
if col in original_df.columns:
|
|||
|
|
image_col = col
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if image_col is None:
|
|||
|
|
raise ValueError(f"原始文件中未找到图片列,尝试过: {original_image_cols}")
|
|||
|
|
|
|||
|
|
stats["image_col_used"] = image_col
|
|||
|
|
|
|||
|
|
# 从原始数据提取图片名作为关联键
|
|||
|
|
original_df["_image_name"] = original_df[image_col].apply(extract_image_name)
|
|||
|
|
|
|||
|
|
# 去重:原始数据可能有重复图片,保留第一条
|
|||
|
|
original_dedup = original_df.drop_duplicates(subset=["_image_name"], keep="first")
|
|||
|
|
|
|||
|
|
# 确定要添加的原始数据列(排除图片路径列和临时列)
|
|||
|
|
exclude_cols = set(original_image_cols + ["_image_name"])
|
|||
|
|
original_cols_to_add = [col for col in original_df.columns
|
|||
|
|
if col not in exclude_cols
|
|||
|
|
and col not in results_df.columns]
|
|||
|
|
|
|||
|
|
stats["original_columns_added"] = original_cols_to_add
|
|||
|
|
|
|||
|
|
# 创建图片名到原始数据的映射
|
|||
|
|
original_map = original_dedup.set_index("_image_name")[original_cols_to_add].to_dict("index")
|
|||
|
|
|
|||
|
|
# 合并:为结果数据添加原始数据列
|
|||
|
|
merged_df = results_df.copy()
|
|||
|
|
|
|||
|
|
# 初始化新列
|
|||
|
|
for col in original_cols_to_add:
|
|||
|
|
merged_df[col] = None
|
|||
|
|
|
|||
|
|
# 逐行匹配并填充
|
|||
|
|
matched_count = 0
|
|||
|
|
for idx, row in merged_df.iterrows():
|
|||
|
|
image_name = row[results_image_col]
|
|||
|
|
if image_name in original_map:
|
|||
|
|
for col in original_cols_to_add:
|
|||
|
|
merged_df.at[idx, col] = original_map[image_name].get(col)
|
|||
|
|
matched_count += 1
|
|||
|
|
|
|||
|
|
stats["merged_rows"] = len(merged_df)
|
|||
|
|
stats["matched_count"] = matched_count
|
|||
|
|
stats["unmatched_results"] = len(merged_df) - matched_count
|
|||
|
|
|
|||
|
|
return merged_df, stats
|
|||
|
|
|
|||
|
|
|
|||
|
|
def collect_and_merge_xlsx(
|
|||
|
|
source_dir: str,
|
|||
|
|
data_all_dir: str,
|
|||
|
|
output_dir: str,
|
|||
|
|
merge: bool = True,
|
|||
|
|
dry_run: bool = False
|
|||
|
|
) -> List[dict]:
|
|||
|
|
"""
|
|||
|
|
收集并合并 xlsx 文件
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
source_dir: batch_output 目录路径
|
|||
|
|
data_all_dir: data_all 目录路径
|
|||
|
|
output_dir: 输出目录路径
|
|||
|
|
merge: 是否合并原始数据
|
|||
|
|
dry_run: 预览模式
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
处理结果列表
|
|||
|
|
"""
|
|||
|
|
source_path = Path(source_dir)
|
|||
|
|
data_all_path = Path(data_all_dir)
|
|||
|
|
output_path = Path(output_dir)
|
|||
|
|
|
|||
|
|
if not source_path.exists():
|
|||
|
|
print(f"错误: 源目录不存在: {source_dir}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
# 创建输出目录
|
|||
|
|
if not dry_run:
|
|||
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
results = []
|
|||
|
|
|
|||
|
|
# 遍历子文件夹
|
|||
|
|
for folder in sorted(source_path.iterdir()):
|
|||
|
|
if not folder.is_dir():
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
folder_name = folder.name
|
|||
|
|
results_file = folder / "results.xlsx"
|
|||
|
|
|
|||
|
|
if not results_file.exists():
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 输出文件名
|
|||
|
|
output_file = output_path / f"{folder_name}.xlsx"
|
|||
|
|
|
|||
|
|
# 查找对应的原始数据文件
|
|||
|
|
original_file = data_all_path / f"{folder_name}_text_img.xlsx"
|
|||
|
|
|
|||
|
|
result_info = {
|
|||
|
|
"folder": folder_name,
|
|||
|
|
"results_file": str(results_file),
|
|||
|
|
"original_file": str(original_file) if original_file.exists() else None,
|
|||
|
|
"output_file": str(output_file),
|
|||
|
|
"merged": False,
|
|||
|
|
"stats": {}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if dry_run:
|
|||
|
|
if merge and original_file.exists():
|
|||
|
|
print(f"[预览] 合并: {folder_name}/results.xlsx + {folder_name}_text_img.xlsx -> {folder_name}.xlsx")
|
|||
|
|
else:
|
|||
|
|
print(f"[预览] 复制: {folder_name}/results.xlsx -> {folder_name}.xlsx")
|
|||
|
|
results.append(result_info)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 执行合并或复制
|
|||
|
|
if merge and original_file.exists():
|
|||
|
|
try:
|
|||
|
|
merged_df, stats = merge_xlsx_files(results_file, original_file)
|
|||
|
|
merged_df.to_excel(output_file, index=False, engine="openpyxl")
|
|||
|
|
|
|||
|
|
result_info["merged"] = True
|
|||
|
|
result_info["stats"] = stats
|
|||
|
|
|
|||
|
|
print(f"已合并: {folder_name}")
|
|||
|
|
print(f" - 分析结果: {stats['results_rows']} 行")
|
|||
|
|
print(f" - 原始数据: {stats['original_rows']} 行")
|
|||
|
|
print(f" - 匹配成功: {stats['matched_count']} 行")
|
|||
|
|
print(f" - 添加列: {stats['original_columns_added']}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"合并失败 {folder_name}: {e}")
|
|||
|
|
# 回退到复制模式
|
|||
|
|
import shutil
|
|||
|
|
shutil.copy2(results_file, output_file)
|
|||
|
|
print(f" 已回退到复制模式")
|
|||
|
|
else:
|
|||
|
|
# 只复制,不合并
|
|||
|
|
import shutil
|
|||
|
|
shutil.copy2(results_file, output_file)
|
|||
|
|
|
|||
|
|
if merge and not original_file.exists():
|
|||
|
|
print(f"已复制: {folder_name} (原始数据不存在: {folder_name}_text_img.xlsx)")
|
|||
|
|
else:
|
|||
|
|
print(f"已复制: {folder_name}")
|
|||
|
|
|
|||
|
|
results.append(result_info)
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
parser = argparse.ArgumentParser(
|
|||
|
|
description="收集并合并 batch_output 和 data_all 中的 xlsx 文件",
|
|||
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|||
|
|
epilog="""
|
|||
|
|
示例:
|
|||
|
|
python3 collect_xlsx.py # 默认合并并输出
|
|||
|
|
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
|
|||
|
|
python3 collect_xlsx.py --no-merge # 不合并,只复制
|
|||
|
|
python3 collect_xlsx.py -n # 预览模式
|
|||
|
|
"""
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"-s", "--source",
|
|||
|
|
default="../data/batch_output",
|
|||
|
|
help="batch_output 目录路径 (默认: ../data/batch_output)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"-d", "--data-all",
|
|||
|
|
default="../data/data_all",
|
|||
|
|
help="data_all 目录路径 (默认: ../data/data_all)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"-o", "--output",
|
|||
|
|
default="../data/collected_xlsx",
|
|||
|
|
help="输出目录路径 (默认: ../data/collected_xlsx)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--no-merge",
|
|||
|
|
action="store_true",
|
|||
|
|
help="不合并原始数据,只复制分析结果"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
parser.add_argument(
|
|||
|
|
"-n", "--dry-run",
|
|||
|
|
action="store_true",
|
|||
|
|
help="预览模式,只打印不执行"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
args = parser.parse_args()
|
|||
|
|
|
|||
|
|
# 转换为绝对路径
|
|||
|
|
script_dir = Path(__file__).parent
|
|||
|
|
source_dir = (script_dir / args.source).resolve()
|
|||
|
|
data_all_dir = (script_dir / args.data_all).resolve()
|
|||
|
|
output_dir = (script_dir / args.output).resolve()
|
|||
|
|
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("收集并合并 xlsx 文件")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(f"分析结果目录: {source_dir}")
|
|||
|
|
print(f"原始数据目录: {data_all_dir}")
|
|||
|
|
print(f"输出目录: {output_dir}")
|
|||
|
|
print(f"合并模式: {'否' if args.no_merge else '是'}")
|
|||
|
|
print("-" * 60)
|
|||
|
|
|
|||
|
|
results = collect_and_merge_xlsx(
|
|||
|
|
str(source_dir),
|
|||
|
|
str(data_all_dir),
|
|||
|
|
str(output_dir),
|
|||
|
|
merge=not args.no_merge,
|
|||
|
|
dry_run=args.dry_run
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print("-" * 60)
|
|||
|
|
merged_count = sum(1 for r in results if r.get("merged"))
|
|||
|
|
print(f"共处理 {len(results)} 个文件")
|
|||
|
|
if not args.no_merge:
|
|||
|
|
print(f" - 合并成功: {merged_count}")
|
|||
|
|
print(f" - 仅复制: {len(results) - merged_count}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|