Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The sample demo have some bug. #401

Open
sosixyz opened this issue Apr 25, 2024 · 21 comments
Open

The sample demo have some bug. #401

sosixyz opened this issue Apr 25, 2024 · 21 comments
Assignees

Comments

@sosixyz
Copy link

sosixyz commented Apr 25, 2024

tensorflow: 2.8.0
tfra: 0.6.0

  • I download my demo, the path is demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py。
    Acoording to the file, I sh start.sh ,The bug is show 。
    image
@rhdong
Copy link
Member

rhdong commented Apr 25, 2024

Hi @MoFHeka could you please help with it?

@sosixyz
Copy link
Author

sosixyz commented Apr 28, 2024

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@alykhantejani
Copy link

Any update on this? I am also facing this issue

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 28, 2024

@sosixyz Could you please provide a minimum reproducible code?

@alykhantejani
Copy link

@MoFHeka It's weird, I can't produce a MWE as it only occurs in some settings for me. According to @rhdong here: #414 it is an issue that dev team are aware of? (although I don't really know what the issue is)

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 28, 2024

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue.
If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 28, 2024

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

@alykhantejani
Copy link

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common

@alykhantejani
Copy link

I should not Im using an in-mem cluster for testing with 2 PS and passing these device names to BasicEmbedding

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 28, 2024

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@sosixyz
Copy link
Author

sosixyz commented May 29, 2024

@sosixyz Could you please provide a minimum reproducible code?

Thank you for your reply!The reproducible code is copied from the link https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py.

@sosixyz
Copy link
Author

sosixyz commented May 29, 2024

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the model.fit() method, dataset must use tf.keras.utils.experimental.DatasetCreator, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 29, 2024

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the model.fit() method, dataset must use tf.keras.utils.experimental.DatasetCreator, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163

Yes, you're right. Mostly using tf.keras.utils.experimental.DatasetCreator for dispatch input data to different worker. But this is a simple demo after all, so I got lazy. Could you please contribute a more complete demo if it's convenient for you.

@alykhantejani
Copy link

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 29, 2024

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?

@alykhantejani
Meta: https://github.com/pytorch/torchrec Software-Hardware Co-design for Fast and Scalable Training of Deep Learning Recommendation Models
Tencent: TFRA here with custom backend
Meituan: https://www.nvidia.com/en-us/on-demand/session/gtcspring22-s41370/ https://tech.meituan.com/2022/03/24/tensorflow-gpu-training-optimization-practice-in-meituan-waimai-recommendation-scenarios.html
Nvidia: https://github.com/NVIDIA-Merlin/HugeCTR
Alibaba: https://github.com/DeepRec-AI/DeepRec

@alykhantejani
Copy link

@MoFHeka I meant using TFRA specifically, especially using Host Memory not GPU mem (as GPU devices are expensive)

@MoFHeka
Copy link
Collaborator

MoFHeka commented May 30, 2024

@alykhantejani
Here is the demo: https://github.com/tensorflow/recommenders-addons/tree/master/demo/dynamic_embedding/movielens-1m-keras-with-horovod

If you want to place the embedding in host memory, please set parameter devices=["CPU"] when you create embedding layer.

If you want to use both host memory and device memory for embedding, using HKV. Replace the code with HKV creator when you assign the hash table backend.
https://github.com/tensorflow/recommenders-addons/blob/master/docs/api_docs/tfra/dynamic_embedding/HkvHashTableCreator.md

Here is the explanation how sync distributed training works: #365

@sosixyz
Copy link
Author

sosixyz commented Jun 1, 2024

@MoFHeka Hello, I try to use tf.keras.utils.experimental.DatasetCreator , but new bug arised. I just change data input function, I use keras.embeding that demo is running well. The demo is

import os
import tensorflow as tf
import tensorflow_datasets as tfds
import sys

from absl import flags
from absl import app
from tensorflow_recommenders_addons import dynamic_embedding as de
import keras.models 
try:
  from tensorflow.keras.optimizers.legacy import Adam
except:
  from tensorflow.keras.optimizers import Adam
# tf.compat.v1.disable_eager_execution()
flags = tf.compat.v1.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string(
    'ps_list', "localhost:2220, localhost:2221",
    'ps_list: to be a comma seperated string, '
    'like "localhost:2220, localhost:2220"')
flags.DEFINE_string(
    'worker_list', "localhost:2231",
    'worker_list: to be a comma seperated string, '
    'like "localhost:2231, localhost:2232"')
flags.DEFINE_string('chief', "localhost:2230", 'chief: like "localhost:2230"')
flags.DEFINE_string('task_mode', "worker",
                    'runninig_mode: ps or worker or chief.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')

input_spec = {
    'user_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='user_id'),
    'movie_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='movie_id')
}


class DualChannelsDeepModel(tf.keras.Model):

  def __init__(self,
               devices=[],
               strategy = None,
               user_embedding_size=1,
               movie_embedding_size=1,
               embedding_initializer=None,
               is_training=True):

    if not is_training:
      de.enable_inference_mode()

    super(DualChannelsDeepModel, self).__init__()
    self.user_embedding_size = user_embedding_size
    self.movie_embedding_size = movie_embedding_size
    self.devices = devices
    self.strategy = strategy

    if embedding_initializer is None:
      embedding_initializer = tf.keras.initializers.Zeros()

    self.user_embedding = de.keras.layers.SquashedEmbedding(
        user_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
         devices=self.devices,
        name='user_embedding')
           
    self.movie_embedding = de.keras.layers.SquashedEmbedding(
        movie_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
        devices=self.devices,
        name='movie_embedding')

    self.dnn1 = tf.keras.layers.Dense(
        64,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn2 = tf.keras.layers.Dense(
        16,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn3 = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.bias_net = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

  @tf.function
  def call(self, features):
    user_id = tf.reshape(features['user_id'], (-1, 1))
    movie_id = tf.reshape(features['movie_id'], (-1, 1))
    user_latent = self.user_embedding(user_id)
    movie_latent = self.movie_embedding(movie_id)
    latent = tf.concat([user_latent, movie_latent], axis=1)

    x = self.dnn1(latent)
    x = self.dnn2(x)
    x = self.dnn3(x)

    bias = self.bias_net(latent)
    x = 0.2 * x + 0.8 * bias
    return x


class Runner():

  def __init__(self, strategy, train_bs, test_bs, epochs, steps_per_epoch,
               model_dir, export_dir):
    self.strategy = strategy
    self.num_worker = strategy._num_workers
    self.num_ps = strategy._num_ps
    # print('######################',self.num_ps)
    self.ps_devices = [
        "/job:ps/replica:0/task:{}/device:CPU:0".format(idx)
        for idx in range(self.num_ps)
    ]
    print('##############', self.ps_devices)
    self.embedding_size = 32
    self.train_bs = train_bs
    self.test_bs = test_bs
    self.epochs = epochs
    self.steps_per_epoch = steps_per_epoch
    self.model_dir = model_dir
    self.export_dir = export_dir

  def get_dataset(self, batch_size=1):
    dataset = tfds.load('movielens/1m-ratings', split='train')
    features = dataset.map(
        lambda x: {
            "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
            "user_id": tf.strings.to_number(x["user_id"], tf.int64),
        })
    ratings = dataset.map(
        lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
    dataset = dataset.zip((features, ratings))
    dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
    if batch_size > 1:
      dataset = dataset.batch(batch_size)
    return dataset

  def train(self):
    # dataset = self.get_dataset(batch_size=self.train_bs)
    # dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.strategy, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))
      optimizer = Adam(1E-3)
      optimizer = de.DynamicEmbeddingOptimizer(optimizer)

      auc = tf.keras.metrics.AUC(num_thresholds=1000)

      model.compile(optimizer=optimizer,
                    loss=tf.keras.losses.MeanSquaredError(),
                    metrics=[
                        auc,
                    ])
      
    if self.model_dir:
        if os.path.exists(self.model_dir):
          model.load_weights(self.model_dir)
    batch_size = self.train_bs
    def dataset_fn(input_context):
      dataset = tfds.load('movielens/1m-ratings', split='train')
      dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
      features = dataset.map(
          lambda x: {
              "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
              "user_id": tf.strings.to_number(x["user_id"], tf.int64),
          })
      ratings = dataset.map(
          lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
      dataset = dataset.zip((features, ratings))
      dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
      if batch_size > 1:
        dataset = dataset.batch(batch_size)
      return dataset
    input_options = tf.distribute.InputOptions(experimental_fetch_to_device=True)
    distributed_dataset = tf.keras.utils.experimental.DatasetCreator(dataset_fn, input_options=input_options)
    
    model.fit(distributed_dataset, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch)

    if self.model_dir:
        save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
        model.save(self.model_dir, options=save_options)

  def export(self):
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))

    def save_spec():
      if hasattr(model, 'save_spec'):
        return model.save_spec()
      else:
        arg_specs = list()
        kwarg_specs = dict()
        for i in model.inputs:
          arg_specs.append(i.type_spec)
        return [arg_specs], kwarg_specs

    @tf.function
    def serve(*args, **kwargs):
      return model(*args, **kwargs)

    save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])

    # Only save the calculation graph
    from tensorflow.python.saved_model import save as tf_save
    K.clear_session()
    de.enable_inference_mode()
    # Overwrite saved_model.pb file with save_and_return_nodes function to rewrite the calculation graph
    tf_save.save_and_return_nodes(obj=model,
                                  export_dir=self.export_dir,
                                  signatures={
                                      'serving_default':
                                          serve.get_concrete_function(
                                              *arg_specs, **kwarg_specs)
                                  },
                                  options=save_options,
                                  experimental_skip_checkpoint=True)

  def test(self):
    de.enable_inference_mode()

    dataset = self.get_dataset(batch_size=self.test_bs)
    dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = tf.keras.models.load_model(self.export_dir)
    signature = model.signatures['serving_default']

    def get_close_or_equal_cnt(model, features, ratings):
      preds = model(features)
      preds = tf.math.argmax(preds, axis=1)
      ratings = tf.math.argmax(ratings, axis=1)
      close_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32))
      equal_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32))
      return close_cnt, equal_cnt

    it = iter(dataset)
    for step in range(self.test_steps):
      features, ratings = it.get_next()
      close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings)
      print(
          f'In batch prediction, step: {step}, {close_cnt}/{self.test_bs} are closely'
          f' accurate, {equal_cnt}/{self.test_bs} are absolutely accurate.')


def start_chief(config):
  print("chief config", config)

  cluster_spec = tf.train.ClusterSpec(config["cluster"])
  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_type="chief", task_id=0)
  # cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      # cluster_spec, task_type="chief", task_id=0)
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver)
  runner = Runner(strategy=strategy,
                  train_bs=32,
                  test_bs=1,
                  epochs=2,
                  steps_per_epoch=10,
                  model_dir=None,
                  export_dir=None)
  runner.train()


def start_worker(task_id, config):
  print("worker config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="worker",
                                task_index=task_id)
  server.join()


def start_ps(task_id, config):
  print("ps config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="ps",
                                task_index=task_id)
  server.join()


def main(argv):
  ps_list = FLAGS.ps_list.replace(' ', '').split(',')
  worker_list = FLAGS.worker_list.replace(' ', '').split(',')
  task_mode = FLAGS.task_mode
  task_id = FLAGS.task_id

  print('ps_list: ', ps_list)
  print('worker_list: ', worker_list)

  cluster_config = {
      "cluster": {
          "chief": [FLAGS.chief],
          "ps": ps_list,
          "worker": worker_list
      }
  }
  print(cluster_config)
  if task_mode == 'ps':
    start_ps(task_id, cluster_config)
  elif task_mode == 'worker':
    start_worker(task_id, cluster_config)
  elif task_mode == 'chief':
    start_chief(cluster_config)
  else:
    print('invalid task_mode. Options include "ps" and "worker".')
    sys.exit(1)


if __name__ == "__main__":
  tf.compat.v1.app.run()

the bug is
image

@kefault
Copy link

kefault commented Jan 20, 2025

@MoFHeka Hello, I try to use tf.keras.utils.experimental.DatasetCreator , but new bug arised. I just change data input function, I use keras.embeding that demo is running well. The demo is

import os
import tensorflow as tf
import tensorflow_datasets as tfds
import sys

from absl import flags
from absl import app
from tensorflow_recommenders_addons import dynamic_embedding as de
import keras.models 
try:
  from tensorflow.keras.optimizers.legacy import Adam
except:
  from tensorflow.keras.optimizers import Adam
# tf.compat.v1.disable_eager_execution()
flags = tf.compat.v1.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string(
    'ps_list', "localhost:2220, localhost:2221",
    'ps_list: to be a comma seperated string, '
    'like "localhost:2220, localhost:2220"')
flags.DEFINE_string(
    'worker_list', "localhost:2231",
    'worker_list: to be a comma seperated string, '
    'like "localhost:2231, localhost:2232"')
flags.DEFINE_string('chief', "localhost:2230", 'chief: like "localhost:2230"')
flags.DEFINE_string('task_mode', "worker",
                    'runninig_mode: ps or worker or chief.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')

input_spec = {
    'user_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='user_id'),
    'movie_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='movie_id')
}


class DualChannelsDeepModel(tf.keras.Model):

  def __init__(self,
               devices=[],
               strategy = None,
               user_embedding_size=1,
               movie_embedding_size=1,
               embedding_initializer=None,
               is_training=True):

    if not is_training:
      de.enable_inference_mode()

    super(DualChannelsDeepModel, self).__init__()
    self.user_embedding_size = user_embedding_size
    self.movie_embedding_size = movie_embedding_size
    self.devices = devices
    self.strategy = strategy

    if embedding_initializer is None:
      embedding_initializer = tf.keras.initializers.Zeros()

    self.user_embedding = de.keras.layers.SquashedEmbedding(
        user_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
         devices=self.devices,
        name='user_embedding')
           
    self.movie_embedding = de.keras.layers.SquashedEmbedding(
        movie_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
        devices=self.devices,
        name='movie_embedding')

    self.dnn1 = tf.keras.layers.Dense(
        64,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn2 = tf.keras.layers.Dense(
        16,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn3 = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.bias_net = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

  @tf.function
  def call(self, features):
    user_id = tf.reshape(features['user_id'], (-1, 1))
    movie_id = tf.reshape(features['movie_id'], (-1, 1))
    user_latent = self.user_embedding(user_id)
    movie_latent = self.movie_embedding(movie_id)
    latent = tf.concat([user_latent, movie_latent], axis=1)

    x = self.dnn1(latent)
    x = self.dnn2(x)
    x = self.dnn3(x)

    bias = self.bias_net(latent)
    x = 0.2 * x + 0.8 * bias
    return x


class Runner():

  def __init__(self, strategy, train_bs, test_bs, epochs, steps_per_epoch,
               model_dir, export_dir):
    self.strategy = strategy
    self.num_worker = strategy._num_workers
    self.num_ps = strategy._num_ps
    # print('######################',self.num_ps)
    self.ps_devices = [
        "/job:ps/replica:0/task:{}/device:CPU:0".format(idx)
        for idx in range(self.num_ps)
    ]
    print('##############', self.ps_devices)
    self.embedding_size = 32
    self.train_bs = train_bs
    self.test_bs = test_bs
    self.epochs = epochs
    self.steps_per_epoch = steps_per_epoch
    self.model_dir = model_dir
    self.export_dir = export_dir

  def get_dataset(self, batch_size=1):
    dataset = tfds.load('movielens/1m-ratings', split='train')
    features = dataset.map(
        lambda x: {
            "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
            "user_id": tf.strings.to_number(x["user_id"], tf.int64),
        })
    ratings = dataset.map(
        lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
    dataset = dataset.zip((features, ratings))
    dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
    if batch_size > 1:
      dataset = dataset.batch(batch_size)
    return dataset

  def train(self):
    # dataset = self.get_dataset(batch_size=self.train_bs)
    # dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.strategy, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))
      optimizer = Adam(1E-3)
      optimizer = de.DynamicEmbeddingOptimizer(optimizer)

      auc = tf.keras.metrics.AUC(num_thresholds=1000)

      model.compile(optimizer=optimizer,
                    loss=tf.keras.losses.MeanSquaredError(),
                    metrics=[
                        auc,
                    ])
      
    if self.model_dir:
        if os.path.exists(self.model_dir):
          model.load_weights(self.model_dir)
    batch_size = self.train_bs
    def dataset_fn(input_context):
      dataset = tfds.load('movielens/1m-ratings', split='train')
      dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
      features = dataset.map(
          lambda x: {
              "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
              "user_id": tf.strings.to_number(x["user_id"], tf.int64),
          })
      ratings = dataset.map(
          lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
      dataset = dataset.zip((features, ratings))
      dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
      if batch_size > 1:
        dataset = dataset.batch(batch_size)
      return dataset
    input_options = tf.distribute.InputOptions(experimental_fetch_to_device=True)
    distributed_dataset = tf.keras.utils.experimental.DatasetCreator(dataset_fn, input_options=input_options)
    
    model.fit(distributed_dataset, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch)

    if self.model_dir:
        save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
        model.save(self.model_dir, options=save_options)

  def export(self):
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))

    def save_spec():
      if hasattr(model, 'save_spec'):
        return model.save_spec()
      else:
        arg_specs = list()
        kwarg_specs = dict()
        for i in model.inputs:
          arg_specs.append(i.type_spec)
        return [arg_specs], kwarg_specs

    @tf.function
    def serve(*args, **kwargs):
      return model(*args, **kwargs)

    save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])

    # Only save the calculation graph
    from tensorflow.python.saved_model import save as tf_save
    K.clear_session()
    de.enable_inference_mode()
    # Overwrite saved_model.pb file with save_and_return_nodes function to rewrite the calculation graph
    tf_save.save_and_return_nodes(obj=model,
                                  export_dir=self.export_dir,
                                  signatures={
                                      'serving_default':
                                          serve.get_concrete_function(
                                              *arg_specs, **kwarg_specs)
                                  },
                                  options=save_options,
                                  experimental_skip_checkpoint=True)

  def test(self):
    de.enable_inference_mode()

    dataset = self.get_dataset(batch_size=self.test_bs)
    dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = tf.keras.models.load_model(self.export_dir)
    signature = model.signatures['serving_default']

    def get_close_or_equal_cnt(model, features, ratings):
      preds = model(features)
      preds = tf.math.argmax(preds, axis=1)
      ratings = tf.math.argmax(ratings, axis=1)
      close_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32))
      equal_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32))
      return close_cnt, equal_cnt

    it = iter(dataset)
    for step in range(self.test_steps):
      features, ratings = it.get_next()
      close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings)
      print(
          f'In batch prediction, step: {step}, {close_cnt}/{self.test_bs} are closely'
          f' accurate, {equal_cnt}/{self.test_bs} are absolutely accurate.')


def start_chief(config):
  print("chief config", config)

  cluster_spec = tf.train.ClusterSpec(config["cluster"])
  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_type="chief", task_id=0)
  # cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      # cluster_spec, task_type="chief", task_id=0)
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver)
  runner = Runner(strategy=strategy,
                  train_bs=32,
                  test_bs=1,
                  epochs=2,
                  steps_per_epoch=10,
                  model_dir=None,
                  export_dir=None)
  runner.train()


def start_worker(task_id, config):
  print("worker config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="worker",
                                task_index=task_id)
  server.join()


def start_ps(task_id, config):
  print("ps config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="ps",
                                task_index=task_id)
  server.join()


def main(argv):
  ps_list = FLAGS.ps_list.replace(' ', '').split(',')
  worker_list = FLAGS.worker_list.replace(' ', '').split(',')
  task_mode = FLAGS.task_mode
  task_id = FLAGS.task_id

  print('ps_list: ', ps_list)
  print('worker_list: ', worker_list)

  cluster_config = {
      "cluster": {
          "chief": [FLAGS.chief],
          "ps": ps_list,
          "worker": worker_list
      }
  }
  print(cluster_config)
  if task_mode == 'ps':
    start_ps(task_id, cluster_config)
  elif task_mode == 'worker':
    start_worker(task_id, cluster_config)
  elif task_mode == 'chief':
    start_chief(cluster_config)
  else:
    print('invalid task_mode. Options include "ps" and "worker".')
    sys.exit(1)


if __name__ == "__main__":
  tf.compat.v1.app.run()

the bug is image

I meet the same question. Set with_unique=False will be ok for training, but it seems not correct. So have you solve this problem?
When I start the job with 8 ps, the ps_0's cpu usage will be very high, but other ps don't have this problem.
cc @alykhantejani @MoFHeka

@MoFHeka
Copy link
Collaborator

MoFHeka commented Jan 20, 2025

@kefault Sorry, PS support I'll sort it out when I have time

@kefault
Copy link

kefault commented Jan 23, 2025

@kefault Sorry, PS support I'll sort it out when I have time

@MoFHeka Thank you a lot~ Do you have an official WeChat group? As a newcomer, maybe a WeChat group would be more efficient for me to communicate with others.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants