-
Notifications
You must be signed in to change notification settings - Fork 942
Frequently Asked Questions
- When should I use InputMode.SPARK vs. InputMode.TENSORFLOW?
- Why use only one task (TF Node) per executor?
- Why doesn't algorithm X scale when I add more nodes?
- What does
KeyError: 'input'
error mean? - Why does MNIST example hang?
- For InputMode.SPARK, how are
epochs
,steps
,batch_size
, andnum_workers
correlated and how should they be set? - For InputMode.SPARK, can I pass in more than one RDD, e.g. for train and test data?
- For InputMode.TENSORFLOW, why does my training hang towards the end of an epoch?
- Why do I get pickling errors after converting TF code to TFoS?
In general, InputMode.TENSORFLOW is only using Spark to schedule the TensorFlow nodes onto the Spark executors. Afterwards, it's essentially just a pure distributed TensorFlow cluster. This will give you the best performance, since the TensorFlow nodes will read data directly from disk, using TensorFlow native APIs.
InputMode.SPARK not only schedules the TensorFlow nodes onto the executors, but it also sets up a queue for transferring data from Spark RDDs to the TensorFlow nodes. Data from RDD partitions are pushed into the queue, while the TensorFlow nodes pull data from this queue. Due to this extra hop, the data I/O performance can be significantly slower than InputMode.TENSORFLOW.
In general, we recommend using InputMode.TENSORFLOW with native tf.data
APIs for best performance and only using InputMode.SPARK if you have existing upstream data pipelines written in Spark. Ultimately, the choice depends on your specific use-case. For example, if you have an existing Spark data pipeline that is processing large amounts of data, and the data format is not easily consumable by TensorFlow, you can either A) re-process the data and persist it in a form that is compatible with tf.data
and use InputMode.TENSORFLOW, or B) use InputMode.SPARK to transform and feed the data to TensorFlow. Option A comes with the expense of storing another copy of the data, which can get expensive at larger scales. Option B essentially trades storage for additional compute. Additionally, if you are experimenting with new datasets and feature engineering, it can be easier to just process the datasets dynamically in Spark code instead of re-processing and persisting the data onto disk (in a TensorFlow compatible format) each time you adjust a feature.
This was a design choice as the most obvious, easiest-to-reason-about mapping of a Spark resource to a TF node. The most visible example is that each executor's logs will only contain the logs of a single TF node, which is much easier to debug vs. interspersed logs. Additionally, most Spark/YARN configurations (e.g. memory) can be almost directly mapped to the TF node. Finally, in Hadoop/YARN environments (which was our primary focus), you can still have multiple executors assigned to the same physical host if there are enough available resources.
Note: for YARN, spark.executor.cores
is usually defaulted to 1, so we just set spark.task.cpus=1
as well. If your environment has a different setting for spark.executor.cores
, you can just set spark.task.cpus
equal to that number to achieve the "one task per executor" goal.
Different algorithms are scalable in different ways, depending on whether they're limited by compute, memory, or I/O. In cases where an algorithm can run comfortably on a single-node (completing in a few seconds/minutes), adding the overhead of launching nodes in a cluster, coordinating work between nodes, and communicating gradients/weights across the cluster will generally be slower. For example, the distributed MNIST example is provided to demonstrate the process of porting a simple, easy-to-understand, distributed TF application over to TFoS. It it not intended to run faster than a single-node TF instance, where you can actually load the entire training set in memory and finish 1000 steps in a few seconds.
Similarly, most of the other examples just demonstrate the minimal amount of code to convert a TF application, without trying to rewrite the code to be more scalable.
Finally, if an application takes a "painfully long" time to complete on a single-node, then it is a good candidate for scaling. However, you will still need to understand which resource is limited and how to modify in your code in order to scale for that resource in a distributed cluster.
Because we assume that one executor hosts only one TF node, you will encounter this error if your cluster is not configured accordingly. Specifically, this is seen when a RDD feeding task lands on an executor that doesn't contain a TF worker node, e.g. it lands on the PS node or even an idle executor. This can also occur if you allow Spark task retries on failures. In general, we recommend disabling task retries by setting --conf spark.task.maxFailures=1
.
There are several possible causes for this symptom:
- In Hadoop/YARN environments, you are likely missing the path to the
libhdfs.so
in yourspark.executorEnv.LD_LIBRARY_PATH
. - In Spark Standalone environments with multiple physical nodes, you need to have a distributed file system that is accessible from all nodes.
- If you are using different settings than the original example, you may need to increase the amount of data being fed to the cluster with the
--epochs
argument, and you may need to increase the--steps
argument accordingly.
For InputMode.SPARK, how are epochs
, steps
, batch_size
, and num_workers
correlated and how should they be set?
In general, an epoch is one pass through all of the records in a dataset. For TensorFlow, one step is one pass of the execution graph, e.g. sess.run()
, which is usually done on one batch of the dataset. So, the number of steps in one epoch is equal to the number of records divided by the batch size. And for distributed TensorFlow, these steps will be "sharded" across your workers. So, putting this all together, you'd have:
steps = epochs * num_records_per_epoch / batch_size
steps_per_worker = steps / num_workers
For InputMode.SPARK, we use Spark's UnionRDD
to simulate epochs when feeding a dataset as an RDD. However, TensorFlow applications can often finish before consuming all of the data, e.g. stopping at a maximum number of steps or when a metric reaches some value. To handle these cases, TensorFlowOnSpark provides a TFDataFeed.terminate()
API to stop the Spark data feeding. Unfortunately, Spark must still fully consume and process all of the remaining partitions in order to consider that stage successful, so TensorFlowOnSpark just consumes the remaining Spark partitions without actually sending the data to the TensorFlow process. Obviously, this can be expensive and time-consuming, so it is highly-recommended that you closely match your epochs, batch_size, steps, and workers to avoid unnecessary overhead.
For example, using the formulas above, if you want more epochs
, you should increase your steps
accordingly, or if you increase the batch_size
, you should decrease steps
, etc.
Only one RDD can be passed into the TensorFlow processes at a time. The data from the Spark RDDs are fed into a queue on each executor, from which the local TensorFlow process will pull data. If you have multiple input RDDs with the same cardinality (e.g. features + labels), you can simply zip
the two RDDs together and then extract the components on the TensorFlow side. If you have two distinct RDDs (e.g. train vs. test), you can either run a separate evaluation job that just loads the latest checkpoint on a periodic basis and evaluates on the test RDD, or you can persist the validation RDD to disk and use a tf.data.Dataset
to load that data during the evaluation phase.
If your application hangs near the end of an epoch of data, you are most likely using a synchronous optimizer and running out of data on one of the workers. This can happen quite often in a Hadoop/YARN environment because upstream data processing can produce unevenly sized files. And, depending on how you are reading those files and configuring your TF data pipeline, one worker can "finish" reading it's shard of data while another worker is still "waiting" for more data. And for synchronous optimizers, this will result in a deadlock. Note that this is difficult to resolve at large scales. For example, you could carefully repartition and reprocess your data into evenly sized partitions, but at large scales, this might require a massive shuffle of data over the network, and it would generally result in yet another another copy of your original dataset. There are essentially two workarounds: 1) convert your code to use an asynchronous optimizer so each worker can operate independently from the others (with any downsides from asynchronous training), or 2) limit the number of steps to some number less than the total number of steps in the dataset. For example, in most of the examples that use synchronous optimizers, we just train on 90% of the epoch (with any downsides from training on an incomplete dataset).
First of all, please note that Windows is not supported due to known issues with pickling.
For other environments, these errors are most likely related to how Spark distributes your python code to the executors. Spark tries its best to serialize map functions to the executors, including any scoped variables. Because of this, you will need to ensure that your code is cleanly serializable and doesn't reference non-serializable variables. A simple example of this would be a network socket variable. If your Spark driver code creates a socket, but your executor code tries to reference that socket, it is actually "meaningless" when serialized to another physical machine... For this reason, many of the examples will define the executor code in one python file with the driver code in another file, just to make this separation more clear. If you do encounter pickling errors, try separating your code similarly.