数据预处理实践
介绍一些数据预处理的方法,拿到更好的数据
数据预处理实践
这部分内容我来给大家介绍一下数据预处理是怎么实现的。
YuLan-GARDEN
是一个集成的框架,它专门用来处理 YuLan 模型预训练过程中所需要的数据清洗和筛选。
简单来说,它有两个核心模块:
- 一个是分析模块,帮助我们了解数据的整体情况,比如数据里面有哪些字段、数据的平均长度、以及语言分布等信息;
- 另一个是处理模块,里面有各种不同的处理工具,可以根据需求进行数据的筛选和清理。
用户可以先通过分析模块对数据做个初步了解,然后根据需要修改一些配置文件,调整这些处理工具的顺序和参数,形成适合自己项目的定制化流程。通过这个流程,用户可以反复调整,直到数据质量达到预期,适合用来训练模型。
接下来,讲讲质量过滤这个环节。YuLan-GARDEN
在这个阶段分成了两个部分:过滤和清洗。过滤阶段就是把那些质量不高的数据直接丢掉,而清洗阶段则是把那些高质量的数据替换掉原来的数据。这个过程可以依赖一些简单的规则,比如根据数据的统计特征、正则表达式的匹配,或者使用一些模型来判断数据的质量。用户还可以对数据进行采样,灵活调整数据清理的顺序和方式。
质量过滤
举个例子,假设我们使用了 FastText
语言分类器来过滤数据。在这个过程中,我们首先加载一个预训练的语言分类器,然后对每一条输入的数据进行语言识别。如果数据的语言不符合我们设置的标准,它就会被过滤掉。这样可以确保我们的数据更加精准和符合需求。示例代码如下:
import logging
from utils.evaluator import LangIdentifier
# 设置日志
logging.basicConfig(level=logging.INFO)
class FilterPassageByLangs:
def __init__(self, model_path: str, reject_threshold: float = 0.5) -> None:
"""
初始化过滤器,加载预训练的 FastText 模型,并设置语言过滤的阈值。
:param model_path: 预训练模型的路径
:param reject_threshold: 语言识别置信度低于此值的文本会被认为是未知语言
"""
try:
# 加载 FastText 语言识别模型
self.language_identifier = LangIdentifier(model_path=model_path)
self.reject_threshold = reject_threshold
logging.info(f"成功加载 FastText 模型:{model_path}")
except Exception as e:
logging.error(f"加载模型失败: {e}")
raise
def filter_single_text(self, text: str, accept_lang_list: list) -> bool:
"""
根据文本的语言过滤文本,返回 True 表示该文本不符合要求(即需要丢弃),
返回 False 表示该文本符合要求(即保留)。
:param text: 输入的文本
:param accept_lang_list: 接受的语言列表(例如:['en', 'fr'])
:return: 是否丢弃该文本
"""
if not text:
logging.warning("空文本被跳过")
return True # 如果文本为空,直接丢弃
try:
# 使用 fasttext 模型给 text 打分,每种语言生成一个置信分数
labels, scores = self.language_identifier.evaluate_single_text(text)
# 检查模型返回的语言标签和分数
if not labels or not scores:
logging.warning(f"无法识别文本的语言: {text[:30]}...") # 仅显示文本的前30个字符
return True # 如果没有语言标签,认为无法识别,丢弃文本
# 如果所有语言的分数均低于 reject_threshold,则认为该文本是无法识别的语言
if scores[0] < self.reject_threshold:
logging.info(f"文本识别的最高语言分数低于阈值,文本被标记为未知语言: {text[:30]}...")
labels = ["uk"] # 将无法识别的语言标签设为 "uk"(未知语言)
# 将 accept_lang_list 中的语言统一转换为小写字母
accept_lang_list = [each.lower() for each in accept_lang_list]
# 如果分数最高的语言标签不在接受的语言列表中,则丢弃该文本
if labels[0] not in accept_lang_list:
logging.info(f"文本语言 '{labels[0]}' 不在接受的语言列表中,文本被丢弃: {text[:30]}...")
return True # 丢弃文本
# 如果通过所有过滤,保留该文本
logging.info(f"文本符合要求,保留: {text[:30]}...")
return False # 保留文本
except Exception as e:
logging.error(f"处理文本时出错: {e}")
return True # 如果处理出错,丢弃该文本
# 示例使用
if __name__ == "__main__":
# 假设模型路径是 "utils/models/fasttext/lid.176.bin"
filterer = FilterPassageByLangs(model_path="utils/models/fasttext/lid.176.bin")
# 测试文本
texts = [
"Hello, how are you?", # 英文
"Bonjour, comment ça va?", # 法语
"Hola, ¿cómo estás?", # 西班牙语
"你好,你怎么样?" # 中文
]
# 只接受英文和法文
accept_lang_list = ['en', 'fr']
for text in texts:
if filterer.filter_single_text(text, accept_lang_list):
print(f"文本被丢弃: {text}")
else:
print(f"文本保留: {text}")
代码解释:
- 错误处理:在
filter_single_text
方法中,我们加上了异常捕获,以确保即使处理过程中遇到错误(比如模型加载失败或者文本处理异常),也不会中断程序的运行,而是返回True
(表示该文本需要丢弃)。 - 空文本检查:我们增加了对空文本的检查,如果输入文本为空,则直接丢弃。
- 模拟 FastText 模型:在
LangIdentifier
类中,我们使用了一个模拟的方法来返回语言标签和置信分数。实际应用中,你会调用 FastText 模型的predict
方法来获取真实的语言预测结果。 - 语言过滤逻辑:
filter_single_text
方法首先对每个文本进行语言识别,若识别出的语言置信度低于阈值(reject_threshold
),则将其标记为“未知语言”。接着,它会检查识别出的语言是否在用户提供的接受语言列表中,若不在,则丢弃该文本。
例子输出:
文本保留: Hello, how are you?
文本保留: Bonjour, comment ça va?
文本被丢弃: Hola, ¿cómo estás?
文本被丢弃: 你好,你怎么样?
数据去重
在数据预处理的去重阶段,YuLan-GARDEN
提供了两种去重方法:一种是针对句子的,另一种是针对整个文档的。我们先来说说句子级的去重方法。它的基本思路是通过计算句子之间的相似度来去掉重复的句子。
具体做法是这样的:首先,我们会把文本中的每一行(每行代表一句话)分开,然后对每一句话计算它的特征(这种特征通常是通过一种叫做𝑛元组的方式得到的)。接着,我们会比较相邻的句子之间的相似度,使用的是一种叫做 Jaccard 相似度的方式,简单来说,就是看两句话中有多少共同的特征。如果两句话的相似度超过了我们设定的阈值,那么我们就认为它们是重复的,进而把其中一条删除。
这样,我们就能确保最终得到的数据集里,重复的句子被去除了,只有独特的内容留下来,从而提升数据的质量。
代码
import string
import re
from nltk.util import ngrams
class CleanerDedupLineByNgram:
def __init__(self):
# 定义行分隔符和元组分隔符
self.line_delimiter = list("\n") # 行的分隔符为换行符
chinese_punctuation = ",。!?:;“”‘’()《》【】、|—" # 中文标点符号
self.gram_delimiter = list(string.punctuation) + list(chinese_punctuation) + [' '] # n元组分隔符
def clean_single_text(self, text: str, n: int = 5, thre_sim: float = 0.95) -> str:
"""
根据n元组的Jaccard相似度去除重复行。
:param text: 输入文本
:param n: n元组的大小,默认为5
:param thre_sim: 相似度阈值,默认为0.95
:return: 去重后的文本
"""
# 使用行分隔符将文本按行分割
lines = [each for each in re.split('|'.join(map(re.escape, self.line_delimiter)), text) if each != '']
lineinfo, last = list(), {} # 存储每行的n元组信息,last用来保存上一行信息
# 遍历每一行,计算每行的n元组
for idx, line in enumerate(lines):
# 按照元组分隔符分割每行的词语,生成n元组
grams = [each for each in re.split('|'.join(map(re.escape, self.gram_delimiter)), line) if each != '']
computed_ngrams = list(ngrams(grams, min(len(grams), n))) # 生成n元组
lineinfo.append({
"lineno": idx, "text": line, "n": min(len(grams), n),
"ngrams": computed_ngrams, "keep": 0 # 默认不保留
})
# 过滤相邻行的重复内容
for idx, each in enumerate(lineinfo):
if last == {}:
# 第一行没有上一个句子,直接保留
each["keep"], last = 1, each
else:
# 计算相邻行之间的Jaccard相似度
ngrams_last, ngrams_cur = set(last["ngrams"]), set(each["ngrams"])
ngrams_intersection = len(ngrams_last.intersection(ngrams_cur)) # 交集的大小
ngrams_union = len(ngrams_last.union(ngrams_cur)) # 并集的大小
# 计算Jaccard相似度
jaccard_sim = ngrams_intersection / ngrams_union if ngrams_union != 0 else 0
if jaccard_sim < thre_sim:
each["keep"], last = 1, each # 如果相似度低于阈值,保留该行
# 将所有保留下来的行重新拼接为一个文本
text = self.line_delimiter[0].join([each["text"] for each in lineinfo if each["keep"] == 1])
return text
主要功能:
- 文本分割:首先,使用换行符分割文本,将每行作为一个独立的单位进行处理。
- n 元组生成:对每行的文本,基于设定的分隔符(包括英文标点和中文标点)生成 n 元组。
- 计算 Jaccard 相似度:通过 Jaccard 相似度来衡量相邻两行的相似性。如果相似度超过设定阈值,则认为这两行是重复的,保留其中的一行。
- 去重:根据相似度过滤掉重复行,只保留不重复的行。
示例使用:
假设我们有一段文本,其中包含重复的句子:
text = """这是第一行。
这是第二行。
这是一行重复的内容。
这是第四行。
这是第五行,和第一行重复。
"""
cleaner = CleanerDedupLineByNgram()
cleaned_text = cleaner.clean_single_text(text, n=5, thre_sim=0.95)
print(cleaned_text)
输出:
这是第一行。
这是第二行。
这是第四行。
向量去重例子
使用 Sentence Transformers 来进行向量相似度清理是一个非常有效的文本去重方法,特别适用于那些文本内容非常相似但表达方式不同的场景。Sentence Transformers 允许我们将文本转换为固定维度的向量,然后可以通过计算这些向量之间的余弦相似度来判断文本是否重复。
下面是一个使用 Sentence Transformers 来做向量相似度清理的函数示例:
安装依赖:
首先,你需要安装 sentence-transformers
库和其他可能的依赖:
pip install sentence-transformers scikit-learn
代码示例:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
class CleanerDedupByEmbedding:
def __init__(self, model_name='all-MiniLM-L6-v2', threshold=0.85):
"""
初始化去重工具,加载 SentenceTransformer 模型并设置阈值
:param model_name: 使用的 SentenceTransformer 模型名称
:param threshold: 判断相似度的阈值,默认是 0.85
"""
self.model = SentenceTransformer(model_name) # 加载 SentenceTransformer 模型
self.threshold = threshold # 设置相似度阈值,默认是 0.85
def clean_text(self, texts):
"""
根据向量相似度来去重文本,保留相似度小于阈值的文本。
:param texts: 文本列表
:return: 去重后的文本列表
"""
# 获取所有文本的嵌入向量(embedding)
embeddings = self.model.encode(texts)
# 计算文本之间的余弦相似度
cosine_similarities = cosine_similarity(embeddings)
# 记录要保留的文本行
keep = [True] * len(texts)
for i in range(len(texts)):
if keep[i]:
for j in range(i + 1, len(texts)):
# 如果文本i和文本j之间的相似度超过阈值,则认为是重复的
if cosine_similarities[i][j] > self.threshold:
keep[j] = False # 标记文本j为重复,不保留
# 返回去重后的文本列表
return [text for text, flag in zip(texts, keep) if flag]
# 示例使用
if __name__ == "__main__":
texts = [
"This is a test sentence.",
"This is an example of a test sentence.",
"Completely unrelated text.",
"This is a sample sentence for testing.",
"Test sentence example."
]
cleaner = CleanerDedupByEmbedding(threshold=0.85)
cleaned_texts = cleaner.clean_text(texts)
print("去重后的文本:")
for text in cleaned_texts:
print(text)
-
计算文本的嵌入向量:
- 在
clean_text
方法中,我们使用模型的encode
方法将文本列表转换为相应的向量(即嵌入)。这些向量代表了每个文本的语义信息。
- 在
-
计算余弦相似度:
- 使用
sklearn.metrics.pairwise.cosine_similarity
来计算所有文本对之间的余弦相似度。余弦相似度用于衡量两个向量的相似性,值范围是 [0, 1],1 表示完全相同,0 表示完全不同。
- 使用
-
去重逻辑:
- 对于每一对文本,如果它们的余弦相似度大于预设的阈值(默认是 0.85),则认为它们是重复的,后面的文本会被标记为
False
,不再保留。
- 对于每一对文本,如果它们的余弦相似度大于预设的阈值(默认是 0.85),则认为它们是重复的,后面的文本会被标记为
-
返回去重后的文本:
- 最后,我们返回一个新的文本列表,只包含那些未被标记为重复的文本。
示例输出:
假设我们有以下文本:
texts = [
"This is a test sentence.",
"This is an example of a test sentence.",
"Completely unrelated text.",
"This is a sample sentence for testing.",
"Test sentence example."
]
输出结果可能是:
去重后的文本:
This is a test sentence.
Completely unrelated text.
隐私过滤
在隐私过滤阶段,YuLan-GARDEN 通过去除文本中的个人身份信息(例如身份证号、电话号码、网址等)来保护隐私。我们以去除身份证号为例来解释:每当输入一段文本时,系统会使用正则表达式匹配身份证号,并将其替换成一个特定的占位符(比如 "MASKEDIDCARD**"),这样就不会泄露用户的私人信息。
import re
from utils.rules.regex import REGEX_IDCARD
from utils.cleaner.cleaner_base import CleanerBase
class CleanerSubstitutePassageIDCard(CleanerBase):
def __init__(self):
super().__init__()
def clean_single_text(self, text: str, repl_text: str = "**MASKED**IDCARD**") -> str:
"""
过滤掉文本中的身份证号,并替换为指定的字符串。
:param text: 输入的文本,可能包含身份证号
:param repl_text: 用于替代身份证号的字符串,默认是 "**MASKED**IDCARD**"
:return: 替换掉身份证号后的文本
"""
# 使用正则表达式 REGEX_IDCARD 匹配身份证号,并用 repl_text 替换
cleaned_text = re.sub(REGEX_IDCARD, repl_text, text)
return cleaned_text
假设我们有一段包含身份证号的文本:
# 示例输入文本
text = "张三,身份证号:41018119870101001X,联系电话:13800000000。"
cleaner = CleanerSubstitutePassageIDCard()
cleaned_text = cleaner.clean_single_text(text)
print(cleaned_text)
输出:
张三,身份证号:**MASKED**IDCARD**,联系电话:13800000000。
那么,如何自己自定义一个自己需要的隐私清理函数呢?我的方法是直接用re库来进行正则化匹配,给一个代码例子给你们看看:
import re
# 函数:清理隐私信息
def clean_private_info(text):
"""
该函数接收一个文本,清理其中的隐私信息,包括:
1. 姓名(假设为“姓名”是以大写字母开头的两个词,且词与词之间以空格分隔)
2. 电子邮件地址
3. 电话号码
清理后的信息将会被替换为 [REDACTED]
"""
# 1. 定义正则表达式:匹配电子邮件
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
# 匹配邮箱地址的正则表达式,匹配任何符合标准格式的邮箱
# 2. 定义正则表达式:匹配电话号码
phone_pattern = r'\b(\+?\d{1,2})?(\()?(\d{3})?(\))?[-.\s]?\d{3}[-.\s]?\d{4}\b'
# 匹配电话号码的正则表达式,支持多种格式(带或不带区号,带括号或不带)
# 3. 定义正则表达式:匹配姓名(假设姓名为两部分,且首字母大写)
name_pattern = r'\b[A-Z][a-z]+ [A-Z][a-z]+\b'
# 匹配姓名的正则表达式,假设名字是以大写字母开头的两部分组成
# 4. 替换电子邮件地址为 [REDACTED]
text = re.sub(email_pattern, '[REDACTED]', text)
# 5. 替换电话号码为 [REDACTED]
text = re.sub(phone_pattern, '[REDACTED]', text)
# 6. 替换姓名为 [REDACTED]
text = re.sub(name_pattern, '[REDACTED]', text)
# 返回清理后的文本
return text
# 示例使用
input_text = """
Hello, my name is John Doe. You can reach me at [email protected] or call me at +1 (555) 123-4567.
I also have a backup email: [email protected]. My colleague's name is Alice Johnson, her phone number is (555) 987-6543.
"""
# 清理隐私信息
cleaned_text = clean_private_info(input_text)
# 输出清理后的文本
print("清理后的文本:\n", cleaned_text)
# 清理后的文本:
# Hello, my name is [REDACTED]. You can reach me at [REDACTED] or call me at [REDACTED].
# I also have a backup email: [REDACTED]. My colleague's name is [REDACTED], her phone number is [REDACTED].
敏感词清理
import re
# 函数:处理敏感词
def handle_sensitive_words(text, sensitive_words=None):
"""
该函数接收一个文本,清理其中的敏感词。敏感词将被替换为 [SENSITIVE]。
你可以通过传入 sensitive_words 参数来指定自定义的敏感词列表。
参数:
- text: 需要处理的文本
- sensitive_words: 一个列表,包含所有敏感词(可选,如果没有传入则使用默认敏感词列表)
返回:
- 处理后的文本,敏感词已被替换为 [SENSITIVE]
"""
# 默认敏感词列表,如果没有传入自定义列表时使用
default_sensitive_words = ['badword1', 'badword2', 'example_sensitive']
# 使用传入的敏感词列表(如果有)或使用默认的敏感词列表
sensitive_words = sensitive_words if sensitive_words else default_sensitive_words
# 将敏感词列表转换为正则表达式模式
# 创建一个正则表达式,匹配任意一个敏感词(不区分大小写)
pattern = r'\b(' + '|'.join(map(re.escape, sensitive_words)) + r')\b'
# 使用 re.sub() 方法替换所有匹配的敏感词
cleaned_text = re.sub(pattern, '[SENSITIVE]', text, flags=re.IGNORECASE)
return cleaned_text
# 示例使用
input_text = """
This is a text that contains some sensitive words like badword1 and example_sensitive.
Some random text and another badword2 is here.
"""
# 自定义敏感词列表
custom_sensitive_words = ['badword1', 'badword2', 'example_sensitive']
# 处理敏感词
cleaned_text = handle_sensitive_words(input_text, custom_sensitive_words)
# 输出处理后的文本
print("处理后的文本:\n", cleaned_text)
在这里我也给出一个简单的敏感词清理的函数,函数里是将敏感词进行替换,在实际应用的时候我的建议是直接清理掉整条句子或者一整段话,避免你们后面训练模型之后,模型因为幻觉说了不该说的话。