#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多模式关键词匹配工具(重构版) - CAS号识别:专注于 `CAS号` 列,支持多种格式(-, 空格, 无分隔符等) - 模糊识别:对所有候选文本(含CAS)进行容错匹配 """ import argparse import re import sys import time import traceback from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Set, Optional, Tuple import pandas as pd # 可选依赖 try: import ahocorasick HAS_AC = True except ImportError: HAS_AC = False # ========== 常量定义 ========== SEPARATOR = "|||" MATCH_RESULT_SEPARATOR = " | " PROGRESS_INTERVAL = 1000 DEFAULT_FUZZY_THRESHOLD = 85 # CAS号正则表达式 # 匹配格式:2-7位数字 + 分隔符(可选) + 2位数字 + 分隔符(可选) + 1位数字 # 支持分隔符:- (连字符), 空格, . (点), _ (下划线), 或无分隔符 CAS_REGEX_PATTERN = r'\b(\d{2,7})[\s\-._]?(\d{2})[\s\-._]?(\d)\b' MODE_KEYWORD_COLUMNS: Dict[str, List[str]] = { "cas": ["CAS号"], "exact": ["中文名", "英文名", "CAS号", "简称", "可能名称"], } MODE_LABELS = { "cas": "CAS号识别", "exact": "精确匹配", } # 常见的文本列名(按优先级排序) COMMON_TEXT_COLUMNS = [ "detected_text", # 新格式(图片分析结果) "文本", # 旧格式 / 合并后的原始文本 "text", "content", "summary", ] # 默认多列匹配组合 DEFAULT_TEXT_COLUMNS = ["detected_text", "文本"] # ========== 数据类 ========== @dataclass class MatchResult: """匹配结果数据类""" matched_indices: List[int] matched_keywords: List[str] elapsed_time: float total_rows: int matcher_name: str @property def match_count(self) -> int: return len(self.matched_indices) @property def match_rate(self) -> float: return (self.match_count / self.total_rows * 100) if self.total_rows > 0 else 0.0 @property def speed(self) -> float: return (self.total_rows / self.elapsed_time) if self.elapsed_time > 0 else 0.0 # ========== 工具函数 ========== def normalize_cas(cas_str: str) -> str: """ 将各种格式的CAS号规范化为标准格式 XXX-XX-X 支持的输入格式: - 123-45-6 (标准格式) - 123 45 6 (空格分隔) - 123.45.6 (点分隔) - 123_45_6 (下划线分隔) - 12345 6 或 1234 56 (部分分隔) - 123456 (无分隔符,仅当总长度正确时) 返回:标准格式的CAS号,如果无法解析则返回原字符串 """ if not cas_str or not isinstance(cas_str, str): return str(cas_str) # 移除所有非数字字符,只保留数字 digits_only = re.sub(r'[^\d]', '', cas_str) # CAS号至少需要5位数字(最短格式:XX-XX-X) if len(digits_only) < 5: return cas_str # 重新格式化为标准格式:前n-3位-中间2位-最后1位 # 例如:123456 -> 1234-5-6, 12345 -> 123-4-5 return f"{digits_only[:-3]}-{digits_only[-3:-1]}-{digits_only[-1]}" def extract_cas_numbers(text: str, pattern: str = CAS_REGEX_PATTERN) -> Set[str]: """ 从文本中提取所有CAS号并规范化 参数: text: 待搜索的文本 pattern: CAS号正则表达式 返回:规范化后的CAS号集合 """ if not text: return set() matches = re.finditer(pattern, str(text)) cas_numbers = set() for match in matches: # 提取完整匹配 raw_cas = match.group(0) # 规范化 normalized = normalize_cas(raw_cas) cas_numbers.add(normalized) return cas_numbers def split_value(value: str, separator: str) -> List[str]: """将单元格内容拆分为多个候选关键词""" if separator and separator in value: parts = value.split(separator) else: parts = [value] return [part.strip() for part in parts if part and part.strip()] def detect_text_columns( df: pd.DataFrame, specified_columns: Optional[List[str]] = None ) -> List[str]: """ 检测并验证文本列名 参数: df: 数据框 specified_columns: 用户指定的列名列表 返回:存在的文本列名列表 异常:如果找不到任何合适的列则抛出 ValueError """ # 如果用户指定了列名 if specified_columns: available = [col for col in specified_columns if col in df.columns] missing = [col for col in specified_columns if col not in df.columns] if missing: print(f"警告: 以下指定的列不存在: {missing}") if available: return available else: print("警告: 所有指定的列都不存在,尝试自动检测...") # 自动检测:优先使用默认多列组合 available_default = [col for col in DEFAULT_TEXT_COLUMNS if col in df.columns] if available_default: print(f"自动检测到文本列: {available_default}") return available_default # 回退:使用第一个找到的常见列 for col in COMMON_TEXT_COLUMNS: if col in df.columns: print(f"自动检测到文本列: ['{col}']") return [col] # 都没找到,抛出异常 raise ValueError( f"无法自动检测文本列。可用列: {df.columns.tolist()}\n" f"请使用 -c 参数指定文本列名" ) def combine_text_columns(row: pd.Series, text_columns: List[str]) -> str: """ 合并多列文本内容 参数: row: DataFrame 的一行 text_columns: 要合并的列名列表 返回:合并后的文本(用换行符分隔) """ texts = [] for col in text_columns: val = row.get(col) if pd.notna(val) and str(val).strip(): texts.append(str(val).strip()) return "\n".join(texts) def load_keywords_for_mode( df: pd.DataFrame, mode: str, separator: str = SEPARATOR ) -> Set[str]: """根据模式加载关键词集合""" mode_lower = mode.lower() if mode_lower not in MODE_KEYWORD_COLUMNS: raise ValueError(f"不支持的模式: {mode}") target_columns = MODE_KEYWORD_COLUMNS[mode_lower] available_columns = [col for col in target_columns if col in df.columns] missing_columns = [col for col in target_columns if col not in df.columns] if not available_columns: raise ValueError( f"模式 '{mode_lower}' 需要的列 {target_columns} 均不存在," f"当前可用列: {df.columns.tolist()}" ) if missing_columns: print(f"警告: 以下列在关键词文件中缺失: {missing_columns}") keywords: Set[str] = set() for column in available_columns: for value in df[column].dropna(): value_str = str(value).strip() if not value_str or value_str in ['#N/A#', 'nan', 'None']: continue for token in split_value(value_str, separator): # CAS模式下规范化CAS号 if mode_lower == "cas": token = normalize_cas(token) keywords.add(token) label = MODE_LABELS.get(mode_lower, mode_lower) print(f"模式「{label}」共加载 {len(keywords)} 个候选关键词,来源列: {available_columns}") if not keywords: print(f"警告: 模式 '{mode_lower}' 未加载到任何关键词!") return keywords def show_progress( current: int, total: int, start_time: float, matched_count: int, interval: int = PROGRESS_INTERVAL ) -> None: """显示处理进度""" if (current + 1) % interval == 0: elapsed = time.time() - start_time speed = (current + 1) / elapsed print(f"已处理 {current + 1}/{total} 行,速度: {speed:.1f} 行/秒,匹配到 {matched_count} 行") # ========== 匹配器基类(策略模式) ========== class KeywordMatcher(ABC): """关键词匹配器抽象基类""" def __init__(self, name: str): self.name = name def match( self, df: pd.DataFrame, keywords: Set[str], text_columns: List[str] ) -> MatchResult: """执行匹配(模板方法) 参数: df: 数据框 keywords: 关键词集合 text_columns: 文本列名列表(支持多列) """ print(f"开始匹配(使用{self.name})...") print(f"搜索列: {text_columns}") self._prepare(keywords) matched_indices = [] matched_keywords_list = [] start_time = time.time() for idx in range(len(df)): row = df.iloc[idx] # 合并多列文本 combined_text = combine_text_columns(row, text_columns) if not combined_text: continue matches = self._match_single_text(combined_text, keywords) if matches: matched_indices.append(idx) formatted = self._format_matches(matches) matched_keywords_list.append(formatted) show_progress(idx, len(df), start_time, len(matched_indices)) elapsed = time.time() - start_time return MatchResult( matched_indices=matched_indices, matched_keywords=matched_keywords_list, elapsed_time=elapsed, total_rows=len(df), matcher_name=self.name ) def _prepare(self, keywords: Set[str]) -> None: """预处理(子类可选实现)""" pass @abstractmethod def _match_single_text(self, text: str, keywords: Set[str]) -> Set[str]: """匹配单条文本(子类必须实现)""" pass def _format_matches(self, matches: Set[str]) -> str: """格式化匹配结果(子类可重写)""" return MATCH_RESULT_SEPARATOR.join(sorted(matches)) # ========== 具体匹配器实现 ========== class AhoCorasickMatcher(KeywordMatcher): """Aho-Corasick 自动机匹配器""" def __init__(self): super().__init__("Aho-Corasick 自动机") self.automaton = None def _prepare(self, keywords: Set[str]) -> None: """构建自动机""" if not HAS_AC: raise RuntimeError("pyahocorasick 未安装") print("正在构建Aho-Corasick自动机...") self.automaton = ahocorasick.Automaton() for keyword in keywords: self.automaton.add_word(keyword, keyword) self.automaton.make_automaton() print("自动机构建完成") def _match_single_text(self, text: str, keywords: Set[str]) -> Set[str]: """使用自动机匹配""" matched = set() for end_index, keyword in self.automaton.iter(text): matched.add(keyword) return matched class SetMatcher(KeywordMatcher): """标准集合匹配器""" def __init__(self): super().__init__("标准集合匹配") def _match_single_text(self, text: str, keywords: Set[str]) -> Set[str]: """使用集合成员检查匹配""" return {kw for kw in keywords if kw in text} class CASRegexMatcher(KeywordMatcher): """CAS号正则表达式匹配器(支持多种格式)""" def __init__(self): super().__init__("CAS号正则匹配(支持多种格式)") self.pattern = re.compile(CAS_REGEX_PATTERN) def _match_single_text(self, text: str, keywords: Set[str]) -> Set[str]: """ 使用正则表达式提取CAS号,规范化后与关键词库比对 流程: 1. 用正则提取文本中所有可能的CAS号 2. 将提取的CAS号规范化为标准格式 3. 与关键词库(已规范化)进行比对 """ # 从文本中提取并规范化CAS号 found_cas_numbers = extract_cas_numbers(text, self.pattern) # 与关键词库求交集 matched = found_cas_numbers & keywords return matched class RegexExactMatcher(KeywordMatcher): """正则表达式精确匹配器(支持词边界)""" def __init__(self): super().__init__("正则表达式精确匹配(词边界)") self.pattern = None def _prepare(self, keywords: Set[str]) -> None: """构建正则表达式模式""" print("正在构建正则表达式模式...") # 转义所有特殊字符,并使用词边界 \b 确保完整词匹配 escaped_keywords = [re.escape(kw) for kw in keywords] # 构建正则模式:\b(keyword1|keyword2|...)\b # 词边界确保不会匹配到部分词 pattern_str = r'\b(' + '|'.join(escaped_keywords) + r')\b' self.pattern = re.compile(pattern_str) print(f"正则模式构建完成,共 {len(keywords)} 个关键词") def _match_single_text(self, text: str, keywords: Set[str]) -> Set[str]: """使用正则表达式精确匹配""" if not self.pattern: return set() # 查找所有匹配项 matches = self.pattern.findall(text) return set(matches) # ========== 匹配器工厂 ========== def create_matcher(algorithm: str, fuzzy_threshold: int = DEFAULT_FUZZY_THRESHOLD, mode: str = None) -> KeywordMatcher: """ 根据算法类型和模式创建匹配器 参数: algorithm: 匹配算法 (auto, set, exact) fuzzy_threshold: 已废弃,保留仅为向后兼容 mode: 检测模式 (cas, exact),用于选择特定匹配器 """ algorithm_lower = algorithm.lower() # CAS模式使用CAS正则匹配器 if mode and mode.lower() == "cas": return CASRegexMatcher() # exact模式或exact算法使用正则精确匹配器 if mode and mode.lower() == "exact": return RegexExactMatcher() if algorithm_lower == "exact": return RegexExactMatcher() elif algorithm_lower == "set": return SetMatcher() elif algorithm_lower == "auto": if HAS_AC: return AhoCorasickMatcher() else: print("警告: 未安装 pyahocorasick,使用标准匹配方法") return SetMatcher() else: raise ValueError(f"不支持的算法: {algorithm}") # ========== 结果处理 ========== def save_results( df: pd.DataFrame, result: MatchResult, output_file: str ) -> Optional[pd.DataFrame]: """保存匹配结果到 Excel""" if result.match_count == 0: print("未找到任何匹配") return None result_df = df.iloc[result.matched_indices].copy() result_df.insert(0, "匹配到的关键词", result.matched_keywords) result_df.to_excel(output_file, index=False, engine='openpyxl') print(f"结果已保存到: {output_file}") return result_df def print_statistics(result: MatchResult) -> None: """打印匹配统计信息""" print(f"\n{'='*60}") print(f"匹配完成!") print(f"匹配模式: {result.matcher_name}") print(f"总耗时: {result.elapsed_time:.2f} 秒") print(f"处理速度: {result.speed:.1f} 行/秒") print(f"匹配到 {result.match_count} 行数据 ({result.match_rate:.2f}%)") print(f"{'='*60}\n") def preview_results(result_df: pd.DataFrame, num_rows: int = 5) -> None: """预览匹配结果""" if result_df.empty: return print(f"前{num_rows}行匹配结果预览:") print("=" * 80) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', 100) print(result_df.head(num_rows)) print("=" * 80) print(f"\n✓ 总共匹配到 {len(result_df)} 行数据") # ========== 主流程 ========== def perform_matching( df: pd.DataFrame, keywords: Set[str], text_columns: List[str], output_file: str, algorithm: str = "auto", mode: str = None ) -> Optional[pd.DataFrame]: """执行完整的匹配流程 参数: df: 数据框 keywords: 关键词集合 text_columns: 文本列名列表(支持多列) output_file: 输出文件路径 algorithm: 匹配算法 mode: 匹配模式 """ # 验证列存在 missing_cols = [col for col in text_columns if col not in df.columns] if missing_cols: print(f"警告: 以下列不存在: {missing_cols}") text_columns = [col for col in text_columns if col in df.columns] if not text_columns: print(f"可用列名: {df.columns.tolist()}") raise ValueError("没有可用的文本列") print(f"文本文件共有 {len(df)} 行数据\n") # 创建匹配器并执行匹配 matcher = create_matcher(algorithm, mode=mode) result = matcher.match(df, keywords, text_columns) # 输出统计信息 print_statistics(result) # 保存结果 result_df = save_results(df, result, output_file) return result_df def process_single_mode( keywords_df: pd.DataFrame, text_df: pd.DataFrame, mode: str, text_columns: List[str], output_file: Path, separator: str = SEPARATOR, save_to_file: bool = True ) -> Optional[pd.DataFrame]: """ 处理单个检测模式 参数: text_columns: 文本列名列表(支持多列) 返回:匹配结果 DataFrame(包含原始索引) """ mode_lower = mode.lower() label = MODE_LABELS.get(mode_lower, mode_lower) print(f"\n{'='*60}") print(f">>> 正在执行识别模式: {label}") print(f"{'='*60}") # 加载关键词 keywords = load_keywords_for_mode(keywords_df, mode_lower, separator=separator) # 显示关键词样例 sample_keywords = list(keywords)[:10] if sample_keywords: print(f"\n{label} - 关键词样例(前10个):") for idx, kw in enumerate(sample_keywords, 1): print(f" {idx}. {kw}") print() # 选择算法 algorithm = "exact" if mode_lower == "exact" else "auto" # 执行匹配(如果不需要保存到文件,传入临时路径) temp_output = str(output_file) if save_to_file else "/tmp/temp_match.xlsx" result_df = perform_matching( df=text_df, keywords=keywords, text_columns=text_columns, output_file=temp_output, algorithm=algorithm, mode=mode_lower # 传递模式参数 ) # 如果不保存文件,删除临时文件 if not save_to_file and result_df is not None: import os if os.path.exists(temp_output): os.remove(temp_output) # 预览结果 if result_df is not None and save_to_file: preview_results(result_df) # 添加模式标识列 if result_df is not None: result_df['匹配模式'] = label return result_df def run_multiple_modes( keywords_file: Path, text_file: Path, output_file: Path, text_columns: Optional[List[str]], modes: List[str], separator: str = SEPARATOR ) -> None: """运行多个检测模式,合并结果到单一文件 参数: text_columns: 文本列名列表(支持多列),None 表示自动检测 """ # 验证文件存在 if not keywords_file.exists(): raise FileNotFoundError(f"找不到关键词文件: {keywords_file}") if not text_file.exists(): raise FileNotFoundError(f"找不到文本文件: {text_file}") # 加载数据 print(f"正在加载关键词文件: {keywords_file}") keywords_df = pd.read_excel(keywords_file) print(f"可用列: {keywords_df.columns.tolist()}\n") print(f"正在加载文本文件: {text_file}") text_df = pd.read_excel(text_file) # 自动检测或验证文本列 actual_text_columns = detect_text_columns(text_df, text_columns) print(f"使用文本列: {actual_text_columns}\n") # 验证模式 if not modes: raise ValueError("modes 不能为空,请至少指定一个模式") for mode in modes: if mode.lower() not in MODE_KEYWORD_COLUMNS: raise ValueError(f"不支持的识别模式: {mode}") # 收集所有模式的匹配结果 all_results = [] multiple_modes = len(modes) > 1 for mode in modes: mode_lower = mode.lower() # 处理该模式(不保存到单独文件) result_df = process_single_mode( keywords_df=keywords_df, text_df=text_df, mode=mode_lower, text_columns=actual_text_columns, output_file=output_file, # 这个参数在 save_to_file=False 时不使用 separator=separator, save_to_file=False # 不保存到单独文件 ) if result_df is not None and not result_df.empty: all_results.append(result_df) # 合并所有结果 if not all_results: print("\n所有模式均未匹配到数据") return print(f"\n{'='*60}") print("正在合并所有模式的匹配结果...") print(f"{'='*60}") # 合并结果 merged_df = merge_mode_results(all_results, text_df) # 保存合并后的结果 merged_df.to_excel(output_file, index=False, engine='openpyxl') print(f"\n合并结果已保存到: {output_file}") print(f" 总匹配行数: {len(merged_df)} 行") # 统计每个模式的贡献 print(f"\n各模式匹配统计:") for mode_result in all_results: mode_name = mode_result['匹配模式'].iloc[0] count = len(mode_result) print(f" {mode_name:20s}: {count:4d} 行") # 预览合并结果 print(f"\n{'='*60}") print("合并结果预览(前5行):") print(f"{'='*60}") preview_results(merged_df, num_rows=5) def merge_mode_results( results: List[pd.DataFrame], original_df: pd.DataFrame ) -> pd.DataFrame: """ 合并多个模式的匹配结果 策略: 1. 按原始数据行索引合并 2. 如果同一行被多个模式匹配,合并关键词和模式标识 3. 保留原始数据的所有列 """ if not results: return pd.DataFrame() # 记录每个原始行索引的匹配信息 row_matches = {} for result_df in results: for idx, row in result_df.iterrows(): if idx not in row_matches: # 首次出现该行 row_matches[idx] = { 'keywords': row['匹配到的关键词'], 'modes': [row['匹配模式']] } else: # 该行已被其他模式匹配过,合并关键词和模式 existing_keywords = row_matches[idx]['keywords'] new_keywords = row['匹配到的关键词'] # 合并关键词(去重) all_keywords = set(str(existing_keywords).split(' | ')) | set(str(new_keywords).split(' | ')) row_matches[idx]['keywords'] = ' | '.join(sorted(all_keywords)) # 添加模式标识 row_matches[idx]['modes'].append(row['匹配模式']) # 构建最终结果 final_indices = list(row_matches.keys()) final_df = original_df.loc[final_indices].copy() # 添加合并后的列 final_df.insert(0, '匹配到的关键词', [row_matches[idx]['keywords'] for idx in final_indices]) final_df.insert(1, '匹配模式', [' + '.join(row_matches[idx]['modes']) for idx in final_indices]) # 按原始索引排序 final_df = final_df.sort_index() return final_df # ========== 命令行接口 ========== def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser( description='多模式关键词匹配工具', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 使用默认配置(自动检测 detected_text 和 文本 列) python keyword_matcher.py # 仅执行 CAS 号识别 python keyword_matcher.py -m cas # 仅执行精确匹配 python keyword_matcher.py -m exact # 指定单个文本列 python keyword_matcher.py -c detected_text # 指定多个文本列 python keyword_matcher.py -c detected_text 文本 summary # 指定自定义文件路径 python keyword_matcher.py -k ../data/input/keywords.xlsx -t ../data/input/text.xlsx """ ) parser.add_argument( '-k', '--keywords', type=str, help='关键词文件路径 (默认: ../data/input/keywords.xlsx)' ) parser.add_argument( '-t', '--text', type=str, help='文本文件路径 (默认: ../data/input/clickin_text_img.xlsx)' ) parser.add_argument( '-o', '--output', type=str, help='输出文件路径 (默认: ../data/output/keyword_matched_results.xlsx)' ) parser.add_argument( '-c', '--text-columns', nargs='+', type=str, default=None, help='文本列名,支持多列 (默认: 自动检测 detected_text 和 文本)' ) parser.add_argument( '-m', '--modes', nargs='+', choices=['cas', 'exact'], default=['cas', 'exact'], help='识别模式 (默认: cas exact)' ) parser.add_argument( '--separator', type=str, default=SEPARATOR, help=f'关键词分隔符 (默认: {SEPARATOR})' ) return parser.parse_args() def main(): """主函数""" args = parse_args() # 确定文件路径 base_dir = Path(__file__).resolve().parent keywords_file = Path(args.keywords) if args.keywords else ( base_dir.parent / "data" / "input" / "keywords.xlsx" ) text_file = Path(args.text) if args.text else ( base_dir.parent / "data" / "input" / "clickin_text_img.xlsx" ) output_file = Path(args.output) if args.output else ( base_dir.parent / "data" / "output" / "keyword_matched_results.xlsx" ) # 显示依赖库状态 print("=" * 60) print("依赖库状态:") print(f" pyahocorasick: {'已安装 ✓' if HAS_AC else '未安装 ✗'}") print("=" * 60) print() if not HAS_AC: print("提示: 安装 pyahocorasick 可获得 5x 性能提升: pip install pyahocorasick\n") try: run_multiple_modes( keywords_file=keywords_file, text_file=text_file, output_file=output_file, text_columns=args.text_columns, modes=args.modes, separator=args.separator ) print("\n" + "=" * 60) print("✓ 所有模式处理完成!") print("=" * 60) except Exception as e: print(f"\n错误: {e}") traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()