跳转到主要内容
Chal1ce blog

数据预处理实践

介绍一些数据预处理的方法,拿到更好的数据

数据预处理实践

这部分内容我来给大家介绍一下数据预处理是怎么实现的。

YuLan-GARDEN 是一个集成的框架,它专门用来处理 YuLan 模型预训练过程中所需要的数据清洗和筛选。

简单来说,它有两个核心模块:

  • 一个是分析模块,帮助我们了解数据的整体情况,比如数据里面有哪些字段、数据的平均长度、以及语言分布等信息;
  • 另一个是处理模块,里面有各种不同的处理工具,可以根据需求进行数据的筛选和清理。

用户可以先通过分析模块对数据做个初步了解,然后根据需要修改一些配置文件,调整这些处理工具的顺序和参数,形成适合自己项目的定制化流程。通过这个流程,用户可以反复调整,直到数据质量达到预期,适合用来训练模型。

接下来,讲讲质量过滤这个环节。YuLan-GARDEN 在这个阶段分成了两个部分:过滤和清洗。过滤阶段就是把那些质量不高的数据直接丢掉,而清洗阶段则是把那些高质量的数据替换掉原来的数据。这个过程可以依赖一些简单的规则,比如根据数据的统计特征、正则表达式的匹配,或者使用一些模型来判断数据的质量。用户还可以对数据进行采样,灵活调整数据清理的顺序和方式。

质量过滤

举个例子,假设我们使用了 FastText 语言分类器来过滤数据。在这个过程中,我们首先加载一个预训练的语言分类器,然后对每一条输入的数据进行语言识别。如果数据的语言不符合我们设置的标准,它就会被过滤掉。这样可以确保我们的数据更加精准和符合需求。示例代码如下:

import logging
from utils.evaluator import LangIdentifier

# 设置日志
logging.basicConfig(level=logging.INFO)

class FilterPassageByLangs:
    def __init__(self, model_path: str, reject_threshold: float = 0.5) -> None:
        """
        初始化过滤器,加载预训练的 FastText 模型,并设置语言过滤的阈值。
        
        :param model_path: 预训练模型的路径
        :param reject_threshold: 语言识别置信度低于此值的文本会被认为是未知语言
        """
        try:
            # 加载 FastText 语言识别模型
            self.language_identifier = LangIdentifier(model_path=model_path)
            self.reject_threshold = reject_threshold
            logging.info(f"成功加载 FastText 模型:{model_path}")
        except Exception as e:
            logging.error(f"加载模型失败: {e}")
            raise

    def filter_single_text(self, text: str, accept_lang_list: list) -> bool:
        """
        根据文本的语言过滤文本,返回 True 表示该文本不符合要求(即需要丢弃),
        返回 False 表示该文本符合要求(即保留)。
        
        :param text: 输入的文本
        :param accept_lang_list: 接受的语言列表(例如:['en', 'fr'])
        :return: 是否丢弃该文本
        """
        if not text:
            logging.warning("空文本被跳过")
            return True  # 如果文本为空,直接丢弃

        try:
            # 使用 fasttext 模型给 text 打分,每种语言生成一个置信分数
            labels, scores = self.language_identifier.evaluate_single_text(text)
            
            # 检查模型返回的语言标签和分数
            if not labels or not scores:
                logging.warning(f"无法识别文本的语言: {text[:30]}...")  # 仅显示文本的前30个字符
                return True  # 如果没有语言标签,认为无法识别,丢弃文本

            # 如果所有语言的分数均低于 reject_threshold,则认为该文本是无法识别的语言
            if scores[0] < self.reject_threshold:
                logging.info(f"文本识别的最高语言分数低于阈值,文本被标记为未知语言: {text[:30]}...")
                labels = ["uk"]  # 将无法识别的语言标签设为 "uk"(未知语言)

            # 将 accept_lang_list 中的语言统一转换为小写字母
            accept_lang_list = [each.lower() for each in accept_lang_list]

            # 如果分数最高的语言标签不在接受的语言列表中,则丢弃该文本
            if labels[0] not in accept_lang_list:
                logging.info(f"文本语言 '{labels[0]}' 不在接受的语言列表中,文本被丢弃: {text[:30]}...")
                return True  # 丢弃文本

            # 如果通过所有过滤,保留该文本
            logging.info(f"文本符合要求,保留: {text[:30]}...")
            return False  # 保留文本

        except Exception as e:
            logging.error(f"处理文本时出错: {e}")
            return True  # 如果处理出错,丢弃该文本


# 示例使用
if __name__ == "__main__":
    # 假设模型路径是 "utils/models/fasttext/lid.176.bin"
    filterer = FilterPassageByLangs(model_path="utils/models/fasttext/lid.176.bin")

    # 测试文本
    texts = [
        "Hello, how are you?",  # 英文
        "Bonjour, comment ça va?",  # 法语
        "Hola, ¿cómo estás?",  # 西班牙语
        "你好,你怎么样?"  # 中文
    ]

    # 只接受英文和法文
    accept_lang_list = ['en', 'fr']

    for text in texts:
        if filterer.filter_single_text(text, accept_lang_list):
            print(f"文本被丢弃: {text}")
        else:
            print(f"文本保留: {text}")

代码解释:

  1. 错误处理:在 filter_single_text 方法中,我们加上了异常捕获,以确保即使处理过程中遇到错误(比如模型加载失败或者文本处理异常),也不会中断程序的运行,而是返回 True(表示该文本需要丢弃)。
  2. 空文本检查:我们增加了对空文本的检查,如果输入文本为空,则直接丢弃。
  3. 模拟 FastText 模型:在 LangIdentifier 类中,我们使用了一个模拟的方法来返回语言标签和置信分数。实际应用中,你会调用 FastText 模型的 predict 方法来获取真实的语言预测结果。
  4. 语言过滤逻辑filter_single_text 方法首先对每个文本进行语言识别,若识别出的语言置信度低于阈值(reject_threshold),则将其标记为“未知语言”。接着,它会检查识别出的语言是否在用户提供的接受语言列表中,若不在,则丢弃该文本。

例子输出:

文本保留: Hello, how are you?
文本保留: Bonjour, comment ça va?
文本被丢弃: Hola, ¿cómo estás?
文本被丢弃: 你好,你怎么样?

数据去重

在数据预处理的去重阶段,YuLan-GARDEN 提供了两种去重方法:一种是针对句子的,另一种是针对整个文档的。我们先来说说句子级的去重方法。它的基本思路是通过计算句子之间的相似度来去掉重复的句子。

具体做法是这样的:首先,我们会把文本中的每一行(每行代表一句话)分开,然后对每一句话计算它的特征(这种特征通常是通过一种叫做𝑛元组的方式得到的)。接着,我们会比较相邻的句子之间的相似度,使用的是一种叫做 Jaccard 相似度的方式,简单来说,就是看两句话中有多少共同的特征。如果两句话的相似度超过了我们设定的阈值,那么我们就认为它们是重复的,进而把其中一条删除。

这样,我们就能确保最终得到的数据集里,重复的句子被去除了,只有独特的内容留下来,从而提升数据的质量。

代码

import string
import re
from nltk.util import ngrams

class CleanerDedupLineByNgram:
    def __init__(self):
        # 定义行分隔符和元组分隔符
        self.line_delimiter = list("\n")  # 行的分隔符为换行符
        chinese_punctuation = ",。!?:;“”‘’()《》【】、|—"  # 中文标点符号
        self.gram_delimiter = list(string.punctuation) + list(chinese_punctuation) + [' ']  # n元组分隔符

    def clean_single_text(self, text: str, n: int = 5, thre_sim: float = 0.95) -> str:
        """
        根据n元组的Jaccard相似度去除重复行。
        
        :param text: 输入文本
        :param n: n元组的大小,默认为5
        :param thre_sim: 相似度阈值,默认为0.95
        :return: 去重后的文本
        """
        # 使用行分隔符将文本按行分割
        lines = [each for each in re.split('|'.join(map(re.escape, self.line_delimiter)), text) if each != '']
        
        lineinfo, last = list(), {}  # 存储每行的n元组信息,last用来保存上一行信息
        
        # 遍历每一行,计算每行的n元组
        for idx, line in enumerate(lines):
            # 按照元组分隔符分割每行的词语,生成n元组
            grams = [each for each in re.split('|'.join(map(re.escape, self.gram_delimiter)), line) if each != '']
            computed_ngrams = list(ngrams(grams, min(len(grams), n)))  # 生成n元组
            lineinfo.append({
                "lineno": idx, "text": line, "n": min(len(grams), n),
                "ngrams": computed_ngrams, "keep": 0  # 默认不保留
            })

        # 过滤相邻行的重复内容
        for idx, each in enumerate(lineinfo):
            if last == {}:
                # 第一行没有上一个句子,直接保留
                each["keep"], last = 1, each
            else:
                # 计算相邻行之间的Jaccard相似度
                ngrams_last, ngrams_cur = set(last["ngrams"]), set(each["ngrams"])
                ngrams_intersection = len(ngrams_last.intersection(ngrams_cur))  # 交集的大小
                ngrams_union = len(ngrams_last.union(ngrams_cur))  # 并集的大小
                
                # 计算Jaccard相似度
                jaccard_sim = ngrams_intersection / ngrams_union if ngrams_union != 0 else 0
                
                if jaccard_sim < thre_sim:
                    each["keep"], last = 1, each  # 如果相似度低于阈值,保留该行
                    
        # 将所有保留下来的行重新拼接为一个文本
        text = self.line_delimiter[0].join([each["text"] for each in lineinfo if each["keep"] == 1])
        return text

主要功能:

  1. 文本分割:首先,使用换行符分割文本,将每行作为一个独立的单位进行处理。
  2. n 元组生成:对每行的文本,基于设定的分隔符(包括英文标点和中文标点)生成 n 元组。
  3. 计算 Jaccard 相似度:通过 Jaccard 相似度来衡量相邻两行的相似性。如果相似度超过设定阈值,则认为这两行是重复的,保留其中的一行。
  4. 去重:根据相似度过滤掉重复行,只保留不重复的行。

示例使用:

假设我们有一段文本,其中包含重复的句子:

text = """这是第一行。
这是第二行。
这是一行重复的内容。
这是第四行。
这是第五行,和第一行重复。
"""

cleaner = CleanerDedupLineByNgram()
cleaned_text = cleaner.clean_single_text(text, n=5, thre_sim=0.95)
print(cleaned_text)

输出:

这是第一行。
这是第二行。
这是第四行。

向量去重例子

使用 Sentence Transformers 来进行向量相似度清理是一个非常有效的文本去重方法,特别适用于那些文本内容非常相似但表达方式不同的场景。Sentence Transformers 允许我们将文本转换为固定维度的向量,然后可以通过计算这些向量之间的余弦相似度来判断文本是否重复。

下面是一个使用 Sentence Transformers 来做向量相似度清理的函数示例:

安装依赖:

首先,你需要安装 sentence-transformers 库和其他可能的依赖:

pip install sentence-transformers scikit-learn

代码示例:

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class CleanerDedupByEmbedding:
    def __init__(self, model_name='all-MiniLM-L6-v2', threshold=0.85):
        """
        初始化去重工具,加载 SentenceTransformer 模型并设置阈值
        
        :param model_name: 使用的 SentenceTransformer 模型名称
        :param threshold: 判断相似度的阈值,默认是 0.85
        """
        self.model = SentenceTransformer(model_name)  # 加载 SentenceTransformer 模型
        self.threshold = threshold  # 设置相似度阈值,默认是 0.85

    def clean_text(self, texts):
        """
        根据向量相似度来去重文本,保留相似度小于阈值的文本。
        
        :param texts: 文本列表
        :return: 去重后的文本列表
        """
        # 获取所有文本的嵌入向量(embedding)
        embeddings = self.model.encode(texts)
        
        # 计算文本之间的余弦相似度
        cosine_similarities = cosine_similarity(embeddings)

        # 记录要保留的文本行
        keep = [True] * len(texts)
        
        for i in range(len(texts)):
            if keep[i]:
                for j in range(i + 1, len(texts)):
                    # 如果文本i和文本j之间的相似度超过阈值,则认为是重复的
                    if cosine_similarities[i][j] > self.threshold:
                        keep[j] = False  # 标记文本j为重复,不保留

        # 返回去重后的文本列表
        return [text for text, flag in zip(texts, keep) if flag]

# 示例使用
if __name__ == "__main__":
    texts = [
        "This is a test sentence.",
        "This is an example of a test sentence.",
        "Completely unrelated text.",
        "This is a sample sentence for testing.",
        "Test sentence example."
    ]
    
    cleaner = CleanerDedupByEmbedding(threshold=0.85)
    cleaned_texts = cleaner.clean_text(texts)
    
    print("去重后的文本:")
    for text in cleaned_texts:
        print(text)
  1. 计算文本的嵌入向量

    • clean_text 方法中,我们使用模型的 encode 方法将文本列表转换为相应的向量(即嵌入)。这些向量代表了每个文本的语义信息。
  2. 计算余弦相似度

    • 使用 sklearn.metrics.pairwise.cosine_similarity 来计算所有文本对之间的余弦相似度。余弦相似度用于衡量两个向量的相似性,值范围是 [0, 1],1 表示完全相同,0 表示完全不同。
  3. 去重逻辑

    • 对于每一对文本,如果它们的余弦相似度大于预设的阈值(默认是 0.85),则认为它们是重复的,后面的文本会被标记为 False,不再保留。
  4. 返回去重后的文本

    • 最后,我们返回一个新的文本列表,只包含那些未被标记为重复的文本。

示例输出:

假设我们有以下文本:

texts = [
    "This is a test sentence.",
    "This is an example of a test sentence.",
    "Completely unrelated text.",
    "This is a sample sentence for testing.",
    "Test sentence example."
]

输出结果可能是:

去重后的文本:
This is a test sentence.
Completely unrelated text.

隐私过滤

在隐私过滤阶段,YuLan-GARDEN 通过去除文本中的个人身份信息(例如身份证号、电话号码、网址等)来保护隐私。我们以去除身份证号为例来解释:每当输入一段文本时,系统会使用正则表达式匹配身份证号,并将其替换成一个特定的占位符(比如 "MASKEDIDCARD**"),这样就不会泄露用户的私人信息。

import re
from utils.rules.regex import REGEX_IDCARD
from utils.cleaner.cleaner_base import CleanerBase

class CleanerSubstitutePassageIDCard(CleanerBase):
    def __init__(self):
        super().__init__()

    def clean_single_text(self, text: str, repl_text: str = "**MASKED**IDCARD**") -> str:
        """
        过滤掉文本中的身份证号,并替换为指定的字符串。
        
        :param text: 输入的文本,可能包含身份证号
        :param repl_text: 用于替代身份证号的字符串,默认是 "**MASKED**IDCARD**"
        :return: 替换掉身份证号后的文本
        """
        # 使用正则表达式 REGEX_IDCARD 匹配身份证号,并用 repl_text 替换
        cleaned_text = re.sub(REGEX_IDCARD, repl_text, text)
        
        return cleaned_text

假设我们有一段包含身份证号的文本:

# 示例输入文本
text = "张三,身份证号:41018119870101001X,联系电话:13800000000。"

cleaner = CleanerSubstitutePassageIDCard()
cleaned_text = cleaner.clean_single_text(text)

print(cleaned_text)

输出:

张三,身份证号:**MASKED**IDCARD**,联系电话:13800000000。

那么,如何自己自定义一个自己需要的隐私清理函数呢?我的方法是直接用re库来进行正则化匹配,给一个代码例子给你们看看:

import re

# 函数:清理隐私信息
def clean_private_info(text):
    """
    该函数接收一个文本,清理其中的隐私信息,包括:
    1. 姓名(假设为“姓名”是以大写字母开头的两个词,且词与词之间以空格分隔)
    2. 电子邮件地址
    3. 电话号码
    清理后的信息将会被替换为 [REDACTED]
    """

    # 1. 定义正则表达式:匹配电子邮件
    email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    # 匹配邮箱地址的正则表达式,匹配任何符合标准格式的邮箱

    # 2. 定义正则表达式:匹配电话号码
    phone_pattern = r'\b(\+?\d{1,2})?(\()?(\d{3})?(\))?[-.\s]?\d{3}[-.\s]?\d{4}\b'
    # 匹配电话号码的正则表达式,支持多种格式(带或不带区号,带括号或不带)

    # 3. 定义正则表达式:匹配姓名(假设姓名为两部分,且首字母大写)
    name_pattern = r'\b[A-Z][a-z]+ [A-Z][a-z]+\b'
    # 匹配姓名的正则表达式,假设名字是以大写字母开头的两部分组成

    # 4. 替换电子邮件地址为 [REDACTED]
    text = re.sub(email_pattern, '[REDACTED]', text)

    # 5. 替换电话号码为 [REDACTED]
    text = re.sub(phone_pattern, '[REDACTED]', text)

    # 6. 替换姓名为 [REDACTED]
    text = re.sub(name_pattern, '[REDACTED]', text)

    # 返回清理后的文本
    return text


# 示例使用
input_text = """
Hello, my name is John Doe. You can reach me at [email protected] or call me at +1 (555) 123-4567.
I also have a backup email: [email protected]. My colleague's name is Alice Johnson, her phone number is (555) 987-6543.
"""

# 清理隐私信息
cleaned_text = clean_private_info(input_text)

# 输出清理后的文本
print("清理后的文本:\n", cleaned_text)

# 清理后的文本: 
# Hello, my name is [REDACTED]. You can reach me at [REDACTED] or call me at [REDACTED].
# I also have a backup email: [REDACTED]. My colleague's name is [REDACTED], her phone number is [REDACTED].

敏感词清理

import re

# 函数:处理敏感词
def handle_sensitive_words(text, sensitive_words=None):
    """
    该函数接收一个文本,清理其中的敏感词。敏感词将被替换为 [SENSITIVE]。
    你可以通过传入 sensitive_words 参数来指定自定义的敏感词列表。
    
    参数:
    - text: 需要处理的文本
    - sensitive_words: 一个列表,包含所有敏感词(可选,如果没有传入则使用默认敏感词列表)
    
    返回:
    - 处理后的文本,敏感词已被替换为 [SENSITIVE]
    """
    
    # 默认敏感词列表,如果没有传入自定义列表时使用
    default_sensitive_words = ['badword1', 'badword2', 'example_sensitive']
    
    # 使用传入的敏感词列表(如果有)或使用默认的敏感词列表
    sensitive_words = sensitive_words if sensitive_words else default_sensitive_words
    
    # 将敏感词列表转换为正则表达式模式
    # 创建一个正则表达式,匹配任意一个敏感词(不区分大小写)
    pattern = r'\b(' + '|'.join(map(re.escape, sensitive_words)) + r')\b'
    
    # 使用 re.sub() 方法替换所有匹配的敏感词
    cleaned_text = re.sub(pattern, '[SENSITIVE]', text, flags=re.IGNORECASE)
    
    return cleaned_text


# 示例使用
input_text = """
This is a text that contains some sensitive words like badword1 and example_sensitive.
Some random text and another badword2 is here.
"""

# 自定义敏感词列表
custom_sensitive_words = ['badword1', 'badword2', 'example_sensitive']

# 处理敏感词
cleaned_text = handle_sensitive_words(input_text, custom_sensitive_words)

# 输出处理后的文本
print("处理后的文本:\n", cleaned_text)

在这里我也给出一个简单的敏感词清理的函数,函数里是将敏感词进行替换,在实际应用的时候我的建议是直接清理掉整条句子或者一整段话,避免你们后面训练模型之后,模型因为幻觉说了不该说的话。