Skip to content

Commit

Permalink
Sync OSS keras to head.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 363683813
  • Loading branch information
qlzh727 authored and tensorflower-gardener committed Mar 18, 2021
1 parent 2b0bcb8 commit 7adb536
Show file tree
Hide file tree
Showing 63 changed files with 1,945 additions and 1,736 deletions.
3 changes: 1 addition & 2 deletions keras/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from keras.utils import tf_inspect
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import moving_averages
from tensorflow.python.training.tracking import util as tracking_util
from tensorflow.python.util import keras_deps
from tensorflow.python.util.tf_export import keras_export
from tensorflow.tools.docs import doc_controls
Expand Down Expand Up @@ -721,7 +720,7 @@ def get_session(op_input_list=()):

# Inject the get_session function to tracking_util to avoid the backward
# dependency from TF to Keras.
tracking_util.register_session_provider(get_session)
tf.__internal__.tracking.register_session_provider(get_session)


def get_graph():
Expand Down
26 changes: 23 additions & 3 deletions keras/distribute/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ py_library(
deps = [
":sidecar_evaluator",
"//:expect_tensorflow_installed",
"//keras:activations",
"//keras:backend",
"//keras:callbacks",
"//keras:callbacks_v1",
Expand All @@ -42,9 +41,7 @@ py_library(
"//keras:losses",
"//keras:optimizers",
"//keras:regularizers",
"//keras/mixed_precision:autocast_variable",
"//keras/mixed_precision:policy",
"//keras/saving",
"//keras/utils:engine_utils",
"//keras/utils:mode_keys",
],
Expand Down Expand Up @@ -730,10 +727,33 @@ tf_py_test(
"no_tfrt", # TODO(b/180537361): Reenable TFRT after the issue is resolved.
],
deps = [
":multi_worker_testing_utils",
"//:expect_absl_installed",
"//:expect_portpicker_installed",
"//:expect_tensorflow_installed",
"//keras",
"//keras/utils:dataset_creator",
],
)

distribute_py_test(
name = "dataset_creator_model_fit_test",
srcs = ["dataset_creator_model_fit_test.py"],
disable_mlir_bridge = True, # TODO(b/170352626)
full_precision = True,
main = "dataset_creator_model_fit_test.py",
shard_count = 50,
tags = [
"multi_gpu",
"no_tfrt", # TODO(b/180537361): Reenable TFRT after the issue is resolved.
"nomultivm", # TODO(b/170502145)
],
deps = [
":multi_worker_testing_utils",
":strategy_combinations",
"//:expect_portpicker_installed",
"//:expect_tensorflow_installed",
"//keras",
"//keras:callbacks",
"//keras/engine",
"//keras/engine:base_layer",
Expand Down
206 changes: 206 additions & 0 deletions keras/distribute/dataset_creator_model_fit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Lint as: python3
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for `DatasetCreator` with `Model.fit` across usages and strategies."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow.compat.v2 as tf
from absl import logging
from absl.testing import parameterized
import numpy as np
import keras
from keras import callbacks as callbacks_lib
from keras.distribute import multi_worker_testing_utils
from keras.distribute import strategy_combinations
from keras.engine import sequential
from keras.layers import core as core_layers
from keras.optimizer_v2 import gradient_descent
from keras.utils import dataset_creator


class DatasetCreatorModelFitTestBase(tf.test.TestCase, parameterized.TestCase):

def _model_compile(self,
strategy,
steps_per_execution=1,
run_eagerly=False,
with_normalization_layer=False):

class ResultAssertingCallback(callbacks_lib.Callback):

def __init__(self):
self._prev_epoch = -1
self._loss_to_compare_against = 2 # Empirical initial value

def on_epoch_end(self, epoch, logs=None):
logging.info("testModelFit: epoch=%r, logs=%r", epoch, logs)
if epoch <= self._prev_epoch:
raise RuntimeError("Epoch is supposed to be larger than previous.")
self._prev_epoch = epoch
is_loss_float = (
logs.get("loss", None) is not None and
isinstance(logs["loss"], (float, np.floating)))
if not is_loss_float:
raise RuntimeError("loss is supposed to be in the logs and float.")
if epoch == 0 or epoch == 9:
# Making sure the loss of first epoch is below 1, and that of last
# epoch is smaller than the first epoch.
if logs["loss"] > self._loss_to_compare_against:
raise RuntimeError(
"loss at epoch {} is larger than previous.".format(epoch))
self._loss_to_compare_against = logs["loss"]

def on_train_end(self, logs=None):
if self._prev_epoch != 9:
raise RuntimeError("Unexpected last epoch: {}".format(
self._prev_epoch))

# TODO(b/182193218): Use ParameterServerStrategy as a proper strategy
# combination.
if strategy == "ParameterServerStrategy":
gpu_devices = tf.config.list_physical_devices("GPU")
if len(gpu_devices) > 1:
self.skipTest("b/178452835: Multi-GPUs not supported in "
"ParameterServerStrategy.")
strategy = tf.distribute.experimental.ParameterServerStrategy(
multi_worker_testing_utils.make_parameter_server_cluster(3, 2),
variable_partitioner=tf.distribute.experimental.partitioners.FixedShardsPartitioner(2))

with strategy.scope():
model = sequential.Sequential([core_layers.Dense(10)])
if with_normalization_layer:
norm = keras.layers.BatchNormalization(
axis=-1, input_shape=(4, 4, 3), momentum=0.8)
model.add(norm)

model.compile(
gradient_descent.SGD(),
loss="mse",
steps_per_execution=steps_per_execution,
run_eagerly=run_eagerly)
return model, [ResultAssertingCallback()]

def _model_fit(self,
strategy,
steps_per_execution=1,
validation_data=None,
x=None,
steps_per_epoch=10,
run_eagerly=False,
with_normalization_layer=False):
model, callbacks = self._model_compile(strategy, steps_per_execution,
run_eagerly,
with_normalization_layer)

def dataset_fn(input_context):
del input_context
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
return tf.data.Dataset.from_tensor_slices(
(x, y)).shuffle(10).repeat().batch(2)

x = x or dataset_creator.DatasetCreator(dataset_fn)

model.fit(
x,
epochs=10,
steps_per_epoch=steps_per_epoch,
verbose=0,
callbacks=callbacks,
validation_data=validation_data)
return model


@tf.__internal__.distribute.combinations.generate(
tf.__internal__.test.combinations.combine(
strategy=strategy_combinations.all_strategies +
strategy_combinations.multi_worker_mirrored_strategies +
["ParameterServerStrategy"],
mode="eager"))
class DatasetCreatorModelFitTest(DatasetCreatorModelFitTestBase):

def testModelFit(self, strategy):
model = self._model_fit(strategy)
self.assertEqual(model.optimizer.iterations, 100)
return model

def testModelFitWithNormalizationLayer(self, strategy):
model = self._model_fit(strategy, with_normalization_layer=True)
self.assertEqual(model.optimizer.iterations, 100)

def testModelFitWithStepsPerExecution(self, strategy):
model = self._model_fit(strategy, steps_per_execution=10)
self.assertEqual(model.optimizer.iterations, 100)


@tf.__internal__.distribute.combinations.generate(
tf.__internal__.test.combinations.combine(strategy=["ParameterServerStrategy"], mode="eager"))
class DatasetCreatorModelFitParameterServerStrategyOnlyTest(
DatasetCreatorModelFitTestBase):

def testModelFitWithNoStepsPerEpoch(self, strategy):
with self.assertRaisesRegex(
ValueError, "`steps_per_epoch` must be specified with "
"`ParameterServerStrategy`."):
self._model_fit(strategy, steps_per_epoch=None)

def testModelFitWithRunEagerly(self, strategy):
with self.assertRaisesRegex(
ValueError, "When using `Model` with `ParameterServerStrategy`, "
"`run_eagerly` is not supported."):
self._model_fit(strategy, run_eagerly=True)

def testModelFitWithValidationData(self, strategy):
with self.assertRaisesRegex(
NotImplementedError, "Evaluation in `model.fit` with "
"`ParameterServerStrategy` is not yet supported."):
self._model_fit(
strategy,
validation_data=tf.data.Dataset.from_tensor_slices([1, 1]))

def testModelFitWithDatasetInstance(self, strategy):
with self.assertRaisesRegex(
NotImplementedError, "Only `DatasetCreator` input is supported in "
"`ParameterServerStrategy` at this time."):
self._model_fit(
strategy, x=tf.data.Dataset.from_tensor_slices([1, 1]))

def testModelEvaluate(self, strategy):
model, _ = self._model_compile(strategy)
with self.assertRaisesRegex(
NotImplementedError, "`model.evaluate` is not yet supported with "
"`ParameterServerStrategy`."):
model.evaluate(x=tf.data.Dataset.from_tensor_slices([1, 1]))

def testModelPredict(self, strategy):
model, _ = self._model_compile(strategy)
with self.assertRaisesRegex(
NotImplementedError, "`model.predict` is not yet supported with "
"`ParameterServerStrategy`."):
model.predict(x=tf.data.Dataset.from_tensor_slices([1, 1]))

def testClusterCoordinatorSingleInstance(self, strategy):
model = self._model_fit(strategy)
strategy = model.distribute_strategy
self.assertIs(strategy._cluster_coordinator,
tf.distribute.experimental.coordinator.ClusterCoordinator(strategy))


if __name__ == "__main__":
tf.compat.v1.enable_v2_behavior()
tf.__internal__.distribute.multi_process_runner.test_main()
7 changes: 4 additions & 3 deletions keras/distribute/distribute_strategy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import numpy as np

import keras
from tensorflow.python.distribute import distribute_utils
from tensorflow.python.distribute import multi_worker_test_base
from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver
from keras import backend
Expand Down Expand Up @@ -2666,15 +2665,17 @@ def create_model():
model.load_weights(temp_dir)
self.assertNotEmpty(model.optimizer.weights)
self.assertTrue(
distribute_utils.is_distributed_variable(model.optimizer.weights[0]))
distributed_training_utils.is_distributed_variable(
model.optimizer.weights[0]))

with distribution.scope():
model = create_model()
# create/restore slot variables outside of scope is fine.
model.load_weights(temp_dir)
self.assertNotEmpty(model.optimizer.weights)
self.assertTrue(
distribute_utils.is_distributed_variable(model.optimizer.weights[0]))
distributed_training_utils.is_distributed_variable(
model.optimizer.weights[0]))


if __name__ == '__main__':
Expand Down
6 changes: 6 additions & 0 deletions keras/distribute/distributed_training_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ def call_replica_local_fn(fn, *args, **kwargs):
with strategy.scope():
return strategy.extended.call_for_each_replica(fn, args, kwargs)
return fn(*args, **kwargs)


def is_distributed_variable(v):
"""Returns whether `v` is a distributed variable."""
return (isinstance(v, tf.distribute.DistributedValues) and
isinstance(v, tf.Variable))
12 changes: 12 additions & 0 deletions keras/distribute/multi_worker_testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import tensorflow.compat.v2 as tf
import keras
from tensorflow.python.distribute import multi_worker_test_base
from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver
from keras.optimizer_v2 import gradient_descent
from tensorflow.python.training.server_lib import ClusterSpec


def mnist_synthetic_dataset(batch_size, steps_per_epoch):
Expand Down Expand Up @@ -71,3 +74,12 @@ def get_mnist_model(input_shape):
optimizer=gradient_descent.SGD(learning_rate=0.001),
metrics=["accuracy"])
return model


def make_parameter_server_cluster(num_workers, num_ps):
cluster_def = multi_worker_test_base.create_in_process_cluster(
num_workers=num_workers, num_ps=num_ps, rpc_layer="grpc")
cluster_def["chief"] = [
"localhost:%d" % multi_worker_test_base.pick_unused_port()
]
return SimpleClusterResolver(ClusterSpec(cluster_def), rpc_layer="grpc")
Loading

0 comments on commit 7adb536

Please sign in to comment.