淘先锋技术网

首页 1 2 3 4 5 6 7

数据集来源:GitHub - chinese-poetry/chinese-poetry: The most comprehensive database of Chinese poetry 🧶最全中华古诗词数据库, 唐宋两朝近一万四千古诗人, 接近5.5万首唐诗加26万宋诗. 两宋时期1564位词人,21050首词。

 这里仅仅选用了其中的唐诗部分进行训练:chinese-poetry/全唐诗/

原始数据:

在原始数据中包含作者,内容,标题,id四个部分,这里使用内容进行训练

 

 数据预处理

首先将所有唐诗的诗句合并成一个长文本

import numpy as np
import json
import torch
import os
import pkuseg


# 读取 json 文件
def get_json(path):
    with open(path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data

# 对 ./tang/ 文件夹下的所有 json 文件进行遍历
# 获取文件夹下的所有文件名
def get_file_name(path):
    file_name = []
    for root, dirs, files in os.walk(path):
        for file in files:
            file_name.append(file)
    return file_name


file_name_ls = get_file_name('./tang/')

ret_ls = []

for file_name in file_name_ls:
    ls = get_json('./tang/' + file_name)
    n_ls = len(ls)

    for i in range(n_ls):
        para = ls[i]['paragraphs']
        para = ''.join(para)
        ret_ls.append(para)

n_poet = len(ret_ls)  # 一共 57607 首诗歌
# 全部合并成一个 string
str_all = ''.join(ret_ls)

接着对其分词,我这里里使用了torchtext库,在代码中使用了collections.Countertorchtext.vocab.Vocab来构建中文字符级别的词汇表,并将文本转换为整数序列。

在这段代码中,首先定义了一个字符级别的分词器chinese_char_tokenizer,它将文本拆分为单个字符。接下来,使用该分词器对所有文本进行分词处理,并使用Counter统计字符出现的频率。然后,使用Vocab基于计数器构建词汇表vocab。最后,将文本转换为整数序列text_as_int,其中每个字符被其在词汇表中的索引所代替。

(这仅仅是其中一种分词方式,由于中文的特殊性,对中文文本的分词工作较英文更加复杂,有兴趣的读者可以尝试下其他的分词方式,例如pyhanlp,pkuseg等)

from collections import Counter
from torchtext.vocab import Vocab

# 定义中文字符级别的分词器
def chinese_char_tokenizer(text):
    return [char for char in text]

# 创建分词器
tokenizer = chinese_char_tokenizer

# 使用 tokenizer 对所有文本进行分词处理
counter = Counter()
counter.update(tokenizer(str_all))

# 创建词汇表,基于计数器
vocab = Vocab(counter)

# 将文本转换为整数序列
text_as_int = [vocab[token] for token in str_all]
print("text_as_int", max(text_as_int))

接着将分好的词表进行保存,便于后面调用

# 保存词表
word_dict = {}
for ii, (char, count) in enumerate(counter.items()):
        print(f'Character: {char}, Frequency: {count}')
        word_dict[count] = char

with open('word_dict.json', 'w', encoding='utf-8') as file:
    json.dump(word_dict, file, ensure_ascii=False)
print("===================")

接着构建数据集,对唐诗内容进行采样,并且将所有数据以7:3的比例划分为训练集和测试集

# 制作数据集的方法:从 ret_ls 中,采样一首诗,然后从这首诗随机采样一个长度为 20 的子串,作为输入,然后预测下一个字符
# 采样方法是随机采样
import random
from tqdm import tqdm
from sklearn.model_selection import train_test_split

x_seq_ls = []
y_seq_ls = []

for i in tqdm(range(n_poet)):

    if len(ret_ls[i]) - 21 <= 0:  # 如果这首诗歌的长度小于等于 21,就跳过
        continue

    # 随机选一个子串
    start = random.randint(0, len(ret_ls[i]) - 21)
    end = start + 20
    # 保存到 x_seq_ls 和 y_seq_ls 中
    x_seq_ls.append(ret_ls[i][start:end])
    y_seq_ls.append(ret_ls[i][end])

# 把 x_seq_ls 和 y_seq_ls 用 vocab 进行编码

x_token = [[vocab[char] for char in seq] for seq in x_seq_ls]
y_token = [vocab[char] for char in y_seq_ls]

# 转化为 numpy
x_mat = np.array(x_token)
y_mat = np.array(y_token)

# 划分为训练集和测试集(7:3)
x_train, x_test, y_train, y_test = train_test_split(x_mat, y_mat, test_size=0.3, random_state=42, shuffle=True)
# 先把 x_train, x_test, y_train, y_test 转化为 tensor
x_train = torch.tensor(x_train).to(device)
x_test = torch.tensor(x_test).to(device)
y_train = torch.tensor(y_train).to(device)
y_test = torch.tensor(y_test).to(device)

print(x_train.shape, y_train.shape)

构造网络

这里使用的是LSTM,根据子集选择的分词方式,若词表大小为len_word_dict,则设置nn.Embedding(len_word_dict, ***),最终输出张量的大小为[batch_size, len_word_dict]

import torch
import torch.nn as nn
import torch.nn.functional as F

# 设置 CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class LSTMNet(nn.Module):
    def __init__(self):
        super(LSTMNet, self).__init__()
        self.embedding = nn.Embedding(268815, 128)
        self.lstm1 = nn.LSTM(input_size=128, hidden_size=128, num_layers=1, batch_first=True)
        self.dropout1 = nn.Dropout(0.2)
        self.lstm2 = nn.LSTM(input_size=128, hidden_size=128, num_layers=1, batch_first=True)
        self.dropout2 = nn.Dropout(0.2)
        self.fc = nn.Linear(128, 268815)

    def forward(self, x):
        x = self.embedding(x)  # [batch_size, seq_len, embedding_size]
        x, _ = self.lstm1(x)  # [batch_size, seq_len, hidden_size]
        x = self.dropout1(x)  # [batch_size, seq_len, hidden_size]
        x, _ = self.lstm2(x)  # [batch_size, seq_len, hidden_size]
        x = self.dropout2(x)  # [batch_size, seq_len, hidden_size]
        x = x[:, -1, :]  # 这里-1的意思是:取最后一个输出 [batch_size, hidden_size]
        x = self.fc(x)  # [batch_size, 268815]
        return x


if __name__ == "__main__":
    # 实例化模型
    model = LSTMNet().to(device)
    print(model)

将网络结构进行输出:

LSTMNet(
  (embedding): Embedding(268815, 128)
  (lstm1): LSTM(128, 128, batch_first=True)
  (dropout1): Dropout(p=0.2, inplace=False)
  (lstm2): LSTM(128, 128, batch_first=True)
  (dropout2): Dropout(p=0.2, inplace=False)
  (fc): Linear(in_features=128, out_features=268815, bias=True)
)

训练

首先导入相关包,指定训练显卡编号

import torch
import torch.optim as optim
from tqdm import tqdm
import torch.nn as nn
from model import LSTMNet
from data import x_train, x_test, y_train, y_test
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'

因为之前 y_train 和 y_test [batch, 1] 最后一个维度是没用的,所以要把它去掉,变成 [batch] 才能正常给交叉熵损失函数计算

y_train = y_train.squeeze()
y_test = y_test.squeeze()

# 转化成 Long
y_train = y_train.long()
y_test = y_test.long()

 设置参数信息

model = LSTMNet().cuda()
optimizer = optim.Adam(model.parameters(), lr=0.001)

batch_size = 8
epochs = 100

loss_func = nn.CrossEntropyLoss()

开始训练,在训练过程中按照训练一轮,验证一轮的方式来进行训练。每当出现更高的精度时则将此时模型的权重保存下来,最终全部训练完成后将最后一次的模型权重保存下来。

for epoch in range(epochs):
    print('Epoch: ', epoch)
    test_acc = 0
    correct_train, correct_test = 0, 0
    for i in tqdm(range(0, len(x_train), batch_size)):
        model.train()
        x_batch = x_train[i:i+batch_size].cuda()
        # print("x_batch:", x_batch)
        y_batch = y_train[i:i+batch_size].cuda()
        pred = model(x_batch)
        loss = loss_func(pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        pred = model(x_batch)
        pred = torch.argmax(pred, dim=1)
        correct_train += (pred == y_batch).sum().item()
    print('Train acc: ', correct_train / len(x_train))

    for i in tqdm(range(0, len(x_test), batch_size)):
        model.eval()
        x_batch = x_test[i:i + batch_size].cuda()
        y_batch = y_test[i:i + batch_size].cuda()
        pred = model(x_batch)
        pred = torch.argmax(pred, dim=1)
        correct_test += (pred == y_batch).sum().item()
    t_acc = correct_test / len(y_test)
    print('Test acc: ', t_acc)
    if t_acc > test_acc:
        test_acc = t_acc
        torch.save(model, f"./lstm_{t_acc:.4f}.pth")

torch.save(model, "./lstm_last.pth")

预测

首先加载模型权重

from model import LSTMNet
import torch
import numpy as np
import json
import torch.nn as nn


model = LSTMNet().cuda()
# 加载模型权重
state_dict = torch.load("/home/quant/zhangenwei/project/tang_create/lstm_0.1815.pth")
model = nn.DataParallel(model)
model.load_state_dict(state_dict)
model.eval()

写出诗句的前段话,模型自动生成后面部分,这里设置一共预测20步,每一步都会生成一个字,最终将预测的字拼接到原有的句子中即得到了完整的诗句。

def get_keys_by_value(dictionary, value):
    return [int(key) for key, val in dictionary.items() if val == value]

file = open('word_dict.json', 'r', encoding='utf-8')
for val in file.readlines():
    val = json.loads(val)
    word = val

test_string = '白日依山盡,黃河入海流,欲窮千里目,更上一'

for i in range(20):
    # 循环 20 步,每步都要预测一个字
    test_string_token = []
    for val in test_string[-20:]:
        word_token = get_keys_by_value(word, val)
        # print(word_token)
        test_string_token.extend(word_token)
    test_string_mat = np.array(test_string_token)

    input = torch.tensor(test_string_mat).cuda()
    input = input.unsqueeze(0)
    pred = model(input)
    pred_argmax = torch.argmax(pred, dim=1).item()
    # 把预测的字转化为文字
    test_string = test_string + word[pred_argmax]

print(test_string)

写在最后

这种方式本质上是一种自回归模型,模型生成的每个事件都将依赖于之前生成的输出,这样的好处在于可以考虑到文本序列的上下文信息,每个事件不依赖于之前的内容,从而可以捕捉到文本序列中的上下文相关性,使生成的内容更加连贯。

然而,由于每次生成的内容都依赖于之前的输出,随着生成内容的增长,误差也会累积,一旦前面生成的内容出现错误就会导致后面误差越来越大。并且自回归模型的生成过程是串行的,每个时间步只能依赖前一个时间步的输出,无法并行生成多个时间步的输出,因此生成速度较慢。