fix: update keywords_match

This commit is contained in:
2026-01-18 18:25:36 +08:00
parent 29f6e25f70
commit 4ed90734df
7 changed files with 1406 additions and 269 deletions

315
scripts/collect_xlsx.py Normal file
View File

@@ -0,0 +1,315 @@
#!/usr/bin/env python3
"""
收集并合并 xlsx 文件
功能:
1. 从 data/batch_output 子文件夹收集 results.xlsx图片分析结果
2. 与 data/data_all 中对应的原始数据({name}_text_img.xlsx合并
3. 通过图片名关联两个数据源
4. 保存合并后的文件到目标目录
用法:
python3 collect_xlsx.py # 默认合并并输出
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
python3 collect_xlsx.py --no-merge # 不合并,只复制
python3 collect_xlsx.py -n # 预览模式
"""
import argparse
from pathlib import Path
from typing import Optional, Tuple, List
import pandas as pd
def extract_image_name(path: str) -> str:
"""
从完整路径提取图片文件名
支持 Windows 和 Unix 路径格式
"""
if pd.isna(path):
return ""
path_str = str(path).strip()
if not path_str:
return ""
# 同时处理 Windows (\) 和 Unix (/) 路径分隔符
# 先统一替换为 /,再提取文件名
normalized = path_str.replace("\\", "/")
filename = normalized.split("/")[-1]
return filename
def merge_xlsx_files(
results_file: Path,
original_file: Path,
results_image_col: str = "image_name",
original_image_cols: list = None,
original_text_col: str = "文本"
) -> Tuple[pd.DataFrame, dict]:
"""
合并分析结果和原始数据
Args:
results_file: 分析结果文件 (batch_output/.../results.xlsx)
original_file: 原始数据文件 (data_all/..._text_img.xlsx)
results_image_col: 结果文件中的图片名列
original_image_cols: 原始文件中可能的图片路径列(按优先级)
original_text_col: 原始文件中的文本列
Returns:
合并后的 DataFrame 和统计信息
"""
if original_image_cols is None:
original_image_cols = ["图片_新", "图片", "图片链接"]
# 读取文件
results_df = pd.read_excel(results_file)
original_df = pd.read_excel(original_file)
stats = {
"results_rows": len(results_df),
"original_rows": len(original_df),
"merged_rows": 0,
"unmatched_results": 0,
"original_columns_added": [],
"image_col_used": None
}
# 找到可用的图片列
image_col = None
for col in original_image_cols:
if col in original_df.columns:
image_col = col
break
if image_col is None:
raise ValueError(f"原始文件中未找到图片列,尝试过: {original_image_cols}")
stats["image_col_used"] = image_col
# 从原始数据提取图片名作为关联键
original_df["_image_name"] = original_df[image_col].apply(extract_image_name)
# 去重:原始数据可能有重复图片,保留第一条
original_dedup = original_df.drop_duplicates(subset=["_image_name"], keep="first")
# 确定要添加的原始数据列(排除图片路径列和临时列)
exclude_cols = set(original_image_cols + ["_image_name"])
original_cols_to_add = [col for col in original_df.columns
if col not in exclude_cols
and col not in results_df.columns]
stats["original_columns_added"] = original_cols_to_add
# 创建图片名到原始数据的映射
original_map = original_dedup.set_index("_image_name")[original_cols_to_add].to_dict("index")
# 合并:为结果数据添加原始数据列
merged_df = results_df.copy()
# 初始化新列
for col in original_cols_to_add:
merged_df[col] = None
# 逐行匹配并填充
matched_count = 0
for idx, row in merged_df.iterrows():
image_name = row[results_image_col]
if image_name in original_map:
for col in original_cols_to_add:
merged_df.at[idx, col] = original_map[image_name].get(col)
matched_count += 1
stats["merged_rows"] = len(merged_df)
stats["matched_count"] = matched_count
stats["unmatched_results"] = len(merged_df) - matched_count
return merged_df, stats
def collect_and_merge_xlsx(
source_dir: str,
data_all_dir: str,
output_dir: str,
merge: bool = True,
dry_run: bool = False
) -> List[dict]:
"""
收集并合并 xlsx 文件
Args:
source_dir: batch_output 目录路径
data_all_dir: data_all 目录路径
output_dir: 输出目录路径
merge: 是否合并原始数据
dry_run: 预览模式
Returns:
处理结果列表
"""
source_path = Path(source_dir)
data_all_path = Path(data_all_dir)
output_path = Path(output_dir)
if not source_path.exists():
print(f"错误: 源目录不存在: {source_dir}")
return []
# 创建输出目录
if not dry_run:
output_path.mkdir(parents=True, exist_ok=True)
results = []
# 遍历子文件夹
for folder in sorted(source_path.iterdir()):
if not folder.is_dir():
continue
folder_name = folder.name
results_file = folder / "results.xlsx"
if not results_file.exists():
continue
# 输出文件名
output_file = output_path / f"{folder_name}.xlsx"
# 查找对应的原始数据文件
original_file = data_all_path / f"{folder_name}_text_img.xlsx"
result_info = {
"folder": folder_name,
"results_file": str(results_file),
"original_file": str(original_file) if original_file.exists() else None,
"output_file": str(output_file),
"merged": False,
"stats": {}
}
if dry_run:
if merge and original_file.exists():
print(f"[预览] 合并: {folder_name}/results.xlsx + {folder_name}_text_img.xlsx -> {folder_name}.xlsx")
else:
print(f"[预览] 复制: {folder_name}/results.xlsx -> {folder_name}.xlsx")
results.append(result_info)
continue
# 执行合并或复制
if merge and original_file.exists():
try:
merged_df, stats = merge_xlsx_files(results_file, original_file)
merged_df.to_excel(output_file, index=False, engine="openpyxl")
result_info["merged"] = True
result_info["stats"] = stats
print(f"已合并: {folder_name}")
print(f" - 分析结果: {stats['results_rows']}")
print(f" - 原始数据: {stats['original_rows']}")
print(f" - 匹配成功: {stats['matched_count']}")
print(f" - 添加列: {stats['original_columns_added']}")
except Exception as e:
print(f"合并失败 {folder_name}: {e}")
# 回退到复制模式
import shutil
shutil.copy2(results_file, output_file)
print(f" 已回退到复制模式")
else:
# 只复制,不合并
import shutil
shutil.copy2(results_file, output_file)
if merge and not original_file.exists():
print(f"已复制: {folder_name} (原始数据不存在: {folder_name}_text_img.xlsx)")
else:
print(f"已复制: {folder_name}")
results.append(result_info)
return results
def main():
parser = argparse.ArgumentParser(
description="收集并合并 batch_output 和 data_all 中的 xlsx 文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python3 collect_xlsx.py # 默认合并并输出
python3 collect_xlsx.py -o ../data/merged # 指定输出目录
python3 collect_xlsx.py --no-merge # 不合并,只复制
python3 collect_xlsx.py -n # 预览模式
"""
)
parser.add_argument(
"-s", "--source",
default="../data/batch_output",
help="batch_output 目录路径 (默认: ../data/batch_output)"
)
parser.add_argument(
"-d", "--data-all",
default="../data/data_all",
help="data_all 目录路径 (默认: ../data/data_all)"
)
parser.add_argument(
"-o", "--output",
default="../data/collected_xlsx",
help="输出目录路径 (默认: ../data/collected_xlsx)"
)
parser.add_argument(
"--no-merge",
action="store_true",
help="不合并原始数据,只复制分析结果"
)
parser.add_argument(
"-n", "--dry-run",
action="store_true",
help="预览模式,只打印不执行"
)
args = parser.parse_args()
# 转换为绝对路径
script_dir = Path(__file__).parent
source_dir = (script_dir / args.source).resolve()
data_all_dir = (script_dir / args.data_all).resolve()
output_dir = (script_dir / args.output).resolve()
print("=" * 60)
print("收集并合并 xlsx 文件")
print("=" * 60)
print(f"分析结果目录: {source_dir}")
print(f"原始数据目录: {data_all_dir}")
print(f"输出目录: {output_dir}")
print(f"合并模式: {'' if args.no_merge else ''}")
print("-" * 60)
results = collect_and_merge_xlsx(
str(source_dir),
str(data_all_dir),
str(output_dir),
merge=not args.no_merge,
dry_run=args.dry_run
)
print("-" * 60)
merged_count = sum(1 for r in results if r.get("merged"))
print(f"共处理 {len(results)} 个文件")
if not args.no_merge:
print(f" - 合并成功: {merged_count}")
print(f" - 仅复制: {len(results) - merged_count}")
if __name__ == "__main__":
main()