tensorflowonspark.TFCluster module
This module provides a high-level API to manage the TensorFlowOnSpark cluster.
There are three main phases of operation:
Reservation/Startup - reserves a port for the TensorFlow process on each executor, starts a multiprocessing.Manager to listen for data/control messages, and then launches the Tensorflow main function on the executors.
Data feeding - For InputMode.SPARK only. Sends RDD data to the TensorFlow nodes via each executor’s multiprocessing.Manager. PS nodes will tie up their executors, so they won’t receive any subsequent data feeding tasks.
Shutdown - sends a shutdown control message to the multiprocessing.Managers of the PS nodes and pushes end-of-feed markers into the data queues of the worker nodes.
- class InputMode[source]
Bases:
object
Enum for the input modes of data feeding.
- SPARK = 1
Spark is responsible for feeding data to the TensorFlow application via an RDD.
- TENSORFLOW = 0
TensorFlow application is responsible for reading any data.
- class TFCluster[source]
Bases:
object
- cluster_id = None
Unique ID for this cluster, used to invalidate state for new clusters.
- cluster_info = None
Cluster node reservations
- cluster_meta = None
Cluster metadata dictionary, e.g. cluster_id, defaultFS, reservation.Server address, etc.
- defaultFS = None
Default FileSystem string, e.g.
file://
orhdfs://<namenode>/
- inference(dataRDD, feed_timeout=600, qname='input')[source]
For InputMode.SPARK only: Feeds Spark RDD partitions into the TensorFlow worker nodes and returns an RDD of results
It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD and provide valid data for the output RDD.
This will use the distributed TensorFlow cluster for inferencing, so the TensorFlow “main” function should be capable of inferencing. Per Spark design, the output RDD will be lazily-executed only when a Spark action is invoked on the RDD.
- Args:
- dataRDD
input data as a Spark RDD
- feed_timeout
number of seconds after which data feeding times out (600 sec default)
- qname
INTERNAL_USE
- Returns:
A Spark RDD representing the output of the TensorFlow inferencing
- input_mode = None
TFCluster.InputMode for this cluster
- nodeRDD = None
RDD representing the nodes of the cluster, i.e.
sc.parallelize(range(num_executors), num_executors)
- num_executors = None
Number of executors in the Spark job (and therefore, the number of nodes in the TensorFlow cluster).
- queues = None
INTERNAL_USE
- sc = None
SparkContext
- server = None
reservation.Server for this cluster
- shutdown(ssc=None, grace_secs=0, timeout=259200)[source]
Stops the distributed TensorFlow cluster.
For InputMode.SPARK, this will be executed AFTER the TFCluster.train() or TFCluster.inference() method completes. For InputMode.TENSORFLOW, this will be executed IMMEDIATELY after TFCluster.run() and will wait until the TF worker nodes complete.
- Args:
- ssc
For Streaming applications only. Spark StreamingContext
- grace_secs
Grace period to wait after all executors have completed their tasks before terminating the Spark application, e.g. to allow the chief worker to perform any final/cleanup duties like exporting or evaluating the model. Default is 0.
- timeout
Time in seconds to wait for TF cluster to complete before terminating the Spark application. This can be useful if the TF code hangs for any reason. Default is 3 days. Use -1 to disable timeout.
- train(dataRDD, num_epochs=0, feed_timeout=600, qname='input')[source]
For InputMode.SPARK only. Feeds Spark RDD partitions into the TensorFlow worker nodes
It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD.
Since epochs are implemented via
RDD.union()
and the entire RDD must generally be processed in full, it is recommended to setnum_epochs
to closely match your training termination condition (e.g. steps or accuracy). SeeTFNode.DataFeed
for more details.- Args:
- dataRDD
input data as a Spark RDD.
- num_epochs
number of times to repeat the dataset during training.
- feed_timeout
number of seconds after which data feeding times out (600 sec default)
- qname
INTERNAL USE.
- working_dir = None
Current working directory
- run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=0, log_dir=None, driver_ps_nodes=False, master_node=None, reservation_timeout=600, queues=['input', 'output', 'error'], eval_node=False, release_port=True)[source]
Starts the TensorFlowOnSpark cluster and Runs the TensorFlow “main” function on the Spark executors
- Args:
- sc
SparkContext
- map_fun
user-supplied TensorFlow “main” function
- tf_args
argparse
args, or command-lineARGV
. These will be passed to themap_fun
.- num_executors
number of Spark executors. This should match your Spark job’s
--num_executors
.- num_ps
number of Spark executors which are reserved for TensorFlow PS nodes. All other executors will be used as TensorFlow worker nodes.
- tensorboard
boolean indicating if the chief worker should spawn a Tensorboard server.
- input_mode
TFCluster.InputMode
- log_dir
directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem.
- driver_ps_nodes
run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). You will need to set cluster_size = num_executors + num_ps
- master_node
name of the “master” or “chief” node in the cluster_template, used for tf.estimator applications.
- reservation_timeout
number of seconds after which cluster reservation times out (600 sec default)
- queues
INTERNAL_USE
- eval_node
run evaluator node for distributed Tensorflow
- release_port
automatically release reserved port prior to invoking user’s map_function. If False, user’s map_function must invoke ctx.release_port() prior to starting TF GRPC server.
- Returns:
A TFCluster object representing the started cluster.