Loading and Preparing OFRecord Dataset
In [data input] (... /basics_topics/data_input.md) we learned that it is usually more efficient to load data using DataLoader and related operators. Also, we learned how to use DataLoader and related operators.
In article OFRecord, we learn about the storage format of OFRecord files.
In this article, we will focus on the loading and generating of OneFlow's OFRecord dataset, which mainly includes:
- 
The hierarchy of OFRecord dataset 
- 
Multiple ways of loading OFRecord dataset 
- 
The transition between OFRecord dataset and other data formats 
What is OFRecord Dataset¶
In article OFRecord, we introduce what OFRecord file is and the storage format of OFRecord file.
OFRecord dataset is the collection of OFRecord files. The collection of mutiple files that named by OneFlow convention, and that stored in the same directory, is an OFRecord dataset.
By default, The files in OFRecord dataset directory are uniformly named in the way of part-xxx, where "xxx" is the file id starting from zero, and there can be choices about padding or non-padding.
These are the examples of using non-padding name style:
mnist_kaggle/train/
├── part-0
├── part-1
├── part-10
├── part-11
├── part-12
├── part-13
├── part-14
├── part-15
├── part-2
├── part-3
├── part-4
├── part-5
├── part-6
├── part-7
├── part-8
└── part-9
These are the examples of using padding name style:
mnist_kaggle/train/
├── part-00000
├── part-00001
├── part-00002
├── part-00003
├── part-00004
├── part-00005
├── part-00006
├── part-00007
├── part-00008
├── part-00009
├── part-00010
├── part-00011
├── part-00012
├── part-00013
├── part-00014
├── part-00015
OneFlow adopts this convention, which is consistent with the default storage filename in spark, so it is convenient to prepare OFRecord data by spark.
Actually, we can specify the filename prefix part-, whether we pad the filename id and how many bits to pad. We just need to keep the same parameters when loading dataset, which will be described below.
OneFlow provides the API interface to load OFRecord dataset by specifying the path of dataset directory, so that we can have the multi-threading, pipelining and some other advantages brought by OneFlow framework.
The Method to Load OFRecord Dataset¶
We use ofrecord_reader to load and preprocess dataset.
In article Data Input, we have shown how to use ofrecord_reader API to load OFRecord data and preprocess it.
Code: of_data_pipeline.py
The prototype of ofrecord_reader is as follows:
def ofrecord_reader(
    ofrecord_dir,
    batch_size=1,
    data_part_num=1,
    part_name_prefix="part-",
    part_name_suffix_length=-1,
    random_shuffle=False,
    shuffle_buffer_size=1024,
    shuffle_after_epoch=False,
    name=None,
)
- ofrecord_diris the directory which stored the dataset
- batchsizeassign the batch size in each epoch
- data_part_numassign the number of ofrecord data format file in the directory which stored the dataset. It will raise an error if the parameter is greater than the number of the existed files
- part_name_prefixassign the filename prefix of ofrecord files. Oneflow locates the ofrecord files according to the prefix + index in the dataset directory
- part_name_suffix_lengthassigns the padding of ofrecord file index, -1 represents no padding
- random_shuffleassign whether shuffle the sample order randomly when reading data
- shuffle_buffer_sizeassign the buffer size when reading data
- shuffle_after_epochassign whether shuffle the sample order after each epoch
The benefit of using ofrecord_reader is that ofrecord_reader acts as a normal operator which participates in OneFlow composition optimization and enjoys OneFlow pipeline acceleration.
For flexibility and extensibility of the code, we can define a preprocessing OP for ofrecord_reader to deal with specific data formats which are coupled with operational logic (e.g. decoding, decompression and etc.).
- For more information on DataLoader and related operator usage refer to Data input .
- For more information on customized OP please refer to User op.
The transition between other data format data and OFRecord dataset¶
According to the storage format of OFRecord file in article OFRecord and the filename format convention of OFRecord dataset introduced at the beginning, we can prepare OFRecord dataset by ourselves.
To prepare dataset easier, we provide jar package from Spark, which is convenient to the interconversion between OFRecord and common data formats (such as TFRecord and JSON).
The installation and launch of Spark¶
At first, we should download Spark and Spark-oneflow-connector:
- 
Download the spark-2.4.7-bin-hadoop2.7.tgz from the official website of Spark 
- 
Download jar package here, which is needed by Spark to support the ofrecord file format 
Then unzip the spark-2.4.7-bin-hadoop2.7.tgz and configure the environment variable SPARK_HOME:
export SPARK_HOME=path/to/spark-2.4.7-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
We can launch the pyspark shell with the following command:
pyspark --master "local[*]"\
 --jars spark-oneflow-connector-assembly-0.1.0_int64.jar\
 --packages org.tensorflow:spark-tensorflow-connector_2.11:1.13.1
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/
Using Python version 3.6.10 (default, Mar 25 2020 18:53:43)
SparkSession available as 'spark'.
We can complete the data conversion between OFRecord dataset and other formats in pyspark shell.
Use Spark to view OFRecord dataset¶
We can view OFRecord data with following code:
spark.read.format("ofrecord").load("file:///path/to/ofrecord_file").show()
The first 20 rows are displayed by default:
+--------------------+------+
|              images|labels|
+--------------------+------+
|[0.33967614, 0.87...|     2|
|[0.266905, 0.9730...|     3|
|[0.66661334, 0.67...|     1|
|[0.91943026, 0.89...|     6|
|[0.014844197, 0.0...|     6|
|[0.5366513, 0.748...|     4|
|[0.055148937, 0.7...|     7|
|[0.7814437, 0.228...|     4|
|[0.31193638, 0.55...|     3|
|[0.20034336, 0.24...|     4|
|[0.09441255, 0.07...|     3|
|[0.5177533, 0.397...|     0|
|[0.23703437, 0.44...|     9|
|[0.9425567, 0.859...|     9|
|[0.017339867, 0.0...|     3|
|[0.827106, 0.3122...|     0|
|[0.8641392, 0.194...|     2|
|[0.95585227, 0.29...|     3|
|[0.7508129, 0.464...|     4|
|[0.035597708, 0.3...|     9|
+--------------------+------+
only showing top 20 rows
The interconversion with TFRecord dataset¶
we can convert TFRecord to OFRecord with the following command:
reader = spark.read.format("tfrecords")
dataframe = reader.load("file:///path/to/tfrecord_file")
writer = dataframe.write.format("ofrecord")
writer.save("file:///path/to/outputdir")
In the above code, the outputdir directory will be created automatically, and ofrecord files will be saved into this directory. Make sure that the "outputdir" directory does not exist before executing the command.
In addition, we can use the following command to split data into multiple ofrecord files.
reader = spark.read.format("tfrecords")
dataframe = reader.load("file:///path/to/tfrecord_file")
writer = dataframe.repartition(10).write.format("ofrecord")
writer.save("file://path/to/outputdir")
After executing the above commands, 10 ofrecord files of part-xxx format will be generated in "outputdir" directory.
The process of converting OFRecord file to TFRecord file is similar. we just need to change the format of read/write side:
reader = spark.read.format("ofrecord")
dataframe = reader.load("file:///path/to/ofrecord_file")
writer = dataframe.write.format("tfrecords")
writer.save("file:///path/to/outputdir")
The interconversion with JSON format¶
We can convert JSON to OFRecord with the following command:
dataframe = spark.read.json("file:///path/to/json_file")
writer = dataframe.write.format("ofrecord")
writer.save("file:///path/to/outputdir")
The following command will convert OFRecord data to JSON files:
reader = spark.read.format("ofrecord")
dataframe = reader.load("file:///path/to/ofrecord_file")
dataframe.write.json("file://path/to/outputdir")