# Copyright 2017 Yahoo Inc.
# Licensed under the terms of the Apache 2.0 license.
# Please see LICENSE file in the project root for terms.
"""This module provides helper functions for the TensorFlow application.
Primarily, these functions help with:
* starting the TensorFlow ``tf.train.Server`` for the node (allocating GPUs as desired, and determining the node's role in the cluster).
* managing input/output data for *InputMode.SPARK*.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import nested_scopes
from __future__ import print_function
import getpass
import logging
import pkg_resources
from packaging import version
from six.moves.queue import Empty
from . import compat, marker
logger = logging.getLogger(__name__)
try:
TF_VERSION = pkg_resources.get_distribution('tensorflow').version
except pkg_resources.DistributionNotFound:
TF_VERSION = pkg_resources.get_distribution('tensorflow-cpu').version
[docs]def hdfs_path(ctx, path):
"""Convenience function to create a Tensorflow-compatible absolute HDFS path from relative paths
Args:
:ctx: TFNodeContext containing the metadata specific to this node in the cluster.
:path: path to convert
Returns:
An absolute path prefixed with the correct filesystem scheme.
"""
# All Hadoop-Compatible File System Schemes (as of Hadoop 3.0.x):
HADOOP_SCHEMES = ['adl://',
'file://',
'hdfs://',
'oss://',
's3://',
's3a://',
's3n://',
'swift://',
'viewfs://',
'wasb://']
if (any(path.startswith(scheme) for scheme in HADOOP_SCHEMES)):
# absolute path w/ scheme, just return as-is
return path
elif path.startswith("/"):
# absolute path w/o scheme, just prepend w/ defaultFS
return ctx.defaultFS + path
else:
# relative path, prepend defaultFS + standard working dir
if ctx.defaultFS.startswith("hdfs://") or ctx.defaultFS.startswith("viewfs://"):
return "{0}/user/{1}/{2}".format(ctx.defaultFS, getpass.getuser(), path)
elif ctx.defaultFS.startswith("file://"):
return "{0}/{1}/{2}".format(ctx.defaultFS, ctx.working_dir[1:], path)
else:
logger.warn("Unknown scheme {0} with relative path: {1}".format(ctx.defaultFS, path))
return "{0}/{1}".format(ctx.defaultFS, path)
[docs]def start_cluster_server(ctx, num_gpus=1, rdma=False):
"""Function that wraps the creation of TensorFlow ``tf.train.Server`` for a node in a distributed TensorFlow cluster.
This is intended to be invoked from within the TF ``map_fun``, replacing explicit code to instantiate ``tf.train.ClusterSpec``
and ``tf.train.Server`` objects.
DEPRECATED for TensorFlow 2.x+
Args:
:ctx: TFNodeContext containing the metadata specific to this node in the cluster.
:num_gpu: number of GPUs desired
:rdma: boolean indicating if RDMA 'iverbs' should be used for cluster communications.
Returns:
A tuple of (cluster_spec, server)
"""
import os
import time
from . import gpu_info
if version.parse(TF_VERSION) >= version.parse("2.0.0"):
raise Exception("DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`")
logging.info("{0}: ======== {1}:{2} ========".format(ctx.worker_num, ctx.job_name, ctx.task_index))
cluster_spec = ctx.cluster_spec
logging.info("{0}: Cluster spec: {1}".format(ctx.worker_num, cluster_spec))
if compat.is_gpu_available() and num_gpus > 0:
# compute my index relative to other nodes placed on the same host (for GPU allocation)
my_addr = cluster_spec[ctx.job_name][ctx.task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)
# GPU
gpu_initialized = False
retries = 3
while not gpu_initialized and retries > 0:
try:
# override PS jobs to only reserve one GPU
if ctx.job_name == 'ps':
num_gpus = 0
# Find a free gpu(s) to use
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)
gpu_prompt = "GPU" if num_gpus == 1 else "GPUs"
logging.info("{0}: Using {1}: {2}".format(ctx.worker_num, gpu_prompt, gpus_to_use))
# Set GPU device to use for TensorFlow
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
# Import tensorflow after gpu allocation
import tensorflow as tf
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec(cluster_spec)
# Create and start a server for the local task.
if rdma:
server = tf.train.Server(cluster, ctx.job_name, ctx.task_index, protocol="grpc+verbs")
else:
server = tf.train.Server(cluster, ctx.job_name, ctx.task_index)
gpu_initialized = True
except Exception as e:
print(e)
logging.error("{0}: Failed to allocate GPU, trying again...".format(ctx.worker_num))
retries -= 1
time.sleep(10)
if not gpu_initialized:
raise Exception("Failed to allocate GPU")
else:
# CPU
import tensorflow as tf
os.environ['CUDA_VISIBLE_DEVICES'] = ''
logging.info("{0}: Using CPU".format(ctx.worker_num))
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec(cluster_spec)
# Create and start a server for the local task.
server = tf.train.Server(cluster, ctx.job_name, ctx.task_index)
return (cluster, server)
[docs]def next_batch(mgr, batch_size, qname='input'):
"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raise Exception("DEPRECATED: Use TFNode.DataFeed class instead")
[docs]def export_saved_model(sess, export_dir, tag_set, signatures):
"""Convenience function to export a saved_model using provided arguments
The caller specifies the saved_model signatures in a simplified python dictionary form, as follows::
signatures = {
'signature_def_key': {
'inputs': { 'input_tensor_alias': input_tensor_name },
'outputs': { 'output_tensor_alias': output_tensor_name },
'method_name': 'method'
}
}
And this function will generate the `signature_def_map` and export the saved_model.
DEPRECATED for TensorFlow 2.x+.
Args:
:sess: a tf.Session instance
:export_dir: path to save exported saved_model
:tag_set: string tag_set to identify the exported graph
:signatures: simplified dictionary representation of a TensorFlow signature_def_map
Returns:
A saved_model exported to disk at ``export_dir``.
"""
import tensorflow as tf
if version.parse(tf.__version__) >= version.parse("2.0.0"):
raise Exception("DEPRECATED: Use TF provided APIs instead.")
g = sess.graph
g._unsafe_unfinalize() # https://github.com/tensorflow/serving/issues/363
builder = tf.saved_model.builder.SavedModelBuilder(export_dir)
logging.info("===== signatures: {}".format(signatures))
signature_def_map = {}
for key, sig in signatures.items():
signature_def_map[key] = tf.saved_model.signature_def_utils.build_signature_def(
inputs={name: tf.saved_model.utils.build_tensor_info(tensor) for name, tensor in sig['inputs'].items()},
outputs={name: tf.saved_model.utils.build_tensor_info(tensor) for name, tensor in sig['outputs'].items()},
method_name=sig['method_name'] if 'method_name' in sig else key)
logging.info("===== signature_def_map: {}".format(signature_def_map))
builder.add_meta_graph_and_variables(
sess,
tag_set.split(','),
signature_def_map=signature_def_map,
clear_devices=True)
g.finalize()
builder.save()
[docs]def release_port(ctx):
"""Closes the temporary socket created to assign a port to the TF node."""
if ctx.tmp_socket is not None:
logger.info("Releasing assigned port: {}".format(ctx.tmp_socket.getsockname()))
ctx.tmp_socket.close()
ctx.tmp_socket = None
else:
logger.warning("release_port() invoked with no bound socket.")
[docs]def batch_results(mgr, results, qname='output'):
"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raise Exception("DEPRECATED: Use TFNode.DataFeed class instead")
[docs]def terminate(mgr, qname='input'):
"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raise Exception("DEPRECATED: Use TFNode.DataFeed class instead")
[docs]class DataFeed(object):
"""This class manages the *InputMode.SPARK* data feeding process from the perspective of the TensorFlow application.
Args:
:mgr: TFManager instance associated with this Python worker process.
:train_mode: boolean indicating if the data feed is expecting an output response (e.g. inferencing).
:qname_in: *INTERNAL_USE*
:qname_out: *INTERNAL_USE*
:input_mapping: *For Spark ML Pipelines only*. Dictionary of input DataFrame columns to input TensorFlow tensors.
"""
def __init__(self, mgr, train_mode=True, qname_in='input', qname_out='output', input_mapping=None):
self.mgr = mgr
self.train_mode = train_mode
self.qname_in = qname_in
self.qname_out = qname_out
self.done_feeding = False
self.input_tensors = [tensor for col, tensor in sorted(input_mapping.items())] if input_mapping is not None else None
self.queue_in = mgr.get_queue(qname_in)
self.queue_out = mgr.get_queue(qname_out)
[docs] def next_batch(self, batch_size):
"""Gets a batch of items from the input RDD.
If multiple tensors are provided per row in the input RDD, e.g. tuple of (tensor1, tensor2, ..., tensorN) and:
* no ``input_mapping`` was provided to the DataFeed constructor, this will return an array of ``batch_size`` tuples,
and the caller is responsible for separating the tensors.
* an ``input_mapping`` was provided to the DataFeed constructor, this will return a dictionary of N tensors,
with tensor names as keys and arrays of length ``batch_size`` as values.
Note: if the end of the data is reached, this may return with fewer than ``batch_size`` items.
Args:
:batch_size: number of items to retrieve.
Returns:
A batch of items or a dictionary of tensors.
"""
tensors = [] if self.input_tensors is None else {tensor: [] for tensor in self.input_tensors}
count = 0
queue_in = self.queue_in
no_input_tensors = self.input_tensors is None
while count < batch_size:
item = queue_in.get(block=True)
if item is None:
# End of Feed
logger.info("next_batch() got None")
queue_in.task_done()
self.done_feeding = True
break
elif type(item) is marker.EndPartition:
# End of Partition
logger.info("next_batch() got EndPartition")
queue_in.task_done()
if not self.train_mode and count > 0:
break
else:
# Normal item
if no_input_tensors:
tensors.append(item)
else:
for i in range(len(self.input_tensors)):
tensors[self.input_tensors[i]].append(item[i])
count += 1
queue_in.task_done()
return tensors
[docs] def should_stop(self):
"""Check if the feed process was told to stop (by a call to ``terminate``)."""
return self.done_feeding
[docs] def batch_results(self, results):
"""Push a batch of output results to the Spark output RDD of ``TFCluster.inference()``.
Note: this currently expects a one-to-one mapping of input to output data, so the length of the ``results`` array should match the length of
the previously retrieved batch of input data.
Args:
:results: array of output data for the equivalent batch of input data.
"""
queue = self.queue_out
for item in results:
queue.put(item, block=True)
[docs] def terminate(self):
"""Terminate data feeding early.
Since TensorFlow applications can often terminate on conditions unrelated to the training data (e.g. steps, accuracy, etc),
this method signals the data feeding process to ignore any further incoming data. Note that Spark itself does not have a mechanism
to terminate an RDD operation early, so the extra partitions will still be sent to the executors (but will be ignored). Because
of this, you should size your input data accordingly to avoid excessive overhead.
"""
logger.info("terminate() invoked")
self.mgr.set('state', 'terminating')
# drop remaining items in the queue
queue = self.mgr.get_queue(self.qname_in)
count = 0
done = False
while not done:
try:
queue.get(block=True, timeout=5)
queue.task_done()
count += 1
except Empty:
logger.info("dropped {0} items from queue".format(count))
done = True