tensorflowonspark.TFCluster module

This module provides a high-level API to manage the TensorFlowOnSpark cluster.

There are three main phases of operation:

  1. Reservation/Startup - reserves a port for the TensorFlow process on each executor, starts a multiprocessing.Manager to listen for data/control messages, and then launches the Tensorflow main function on the executors.

  2. Data feeding - For InputMode.SPARK only. Sends RDD data to the TensorFlow nodes via each executor’s multiprocessing.Manager. PS nodes will tie up their executors, so they won’t receive any subsequent data feeding tasks.

  3. Shutdown - sends a shutdown control message to the multiprocessing.Managers of the PS nodes and pushes end-of-feed markers into the data queues of the worker nodes.

class InputMode[source]

Bases: object

Enum for the input modes of data feeding.

SPARK = 1

Spark is responsible for feeding data to the TensorFlow application via an RDD.

TENSORFLOW = 0

TensorFlow application is responsible for reading any data.

class TFCluster[source]

Bases: object

cluster_id = None

Unique ID for this cluster, used to invalidate state for new clusters.

cluster_info = None

Cluster node reservations

cluster_meta = None

Cluster metadata dictionary, e.g. cluster_id, defaultFS, reservation.Server address, etc.

defaultFS = None

Default FileSystem string, e.g. file:// or hdfs://<namenode>/

inference(dataRDD, feed_timeout=600, qname='input')[source]

For InputMode.SPARK only: Feeds Spark RDD partitions into the TensorFlow worker nodes and returns an RDD of results

It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD and provide valid data for the output RDD.

This will use the distributed TensorFlow cluster for inferencing, so the TensorFlow “main” function should be capable of inferencing. Per Spark design, the output RDD will be lazily-executed only when a Spark action is invoked on the RDD.

Args:
dataRDD

input data as a Spark RDD

feed_timeout

number of seconds after which data feeding times out (600 sec default)

qname

INTERNAL_USE

Returns:

A Spark RDD representing the output of the TensorFlow inferencing

input_mode = None

TFCluster.InputMode for this cluster

nodeRDD = None

RDD representing the nodes of the cluster, i.e. sc.parallelize(range(num_executors), num_executors)

num_executors = None

Number of executors in the Spark job (and therefore, the number of nodes in the TensorFlow cluster).

queues = None

INTERNAL_USE

sc = None

SparkContext

server = None

reservation.Server for this cluster

shutdown(ssc=None, grace_secs=0, timeout=259200)[source]

Stops the distributed TensorFlow cluster.

For InputMode.SPARK, this will be executed AFTER the TFCluster.train() or TFCluster.inference() method completes. For InputMode.TENSORFLOW, this will be executed IMMEDIATELY after TFCluster.run() and will wait until the TF worker nodes complete.

Args:
ssc

For Streaming applications only. Spark StreamingContext

grace_secs

Grace period to wait after all executors have completed their tasks before terminating the Spark application, e.g. to allow the chief worker to perform any final/cleanup duties like exporting or evaluating the model. Default is 0.

timeout

Time in seconds to wait for TF cluster to complete before terminating the Spark application. This can be useful if the TF code hangs for any reason. Default is 3 days. Use -1 to disable timeout.

tensorboard_url()[source]

Utility function to get the Tensorboard URL

train(dataRDD, num_epochs=0, feed_timeout=600, qname='input')[source]

For InputMode.SPARK only. Feeds Spark RDD partitions into the TensorFlow worker nodes

It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD.

Since epochs are implemented via RDD.union() and the entire RDD must generally be processed in full, it is recommended to set num_epochs to closely match your training termination condition (e.g. steps or accuracy). See TFNode.DataFeed for more details.

Args:
dataRDD

input data as a Spark RDD.

num_epochs

number of times to repeat the dataset during training.

feed_timeout

number of seconds after which data feeding times out (600 sec default)

qname

INTERNAL USE.

working_dir = None

Current working directory

run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=0, log_dir=None, driver_ps_nodes=False, master_node=None, reservation_timeout=600, queues=['input', 'output', 'error'], eval_node=False, release_port=True)[source]

Starts the TensorFlowOnSpark cluster and Runs the TensorFlow “main” function on the Spark executors

Args:
sc

SparkContext

map_fun

user-supplied TensorFlow “main” function

tf_args

argparse args, or command-line ARGV. These will be passed to the map_fun.

num_executors

number of Spark executors. This should match your Spark job’s --num_executors.

num_ps

number of Spark executors which are reserved for TensorFlow PS nodes. All other executors will be used as TensorFlow worker nodes.

tensorboard

boolean indicating if the chief worker should spawn a Tensorboard server.

input_mode

TFCluster.InputMode

log_dir

directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem.

driver_ps_nodes

run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). You will need to set cluster_size = num_executors + num_ps

master_node

name of the “master” or “chief” node in the cluster_template, used for tf.estimator applications.

reservation_timeout

number of seconds after which cluster reservation times out (600 sec default)

queues

INTERNAL_USE

eval_node

run evaluator node for distributed Tensorflow

release_port

automatically release reserved port prior to invoking user’s map_function. If False, user’s map_function must invoke ctx.release_port() prior to starting TF GRPC server.

Returns:

A TFCluster object representing the started cluster.