(cluener 03)run_lstm_crf.py数据处理部分解读
本部分解读 run_lstm_crf.py 中的代码为
processor = CluenerProcessor(data_dir=config.data_dir)
processor.get_vocab()
一、processor = CluenerProcessor(data_dir=config.data_dir)代码解读
项目结构调用关系示意图如下(本示意图仅表现调用文件和函数名称)
本语句对应代码重构如下(仅包含涉及到的代码)。
'''
1. 本部分代码对应run_lstm_crf.py中 "processor = CluenerProcessor(data_dir=config.data_dir)"
2. 本部分代码涉及文件包括:run_lstm_crf.py、data_processor.py、vocabulary.py
3. 整体的代码构建顺序实际是倒叙的,理解的时候推荐由下至上阅读
'''
import json
from pathlib import Path
from collections import Counter
class Vocabulary(object):
def __init__(self, max_size=None, min_freq=None, add_unused=False,
pad_token='[PAD]', unk_token='[UNK]', cls_token='[CLS]', sep_token='[SEP]', mask_token='[MASK]'):
self.max_size = max_size; self.min_freq = min_freq; self.add_unused = add_unused # 以上参数未被使用
self.pad_token = pad_token; self.unk_token = unk_token; self.cls_token = cls_token; self.sep_token = sep_token; self.mask_token = mask_token # 定义特殊字符
self.word2idx = {}
self.idx2word = None
self.rebuild = True # 决定是否要重新构建词表(此处用不到)
self.word_counter = Counter()
self.reset()
def reset(self): # 将特殊字符加入word2idx字典,如果需要过滤则可以添加不常用字符(本次未用到)。
ctrl_symbols = [self.pad_token, self.unk_token, self.cls_token, self.sep_token, self.mask_token]
for index, syb in enumerate(ctrl_symbols): # word2idx添加特殊字符
self.word2idx[syb] = index
if self.add_unused: # word2idx是否添加不常用词字符
for i in range(20):
self.word2idx[f"UNUSED{i}"] = len(self.word2idx)
class CluenerProcessor:
def __init__(self, data_dir):
self.vocab = Vocabulary()
self.data_dir = data_dir
processor = CluenerProcessor(data_dir=Path('./dataset/cluener'))
# 本语句仅作为processor的初始化,在初始化的过程中设置了特殊字符(和不常用字符)的word2idx
二、processor.get_vocab()代码解读
项目结构调用关系示意图如下(本示意图仅表现调用文件和函数名称)
2.1 重新构建词表vocab.pkl
从Vocabulary(object)的初始化中可以看到,在选择重新构建词表时是可以通过max_size、min_freq和add_unused来人工控制词表的。
本部分仅重构全量词表的构建代码,人工控制的逻辑后面会贴出。
'''
1. 本部分代码对应run_lstm_crf.py中 "processor.get_vocab()"的重新构建全量词表逻辑(全部模式包括:加载已存在词表;构建全量词表;构建人工挑选词表)
2. 本部分代码涉及文件包括:run_lstm_crf.py、data_processor.py、vocabulary.py、common.py(common.py中pickle相关)
3. 整体的代码构建顺序实际是倒叙的,理解的时候推荐由下至上阅读
'''
import json
import pickle
from pathlib import Path
from collections import Counter
class Vocabulary(object):
def __init__(self, max_size=None, min_freq=None, add_unused=False,
pad_token='[PAD]', unk_token='[UNK]', cls_token='[CLS]', sep_token='[SEP]', mask_token='[MASK]'):
self.max_size = max_size; self.min_freq = min_freq; self.add_unused = add_unused
self.pad_token = pad_token; self.unk_token = unk_token; self.cls_token = cls_token; self.sep_token = sep_token; self.mask_token = mask_token
self.word2idx = {}
self.idx2word = None
self.rebuild = True
self.word_counter = Counter()
self.reset()
def reset(self): # 将特殊字符加入word2idx字典,如果需要过滤则可以添加不常用字符(本次未用到)。
ctrl_symbols = [self.pad_token, self.unk_token, self.cls_token, self.sep_token, self.mask_token]
for index, syb in enumerate(ctrl_symbols):
self.word2idx[syb] = index
if self.add_unused:
for i in range(20):
self.word2idx[f"UNUSED{i}"] = len(self.word2idx)
# ------------- 以上代码皆为 processor = CluenerProcessor(data_dir=Path('./dataset/cluener')) 相关代码
def build_vocab(self):
words = self.word_counter.most_common(None) # 将Counter中字符按个数降序排列表示为words
if self.word2idx:
words = filter(lambda kv: kv[0] not in self.word2idx, words) # 过滤words中已有的特殊字符
start_idx = len(self.word2idx) # 计算已有特殊字符长度,此处设置为5
self.word2idx.update({w: i + start_idx for i, (w, _) in enumerate(words)}) # 将words中字符按修正后的index添加
self.idx2word = {i: w for w, i in self.word2idx.items()} # 构建word2idx倒排表idx2word
self.rebuild = False # 将重新构建参数设置为False
def save(self, file_path):
mappings = {'word2idx': self.word2idx, 'idx2word': self.idx2word} # 将word2idx和idx2word存入字典
with open(file_path, 'wb') as f:
pickle.dump(mappings, f) # 将mappings字典写入'./dataset/cluener/vocab.pkl'
class CluenerProcessor:
def __init__(self, data_dir):
self.vocab = Vocabulary()
self.data_dir = data_dir
def get_vocab(self):
vocab_path = self.data_dir / 'vocab.pkl'
if vocab_path.exists():
self.vocab.load_from_file(str(vocab_path)) # 如果词表存在则加载已存在词表,这部分先不说
else:
files = ['train.json', 'dev.json', 'test.json'] # 全量词表一定是涵盖train、dev和test所有的词(避免未知)
for file in files:
with open(str(self.data_dir / file), 'r') as fr:
for line in fr: # line为str字符串
line = json.loads(line.strip()) # 解析json,line为字典,句子关键词为'text'
text = line['text']
self.vocab.word_counter.update(list(text)) # 将list(text)加入self.word_counter,即按字符统计
self.vocab.build_vocab() # 跳至Vocabulary().build_vocab(),即构建词表
self.vocab.save(vocab_path) # 跳至Vocabulary().save(vocab_path),即保存词表
processor = CluenerProcessor(data_dir=Path('./dataset/cluener')) # 此部分解读见上一part
processor.get_vocab() # 获取词表,即word2idx和idx2word表
2.2 构建人工控制词表
人工控制是通过设置max_size、min_freq和add_unused参数来实现的。
根据构建全量词表的逻辑可以看到,控制逻辑应当添加入 "self.vocab.build_vocab()" 中,所以改动仅出现在这部分。
class Vocabulary(object):
...
def build_vocab(self):
max_size = min(self.max_size, len(self.word_counter)) if self.max_size else None
words = self.word_counter.most_common(max_size)
if self.min_freq is not None:
words = filter(lambda kv: kv[1] >= self.min_freq, words)
if self.word2idx:
words = filter(lambda kv: kv[0] not in self.word2idx, words)
start_idx = len(self.word2idx)
self.word2idx.update({w: i + start_idx for i, (w, _) in enumerate(words)})
self.idx2word = {i: w for w, i in self.word2idx.items()}
self.rebuild = False
2.3 加载已存在词表
通过验证 vocab_path即'./dataset/cluener/vocab.pkl' 是否存在来选择切换加载已存在词表模式。
self.word2idx和self.idx2word通过关键词加载字典即可。
'''
1. 本部分代码对应run_lstm_crf.py中 "processor = CluenerProcessor(data_dir=config.data_dir)"
2. 本部分代码涉及文件包括:run_lstm_crf.py、data_processor.py、vocabulary.py、common.py(common.py中pickle相关)
3. 整体的代码构建顺序实际是倒叙的,理解的时候推荐由下至上阅读
'''
import pickle
from pathlib import Path
from collections import Counter
class Vocabulary(object):
def __init__(self, max_size=None, min_freq=None, add_unused=False,
pad_token='[PAD]', unk_token='[UNK]', cls_token='[CLS]', sep_token='[SEP]', mask_token='[MASK]'):
self.max_size = max_size; self.min_freq = min_freq; self.add_unused = add_unused
self.pad_token = pad_token; self.unk_token = unk_token; self.cls_token = cls_token; self.sep_token = sep_token; self.mask_token = mask_token
self.word2idx = {}
self.idx2word = None
self.rebuild = True
self.word_counter = Counter()
self.reset()
def reset(self): # 将特殊字符加入word2idx字典,如果需要过滤则可以添加不常用字符(本次未用到)。
ctrl_symbols = [self.pad_token, self.unk_token, self.cls_token, self.sep_token, self.mask_token]
for index, syb in enumerate(ctrl_symbols):
self.word2idx[syb] = index
if self.add_unused:
for i in range(20):
self.word2idx[f"UNUSED{i}"] = len(self.word2idx)
# ------------- 以上代码皆为 processor = CluenerProcessor(data_dir=Path('./dataset/cluener')) 相关代码
def load_from_file(self, file_path):
with open(str(file_path), 'rb') as f: # 打开'./dataset/cluener/vocab.pkl'文件
mappings = pickle.load(f) # 加载文件中的字典,关键词分别为'word2idx'和'idx2word'
self.idx2word = mappings['idx2word']
self.word2idx = mappings['word2idx']
class CluenerProcessor:
def __init__(self, data_dir):
self.vocab = Vocabulary()
self.data_dir = data_dir
def get_vocab(self):
vocab_path = self.data_dir / 'vocab.pkl'
if vocab_path.exists():
self.vocab.load_from_file(str(vocab_path)) # './dataset/cluener/vocab.pkl'如果已经存在,加载里面字典即可
# 重点在于将数据以属性的形式集成到self即processor中
processor = CluenerProcessor(data_dir=Path('./dataset/cluener'))
processor.get_vocab()
其中,项目所用库的使用说明如下:
collections.Counter.most_common()源码如下
import heapq as _heapq def most_common(self, n=None): if n is None: return sorted(self.iteritems(), key=_itemgetter(1), reverse=True) return _heapq.nlargest(n, self.iteritems(), key=_itemgetter(1))
当most_common()不传参数值时,使用sort()函数排序,排序的根据是第2个元素数值进行排序的。
当most_common()传入参数n时,则返回排序后的前n个元素结果,且结果可以是并列的(如果n,n+1一样大,则都返回)。
filter()函数用于过滤序列,过滤掉不符合条件的元素,返回由符合条件元素组成的新列表
其中,filter(function, iterable),function为判断函数,iterable为可迭代对象。
pickle库,实现序列化和反序列化操作。
使用pickle库可以将python对象直接保存到文件,而不需要先转换为字符串,也不用底层的文件访问操作将其写入二进制文件。
pickle会创建一个python语言专用的二进制格式。
pickle.dump(obj, file, [, protocol]):将obj对象序列存入已经打开的file中(注意,要求已经打开)。
obj:想要序列化的obj对象 file:序列化文件名称 protocol:序列化使用的协议
- pickle.load(file):将file中的对象序列化读出。
- pickle.dumps(obj)和pickle.loads(string)分别代表将obj对象序列化为string形式和从string中读出序列化前obj对象。
评论已关闭