Hike News
Hike News

深度學習-tensorflow基礎-讀取數據-異步讀取

Introduction

當數據量很大時,入隊列操作從硬盤讀取數據,放入內存中,主線程須等待入隊列操作完成,才能進行訓練,此時會話裡可以運行多個線程,實現異步讀取

隊列管理器(QueueRunner)

  • 其較為高效
  • 雖然叫做管理器,但其是用來創建線程
  • 不能與threading(偽多線程)庫混用
  • tf.train.QueueRunner(queue,enqueue_ops=None)創建一個QueueRunner

    • queue:先前定義的隊列
    • enqueue_ops:添加線程的隊列操作列表(指定開啟幾個線程,並指定這些線程去做什麼操作)
      • 列表[]中輸入線程將執行的操作
      • 例如[]*2:指定兩個線程進行此操作
        • 開啟的線程會有上限數
    • create_threads(sess,coord=None,start=False):創建線程來運行給定會話的入隊操作
      • start:bool值
        • True:啟動線程
        • False:調用者必須調用start()啟動線程
      • coord:線程協調器(後面線程管理需要用到)
      • 返回一個隊列管理器op
  • tf.train.start_queue_runners(sess=None, coord=None)函數會要求圖中的每個QueueRunner啟動它的運行隊列操作的線程

    • 須在會話中開啟
    • 收集圖中所有隊列線程,預設同時開啟線程
    • sess:所在的會話
    • coord:線程協調器
    • 返回所有的線程

實現異步操作

通過隊列管理器來實現變量+1、入隊、主線程出隊列的操作

  • 子線程不斷的往隊列放數據,主線程不斷的往裡頭取數據
  • 兩者操作彼此互不干擾
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import tensorflow as tf

    # 模擬異步的過程
    # 子線程存入樣本,主線程讀取樣本

    # 定義一個隊列
    Q = tf.FIFOQueue(1000,dtypes=tf.float32)

    # 定義子線程要執行的任務(值+1,放入隊列當中)
    var = tf.Variable(0.0,dtype=tf.float32)

    # 實現一個自增加法(assign_add),把本身做加法並返回給自身
    data = tf.assign_add(var,tf.constant(1.0))
    en_q = Q.enqueue(data)

    # 定義隊列管理器op,指定多少個子線程,子線程該做什麼
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) # 使用兩個線程完成en_q這個任務

    # 初始化變量op
    init_var_op = tf.global_variables_initializer()

    with tf.Session() as sess:
    # 初始化變量
    sess.run(init_var_op)

    # 真正的開啟子線程(必須在會話中開啟)
    threads = qr.create_threads(sess,start=True)

    # 主線程不斷的去讀取數據
    for i in range(300):
    print(sess.run(Q.dequeue()))

Result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2.0
19.0
43.0
68.0
88.0
108.0
135.0
...
...
...
1298.0
1299.0
1300.0
tensorflow.python.framework.errors_impl.CancelledError: Enqueue operation was cancelled
[[Node: fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/device:CPU:0"](fifo_queue, AssignAdd)]]
  • 最後會報錯的原因是因為,主線程結束的情況下,Session被關閉,且資源被釋放,但子線程並未回收還在執行,因此報錯
  • 因此須管理子線程,並確實的回收子線程(線程協調器)

線程協調器

用作管理線程

  • tf.train.Coordinator():線程協調員,實現一個簡單的機制來協調一組線程的終止
    • 返回一個線程協調器的實例

method

  • request_stop():要求線程終止
  • should_stop():檢查是否要求停止
  • join(threads=None,stop_grace_period_secs=120):等待線程終止

改善後

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import tensorflow as tf

Q = tf.FIFOQueue(1000,dtypes=tf.float32)

var = tf.Variable(0.0,dtype=tf.float32)

data = tf.assign_add(var,tf.constant(1.0))
en_q = Q.enqueue(data)

qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)

init_var_op = tf.global_variables_initializer()

with tf.Session() as sess:

sess.run(init_var_op)

# 開啟線程管理器
coord = tf.train.Coordinator()

# coord須指定線程管理器的對象
threads = qr.create_threads(sess,coord=coord,start=True)

for i in range(300):
print(sess.run(Q.dequeue()))

# 回收子線程
coord.request_stop()
coord.join(threads)