


















couplet_file ="couplet.txt"#对联couplets = []with open(couplet_file,'r') as f: for line in f: try: content = line.replace(' ','') if '_' in content or '(' in content or '(' in content or '《' in content or '[' in content: continue if len(content) < * or len(content) > *: continue content = '[' + content + ']' # print chardet.detect(content) content = content.decode('utf-8') couplets.append(content) except Exception as e: pass# 按字数排序couplets = sorted(couplets,key=lambda line: len(line))print('对联总数: %d'%(len(couplets)))# 统计每个字出现次数all_words = []for couplet in couplets: all_words += [word for word in couplet]counter = collections.Counter(all_words)count_pairs = sorted(counter.items(), key=lambda x: -x[])words, _ = zip(*count_pairs)words = words[:len(words)] + (' ',)# 每个字映射为一个数字IDword_num_map = dict(zip(words, range(len(words))))to_num = lambda word: word_num_map.get(word, len(words))couplets_vector = [ list(map(to_num, couplet)) for couplet in couplets]# 每次取64首对联进行训练, 此参数可以调整batch_size = 64n_chunk = len(couplets_vector) // batch_sizex_batches = []y_batches = []for i in range(n_chunk): start_index = i * batch_size#起始位置 end_index = start_index + batch_size#结束位置 batches = couplets_vector[start_index:end_index] length = max(map(len,batches))#每个batches中句子的最大长度 xdata = np.full((batch_size,length), word_num_map[' '], np.int32) for row in range(batch_size): xdata[row,:len(batches[row])] = batches[row] ydata = np.copy(xdata) ydata[:,:-1] = xdata[:,1:] x_batches.append(xdata) y_batches.append(ydata)

定义LSTM模型(定义cell为一个128维的ht的cell。并使用MultiRNNCell 定义为两层的LSTM)

def neural_network(rnn_size=, num_layers=): cell = tf.nn.rnn_cell.BasicLSTMCell(rnn_size, state_is_tuple=True) cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True) initial_state = cell.zero_state(batch_size, tf.float32) with tf.variable_scope('rnnlm'): softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)+]) softmax_b = tf.get_variable("softmax_b", [len(words)+]) with tf.device("/cpu:0"): embedding = tf.get_variable("embedding", [len(words)+, rnn_size]) inputs = tf.nn.embedding_lookup(embedding, input_data) outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm') output = tf.reshape(outputs,[-, rnn_size]) logits = tf.matmul(output, softmax_w) + softmax_b probs = tf.nn.softmax(logits) return logits, last_state, probs, cell, initial_state


def train_neural_network(): logits, last_state, _, _, _ = neural_network() targets = tf.reshape(output_targets, [-]) loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example([logits], [targets], [tf.ones_like(targets, dtype=tf.float32)], len(words)) cost = tf.reduce_mean(loss) learning_rate = tf.Variable(0.0, trainable=False) tvars = tf.trainable_variables() grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), ) optimizer = tf.train.AdamOptimizer(learning_rate) train_op = optimizer.apply_gradients(zip(grads, tvars)) with tf.Session() as sess: sess.run(tf.initialize_all_variables()) saver = tf.train.Saver(tf.all_variables()) for epoch in range(): sess.run(tf.assign(learning_rate, 0.01 * (0.97 ** epoch))) n =  for batche in range(n_chunk): train_loss, _ , _ = sess.run([cost, last_state, train_op], feed_dict={input_data: x_batches[n], output_targets: y_batches[n]}) n +=  print(epoch, batche, train_loss) if epoch %  == : saver.save(sess, './couplet.module', global_step=epoch)


saver.restore(sess, 'couplet.module-98')



格律诗训练语料来自互联网,其中包括《 唐诗》、《 全唐诗》、《 全台词》等文献,以及从各大诗词论坛(例如诗词在线、天涯论坛诗词比兴等)抓取并筛选后的格律诗,总计287000多首。


例如:给定主题词“春日”,根据它在潜在主题空间中的分布向量,可以找出 “玉魄”、“红泥”和 “燕”等空间距离比较近的语义相关词。




我们采用基于短语的统计机器翻译技术 ,PBSMT是目前一种主流的机器翻译技术,它的优势在于短语翻译结果的选词准确. 由于诗词的生成讲求对仗,不涉及远距离语序调整问题,因此,诗词的生成非常适合采用基于短语的机器翻译算法来解决。


BLEU 的直观思想是翻译结果越接近参考答案则翻译质量越好. 相应的,我们认为如果根据给定上句生成的下句能够更贴近已有的参考下句则系统的生成质量越好,但由于诗词在内容表现上丰富多样,所以需要搜集拥有多个参考下句的数据样本加入答案集。BLEU通过对生成候选句与源语句的参考句进行1元词到N元词的重合度统计,结合下式衡量生成结果的好坏。




关于RNN和LSTM原理的说明: http://www.jianshu.com/p/9dc9f41f0b29


基于主题模型和统计机器翻译方法的中文格律诗自动生成:蒋锐滢,崔 磊,何 晶,周 明,潘志庚


数据来自于http://www16.zzu.edu.cn/qts/ ,总共4万多首唐诗。
  • tensorflow 1.0
  • python2





from __future__ import print_function
import numpy as np
import tensorflow as tf import argparse
import time
import os,sys
from six.moves import cPickle from utils import TextLoader
from model import Model def main():
parser = argparse.ArgumentParser()
parser.add_argument('--save_dir', type=str, default='save',
help='directory to store checkpointed models')
parser.add_argument('--rnn_size', type=int, default=,
help='size of RNN hidden state')
parser.add_argument('--num_layers', type=int, default=,
help='number of layers in the RNN')
parser.add_argument('--model', type=str, default='lstm',
help='rnn, gru, or lstm')
parser.add_argument('--batch_size', type=int, default=,
help='minibatch size')
parser.add_argument('--num_epochs', type=int, default=,
help='number of epochs')
parser.add_argument('--save_every', type=int, default=,
help='save frequency')
parser.add_argument('--grad_clip', type=float, default=.,
help='clip gradients at this value')
parser.add_argument('--learning_rate', type=float, default=0.002,
help='learning rate')
parser.add_argument('--decay_rate', type=float, default=0.97,
help='decay rate for rmsprop')
parser.add_argument('--init_from', type=str, default=None,
help="""continue training from saved model at this path. Path must contain files saved by previous training process:
'config.pkl' : configuration;
'chars_vocab.pkl' : vocabulary definitions;
'iterations' : number of trained iterations;
'losses-*' : train loss;
'checkpoint' : paths to model file(s) (created by tf).
Note: this file contains absolute paths, be careful when moving files around;
'model.ckpt-*' : file(s) with model definition (created by tf)
args = parser.parse_args()
train(args) def train(args):
data_loader = TextLoader(args.batch_size)
args.vocab_size = data_loader.vocab_size # check compatibility if training is continued from previously saved model
if args.init_from is not None:
# check if all necessary files exist
assert os.path.isdir(args.init_from)," %s must be a a path" % args.init_from
assert os.path.isfile(os.path.join(args.init_from,"config.pkl")),"config.pkl file does not exist in path %s"%args.init_from
assert os.path.isfile(os.path.join(args.init_from,"chars_vocab.pkl")),"chars_vocab.pkl.pkl file does not exist in path %s" % args.init_from
ckpt = tf.train.get_checkpoint_state(args.init_from)
assert ckpt,"No checkpoint found"
assert ckpt.model_checkpoint_path,"No model path found in checkpoint"
assert os.path.isfile(os.path.join(args.init_from,"iterations")),"iterations file does not exist in path %s " % args.init_from # open old config and check if models are compatible
with open(os.path.join(args.init_from, 'config.pkl'),'rb') as f:
saved_model_args = cPickle.load(f)
for checkme in need_be_same:
assert vars(saved_model_args)[checkme]==vars(args)[checkme],"Command line argument and saved model disagree on '%s' "%checkme # open saved vocab/dict and check if vocabs/dicts are compatible
with open(os.path.join(args.init_from, 'chars_vocab.pkl'),'rb') as f:
saved_chars, saved_vocab = cPickle.load(f)
assert saved_chars==data_loader.chars, "Data and loaded model disagree on character set!"
assert saved_vocab==data_loader.vocab, "Data and loaded model disagree on dictionary mappings!" with open(os.path.join(args.save_dir, 'config.pkl'), 'wb') as f:
cPickle.dump(args, f)
with open(os.path.join(args.save_dir, 'chars_vocab.pkl'), 'wb') as f:
cPickle.dump((data_loader.chars, data_loader.vocab), f) model = Model(args) with tf.Session() as sess:
saver = tf.train.Saver(tf.global_variables())
iterations =
# restore model and number of iterations
if args.init_from is not None:
saver.restore(sess, ckpt.model_checkpoint_path)
with open(os.path.join(args.save_dir, 'iterations'),'rb') as f:
iterations = cPickle.load(f)
losses = []
for e in range(args.num_epochs):
sess.run(tf.assign(model.lr, args.learning_rate * (args.decay_rate ** e)))
for b in range(data_loader.num_batches):
iterations +=
start = time.time()
x, y = data_loader.next_batch()
feed = {model.input_data: x, model.targets: y}
train_loss, _ , _ = sess.run([model.cost, model.final_state, model.train_op], feed)
end = time.time()
info = "{}/{} (epoch {}), train_loss = {:.3f}, time/batch = {:.3f}" \
.format(e * data_loader.num_batches + b,
args.num_epochs * data_loader.num_batches,
e, train_loss, end - start)
if (e * data_loader.num_batches + b) % args.save_every == \
or (e==args.num_epochs- and b == data_loader.num_batches-): # save for the last result
checkpoint_path = os.path.join(args.save_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step = iterations)
with open(os.path.join(args.save_dir,"iterations"),'wb') as f:
with open(os.path.join(args.save_dir,"losses-"+str(iterations)),'wb') as f:
losses = []
print("model saved to {}".format(checkpoint_path))
sys.stdout.write('\n') if __name__ == '__main__':


#-*- coding:utf- -*-

import tensorflow as tf
from tensorflow.contrib import rnn
from tensorflow.contrib import legacy_seq2seq
import numpy as np class Model():
def __init__(self, args,infer=False):
self.args = args
if infer:
args.batch_size = if args.model == 'rnn':
cell_fn = rnn.BasicRNNCell
elif args.model == 'gru':
cell_fn = rnn.GRUCell
elif args.model == 'lstm':
cell_fn = rnn.BasicLSTMCell
raise Exception("model type not supported: {}".format(args.model)) cell = cell_fn(args.rnn_size,state_is_tuple=False) self.cell = cell = rnn.MultiRNNCell([cell] * args.num_layers,state_is_tuple=False) self.input_data = tf.placeholder(tf.int32, [args.batch_size, None])
# the length of input sequence is variable.
self.targets = tf.placeholder(tf.int32, [args.batch_size, None])
self.initial_state = cell.zero_state(args.batch_size, tf.float32) with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w", [args.rnn_size, args.vocab_size])
softmax_b = tf.get_variable("softmax_b", [args.vocab_size])
with tf.device("/cpu:0"):
embedding = tf.get_variable("embedding", [args.vocab_size, args.rnn_size])
inputs = tf.nn.embedding_lookup(embedding, self.input_data) outputs, last_state = tf.nn.dynamic_rnn(cell,inputs,initial_state=self.initial_state,scope='rnnlm')
output = tf.reshape(outputs,[-, args.rnn_size])
self.logits = tf.matmul(output, softmax_w) + softmax_b
self.probs = tf.nn.softmax(self.logits)
targets = tf.reshape(self.targets, [-])
loss = legacy_seq2seq.sequence_loss_by_example([self.logits],
self.cost = tf.reduce_mean(loss)
self.final_state = last_state
self.lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars),
optimizer = tf.train.AdamOptimizer(self.lr)
self.train_op = optimizer.apply_gradients(zip(grads, tvars)) def sample(self, sess, chars, vocab, prime=u'', sampling_type=): def pick_char(weights):
if sampling_type == :
sample = np.argmax(weights)
t = np.cumsum(weights)
s = np.sum(weights)
sample = int(np.searchsorted(t, np.random.rand()*s))
return chars[sample]
for char in prime:
if char not in vocab:
return u"{} is not in charset!".format(char) if not prime:
state = self.cell.zero_state(, tf.float32).eval()
prime = u'^'
result = u''
x = np.array([list(map(vocab.get,prime))])
[probs,state] = sess.run([self.probs,self.final_state],{self.input_data: x,self.initial_state: state})
char = pick_char(probs[-])
while char != u'$':
result += char
x = np.zeros((,))
x[,] = vocab[char]
[probs,state] = sess.run([self.probs,self.final_state],{self.input_data: x,self.initial_state: state})
char = pick_char(probs[-])
return result
result = u'^'
for prime_char in prime:
result += prime_char
x = np.array([list(map(vocab.get,result))])
state = self.cell.zero_state(, tf.float32).eval()
[probs,state] = sess.run([self.probs,self.final_state],{self.input_data: x,self.initial_state: state})
char = pick_char(probs[-])
while char != u',' and char != u'。':
result += char
x = np.zeros((,))
x[,] = vocab[char]
[probs,state] = sess.run([self.probs,self.final_state],{self.input_data: x,self.initial_state: state})
char = pick_char(probs[-])
result += char
return result[:]


#-*- coding:utf- -*-

import codecs
import os
import collections
from six.moves import cPickle,reduce,map
import numpy as np BEGIN_CHAR = '^'
END_CHAR = '$'
MAX_LENGTH = class TextLoader(): def __init__(self, batch_size, max_vocabsize=, encoding='utf-8'):
self.batch_size = batch_size
self.max_vocabsize = max_vocabsize
self.encoding = encoding data_dir = './data' input_file = os.path.join(data_dir, "shijing.txt")
vocab_file = os.path.join(data_dir, "vocab.pkl")
tensor_file = os.path.join(data_dir, "data.npy") if not (os.path.exists(vocab_file) and os.path.exists(tensor_file)):
print("reading text file")
self.preprocess(input_file, vocab_file, tensor_file)
print("loading preprocessed files")
self.load_preprocessed(vocab_file, tensor_file)
self.reset_batch_pointer() def preprocess(self, input_file, vocab_file, tensor_file):
def handle_poem(line):
line = line.replace(' ','')
if len(line) >= MAX_LENGTH:
index_end = line.rfind(u'。',,MAX_LENGTH)
index_end = index_end if index_end > else MAX_LENGTH
line = line[:index_end+]
return BEGIN_CHAR+line+END_CHAR with codecs.open(input_file, "r", encoding=self.encoding) as f:
lines = list(map(handle_poem,f.read().strip().split('\n'))) counter = collections.Counter(reduce(lambda data,line: line+data,lines,''))
count_pairs = sorted(counter.items(), key=lambda x: -x[])
chars, _ = zip(*count_pairs)
self.vocab_size = min(len(chars),self.max_vocabsize - ) +
self.chars = chars[:self.vocab_size-] + (UNKNOWN_CHAR,)
self.vocab = dict(zip(self.chars, range(len(self.chars))))
unknown_char_int = self.vocab.get(UNKNOWN_CHAR)
with open(vocab_file, 'wb') as f:
cPickle.dump(self.chars, f)
get_int = lambda char: self.vocab.get(char,unknown_char_int)
lines = sorted(lines,key=lambda line: len(line))
self.tensor = [ list(map(get_int,line)) for line in lines ]
with open(tensor_file,'wb') as f:
cPickle.dump(self.tensor,f) def load_preprocessed(self, vocab_file, tensor_file):
with open(vocab_file, 'rb') as f:
self.chars = cPickle.load(f)
with open(tensor_file,'rb') as f:
self.tensor = cPickle.load(f)
self.vocab_size = len(self.chars)
self.vocab = dict(zip(self.chars, range(len(self.chars)))) def create_batches(self):
self.num_batches = int(len(self.tensor) / self.batch_size)
self.tensor = self.tensor[:self.num_batches * self.batch_size]
unknown_char_int = self.vocab.get(UNKNOWN_CHAR)
self.x_batches = []
self.y_batches = [] for i in range(self.num_batches):
from_index = i * self.batch_size
to_index = from_index + self.batch_size
batches = self.tensor[from_index:to_index]
seq_length = max(map(len,batches))
xdata = np.full((self.batch_size,seq_length),unknown_char_int,np.int32)
for row in range(self.batch_size):
xdata[row,:len(batches[row])] = batches[row]
ydata = np.copy(xdata)
ydata[:,:-] = xdata[:,:]
self.y_batches.append(ydata) def next_batch(self):
x, y = self.x_batches[self.pointer], self.y_batches[self.pointer]
self.pointer +=
return x, y def reset_batch_pointer(self):
self.pointer =


#-*- coding:utf- -*-

from __future__ import print_function
import numpy as np
import tensorflow as tf
import argparse
import time
import os
from six.moves import cPickle from utils import TextLoader
from model import Model from six import text_type def main():
parser = argparse.ArgumentParser()
parser.add_argument('--save_dir', type=str, default='save',
help='model directory to store checkpointed models')
parser.add_argument('--prime', type=str, default='',
parser.add_argument('--sample', type=int, default=,
help='0 to use max at each timestep, 1 to sample at each timestep') args = parser.parse_args()
sample(args) def sample(args):
with open(os.path.join(args.save_dir, 'config.pkl'), 'rb') as f:
saved_args = cPickle.load(f)
with open(os.path.join(args.save_dir, 'chars_vocab.pkl'), 'rb') as f:
chars, vocab = cPickle.load(f)
model = Model(saved_args, True)
with tf.Session() as sess:
saver = tf.train.Saver(tf.global_variables())
ckpt = tf.train.get_checkpoint_state(args.save_dir)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
print(model.sample(sess, chars, vocab, args.prime.decode('utf-8',errors='ignore'), args.sample)) if __name__ == '__main__':
  • python sample.py rnn神经网络会生成一首全新的古诗。例如: ”帝以诚求备,堪留百勇杯。教官日与失,共恨五毛宣。鸡唇春疏叶,空衣滴舞衣。丑夫归晚里,此地几何人。”
  • python sample.py --prime <这里输入指定汉字> rnn神经网络会利用输入的汉字生成一首藏头诗。例如: python sample.py --prime 如花似月 会得到 “如尔残回号,花枝误晚声。似君星度上,月满二秋寒。”


