- 1 引言
- 2 步骤
- 2.1 数据集预处理
- 2.2 用预处理后的数据集训练Bert的语料库
- 2.3 加载语料库和字典后用原始数据训练Bert模型
- 2.4 模型测试
(1)文章汇总 【NLP-新闻文本分类】1 数据分析和探索 【NLP-新闻文本分类】2特征工程
(2) 基本内容 Bert模型很特殊,没有特征工程步骤,直接对数据集产生语料库和词典后,就用来预训练bert模型
- 当前模型源码地址
- 环境
Tensorflow == 1.14.0 Keras == 2.3.1 bert4keras == 0.8.4
2 步骤 2.1 数据集预处理该文件包括了两个步骤,产生Bert所需的字典vocab.txt以及把train_set、test_a、test_b文件中的文本部分汇总后循环10次,生成10个内容顺序不一样的文件,比如第一个文件命名为corpus.0.tfrecord,每个文件其实是多个列表组成的,一个列表是由10篇文章组成,以下就是一个列表的格式,每个列表中每个元素是以510为最大字符数量进行存储。
[‘5399 3117 1070 4321 … 3659 1141’, ‘7543 3750 1511 7467 …15 922 885’, ‘2491 4109 1757 7539 … 5787 2717’, ‘7349 5681 6093 5998 … 5028 1985’, ‘7194 1767 5915 1080 …23 408 671’, ‘6560 648 1667 1099 3… 5791 2662’, ‘2673 5076 6835 2835 … 4396 3750’, ‘4811 648 1679 4811 2…5 900 2465’, ‘4562 4893 2210 4761 … 7377 5977’, ‘3750 1866 307 5949 3… 1734 5598’, ‘4464 3370 6734 4583 … 6887 4811’, ‘2541 910 1582 2899 2…5 465 2252’, ‘6407 900 3659 3370 3… 2073 4811’, ‘3272 5254 2130 900 3… 2717 5619’, …]
# 预训练语料构建
import glob
import os
os.environ['TF_KERAS'] = '1' # 必须使用tf.keras
import numpy as np
import pandas as pd
import tensorflow as tf
from bert4keras.backend import K
from bert4keras.snippets import parallel_apply
from bert4keras.tokenizers import Tokenizer
from tqdm import tqdm
class TrainingDataset(object):
"""预训练数据集生成器。"""
def __init__(self, tokenizer, sequence_length=512):
"""参数说明:tokenizer必须是bert4keras自带的tokenizer类;"""
self.tokenizer = tokenizer
self.sequence_length = sequence_length
self.token_pad_id = tokenizer._token_pad_id
self.token_cls_id = tokenizer._token_start_id
self.token_sep_id = tokenizer._token_end_id
self.token_mask_id = tokenizer._token_mask_id
self.vocab_size = tokenizer._vocab_size
def padding(self, sequence, padding_value=None):
"""对单个序列进行补0。"""
if padding_value is None:
padding_value = self.token_pad_id
sequence = sequence[:self.sequence_length]
padding_length = self.sequence_length - len(sequence)
return sequence + [padding_value] * padding_length
def sentence_process(self, text):
"""单个文本的处理函数,返回处理后的instance。"""
raise NotImplementedError
def paragraph_process(self, texts, starts, ends, paddings):
"""单个段落(多个文本)的处理函数
说明:texts是单句组成的list;starts是每个instance的起始id;
ends是每个instance的终止id;paddings是每个instance的填充id。
做法:不断塞句子,直到长度最接近sequence_length,然后padding。
"""
instances, instance = [], [[start] for start in starts]
for text in texts:
# 处理单个句子
sub_instance = self.sentence_process(text)
sub_instance = [i[:self.sequence_length - 2] for i in sub_instance]
new_length = len(instance[0]) + len(sub_instance[0])
# 如果长度即将溢出
if new_length > self.sequence_length - 1:
# 插入终止符,并padding
complete_instance = []
for item, end, pad in zip(instance, ends, paddings):
item.append(end)
item = self.padding(item, pad)
complete_instance.append(item)
# 存储结果,并构建新样本
instances.append(complete_instance)
instance = [[start] for start in starts]
# 样本续接
for item, sub_item in zip(instance, sub_instance):
item.extend(sub_item)
# 插入终止符,并padding
complete_instance = []
for item, end, pad in zip(instance, ends, paddings):
item.append(end)
item = self.padding(item, pad)
complete_instance.append(item)
# 存储最后的instance
instances.append(complete_instance)
return instances
def tfrecord_serialize(self, instances, instance_keys):
"""转为tfrecord的字符串,等待写入到文件。"""
def create_feature(x):
return tf.train.Feature(int64_list=tf.train.Int64List(value=x))
serialized_instances = []
for instance in instances:
features = {
k: create_feature(v)
for k, v in zip(instance_keys, instance)
}
tf_features = tf.train.Features(feature=features)
tf_example = tf.train.Example(features=tf_features)
serialized_instance = tf_example.SerializeToString()
serialized_instances.append(serialized_instance)
return serialized_instances
def process(self, corpus, record_name, workers=8, max_queue_size=2000):
"""处理输入语料(corpus),最终转为tfrecord格式(record_name)
自带多进程支持,如果cpu核心数多,请加大workers和max_queue_size。
"""
writer = tf.io.TFRecordWriter(record_name)
globals()['count'] = 0
def write_to_tfrecord(serialized_instances):
globals()['count'] += len(serialized_instances)
for serialized_instance in serialized_instances:
writer.write(serialized_instance)
def paragraph_process(texts):
instances = self.paragraph_process(texts)
serialized_instances = self.tfrecord_serialize(instances)
return serialized_instances
parallel_apply(
func=paragraph_process,
iterable=corpus,
workers=workers,
max_queue_size=max_queue_size,
callback=write_to_tfrecord,
)
writer.close()
print('write %s examples into %s' % (globals()['count'], record_name))
@staticmethod
def load_tfrecord(record_names, batch_size, parse_function):
"""加载处理成tfrecord格式的语料。"""
if not isinstance(record_names, list):
record_names = [record_names]
dataset = tf.data.TFRecordDataset(record_names) # 加载
dataset = dataset.map(parse_function) # 解析
dataset = dataset.repeat() # 循环
dataset = dataset.shuffle(batch_size * 1000) # 打乱
dataset = dataset.batch(batch_size) # 成批
return dataset
class TrainingDatasetRoBERTa(TrainingDataset):
"""预训练数据集生成器(RoBERTa模式)。"""
def __init__(
self, tokenizer, word_segment, mask_rate=0.15, sequence_length=512
):
"""参数说明:
tokenizer必须是bert4keras自带的tokenizer类;
word_segment是任意分词函数。
"""
super(TrainingDatasetRoBERTa, self).__init__(tokenizer, sequence_length)
self.word_segment = word_segment
self.mask_rate = mask_rate
def token_process(self, token_id):
"""
以80%的几率替换为[MASK],以10%的几率保持不变,以10%的几率替换为一个随机token。
"""
rand = np.random.random()
if rand 0
segment = tokenizer.tokens_to_ids(['[CLS]'] + segment + ['[SEP]'])
segments.append(segment)
assert len(segments) > 0
if len(segments) > max_segment:
segment_ = int(max_segment / 2)
return segments[:segment_] + segments[-segment_:]
else:
return segments
class data_generator(DataGenerator):
"""数据生成器。"""
def __init__(self, data, batch_size=32, buffer_size=None, random=False):
super().__init__(data, batch_size, buffer_size)
self.random = random
def __iter__(self, random=False):
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
for is_end, (text, label) in self.sample(random):
token_ids = sentence_split(text)
token_ids = sequence_padding(token_ids, length=maxlen)
segment_ids = np.zeros_like(token_ids)
batch_token_ids.append(token_ids)
batch_segment_ids.append(segment_ids)
batch_labels.append([label])
if len(batch_token_ids) == self.batch_size or is_end:
batch_token_ids = sequence_padding(
batch_token_ids, length=max_segment)
batch_segment_ids = sequence_padding(
batch_segment_ids, length=max_segment)
batch_labels = sequence_padding(batch_labels)
yield [batch_token_ids, batch_segment_ids], batch_labels
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
def forfit(self):
while True:
for d in self.__iter__(self.random):
yield d
class Attention(Layer):
"""注意力层。"""
def __init__(self, hidden_size, **kwargs):
self.hidden_size = hidden_size
super().__init__(**kwargs)
def build(self, input_shape):
initializer = keras.initializers.truncated_normal(mean=0.0, stddev=0.05)
# 为该层创建一个可训练的权重
self.weight = self.add_weight(
name='weight',
shape=(self.hidden_size, self.hidden_size),
initializer=initializer,
trainable=True)
self.bias = self.add_weight(
name='bias',
shape=(self.hidden_size,),
initializer='zero',
trainable=True)
self.query = self.add_weight(
name='query',
shape=(self.hidden_size, 1),
initializer=initializer,
trainable=True)
super().build(input_shape) # 一定要在最后调用它
def call(self, x):
x, mask = x
mask = K.squeeze(mask, axis=2)
# linear
key = K.bias_add(K.dot(x, self.weight), self.bias)
# compute attention
outputs = K.squeeze(K.dot(key, self.query), axis=2)
outputs -= 1e32 * (1 - mask)
attn_scores = K.softmax(outputs)
attn_scores *= mask
attn_scores = K.reshape(
attn_scores, shape=(-1, 1, attn_scores.shape[-1])
)
outputs = K.squeeze(K.batch_dot(attn_scores, key), axis=1)
return outputs
def compute_output_shape(self, input_shape):
return input_shape[0][0], self.hidden_size
def build_model():
"""构建模型。"""
token_ids = Input(shape=(max_segment, maxlen), dtype='int32')
segment_ids = Input(shape=(max_segment, maxlen), dtype='int32')
input_mask = Masking(mask_value=0)(token_ids)
input_mask = Lambda(
lambda x: K.cast(K.any(x, axis=2, keepdims=True), 'float32')
)(input_mask)
token_ids1 = Lambda(
lambda x: K.reshape(x, shape=(-1, maxlen))
)(token_ids)
segment_ids1 = Lambda(
lambda x: K.reshape(x, shape=(-1, maxlen))
)(segment_ids)
# 加载预训练模型
bert = build_transformer_model(
config_path=config_path,
checkpoint_path=checkpoint_path,
return_keras_model=False,
)
output = bert.model([token_ids1, segment_ids1])
output = Lambda(lambda x: x[:, 0])(output)
output = Lambda(
lambda x: K.reshape(x, shape=(-1, max_segment, output.shape[-1]))
)(output)
output = Multiply()([output, input_mask])
output = Dropout(drop)(output)
output = Attention(output.shape[-1].value)([output, input_mask])
output = Dropout(drop)(output)
output = Dense(
units=num_classes,
activation='softmax',
kernel_initializer=bert.initializer
)(output)
model = keras.models.Model([token_ids, segment_ids], output)
optimizer_params = {
# 'learning_rate': lr,
'grad_accum_steps': grad_accum_steps
}
optimizer = extend_with_gradient_accumulation(Adam)
optimizer = optimizer(**optimizer_params)
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=optimizer,
metrics=['sparse_categorical_accuracy'],
)
return model
def adversarial_training(model, embedding_name, epsilon=1.):
"""给模型添加对抗训练
其中model是需要添加对抗训练的keras模型,embedding_name
则是model里边Embedding层的名字。要在模型compile之后使用。
"""
if model.train_function is None: # 如果还没有训练函数
model._make_train_function() # 手动make
old_train_function = model.train_function # 备份旧的训练函数
# 查找Embedding层
for output in model.outputs:
embedding_layer = search_layer(output, embedding_name)
if embedding_layer is not None:
break
if embedding_layer is None:
raise Exception('Embedding layer not found')
# 求Embedding梯度
embeddings = embedding_layer.embeddings # Embedding矩阵
gradients = K.gradients(model.total_loss, [embeddings]) # Embedding梯度
gradients = K.zeros_like(embeddings) + gradients[0] # 转为dense tensor
# 封装为函数
inputs = (
model._feed_inputs + model._feed_targets + model._feed_sample_weights
) # 所有输入层
embedding_gradients = K.function(
inputs=inputs,
outputs=[gradients],
name='embedding_gradients',
) # 封装为函数
def train_function(inputs): # 重新定义训练函数
grads = embedding_gradients(inputs)[0] # Embedding梯度
delta = epsilon * grads / (np.sqrt((grads**2).sum()) + 1e-8) # 计算扰动
K.set_value(embeddings, K.eval(embeddings) + delta) # 注入扰动
outputs = old_train_function(inputs) # 梯度下降
K.set_value(embeddings, K.eval(embeddings) - delta) # 删除扰动
return outputs
model.train_function = train_function # 覆盖原训练函数
class Evaluator(Callback):
def __init__(self, valid_generator):
super().__init__()
self.valid_generator = valid_generator
self.best_val_f1 = 0.
def evaluate(self):
y_true, y_pred = list(), list()
for x, y in self.valid_generator:
y_true.append(y)
y_pred.append(self.model.predict(x).argmax(axis=1))
y_true = np.concatenate(y_true)
y_pred = np.concatenate(y_pred)
f1 = f1_score(y_true, y_pred, average='macro')
return f1
def on_epoch_end(self, epoch, logs=None):
val_f1 = self.evaluate()
if val_f1 > self.best_val_f1:
self.best_val_f1 = val_f1
logs['val_f1'] = val_f1
print(f'val_f1: {val_f1:.5f}, best_val_f1: {self.best_val_f1:.5f}')
def do_train(df_train):
skf = StratifiedKFold(n_splits=n, random_state=SEED, shuffle=True)
for fold, (trn_idx, val_idx) in enumerate(skf.split(df_train['text'], df_train['label']), 1):
print(f'Fold {fold}')
train_data = load_data(df_train.iloc[trn_idx])
valid_data = load_data(df_train.iloc[val_idx])
train_generator = data_generator(train_data, batch_size, random=True)
valid_generator = data_generator(valid_data, batch_size)
model = build_model()
adversarial_training(model, 'Embedding-Token', 0.5)
callbacks = [
Evaluator(valid_generator),
EarlyStopping(
monitor='val_f1',
patience=5,
verbose=1,
mode='max'),
ReduceLROnPlateau(
monitor='val_f1',
factor=0.5,
patience=2,
verbose=1,
mode='max'),
ModelCheckpoint(
f'weights-{fold}.h5',
monitor='val_f1',
save_weights_only=True,
save_best_only=True,
verbose=1,
mode='max'),
]
model.fit_generator(
train_generator.forfit(),
steps_per_epoch=len(train_generator),
epochs=epochs,
callbacks=callbacks,
validation_data=valid_generator.forfit(),
validation_steps=len(valid_generator)
)
del model
K.clear_session()
if __name__ == '__main__':
df_train = pd.read_csv('data/train_set.csv', sep='\t')
df_train['text'] = df_train['text'].apply(lambda x: x.strip().split())
do_train(df_train)
2.4 模型测试
测试集是test_a.csv
# 模型预测脚本
import numpy as np
import pandas as pd
from bert4keras.backend import keras, K
from bert4keras.models import build_transformer_model
from bert4keras.snippets import sequence_padding, DataGenerator
from bert4keras.tokenizers import Tokenizer
from keras.layers import *
# BERT base
config_path = 'pre_models/bert_config.json'
checkpoint_path = 'pre_models/bert_model.ckpt'
dict_path = 'pre_models/vocab.txt'
n = 5 # Cross-validation
SEED = 2020
num_classes = 14
maxlen = 512
max_segment = 2
batch_size = 4
grad_accum_steps = 64
drop = 0.2
lr = 2e-5
epochs = 100
def load_data(df):
"""加载数据。"""
D = list()
for _, row in df.iterrows():
text = row['text']
label = row['label']
D.append((text, int(label)))
return D
# 建立分词器
tokenizer = Tokenizer(dict_path, do_lower_case=True)
def sentence_split(words):
"""句子截断。"""
document_len = len(words)
index = list(range(0, document_len, maxlen-2))
index.append(document_len)
segments = []
for i in range(len(index) - 1):
segment = words[index[i]: index[i + 1]]
assert len(segment) > 0
segment = tokenizer.tokens_to_ids(['[CLS]'] + segment + ['[SEP]'])
segments.append(segment)
assert len(segments) > 0
if len(segments) > max_segment:
segment_ = int(max_segment / 2)
return segments[:segment_] + segments[-segment_:]
else:
return segments
class data_generator(DataGenerator):
"""数据生成器。"""
def __init__(self, data, batch_size=32, buffer_size=None, random=False):
super().__init__(data, batch_size, buffer_size)
self.random = random
def __iter__(self, random=False):
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
for is_end, (text, label) in self.sample(random):
token_ids = sentence_split(text)
token_ids = sequence_padding(token_ids, length=maxlen)
segment_ids = np.zeros_like(token_ids)
batch_token_ids.append(token_ids)
batch_segment_ids.append(segment_ids)
batch_labels.append([label])
if len(batch_token_ids) == self.batch_size or is_end:
batch_token_ids = sequence_padding(
batch_token_ids, length=max_segment)
batch_segment_ids = sequence_padding(
batch_segment_ids, length=max_segment)
batch_labels = sequence_padding(batch_labels)
yield [batch_token_ids, batch_segment_ids], batch_labels
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
def forfit(self):
while True:
for d in self.__iter__(self.random):
yield d
class Attention(Layer):
"""注意力层。"""
def __init__(self, hidden_size, **kwargs):
self.hidden_size = hidden_size
super().__init__(**kwargs)
def build(self, input_shape):
initializer = keras.initializers.truncated_normal(mean=0.0, stddev=0.05)
# 为该层创建一个可训练的权重
self.weight = self.add_weight(
name='weight',
shape=(self.hidden_size, self.hidden_size),
initializer=initializer,
trainable=True)
self.bias = self.add_weight(
name='bias',
shape=(self.hidden_size,),
initializer='zero',
trainable=True)
self.query = self.add_weight(
name='query',
shape=(self.hidden_size, 1),
initializer=initializer,
trainable=True)
super().build(input_shape) # 一定要在最后调用它
def call(self, x):
x, mask = x
mask = K.squeeze(mask, axis=2)
# linear
key = K.bias_add(K.dot(x, self.weight), self.bias)
# compute attention
outputs = K.squeeze(K.dot(key, self.query), axis=2)
outputs -= 1e32 * (1 - mask)
attn_scores = K.softmax(outputs)
attn_scores *= mask
attn_scores = K.reshape(
attn_scores, shape=(-1, 1, attn_scores.shape[-1]))
outputs = K.squeeze(K.batch_dot(attn_scores, key), axis=1)
return outputs
def compute_output_shape(self, input_shape):
return input_shape[0][0], self.hidden_size
def build_model():
"""构建模型。"""
token_ids = Input(shape=(max_segment, maxlen), dtype='int32')
segment_ids = Input(shape=(max_segment, maxlen), dtype='int32')
input_mask = Masking(mask_value=0)(token_ids)
input_mask = Lambda(
lambda x: K.cast(K.any(x, axis=2, keepdims=True), 'float32')
)(input_mask)
token_ids1 = Lambda(
lambda x: K.reshape(x, shape=(-1, maxlen))
)(token_ids)
segment_ids1 = Lambda(
lambda x: K.reshape(x, shape=(-1, maxlen))
)(segment_ids)
# 加载预训练模型
bert = build_transformer_model(
config_path=config_path,
checkpoint_path=checkpoint_path,
return_keras_model=False,
)
output = bert.model([token_ids1, segment_ids1])
output = Lambda(lambda x: x[:, 0])(output)
output = Lambda(
lambda x: K.reshape(x, shape=(-1, max_segment, output.shape[-1]))
)(output)
output = Multiply()([output, input_mask])
output = Dropout(drop)(output)
output = Attention(output.shape[-1].value)([output, input_mask])
output = Dropout(drop)(output)
output = Dense(
units=num_classes,
activation='softmax',
kernel_initializer=bert.initializer
)(output)
model = keras.models.Model([token_ids, segment_ids], output)
return model
def do_predict(df_test):
test_data = load_data(df_test)
test_generator = data_generator(test_data, batch_size)
model = build_model()
res = np.zeros((len(test_data), num_classes))
for i in range(1, n+1):
model.load_weights(f'weights-{i}.h5')
pred = model.predict_generator(
test_generator.forfit(), steps=len(test_generator))
res += pred / n
return res
if __name__ == '__main__':
df_test = pd.read_csv('data/test_a.csv', sep='\t')
df_test['label'] = 0
df_test['text'] = df_test['text'].apply(lambda x: x.strip().split())
res = do_predict(df_test)
df_test['label'] = res.argmax(axis=1)
df_test.to_csv('submission.csv', index=False, columns=['label'])