线程和队列

在使用TensorFlow进行异步计算时,队列是一种强大的机制。

为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

队列概述

队列,如FIFOQueue和RandomShuffleQueue,在TensorFlow的张量异步计算时都非常重要。

例如,一个典型的输入结构:是使用一个RandomShuffleQueue来作为模型训练的输入:

  • 多个线程准备训练样本,并且把这些样本推入队列。
  • 一个训练线程执行一个训练操作

同步执行队列

# 创建一个队列
Q = tf.FIFOQueue(3, dtypes=tf.float32) # 数据进队列
init = Q.enqueue_many(([0.1, 0.2, 0.3],)) # 定义操作,op,出队列,+1,进队列,注意返回的都是op
out_q = Q.dequeue()
data = out_q + 1
en_q = Q.enqueue(data) with tf.Session() as sess: # 初始化队列,是数据进入
sess.run(init) # 执行两次入队加1
for i in range(2):
sess.run(en_q) # 循环取队列
for i in range(3):
print(sess.run(Q.dequeue()))

tf.QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

tf.Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:

  • should_stop():如果线程应该停止则返回True。
  • request_stop(): 请求该线程停止。
  • join():等待被指定的线程终止。

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

异步执行队列:

#主线程,不断的去取数据,开启其它线程来进行增加计数,入队
#主线程结束了,队列线程没有结束,就会抛出异常
#主线程没有结束,需要将队列线程关闭,防止主线程等待 Q = tf.FIFOQueue(1000,dtypes=tf.float32) # 定义操作
var = tf.Variable(0.0)
increment_op = tf.assign_add(var,tf.constant(1.0))
en_op = Q.enqueue(increment_op) # 创建一个队列管理器,指定线程数,执行队列的操作
qr = tf.train.QueueRunner(Q,enqueue_ops=[increment_op,en_op]*3) with tf.Session() as sess:
tf.global_variables_initializer().run() # 生成一个线程协调器
coord = tf.train.Coordinator() # 启动线程执行操作
threads_list = qr.create_threads(sess,coord=coord,start=True) print(len(threads_list),"----------")
# 主线程去取数据
for i in range(20):
print(sess.run(Q.dequeue())) # 请求其它线程终止
coord.request_stop() # 关闭线程
coord.join(threads_list)

 

最新文章

  1. 更改默认alert框体
  2. hibernate笔记--组件映射方法
  3. MYSQL 函数 字符串到整数
  4. swift项目中引入OC框架
  5. Hill Climber and Random Walk
  6. Windows 8 和 Windows 8.1 中对插件和 ActiveX 的支持
  7. Android -> 怎样避免Handler引起内存泄露
  8. 利用squid 搭建简单的代理服务器
  9. try、catch、finally 块的关系
  10. Eclipse 中构建 Maven 项目的完整过程 - 动态 Web 项目
  11. c++中sizeof的理解
  12. 性能测试四十八:Jenkins+Ant+Jmeter系统部署
  13. 【原创 Hadoop&Spark 动手实践 12】Spark MLLib 基础、应用与信用卡欺诈检测系统动手实践
  14. Python学习之旅(二十九)
  15. 网络编程并发 多进程 进程池,互斥锁,信号量,IO模型
  16. SQL 加载
  17. js,JavaScript,a标签onclick传递参数不对,A标签调用js函数写法总结
  18. [RN] 05 - Let's start with UI Design
  19. 为什么不取消注册BroadcastReceiver会导致内存泄漏
  20. scala 学习笔记四 匿名函数

热门文章

  1. HPU 1437: 王小二的求值问题
  2. 最短路--spfa+队列优化模板
  3. python 取整
  4. js 每隔四位加一个空格
  5. TensorFlow笔记-06-神经网络优化-损失函数,自定义损失函数,交叉熵
  6. WebApi_使用技巧
  7. 【MVC】Controller的使用
  8. hadoop零基础入门之DKH安装准备
  9. OpenWrt的web服务器
  10. Windows10安装.net3.5