这里仅仅选用了其中的唐诗部分进行训练:chinese-poetry/全唐诗/
原始数据:
在原始数据中包含作者,内容,标题,id四个部分,这里使用内容进行训练
数据预处理
首先将所有唐诗的诗句合并成一个长文本
import numpy as np
import json
import torch
import os
import pkuseg
# 读取 json 文件
def get_json(path):
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
# 对 ./tang/ 文件夹下的所有 json 文件进行遍历
# 获取文件夹下的所有文件名
def get_file_name(path):
file_name = []
for root, dirs, files in os.walk(path):
for file in files:
file_name.append(file)
return file_name
file_name_ls = get_file_name('./tang/')
ret_ls = []
for file_name in file_name_ls:
ls = get_json('./tang/' + file_name)
n_ls = len(ls)
for i in range(n_ls):
para = ls[i]['paragraphs']
para = ''.join(para)
ret_ls.append(para)
n_poet = len(ret_ls) # 一共 57607 首诗歌
# 全部合并成一个 string
str_all = ''.join(ret_ls)
接着对其分词,我这里里使用了torchtext库,在代码中使用了collections.Counter
和torchtext.vocab.Vocab
来构建中文字符级别的词汇表,并将文本转换为整数序列。
在这段代码中,首先定义了一个字符级别的分词器chinese_char_tokenizer
,它将文本拆分为单个字符。接下来,使用该分词器对所有文本进行分词处理,并使用Counter
统计字符出现的频率。然后,使用Vocab
基于计数器构建词汇表vocab
。最后,将文本转换为整数序列text_as_int
,其中每个字符被其在词汇表中的索引所代替。
(这仅仅是其中一种分词方式,由于中文的特殊性,对中文文本的分词工作较英文更加复杂,有兴趣的读者可以尝试下其他的分词方式,例如pyhanlp,pkuseg等)
from collections import Counter
from torchtext.vocab import Vocab
# 定义中文字符级别的分词器
def chinese_char_tokenizer(text):
return [char for char in text]
# 创建分词器
tokenizer = chinese_char_tokenizer
# 使用 tokenizer 对所有文本进行分词处理
counter = Counter()
counter.update(tokenizer(str_all))
# 创建词汇表,基于计数器
vocab = Vocab(counter)
# 将文本转换为整数序列
text_as_int = [vocab[token] for token in str_all]
print("text_as_int", max(text_as_int))
接着将分好的词表进行保存,便于后面调用
# 保存词表
word_dict = {}
for ii, (char, count) in enumerate(counter.items()):
print(f'Character: {char}, Frequency: {count}')
word_dict[count] = char
with open('word_dict.json', 'w', encoding='utf-8') as file:
json.dump(word_dict, file, ensure_ascii=False)
print("===================")
接着构建数据集,对唐诗内容进行采样,并且将所有数据以7:3的比例划分为训练集和测试集
# 制作数据集的方法:从 ret_ls 中,采样一首诗,然后从这首诗随机采样一个长度为 20 的子串,作为输入,然后预测下一个字符
# 采样方法是随机采样
import random
from tqdm import tqdm
from sklearn.model_selection import train_test_split
x_seq_ls = []
y_seq_ls = []
for i in tqdm(range(n_poet)):
if len(ret_ls[i]) - 21 <= 0: # 如果这首诗歌的长度小于等于 21,就跳过
continue
# 随机选一个子串
start = random.randint(0, len(ret_ls[i]) - 21)
end = start + 20
# 保存到 x_seq_ls 和 y_seq_ls 中
x_seq_ls.append(ret_ls[i][start:end])
y_seq_ls.append(ret_ls[i][end])
# 把 x_seq_ls 和 y_seq_ls 用 vocab 进行编码
x_token = [[vocab[char] for char in seq] for seq in x_seq_ls]
y_token = [vocab[char] for char in y_seq_ls]
# 转化为 numpy
x_mat = np.array(x_token)
y_mat = np.array(y_token)
# 划分为训练集和测试集(7:3)
x_train, x_test, y_train, y_test = train_test_split(x_mat, y_mat, test_size=0.3, random_state=42, shuffle=True)
# 先把 x_train, x_test, y_train, y_test 转化为 tensor
x_train = torch.tensor(x_train).to(device)
x_test = torch.tensor(x_test).to(device)
y_train = torch.tensor(y_train).to(device)
y_test = torch.tensor(y_test).to(device)
print(x_train.shape, y_train.shape)
构造网络
这里使用的是LSTM,根据子集选择的分词方式,若词表大小为len_word_dict,则设置nn.Embedding(len_word_dict, ***),最终输出张量的大小为[batch_size, len_word_dict]
import torch
import torch.nn as nn
import torch.nn.functional as F
# 设置 CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class LSTMNet(nn.Module):
def __init__(self):
super(LSTMNet, self).__init__()
self.embedding = nn.Embedding(268815, 128)
self.lstm1 = nn.LSTM(input_size=128, hidden_size=128, num_layers=1, batch_first=True)
self.dropout1 = nn.Dropout(0.2)
self.lstm2 = nn.LSTM(input_size=128, hidden_size=128, num_layers=1, batch_first=True)
self.dropout2 = nn.Dropout(0.2)
self.fc = nn.Linear(128, 268815)
def forward(self, x):
x = self.embedding(x) # [batch_size, seq_len, embedding_size]
x, _ = self.lstm1(x) # [batch_size, seq_len, hidden_size]
x = self.dropout1(x) # [batch_size, seq_len, hidden_size]
x, _ = self.lstm2(x) # [batch_size, seq_len, hidden_size]
x = self.dropout2(x) # [batch_size, seq_len, hidden_size]
x = x[:, -1, :] # 这里-1的意思是:取最后一个输出 [batch_size, hidden_size]
x = self.fc(x) # [batch_size, 268815]
return x
if __name__ == "__main__":
# 实例化模型
model = LSTMNet().to(device)
print(model)
将网络结构进行输出:
LSTMNet(
(embedding): Embedding(268815, 128)
(lstm1): LSTM(128, 128, batch_first=True)
(dropout1): Dropout(p=0.2, inplace=False)
(lstm2): LSTM(128, 128, batch_first=True)
(dropout2): Dropout(p=0.2, inplace=False)
(fc): Linear(in_features=128, out_features=268815, bias=True)
)
训练
首先导入相关包,指定训练显卡编号
import torch
import torch.optim as optim
from tqdm import tqdm
import torch.nn as nn
from model import LSTMNet
from data import x_train, x_test, y_train, y_test
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'
因为之前 y_train 和 y_test [batch, 1] 最后一个维度是没用的,所以要把它去掉,变成 [batch] 才能正常给交叉熵损失函数计算
y_train = y_train.squeeze()
y_test = y_test.squeeze()
# 转化成 Long
y_train = y_train.long()
y_test = y_test.long()
设置参数信息
model = LSTMNet().cuda()
optimizer = optim.Adam(model.parameters(), lr=0.001)
batch_size = 8
epochs = 100
loss_func = nn.CrossEntropyLoss()
开始训练,在训练过程中按照训练一轮,验证一轮的方式来进行训练。每当出现更高的精度时则将此时模型的权重保存下来,最终全部训练完成后将最后一次的模型权重保存下来。
for epoch in range(epochs):
print('Epoch: ', epoch)
test_acc = 0
correct_train, correct_test = 0, 0
for i in tqdm(range(0, len(x_train), batch_size)):
model.train()
x_batch = x_train[i:i+batch_size].cuda()
# print("x_batch:", x_batch)
y_batch = y_train[i:i+batch_size].cuda()
pred = model(x_batch)
loss = loss_func(pred, y_batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
pred = model(x_batch)
pred = torch.argmax(pred, dim=1)
correct_train += (pred == y_batch).sum().item()
print('Train acc: ', correct_train / len(x_train))
for i in tqdm(range(0, len(x_test), batch_size)):
model.eval()
x_batch = x_test[i:i + batch_size].cuda()
y_batch = y_test[i:i + batch_size].cuda()
pred = model(x_batch)
pred = torch.argmax(pred, dim=1)
correct_test += (pred == y_batch).sum().item()
t_acc = correct_test / len(y_test)
print('Test acc: ', t_acc)
if t_acc > test_acc:
test_acc = t_acc
torch.save(model, f"./lstm_{t_acc:.4f}.pth")
torch.save(model, "./lstm_last.pth")
预测
首先加载模型权重
from model import LSTMNet
import torch
import numpy as np
import json
import torch.nn as nn
model = LSTMNet().cuda()
# 加载模型权重
state_dict = torch.load("/home/quant/zhangenwei/project/tang_create/lstm_0.1815.pth")
model = nn.DataParallel(model)
model.load_state_dict(state_dict)
model.eval()
写出诗句的前段话,模型自动生成后面部分,这里设置一共预测20步,每一步都会生成一个字,最终将预测的字拼接到原有的句子中即得到了完整的诗句。
def get_keys_by_value(dictionary, value):
return [int(key) for key, val in dictionary.items() if val == value]
file = open('word_dict.json', 'r', encoding='utf-8')
for val in file.readlines():
val = json.loads(val)
word = val
test_string = '白日依山盡,黃河入海流,欲窮千里目,更上一'
for i in range(20):
# 循环 20 步,每步都要预测一个字
test_string_token = []
for val in test_string[-20:]:
word_token = get_keys_by_value(word, val)
# print(word_token)
test_string_token.extend(word_token)
test_string_mat = np.array(test_string_token)
input = torch.tensor(test_string_mat).cuda()
input = input.unsqueeze(0)
pred = model(input)
pred_argmax = torch.argmax(pred, dim=1).item()
# 把预测的字转化为文字
test_string = test_string + word[pred_argmax]
print(test_string)
写在最后
这种方式本质上是一种自回归模型,模型生成的每个事件都将依赖于之前生成的输出,这样的好处在于可以考虑到文本序列的上下文信息,每个事件不依赖于之前的内容,从而可以捕捉到文本序列中的上下文相关性,使生成的内容更加连贯。
然而,由于每次生成的内容都依赖于之前的输出,随着生成内容的增长,误差也会累积,一旦前面生成的内容出现错误就会导致后面误差越来越大。并且自回归模型的生成过程是串行的,每个时间步只能依赖前一个时间步的输出,无法并行生成多个时间步的输出,因此生成速度较慢。