Files
chem-risk-detect/scripts/collect_xlsx.py
2026-01-18 18:25:36 +08:00

316 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
收集并合并 xlsx 文件
功能:
1. 从 data/batch_output 子文件夹收集 results.xlsx图片分析结果
2. 与 data/data_all 中对应的原始数据({name}_text_img.xlsx合并
3. 通过图片名关联两个数据源
4. 保存合并后的文件到目标目录
用法:
python3 collect_xlsx.py # 默认合并并输出
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
python3 collect_xlsx.py --no-merge # 不合并,只复制
python3 collect_xlsx.py -n # 预览模式
"""
import argparse
from pathlib import Path
from typing import Optional, Tuple, List
import pandas as pd
def extract_image_name(path: str) -> str:
"""
从完整路径提取图片文件名
支持 Windows 和 Unix 路径格式
"""
if pd.isna(path):
return ""
path_str = str(path).strip()
if not path_str:
return ""
# 同时处理 Windows (\) 和 Unix (/) 路径分隔符
# 先统一替换为 /,再提取文件名
normalized = path_str.replace("\\", "/")
filename = normalized.split("/")[-1]
return filename
def merge_xlsx_files(
results_file: Path,
original_file: Path,
results_image_col: str = "image_name",
original_image_cols: list = None,
original_text_col: str = "文本"
) -> Tuple[pd.DataFrame, dict]:
"""
合并分析结果和原始数据
Args:
results_file: 分析结果文件 (batch_output/.../results.xlsx)
original_file: 原始数据文件 (data_all/..._text_img.xlsx)
results_image_col: 结果文件中的图片名列
original_image_cols: 原始文件中可能的图片路径列(按优先级)
original_text_col: 原始文件中的文本列
Returns:
合并后的 DataFrame 和统计信息
"""
if original_image_cols is None:
original_image_cols = ["图片_新", "图片", "图片链接"]
# 读取文件
results_df = pd.read_excel(results_file)
original_df = pd.read_excel(original_file)
stats = {
"results_rows": len(results_df),
"original_rows": len(original_df),
"merged_rows": 0,
"unmatched_results": 0,
"original_columns_added": [],
"image_col_used": None
}
# 找到可用的图片列
image_col = None
for col in original_image_cols:
if col in original_df.columns:
image_col = col
break
if image_col is None:
raise ValueError(f"原始文件中未找到图片列,尝试过: {original_image_cols}")
stats["image_col_used"] = image_col
# 从原始数据提取图片名作为关联键
original_df["_image_name"] = original_df[image_col].apply(extract_image_name)
# 去重:原始数据可能有重复图片,保留第一条
original_dedup = original_df.drop_duplicates(subset=["_image_name"], keep="first")
# 确定要添加的原始数据列(排除图片路径列和临时列)
exclude_cols = set(original_image_cols + ["_image_name"])
original_cols_to_add = [col for col in original_df.columns
if col not in exclude_cols
and col not in results_df.columns]
stats["original_columns_added"] = original_cols_to_add
# 创建图片名到原始数据的映射
original_map = original_dedup.set_index("_image_name")[original_cols_to_add].to_dict("index")
# 合并:为结果数据添加原始数据列
merged_df = results_df.copy()
# 初始化新列
for col in original_cols_to_add:
merged_df[col] = None
# 逐行匹配并填充
matched_count = 0
for idx, row in merged_df.iterrows():
image_name = row[results_image_col]
if image_name in original_map:
for col in original_cols_to_add:
merged_df.at[idx, col] = original_map[image_name].get(col)
matched_count += 1
stats["merged_rows"] = len(merged_df)
stats["matched_count"] = matched_count
stats["unmatched_results"] = len(merged_df) - matched_count
return merged_df, stats
def collect_and_merge_xlsx(
source_dir: str,
data_all_dir: str,
output_dir: str,
merge: bool = True,
dry_run: bool = False
) -> List[dict]:
"""
收集并合并 xlsx 文件
Args:
source_dir: batch_output 目录路径
data_all_dir: data_all 目录路径
output_dir: 输出目录路径
merge: 是否合并原始数据
dry_run: 预览模式
Returns:
处理结果列表
"""
source_path = Path(source_dir)
data_all_path = Path(data_all_dir)
output_path = Path(output_dir)
if not source_path.exists():
print(f"错误: 源目录不存在: {source_dir}")
return []
# 创建输出目录
if not dry_run:
output_path.mkdir(parents=True, exist_ok=True)
results = []
# 遍历子文件夹
for folder in sorted(source_path.iterdir()):
if not folder.is_dir():
continue
folder_name = folder.name
results_file = folder / "results.xlsx"
if not results_file.exists():
continue
# 输出文件名
output_file = output_path / f"{folder_name}.xlsx"
# 查找对应的原始数据文件
original_file = data_all_path / f"{folder_name}_text_img.xlsx"
result_info = {
"folder": folder_name,
"results_file": str(results_file),
"original_file": str(original_file) if original_file.exists() else None,
"output_file": str(output_file),
"merged": False,
"stats": {}
}
if dry_run:
if merge and original_file.exists():
print(f"[预览] 合并: {folder_name}/results.xlsx + {folder_name}_text_img.xlsx -> {folder_name}.xlsx")
else:
print(f"[预览] 复制: {folder_name}/results.xlsx -> {folder_name}.xlsx")
results.append(result_info)
continue
# 执行合并或复制
if merge and original_file.exists():
try:
merged_df, stats = merge_xlsx_files(results_file, original_file)
merged_df.to_excel(output_file, index=False, engine="openpyxl")
result_info["merged"] = True
result_info["stats"] = stats
print(f"已合并: {folder_name}")
print(f" - 分析结果: {stats['results_rows']}")
print(f" - 原始数据: {stats['original_rows']}")
print(f" - 匹配成功: {stats['matched_count']}")
print(f" - 添加列: {stats['original_columns_added']}")
except Exception as e:
print(f"合并失败 {folder_name}: {e}")
# 回退到复制模式
import shutil
shutil.copy2(results_file, output_file)
print(f" 已回退到复制模式")
else:
# 只复制,不合并
import shutil
shutil.copy2(results_file, output_file)
if merge and not original_file.exists():
print(f"已复制: {folder_name} (原始数据不存在: {folder_name}_text_img.xlsx)")
else:
print(f"已复制: {folder_name}")
results.append(result_info)
return results
def main():
parser = argparse.ArgumentParser(
description="收集并合并 batch_output 和 data_all 中的 xlsx 文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python3 collect_xlsx.py # 默认合并并输出
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
python3 collect_xlsx.py --no-merge # 不合并,只复制
python3 collect_xlsx.py -n # 预览模式
"""
)
parser.add_argument(
"-s", "--source",
default="../data/batch_output",
help="batch_output 目录路径 (默认: ../data/batch_output)"
)
parser.add_argument(
"-d", "--data-all",
default="../data/data_all",
help="data_all 目录路径 (默认: ../data/data_all)"
)
parser.add_argument(
"-o", "--output",
default="../data/collected_xlsx",
help="输出目录路径 (默认: ../data/collected_xlsx)"
)
parser.add_argument(
"--no-merge",
action="store_true",
help="不合并原始数据,只复制分析结果"
)
parser.add_argument(
"-n", "--dry-run",
action="store_true",
help="预览模式,只打印不执行"
)
args = parser.parse_args()
# 转换为绝对路径
script_dir = Path(__file__).parent
source_dir = (script_dir / args.source).resolve()
data_all_dir = (script_dir / args.data_all).resolve()
output_dir = (script_dir / args.output).resolve()
print("=" * 60)
print("收集并合并 xlsx 文件")
print("=" * 60)
print(f"分析结果目录: {source_dir}")
print(f"原始数据目录: {data_all_dir}")
print(f"输出目录: {output_dir}")
print(f"合并模式: {'' if args.no_merge else ''}")
print("-" * 60)
results = collect_and_merge_xlsx(
str(source_dir),
str(data_all_dir),
str(output_dir),
merge=not args.no_merge,
dry_run=args.dry_run
)
print("-" * 60)
merged_count = sum(1 for r in results if r.get("merged"))
print(f"共处理 {len(results)} 个文件")
if not args.no_merge:
print(f" - 合并成功: {merged_count}")
print(f" - 仅复制: {len(results) - merged_count}")
if __name__ == "__main__":
main()