When I want to use tf.train.string_input_producer to load data for 2 epochs, I used
filename_queue = tf.train.string_input_producer(filenames=['data.csv'], num_epochs=2, shuffle=True)
col1_batch, col2_batch, col3_batch = tf.train.shuffle_batch([col1, col2, col3], batch_size=batch_size, capacity=capacity,\min_after_dequeue=min_after_dequeue, allow_smaller_final_batch=True)
But then I found that this op did not produce what I want.
It can only produce each sample in data.csv for 2 times, but the generated order is not clearly. For example, 3 line data in data.csv
[[1]
[2]
[3]]
it will produce (which each sample just appear 2 times, but the order is optional)
[1]
[1]
[3]
[2]
[2]
[3]
but what I want is (each epoch is separate, shuffle in each epoch)
(epoch 1:)
[1]
[2]
[3]
(epoch 2:)
[1]
[3]
[2]
In addition, how to know when 1 epoch was done? Is there some flag variables? Thanks!
my code is here.
import tensorflow as tf
def read_my_file_format(filename_queue):
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
record_defaults = [['1'], ['1'], ['1']]
col1, col2, col3 = tf.decode_csv(value, record_defaults=record_defaults, field_delim='-')
# col1 = list(map(int, col1.split(',')))
# col2 = list(map(int, col2.split(',')))
return col1, col2, col3
def input_pipeline(filenames, batch_size, num_epochs=1):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
col1,col2,col3 = read_my_file_format(filename_queue)
min_after_dequeue = 10
capacity = min_after_dequeue + 3 * batch_size
col1_batch, col2_batch, col3_batch = tf.train.shuffle_batch(
[col1, col2, col3], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue, allow_smaller_final_batch=True)
return col1_batch, col2_batch, col3_batch
filenames=['1.txt']
batch_size = 3
num_epochs = 1
a1,a2,a3=input_pipeline(filenames, batch_size, num_epochs)
with tf.Session() as sess:
sess.run(tf.local_variables_initializer())
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
a, b, c = sess.run([a1, a2, a3])
print(a, b, c)
except tf.errors.OutOfRangeError:
print('Done training, epoch reached')
finally:
coord.request_stop()
coord.join(threads)
my data is like
1,2-3,4-A
7,8-9,10-B
12,13-14,15-C
17,18-19,20-D
22,23-24,25-E
27,28-29,30-F
32,33-34,35-G
37,38-39,40-H
————————————————
As Nicolas observes, the tf.train.string_input_producer() API does not give you the ability to detect when the end of an epoch is reached; instead it concatenates together all epochs into one long batch. For this reason, we recently added (in TensorFlow 1.2) the tf.contrib.data API, which makes it possible to express more sophisticated pipelines, including your use case.
The following code snippet shows how you would write your program using tf.contrib.data:
import tensorflow as tf
def input_pipeline(filenames, batch_size):
# Define a `tf.contrib.data.Dataset` for iterating over one epoch of the data.
dataset = (tf.contrib.data.TextLineDataset(filenames)
.map(lambda line: tf.decode_csv(
line, record_defaults=[['1'], ['1'], ['1']], field_delim='-'))
.shuffle(buffer_size=10) # Equivalent to min_after_dequeue=10.
.batch(batch_size))
# Return an *initializable* iterator over the dataset, which will allow us to
# re-initialize it at the beginning of each epoch.
return dataset.make_initializable_iterator()
filenames=['1.txt']
batch_size = 3
num_epochs = 10
iterator = input_pipeline(filenames, batch_size)
# `a1`, `a2`, and `a3` represent the next element to be retrieved from the iterator.
a1, a2, a3 = iterator.get_next()
with tf.Session() as sess:
for _ in range(num_epochs):
# Resets the iterator at the beginning of an epoch.
sess.run(iterator.initializer)
try:
while True:
a, b, c = sess.run([a1, a2, a3])
print(a, b, c)
except tf.errors.OutOfRangeError:
# This will be raised when you reach the end of an epoch (i.e. the
# iterator has no more elements).
pass
# Perform any end-of-epoch computation here.
print('Done training, epoch reached')
————————————————
You might want to have a look to this answer to a similar question.
The short story is that:
if
num_epochs
> 1, all the data is enqueued at the same time and suffled independently of the epoch,so you don't have the ability to monitor which epoch is being dequeued.
What you could do is the first suggestion in the quoted answer, which is to work with num_epochs
== 1, and reinitialise the local queue variables (and obviously not the model variables) in each run.
init_queue = tf.variables_initializer(tf.get_collection(tf.GraphKeys.LOCAL_VARIABLES, scope='input_producer'))
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for e in range(num_epochs):
with tf.Session() as sess:
sess.run(init_queue) # reinitialize the local variables in the input_producer scope
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
a, b, c = sess.run([a1, a2, a3])
print(a, b, c)
except tf.errors.OutOfRangeError:
print('Done training, epoch reached')
finally:
coord.request_stop()
coord.join(threads)