主要参考:https://www.tensorflow.org/api_guides/python/threading_and_queues#Queue_usage_overview

自动方式

For most use cases, the automatic thread startup and management provided by tf.train.MonitoredSession is sufficient. In the rare case that it is not, TensorFlow provides tools for manually managing your threads and queues.

与tf.read_file()、tf.image.decode_jpeg()、tfrecord API等函数配合,可以实现自动图片流并行读取

import tensorflow as tf

def simple_shuffle_batch(source, capacity, batch_size=10):
# Create a random shuffle queue.
queue = tf.RandomShuffleQueue(capacity=capacity,
min_after_dequeue=int(0.9*capacity),
shapes=source.shape, dtypes=source.dtype) # Create an op to enqueue one item.
enqueue = queue.enqueue(source) # Create a queue runner that, when started, will launch 4 threads applying
# that enqueue op.
num_threads = 4
qr = tf.train.QueueRunner(queue, [enqueue] * num_threads) # Register the queue runner so it can be found and started by
# <a href="../../api_docs/python/tf/train/start_queue_runners"><code>tf.train.start_queue_runners</code></a> later (the threads are not launched yet).
tf.train.add_queue_runner(qr) # Create an op to dequeue a batch
return queue.dequeue_many(batch_size) # create a dataset that counts from 0 to 99
input = tf.constant(list(range(100)))
input = tf.data.Dataset.from_tensor_slices(input)
input = input.make_one_shot_iterator().get_next() # Create a slightly shuffled batch from the sorted elements
get_batch = simple_shuffle_batch(input, capacity=20) # `MonitoredSession` will start and manage the `QueueRunner` threads.
with tf.train.MonitoredSession() as sess:
# Since the `QueueRunners` have been started, data is available in the
# queue, so the `sess.run(get_batch)` call will not hang.
while not sess.should_stop():
print(sess.run(get_batch))

手动方式

通过官方例程微调(以便能正常运行)得到,目前能运行,结果也正确,但是运行警告,尚未解决。

WARNING:tensorflow:From /home/work/Downloads/python_scripts/tensorflow_example/test_tf_queue_manual.py:52: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.
Instructions for updating:
To construct input pipelines, use the `tf.data` module.

import tensorflow as tf
# Using Python's threading library.
import threading
import time batch_size = 10
thread_num = 3 print("-" * 50)
def MyLoop(coord, id):
step = 0
while not coord.should_stop():
step += 1
print("thread id: %02d, step: %02d, ...do something..." %(id, step))
time.sleep(0.01)
if step >= 5:
coord.request_stop() # Main thread: create a coordinator.
coord = tf.train.Coordinator() # Create thread_num threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,i)) for i in range(thread_num)] # Start the threads and wait for all of them to stop.
for t in threads:
t.start()
coord.join(threads) print("-" * 50) # create a dataset that counts from 0 to 99
example = tf.constant(list(range(100)))
example = tf.data.Dataset.from_tensor_slices(example)
example = example.make_one_shot_iterator().get_next() # Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(capacity=20,
min_after_dequeue=int(0.9*20),
shapes=example.shape,
dtypes=example.dtype)
enqueue_op = queue.enqueue(example) # Create a training graph that starts by dequeueing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = inputs # ...use 'inputs' to build the training part of the graph... # Create a queue runner that will run thread_num threads in parallel to enqueue examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * thread_num) # Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # Run the training loop, controlling termination with the coordinator.
try:
for step in range(1000000):
if coord.should_stop():
break
y = sess.run(train_op)
print(step, ", y =", y)
except Exception as e:
# Report exceptions to the coordinator.
coord.request_stop(e)
finally:
# Terminate as usual. It is safe to call `coord.request_stop()` twice.
coord.request_stop()
coord.join(threads)

最新文章

  1. JS中函数的调用和this的值
  2. PHP MYSQL单向同步方案
  3. .net Session 超时控制
  4. iOS UISlider的使用
  5. PV公式
  6. C++临时变量的生命周期
  7. DELPHI下读取与设置系统时钟
  8. 使用visual studio 2013 快速搭建phonegap开发环境
  9. android的个人代码总结
  10. iOS设备的硬件适配 (关于armv6, armv7, armv7s ) &lt;转&gt;
  11. TreeView控件绑定数据库
  12. Math.random引发的骗术,绝对是用随机数骗前端妹纸的最佳方法
  13. Mangos笔记
  14. 51nod_1490: 多重游戏(树上博弈)
  15. LoadRunner【第四篇】参数化
  16. php判断浏览器还是微信打开
  17. Centos7_64环境搭建
  18. CF 672D Robin Hood(二分答案)
  19. 【PAT】1018 锤子剪刀布 (20)(20 分)
  20. 本机ip和127.0.0.1的区别

热门文章

  1. Selenium3自动化测试【20】CSS定位元素
  2. 网络损伤仪WANsim中关于丢包的介绍
  3. CSS 四种样式表 六种规则选择器 五种常用样式属性
  4. P5591 小猪佩奇学数学
  5. informix错误代码小结
  6. C语言学习之基本数据类型【一】
  7. PostgreSQL-WITH AS短语
  8. Ubuntu 20.10安装WPS Office、更新Visual Studio Code以及卸载LibreOffice
  9. 【SpringCloud微服务实战】搭建企业级应用开发框架(一):架构说明
  10. 单片机学习(一)项目的建立和vscode代码编辑环境的设置