cut_sentence.py

"""
实现句子的分词
注意点:
1. 实现单个字分词 2. 实现按照词语分词
2.1 加载词典 3. 使用停用词
""" import string
import jieba
import jieba.posseg as psg
import logging stopwords_path = "../corpus/stopwords.txt" stopwords = [i.strip() for i in open(stopwords_path,encoding="utf-8").readlines()] #关闭jieba日志
jieba.setLogLevel(logging.INFO) #加载词典
jieba.load_userdict("../corpus/keywords.txt") continue_words = string.ascii_lowercase def _cut_sentence_by_word(sentence):
"""
按照单个字进行分词,eg:python 可 以 做 人 工 智 能 么 ? jave
:param sentence:str
:return: [str,str,str]
"""
temp = ""
result = []
for word in sentence:
if word in continue_words:
temp += word
else:
if len(temp)>0:
result.append(temp)
temp = ""
result.append(word) if len(temp)>0:
result.append(temp)
return result def _cut_sentence(sentence,use_stopwords,use_seg):
"""
按照词语进行分词
:param sentence:str
:return: 【str,str,str】
"""
if not use_seg:
result = jieba.lcut(sentence)
else:
result = [(i.word,i.flag) for i in psg.cut(sentence)]
if use_stopwords:
if not use_seg:
result = [i for i in result if i not in stopwords]
else:
result = [i for i in result if i[0] not in stopwords]
return result def cut(sentence,by_word=False,use_stopwords=False,use_seg=False):
"""
封装上述的方法
:param sentence:str
:param by_word: bool,是否按照单个字分词
:param use_stopwords: 是否使用停用词
:param use_seg: 是否返回词性
:return: [(str,seg),str]
"""
sentence = sentence.lower()
if by_word:
return _cut_sentence_by_word(sentence)
else:
return _cut_sentence(sentence,use_stopwords,use_seg)

word_sequence.py

"""
文本序列化
""" class WordSequence:
UNK_TAG = "<UNK>" #表示未知字符
PAD_TAG = "<PAD>" #填充符
SOS_TAG = "<SOS>"
EOS_TAG = "<EOS>"
PAD = 0
UNK = 1
SOS = 2
EOS = 3 def __init__(self):
self.dict = { #保存词语和对应的数字
self.UNK_TAG:self.UNK,
self.PAD_TAG:self.PAD,
self.SOS_TAG:self.SOS,
self.EOS_TAG:self.EOS
}
self.count = {} #统计词频的 def fit(self,sentence):
"""
接受句子,统计词频
:param sentence:[str,str,str]
:return:None
"""
for word in sentence:
self.count[word] = self.count.get(word,0) + 1 #所有的句子fit之后,self.count就有了所有词语的词频 def build_vocab(self,min_count=5,max_count=None,max_features=None):
"""
根据条件构造 词典
:param min_count:最小词频
:param max_count: 最大词频
:param max_features: 最大词语数
:return:
"""
if min_count is not None:
self.count = {word:count for word,count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word:count for word,count in self.count.items() if count <= max_count}
if max_features is not None:
#[(k,v),(k,v)....] --->{k:v,k:v}
self.count = dict(sorted(self.count.items(),lambda x:x[-1],reverse=True)[:max_features]) for word in self.count:
self.dict[word] = len(self.dict) #每次word对应一个数字 #把dict进行翻转
self.inverse_dict = dict(zip(self.dict.values(),self.dict.keys())) def transform(self,sentence,max_len=None,add_eos=False):
"""
把句子转化为数字序列
:param sentence:[str,str,str]
:return: [int,int,int]
"""
if add_eos and max_len is not None:
max_len = max_len-1 if len(sentence) > max_len:
sentence = sentence[:max_len]
else:
sentence = sentence + [self.PAD_TAG] *(max_len- len(sentence)) #填充PAD if add_eos:
if self.PAD_TAG in sentence:
index = sentence.index(self.PAD_TAG)
sentence.insert(index,self.EOS_TAG)
else:
sentence += [self.EOS_TAG] return [self.dict.get(i,1) for i in sentence] def inverse_transform(self,incides):
"""
把数字序列转化为字符
:param incides: [int,int,int]
:return: [str,str,str]
"""
result = []
for i in incides:
temp = self.inverse_dict.get(i, "<UNK>")
if temp != self.EOS_TAG:
result.append(temp)
else:
break
return "".join(result) def __len__(self):
return len(self.dict) if __name__ == '__main__':
sentences = [["今天","天气","很","好"],
["今天","去","吃","什么"]]
ws = WordSequence()
for sentence in sentences:
ws.fit(sentence)
ws.build_vocab(min_count=1)
print(ws.dict)
ret = ws.transform(["好","好","好","好","好","好","好","热","呀"],max_len=3)
print(ret)
ret = ws.inverse_transform(ret)
print(ret)
pass

  

dataset.py

"""
准备数据集
"""
import random
from tqdm import tqdm
import config
import torch
from torch.utils.data import DataLoader,Dataset #1. 进行数据集的切分
def chatbot_data_split():
input = open("../corpus/chatbot/input.txt",encoding="utf-8").readlines()
target = open("../corpus/chatbot/target.txt",encoding="utf-8").readlines()
f_train_input = open("../corpus/chatbot/train_input.txt","a",encoding="utf-8")
f_train_target = open("../corpus/chatbot/train_target.txt","a",encoding="utf-8")
f_test_input = open("../corpus/chatbot/test_input.txt","a",encoding="utf-8")
f_test_target = open("../corpus/chatbot/test_target.txt","a",encoding="utf-8")
for input,target in tqdm(zip(input,target),total=len(input)):
if random.random()>0.8:
#放入test
f_test_input.write(input)
f_test_target.write(target)
else:
f_train_input.write(input)
f_train_target.write(target)
f_train_input.close()
f_train_target.close()
f_test_input.close()
f_test_target.close() #2. 准备dataset class ChatDataset(Dataset):
def __init__(self,train=True):
input_path = "../corpus/chatbot/train_input.txt" if train else "../corpus/chatbot/test_input.txt"
target_path = "../corpus/chatbot/train_target.txt" if train else "../corpus/chatbot/test_target.txt"
self.input_data = open(input_path,encoding="utf-8").readlines()
self.target_data = open(target_path,encoding="utf-8").readlines()
assert len(self.input_data) == len(self.target_data),"input target长度不一致!!!" def __getitem__(self, idx):
input = self.input_data[idx].strip().split()
target = self.target_data[idx].strip().split()
#获取真实长度
input_len = len(input) if len(input)<config.chatbot_input_max_len else config.chatbot_input_max_len
target_len = len(target) if len(target)<config.chatbot_target_max_len else config.chatbot_target_max_len input = config.input_ws.transform(input,max_len=config.chatbot_input_max_len)
target = config.target_ws.transform(target,max_len=config.chatbot_target_max_len,add_eos=True)
return input,target,input_len,target_len def __len__(self):
return len(self.input_data) # 3. 准备dataloader
def collate_fn(batch):
"""
:param batch:【(input,target,input_len,target_len),(),(一个getitem的结果)】
:return:
"""
#1. 对batch按照input的长度进行排序
batch = sorted(batch,key=lambda x:x[-2],reverse=True)
#2. 进行batch操作
input, target, input_len, target_len = zip(*batch)
#3. 把输入处理成LongTensor
input = torch.LongTensor(input)
target = torch.LongTensor(target)
input_len = torch.LongTensor(input_len)
target_len = torch.LongTensor(target_len)
return input, target, input_len, target_len def get_dataloader(train=True):
batch_size = config.chatbot_train_batch_size if train else config.chatbot_test_batch_size
return DataLoader(ChatDataset(train),batch_size=batch_size,collate_fn=collate_fn,shuffle=True) if __name__ == '__main__':
loader = get_dataloader()
for idx,(input, target, input_len, target_len) in enumerate(loader):
print(idx)
print(input)
print(target)
print(input_len)
print(target_len)
break

config.py

"""
项目配置
"""
import pickle
import torch device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = ("cpu") ################# classify 相关的配置 ###############
predict_ratio = 0.98 #预测可能性的阈值 ################# chatbot相关的配置 #################
chatbot_train_batch_size = 400
chatbot_test_batch_size = 500 input_ws = pickle.load(open("../chatbot/models/ws_input.pkl","rb"))
target_ws = pickle.load(open("../chatbot/models/ws_target.pkl","rb"))
chatbot_input_max_len = 20
chatbot_target_max_len = 30 chatbot_encoder_embedding_dim = 300
chatbot_encoder_hidden_size = 128
chatbot_encoder_number_layer = 2
chatbot_encoder_bidirectional = True
chatbot_encoder_dropout = 0.3 chatbot_decoder_embedding_dim = 300
chatbot_decoder_hidden_size = 128*2
chatbot_decoder_number_layer = 1
chatbot_decoder_dropout = 0

  

  

encoder.py

"""
进行编码
""" import torch.nn as nn
from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
import config
import torch class Encoder(nn.Module):
def __init__(self):
super(Encoder,self).__init__()
self.embedding = nn.Embedding(num_embeddings=len(config.input_ws),
embedding_dim=config.chatbot_encoder_embedding_dim,
padding_idx=config.input_ws.PAD
)
# 2层双向,每层hidden_size 128
self.gru = nn.GRU(input_size=config.chatbot_encoder_embedding_dim,
hidden_size=config.chatbot_encoder_hidden_size,
num_layers=config.chatbot_encoder_number_layer,
batch_first=True,
bidirectional=config.chatbot_encoder_bidirectional,
dropout=config.chatbot_encoder_dropout) def forward(self, input,input_len):
input_embeded = self.embedding(input) #对输入进行打包
input_packed = pack_padded_sequence(input_embeded,input_len,batch_first=True)
#经过GRU处理
output,hidden = self.gru(input_packed)
# print("encoder gru hidden:",hidden.size())
#进行解包
output_paded,seq_len = pad_packed_sequence(output,batch_first=True,padding_value=config.input_ws.PAD)
#获取最上层的正向和反向最后一个时间步的输出,表示整个句子
encoder_hidden = torch.cat([hidden[-2],hidden[-1]],dim=-1).unsqueeze(0) #[1,batch_size,128*2]
return output_paded,encoder_hidden #[1,batch_size,128*2]

  decoder.py

"""
实现解码器
"""
import torch.nn as nn
import config
import torch
import torch.nn.functional as F
import numpy as np
import random class Decoder(nn.Module):
def __init__(self):
super(Decoder,self).__init__() self.embedding = nn.Embedding(num_embeddings=len(config.target_ws),
embedding_dim=config.chatbot_decoder_embedding_dim,
padding_idx=config.target_ws.PAD) #需要的hidden_state形状:[1,batch_size,64]
self.gru = nn.GRU(input_size=config.chatbot_decoder_embedding_dim,
hidden_size=config.chatbot_decoder_hidden_size,
num_layers=config.chatbot_decoder_number_layer,
bidirectional=False,
batch_first=True,
dropout=config.chatbot_decoder_dropout) #假如encoder的hidden_size=64,num_layer=1 encoder_hidden :[2,batch_sizee,64] self.fc = nn.Linear(config.chatbot_decoder_hidden_size,len(config.target_ws)) def forward(self, encoder_hidden,target):
# print("target size:",target.size())
#第一个时间步的输入的hidden_state
decoder_hidden = encoder_hidden #[1,batch_size,128*2]
#第一个时间步的输入的input
batch_size = encoder_hidden.size(1)
decoder_input = torch.LongTensor([[config.target_ws.SOS]]*batch_size).to(config.device) #[batch_size,1]
# print("decoder_input:",decoder_input.size()) #使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
decoder_outputs = torch.zeros([batch_size,config.chatbot_target_max_len,len(config.target_ws)]).to(config.device) if random.random()>0.5: #teacher_forcing机制 for t in range(config.chatbot_target_max_len):
decoder_output_t,decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
decoder_outputs[:,t,:] = decoder_output_t #获取当前时间步的预测值
value,index = decoder_output_t.max(dim=-1)
decoder_input = index.unsqueeze(-1) #[batch_size,1]
# print("decoder_input:",decoder_input.size())
else:
for t in range(config.chatbot_target_max_len):
decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
decoder_outputs[:, t, :] = decoder_output_t
#把真实值作为下一步的输入
decoder_input = target[:,t].unsqueeze(-1)
# print("decoder_input size:",decoder_input.size())
return decoder_outputs,decoder_hidden def forward_step(self,decoder_input,decoder_hidden):
'''
计算一个时间步的结果
:param decoder_input: [batch_size,1]
:param decoder_hidden: [1,batch_size,128*2]
:return:
''' decoder_input_embeded = self.embedding(decoder_input)
# print("decoder_input_embeded:",decoder_input_embeded.size()) #out:[batch_size,1,128*2]
#decoder_hidden :[1,bathc_size,128*2]
out,decoder_hidden = self.gru(decoder_input_embeded,decoder_hidden)
# print("decoder_hidden size:",decoder_hidden.size())
#out :【batch_size,1,hidden_size】 out_squeezed = out.squeeze(dim=1) #去掉为1的维度
out_fc = F.log_softmax(self.fc(out_squeezed),dim=-1) #[bathc_size,vocab_size]
# out_fc.unsqueeze_(dim=1) #[bathc_size,1,vocab_size]
# print("out_fc:",out_fc.size())
return out_fc,decoder_hidden def evaluate(self,encoder_hidden): # 第一个时间步的输入的hidden_state
decoder_hidden = encoder_hidden # [1,batch_size,128*2]
# 第一个时间步的输入的input
batch_size = encoder_hidden.size(1)
decoder_input = torch.LongTensor([[config.target_ws.SOS]] * batch_size).to(config.device) # [batch_size,1]
# print("decoder_input:",decoder_input.size()) # 使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
decoder_outputs = torch.zeros([batch_size, config.chatbot_target_max_len, len(config.target_ws)]).to(
config.device) predict_result = []
for t in range(config.chatbot_target_max_len):
decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
decoder_outputs[:, t, :] = decoder_output_t # 获取当前时间步的预测值
value, index = decoder_output_t.max(dim=-1)
predict_result.append(index.cpu().detach().numpy()) #[[batch],[batch]...]
decoder_input = index.unsqueeze(-1) # [batch_size,1]
# print("decoder_input:",decoder_input.size())
# predict_result.append(decoder_input)
#把结果转化为ndarray,每一行是一条预测结果
predict_result = np.array(predict_result).transpose()
return decoder_outputs, predict_result

  seq2seq.py

"""
完成seq2seq模型
"""
import torch.nn as nn
from chatbot.encoder import Encoder
from chatbot.decoder import Decoder class Seq2Seq(nn.Module):
def __init__(self):
super(Seq2Seq,self).__init__()
self.encoder = Encoder()
self.decoder = Decoder() def forward(self, input,input_len,target):
encoder_outputs,encoder_hidden = self.encoder(input,input_len)
decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target)
return decoder_outputs def evaluate(self,input,input_len):
encoder_outputs, encoder_hidden = self.encoder(input, input_len)
decoder_outputs, predict_result = self.decoder.evaluate(encoder_hidden)
return decoder_outputs,predict_result

  train.py

"""
进行模型的训练
"""
import torch
import torch.nn.functional as F
from chatbot.seq2seq import Seq2Seq
from torch.optim import Adam
from chatbot.dataset import get_dataloader
from tqdm import tqdm
import config
import numpy as np
import pickle
from matplotlib import pyplot as plt
# from eval import eval model = Seq2Seq().to(config.device) optimizer = Adam(model.parameters()) loss_list = [] def train(epoch):
data_loader = get_dataloader(train=True)
bar = tqdm(data_loader,total=len(data_loader)) for idx,(input,target,input_len,target_len) in enumerate(bar):
input = input.to(config.device)
target = target.to(config.device)
input_len = input_len.to(config.device)
optimizer.zero_grad()
decoder_outputs = model(input,input_len,target) #[batch_Size,max_len,vocab_size]
loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
loss.backward()
optimizer.step()
loss_list.append(loss.item())
bar.set_description("epoch:{} idx:{} loss:{:.6f}".format(epoch,idx,np.mean(loss_list))) if idx%100 == 0:
torch.save(model.state_dict(),"../chatbot/models/model.pkl")
torch.save(optimizer.state_dict(),"../chatbot/models/optimizer.pkl")
pickle.dump(loss_list,open("../chatbot/models/loss_list.pkl","wb")) if __name__ == '__main__':
for i in range(5):
train(i)
# eval() # plt.figure(figsize=(50,8))
# plt.plot(range(len(loss_list)),loss_list)
# plt.show()

  eval.py

"""
进行模型的评估
""" import torch
import torch.nn.functional as F
from chatbot.dataset import get_dataloader
from tqdm import tqdm
import config
import numpy as np
import pickle
from chatbot.seq2seq import Seq2Seq def eval():
model = Seq2Seq().to(config.device)
model.eval()
model.load_state_dict(torch.load("./models/model.pkl")) loss_list = []
data_loader = get_dataloader(train=False)
bar = tqdm(data_loader,total=len(data_loader),desc="当前进行评估")
with torch.no_grad():
for idx,(input,target,input_len,target_len) in enumerate(bar):
input = input.to(config.device)
target = target.to(config.device)
input_len = input_len.to(config.device) decoder_outputs,predict_result = model.evaluate(input,input_len) #[batch_Size,max_len,vocab_size]
loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
loss_list.append(loss.item())
bar.set_description("idx:{} loss:{:.6f}".format(idx,np.mean(loss_list)))
print("当前的平均损失为:",np.mean(loss_list)) def interface():
from chatbot.cut_sentence import cut
import config
#加载模型
model = Seq2Seq().to(config.device)
model.eval()
model.load_state_dict(torch.load("./models/model.pkl")) #准备待预测的数据
while True:
origin_input =input("me>>:")
# if "你是谁" in origin_input or "你叫什么" in origin_input:
# result = "我是小智。"
# elif "你好" in origin_input or "hello" in origin_input:
# result = "Hello"
# else:
_input = cut(origin_input, by_word=True)
input_len = torch.LongTensor([len(_input)]).to(config.device)
_input = torch.LongTensor([config.input_ws.transform(_input,max_len=config.chatbot_input_max_len)]).to(config.device) outputs,predict = model.evaluate(_input,input_len)
result = config.target_ws.inverse_transform(predict[0])
print("chatbot>>:",result) if __name__ == '__main__':
interface()

  

最新文章

  1. IOS 消息机制(NSNotificationCenter)
  2. Android--UI之Spinner
  3. C# 多线程防止卡死
  4. 開始Unity3D的学习之旅
  5. mysql insert语句错误问题解决
  6. ng的数据绑定
  7. Struts2中OGNL
  8. ListView分页显示
  9. iOS 热更新插件
  10. HTML5基本标签
  11. Quartz源码——scheduler.start()启动源码分析(二)
  12. Python 日志模块logging
  13. mycat练习笔记
  14. AIX查看系统版本
  15. myBatis:not bind 问题
  16. P1903 [国家集训队]数颜色 / 维护队列
  17. NioEventLoopGroup的构造函数
  18. normalization 阅读笔记
  19. IOTA price analysis
  20. spring boot 使用静态资源

热门文章

  1. (note)从小白到产品经理之路
  2. Jmeter 注册多个用户 之 CSV Data set Config
  3. Reactor模式和Proactor模式
  4. 基于 HTML5 WebGL 的楼宇智能化集成系统(一)
  5. vue实现选项卡切换--不用ui库
  6. 2288: 【基础】小X转进制
  7. 如何让Java应用成为杀不死的小强?(下篇)
  8. 001_Chrome 76支持原生HTML 图片懒加载Lazy loading
  9. 【JAVA进阶架构师指南】之四:垃圾回收GC
  10. 使用appium框架测试安卓app时,获取toast弹框文字时,前一步千万不要加time.sleep等等待时间。