Introduction
當數據量很大時,入隊列操作從硬盤讀取數據,放入內存中,主線程須等待入隊列操作完成,才能進行訓練,此時會話裡可以運行多個線程,實現異步讀取
隊列管理器(QueueRunner)
- 其較為高效
- 雖然叫做管理器,但其是用來創建線程
- 不能與
threading
(偽多線程)庫混用 tf.train.QueueRunner(queue,enqueue_ops=None)
創建一個QueueRunnerqueue
:先前定義的隊列enqueue_ops
:添加線程的隊列操作列表(指定開啟幾個線程,並指定這些線程去做什麼操作)- 列表
[]
中輸入線程將執行的操作 - 例如
[]*2
:指定兩個線程進行此操作- 開啟的線程會有上限數
- 列表
create_threads(sess,coord=None,start=False)
:創建線程來運行給定會話的入隊操作start
:bool值- True:啟動線程
- False:調用者必須調用
start()
啟動線程
coord
:線程協調器(後面線程管理需要用到)- 返回一個隊列管理器op
tf.train.start_queue_runners(sess=None, coord=None)
函數會要求圖中的每個QueueRunner啟動它的運行隊列操作的線程- 須在會話中開啟
- 收集圖中所有隊列線程,預設同時開啟線程
sess
:所在的會話coord
:線程協調器- 返回所有的線程
實現異步操作
通過隊列管理器來實現變量+1、入隊、主線程出隊列的操作
- 子線程不斷的往隊列放數據,主線程不斷的往裡頭取數據
- 兩者操作彼此互不干擾
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import tensorflow as tf
# 模擬異步的過程
# 子線程存入樣本,主線程讀取樣本
# 定義一個隊列
Q = tf.FIFOQueue(1000,dtypes=tf.float32)
# 定義子線程要執行的任務(值+1,放入隊列當中)
var = tf.Variable(0.0,dtype=tf.float32)
# 實現一個自增加法(assign_add),把本身做加法並返回給自身
data = tf.assign_add(var,tf.constant(1.0))
en_q = Q.enqueue(data)
# 定義隊列管理器op,指定多少個子線程,子線程該做什麼
qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) # 使用兩個線程完成en_q這個任務
# 初始化變量op
init_var_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化變量
sess.run(init_var_op)
# 真正的開啟子線程(必須在會話中開啟)
threads = qr.create_threads(sess,start=True)
# 主線程不斷的去讀取數據
for i in range(300):
print(sess.run(Q.dequeue()))
Result
1 | 2.0 |
- 最後會報錯的原因是因為,主線程結束的情況下,Session被關閉,且資源被釋放,但子線程並未回收還在執行,因此報錯
- 因此須管理子線程,並確實的回收子線程(線程協調器)
線程協調器
用作管理線程
tf.train.Coordinator()
:線程協調員,實現一個簡單的機制來協調一組線程的終止- 返回一個線程協調器的實例
method
request_stop()
:要求線程終止should_stop()
:檢查是否要求停止join(threads=None,stop_grace_period_secs=120)
:等待線程終止
改善後
1 | import tensorflow as tf |