MPI學習轉載

      網友投稿 1085 2025-03-31

      隨著分布式深度學習在工業界的普及,MPI(比我的年紀還要大兩歲)又迎來了新的活力。作為一個從沒有在 HPC 領域有過積累的小學生,學習了許多論文與博客,還是沒有理清 MPI,OpenMPI,AllReduce,ReduceScatter,RingAllReduce 等等概念之間的關系。在前段時間為了能夠更好地閱讀 Horovod 和 BytePS 的代碼,從零開始學習了一番。Horovod 本身的實現并不十分復雜,但是它的部分工作其實是借助 MPI 來實現的。


      這里拋磚引玉地介紹一下 MPI 與深度學習的關系,也留作最近業余時間學習過程的記錄。近來與朋友交流,有感于之前文章都過于陽春白雪,對于不熟悉這一領域的讀者來說不太友好。因此這一篇文章可能會有針對性地鋪墊一些背景知識,因此全文可能較長,對熟悉 MPI 的朋友來說可以跳過前面。

      MPI 是什么?這里引用一段?MPI 教程介紹?中的內容:

      這里十分推薦先閱讀完?MPI 教程?的全部內容,它是我在互聯網上能找到的所有關于 MPI 的公開材料中最為深入淺出的一個教程。

      簡單地來理解 MPI,它是一個定義了多個原語的消息傳遞接口,這一接口主要被用于多進程間的通信。它的競品包括 RPC,Distributed Shared Memory 等。關于它們的比較可以參考論文?Message Passing, Remote Procedure Calls and Distributed Shared Memory as Communication Paradigms for Distributed Systems。

      MPI 的詳細文檔可以參考?MPI Forum,這里有 MPI 各個版本的文檔(目前發布到了 3.1)。在當代的 MPI 中,接口已經有相當多。不過對我們而言最重要的只有三個部分,也對應著?MPI 文檔?中的第三和第五章節:端到端通信,數據類型,和集合通信(Collective Communication)。

      端到端通信部分主要實現了從一個進程到另一個進程的通信,核心功能由兩個原語提供:

      int MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)

      具體內容請參考?MPI 教程:MPI Send and Receive。

      集合通訊是建立在端到端通信的基礎上,在一組進程內的通訊原語。其中主要包括:

      // Broadcasts a message from the process with rank root to all other processes of the group. int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) // Gathers values from a group of processes. int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) // Sends data from one task to all tasks in a group. int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) ...

      更多請參考?MPI 教程:廣播以及集體(collective)通信,MPI 教程:MPI Scatter, Gather, and Allgather?和?MPI 文檔。

      MPI 提出了這一系列為了解決進程間消息傳遞問題而存在的接口,但它們需要一個實現。OpenMPI 是 MPI 的常用實現之一。因此我們可以理解,MPI 是定義,是接口,而 OpenMPI 是這一接口的對應實現。這里還有一個容易混淆的概念,就是 OpenMP。OpenMP(Open Multi-Processing)與 OpenMPI,MPI 并無任何關系。它是一個針對共享內存并行編程的 API。這里特意提出,避免混淆。

      而既然 OpenMPI 是 MPI 的一種實現,那針對不同的原語,采用什么算法和數據結構來實現,是實現者的自由。我們應該可以輕易地想到,針對不同的情況(主要是消息的大小等),采用不同的算法會提高整體的性能。OpenMPI 也是這樣做的。

      回到 AllReduce,它是 MPI 定義的一個集合通信的原語,它的語義是:

      在語義上(注意,僅限于語義的角度,在實現角度并不一定如此),MPI_AllReduce 等于 MPI_Reduce 加 MPI_Bcast。MPI_Reduce 會把結果聚集在 Root 進程,而再利用 MPI_Bcast 把 Root 進程的結果分發給所有進程,就實現了跟 MPI_AllReduce 等價的功能。

      而從實現的角度,MPI_AllReduce 這一接口有非常多不同的算法實現,不同的算法在不同情景下具有不同的優劣勢。在介紹算法之前,先來介紹一下 MPI_AllReduce 跟深度學習有什么關系。

      要理解它們之間的關系,首先要介紹一下模型訓練。在一次模型訓練中,首先我們會利用數據對模型進行前向的計算。所謂的前向計算,就是將模型上一層的輸出作為下一層的輸入,并計算下一層的輸出,從輸入層一直算到輸出層為止。根據目標函數,我們將反向計算模型中每個參數的導數,并且結合學習率來更新模型的參數。在分布式訓練的場景中,這一問題就會更為復雜一些。比如我們利用 4 個 Worker,利用不同的數據同步地訓練相同結構的模型(數據并行的同步訓練),在每個 Worker 計算好梯度后,就涉及到一個梯度同步的問題。每個 Worker 都有根據自己的數據計算的梯度,如何能夠讓自己得到的梯度也能作用于其他的 Worker 呢?有一種方式,是引入一個中心化的組件,參數服務器。所有的參數都存儲在參數服務器中,而 Worker 是萬年打工仔。Worker 們只負責辛辛苦苦地計算梯度,并且把計算好的梯度發送給參數服務器。參數服務器收到梯度后,執行一定的計算(梯度平均等)后,更新其維護的參數,再把更新好的新參數返回給所有的 Worker。Worker 打工仔們會再進行下一輪的前后向計算。

      除了這樣的方式之外,我們發現,MPI_AllReduce 語義也可以很好地滿足這一需要。

      我們可以把每個 Worker 看作是 MPI 概念中的一個進程,4 個 Worker 組成了一個 4 個進程組成的組。我們在這四個進程中對梯度進行一次 MPI_AllReduce。根據 MPI_AllReduce 的語義,所有參與計算的進程都有結果,所以梯度就完成了分發。只要在初始化的時候,我們可以保證每個 Worker 的參數是一致的,那在后續的迭代計算中,參數會一直保持一致,因為梯度信息是一致的。

      所以,MPI_AllReduce 的語義可以很好地解決深度學習中梯度同步的問題。但是,到底能不能使用它,還是要看下層的實現對這一場景是否足夠友好。

      在 OpenMPI 的實現中,MPI_AllReduce 主要有 7 種算法,具體可以參考?ompi/mca/coll/tuned/coll_tuned_allreduce_decision.c

      /* valid values for coll_tuned_allreduce_forced_algorithm */ static mca_base_var_enum_value_t allreduce_algorithms[] = { {0, "ignore"}, {1, "basic_linear"}, {2, "nonoverlapping"}, {3, "recursive_doubling"}, {4, "ring"}, {5, "segmented_ring"}, {6, "rabenseifner"}, {0, NULL} };

      我們可以靜態地指定算法,也可以讓 OpenMPI 來決定。當然,這不是這篇文章的重點。在深度學習這一場景下,被最為廣泛應用的是 RingAllReduce 這一實現。在 OpenMPI 中,這一實現在?ompi/mca/coll/base/coll_base_allreduce.c。它的注釋非常簡潔明了地介紹了實現原理,建議閱讀。簡單來說,它利用了 MPI 的端到端通信的原語,實現了 RingAllReduce 的功能。將 RingAllReduce 分為了兩個階段。第一個階段等價于 MPI_ReduceScatter 的語義,是將結果計算到不同的進程。第二個階段等價于 MPI_AllGather 語義,將計算結果聚合到所有進程。

      MPI_ReduceScatter 這一接口,本身也對應著非常多的實現。如果先做一次 MPI_Reduce 再做一次 MPI_Scatter(對應?ompi_coll_base_reduce_scatter_intra_nonoverlapping),性能一定無法接受。所以這里的實現使用的是?ompi_coll_base_reduce_scatter_intra_ring。通過 N-1 步,我們可以實現 MPI_ReduceScatter 的語義。其中每步中每個進程的上下行通信量都是 M/N。其中個 M 是數組的長度,N 是進程的數量。數組會被分為 N 等分,所以每次通信量是 M/N。

      第二個階段,就是 MPI_AllGather 了。MPI_AllGather 本身也有非常多的算法實現。RingAllReduce 使用的是?ompi_coll_base_allgather_intra_ring。這一實現一共需要 N-1 步。在第 i 步的時候,Rank r 進程會收到來自 r-1 進程的信息,這一信息中包括了 r-i-1 進程的數據。同時,r 進程會給 r+1 進程發送包含 r-i 進程的數據。所以每步中每個進程的上下行通信量同樣都是 M/N。

      所以整體來看,單步中每個進程的上下行通信量為 M/N,而在整個過程中,每個進程的上下行通信量都是 2(N-1)*M/N。所以我們認為 RingAllReduce 對帶寬特別友好,能很好地解決參數服務器架構中的帶寬瓶頸問題。其實 MPI_AllGather 除了 Ring 之外還有很多更高效的實現,但由于 MPI_RingAllReduce 中對帶寬的要求至少是 M/N,因此 ompi_coll_base_allgather_intra_ring 的實現已經完全夠用,在任意時刻都占滿 M/N 的上下行。

      將 RingAllReduce 引入深度學習,是百度的工作,這一工作開源在?baidu-research/tensorflow-allreduce。百度利用了 MPI 端到端通信的原語,重新實現了 ompi_coll_base_allgather_intra_ring 和 ompi_coll_base_reduce_scatter_intra_ring。至于不直接使用 MPI_AllReduce 的原語,猜測應該是為了兼容更多的 MPI 實現,同時避免動態選擇算法導致沒有啟用 RingAllReduce 的可能(盡管 OpenMPI 可以靜態選擇算法,但可能其他實現不支持)。

      百度的這一實現非常易懂,總共只有 3000 行不到的代碼,其中相當部分是測試。百度提供了一個自己的 Optimizer,重載了 compute_gradients 的實現。

      class DistributedOptimizer(tf.train.Optimizer): """ An optimizer that wraps another tf.Optimizer, using an MPI allreduce to average gradient values before applying gradients to model weights. """ def __init__(self, optimizer, name=None, use_locking=False): """ Construct a new DistributedOptimizer, which uses another optimizer under the hood for computing single-process gradient values and applying gradient updates after the gradient values have been averaged across all the MPI ranks. Args: optimizer: Optimizer to use for computing gradients and applying updates. name: Optional name prefix for the operations created when applying gradients. Defaults to "Distributed" followed by the provided optimizer type. use_locking: Whether to use locking when updating variables. See Optimizer.__init__ for more info. """ if name is None: name = "Distributed{}".format(type(optimizer).__name__) self._optimizer = optimizer super(DistributedOptimizer, self).__init__( name=name, use_locking=use_locking) def compute_gradients(self, *args, **kwargs): """ Compute gradients of all trainable variables. See Optimizer.compute_gradients() for more info. In DistributedOptimizer, compute_gradients() is overriden to also allreduce the gradients before returning them. """ gradients = (super(DistributedOptimizer, self) .compute_gradients(*args, **kwargs)) return [(allreduce(gradient), var) for (gradient, var) in gradients] ... class Session(tf.Session): """ A class for running TensorFlow operations, with copies of the same graph running distributed across different MPI nodes. The primary difference between `tf.Session` and `tf.contrib.mpi.Session` is that the MPI `Session` ensures that the `Session` options are correct for use with `tf.contrib.mpi`, and initializes MPI immediately upon the start of the session. """ def __init__(self, target='', graph=None, config=None): """ Creates a new TensorFlow MPI session. Unlike a normal `tf.Session`, an MPI Session may only use a single GPU, which must be specified in advance before the session is initialized. In addition, it only uses a single graph evaluation thread, and initializes MPI immediately upon starting. If no `graph` argument is specified when constructing the session, the default graph will be launched in the session. If you are using more than one graph (created with `tf.Graph()` in the same process, you will have to use different sessions for each graph, but each graph can be used in multiple sessions. In this case, it is often clearer to pass the graph to be launched explicitly to the session constructor. Args: target: (Optional.) The execution engine to connect to. graph: (Optional.) The `Graph` to be launched (described above). config: (Optional.) A `ConfigProto` protocol buffer with configuration options for the session. """ super(Session, self).__init__(target, graph, config=config) # Initialize MPI on the relevant device. # TODO: Move this to library load and eliminate mpi.Session() self.run(init())

      在初始化 optimizer,和使用 session 的時候,語句如下:

      optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer()) with mpi.Session() as session:

      在 optimizer 調用 compute_gradients 的時候,首先會利用 TF 自己的 optimizer 計算出本地梯度,然后利用 AllReduce 來得到各個進程平均后的梯度。而在 Session 初始化的時候會預先執行 MPI_Init 進行 MPI 環境的初始化。

      在底層,AllReduce 被注冊為 Op,在 ComputeAsync 中,計算請求被入隊到一個隊列中。這一隊列會被一個統一的后臺線程處理。之所以引入這樣一個后臺線程,在注釋中有詳細的介紹。

      在百度的實現中,不同 Rank 的角色是不一樣的,Rank 0 會充當 coordinator 的角色。它會協調來自其他 Rank 的 MPI 請求,起到一個調度協調的作用。這是一個工程上的考量,具體可以參考注釋。順便一提,百度的這個工作注釋非常詳盡,真乃學術界的典范。這一設計也被后來的 Horovod 采用。

      Horovod 相比于百度的工作,并無學術上的貢獻。但是 Horovod 扎實的工程實現,使得它受到了更多的關注。它最大的優勢在于對 RingAllReduce 進行了更高層次的抽象,使其支持多種不同的框架。同時引入了 Nvidia NCCL,對 GPU 更加友好。

      與百度的實現類似,Horovod 也需要先進行初始化。只不過百度把這個過程放在了 Session 構建的時候,而 Horovod 提供了顯式初始化的函數。在初始化的時候,Horovod 會調用 MPI_Comm_dup 獲取一個 Communicator。之所以不直接使用默認的?MPI_COMM_WORLD,參考這里的文檔:

      除此之外,在初始化的時候,Horovod 還會創建一個后臺線程。這里的后臺線程的作用與百度的實現類似。

      void horovod_init_comm(MPI_Comm comm) { MPI_Comm_dup(comm, &mpi_context.mpi_comm); InitializeHorovodOnce(nullptr, 0); } // Start Horovod background thread. Ensure that this is // only done once no matter how many times this function is called. void InitializeHorovodOnce(const int* ranks, int nranks) { // Ensure background thread is only started once. if (!horovod_global.initialize_flag.test_and_set()) { horovod_global.control_operation = ParseControllerOpsFromEnv(); horovod_global.cpu_operation = ParseCPUOpsFromEnv(); #if HAVE_MPI // Enable mpi is it's used either in cpu data transfer or controller if (horovod_global.cpu_operation == LibType::MPI || horovod_global.control_operation == LibType::MPI) { mpi_context.Enable(); } if (horovod_global.control_operation == LibType::MPI){ horovod_global.controller.reset(new MPIController( horovod_global.response_cache, horovod_global.tensor_queue, horovod_global.timeline, horovod_global.parameter_manager, mpi_context)); horovod_global.controller->SetRanks(ranks, nranks); } #endif // Reset initialization flag horovod_global.initialization_done = false; horovod_global.background_thread = std::thread( BackgroundThreadLoop, std::ref(horovod_global)); } // Wait to ensure that the background thread has finished initializing MPI. while (!horovod_global.initialization_done) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } LOG(DEBUG) << "Background thread init done"; }

      在這個后臺線程的初始化過程中,它會利用進程內共享的全局狀態在自己的內存里創建一些對象,以及一些邏輯判斷。比如要不要進行 Hierarchical AllReduce,要不要 AutoTune(后面會詳細介紹)等。這里是初始化階段的日志。

      $ horovodrun -np 2 python hvd.py [1,1]:[2020-07-09 10:27:48.952760: D horovod/common/utils/env_parser.cc:106] Using MPI to perform controller operations. [1,1]:[2020-07-09 10:27:48.952813: D horovod/common/utils/env_parser.cc:72] Using MPI to perform CPU operations. [1,1]:[2020-07-09 10:27:48.952922: D horovod/common/mpi/mpi_context.h:46] MPI context enabled. [1,1]:[2020-07-09 10:27:48.952968: D horovod/common/mpi/mpi_controller.h:32] MPI Controller Initialized. [1,0]:[2020-07-09 10:27:49. 27002: D horovod/common/utils/env_parser.cc:106] Using MPI to perform controller operations. [1,0]:[2020-07-09 10:27:49. 27064: D horovod/common/utils/env_parser.cc:72] Using MPI to perform CPU operations. [1,0]:[2020-07-09 10:27:49. 27094: D horovod/common/mpi/mpi_context.h:46] MPI context enabled. [1,0]:[2020-07-09 10:27:49. 27118: D horovod/common/mpi/mpi_controller.h:32] MPI Controller Initialized. [1,0]:[2020-07-09 10:27:49. 88254: D horovod/common/mpi/mpi_context.cc:142] Using MPI_COMM_WORLD as a communicator. [1,1]:[2020-07-09 10:27:49. 88459: D horovod/common/mpi/mpi_context.cc:142] Using MPI_COMM_WORLD as a communicator. [1,0]:[2020-07-09 10:27:49. 88947: D horovod/common/mpi/mpi_controller.cc:39] Started Horovod with 2 processes [1,0]:[2020-07-09 10:27:49. 89143: D horovod/common/mpi/mpi_controller.cc:80] MPI controller initialized. [1,0]:[2020-07-09 10:27:49. 89195: I horovod/common/operations.cc:506] [0]: Horovod Initialized [1,1]:[2020-07-09 10:27:49. 89147: D horovod/common/mpi/mpi_controller.cc:80] MPI controller initialized. [1,1]:[2020-07-09 10:27:49. 89489: I horovod/common/operations.cc:506] [1]: Horovod Initialized [1,0]:[2020-07-09 10:27:49. 89945: D horovod/common/operations.cc:649] Background thread init done [1,1]:[2020-07-09 10:27:49. 91335: D horovod/common/operations.cc:649] Background thread init done

      在初始化的過程中,有一些比較重要的對象會被構造出來。不過這里暫且按下不表,后續再介紹。在初始化好之后,我們利用下面的代碼進行模型的訓練:

      @tf.function def training_step(images, labels, first_batch): with tf.GradientTape() as tape: probs = mnist_model(images, training=True) loss_value = loss(labels, probs) # Horovod: add Horovod Distributed GradientTape. tape = hvd.DistributedGradientTape(tape) grads = tape.gradient(loss_value, mnist_model.trainable_variables) opt.apply_gradients(zip(grads, mnist_model.trainable_variables)) # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. # # Note: broadcast should be done after the first gradient step to ensure optimizer # initialization. if first_batch: hvd.broadcast_variables(mnist_model.variables, root_rank=0) hvd.broadcast_variables(opt.variables(), root_rank=0) return loss_value

      首先會利用 Bcast 來同步 Rank 0 進程的初始化參數給所有的進程,這里是為了保證初始參數一致。

      def broadcast_variables(variables, root_rank): """Broadcasts variables from root rank to all other processes. Arguments: variables: variables for broadcast root_rank: rank of the process from which global variables will be broadcasted to all other processes. """ broadcast_group = _make_broadcast_group_fn() return broadcast_group(variables, root_rank)

      由于我們是利用 TensorFlow 2 來進行訓練。所以梯度更新部分的實現不是基于計算圖的實現,而是使用?hvd.DistributedGradientTape。它的實現如下所示,當調用?gradient?的時候,首先會調用 tf.GradientTape 的同名函數,同時進行 AllReduce。這里的邏輯與百度實現中的 Optimizer 是否似曾相識:

      class _DistributedGradientTape(tf.GradientTape): def gradient(self, target, sources, output_gradients=None): gradients = super(self.__class__, self).gradient(target, sources, output_gradients) return self._allreduce_grads(gradients) @_cache def _make_allreduce_grads_fn(name, device_dense, device_sparse, compression, sparse_as_dense, op): def allreduce_grads(grads): with tf.name_scope(name + "_Allreduce"): if sparse_as_dense: grads = [tf.convert_to_tensor(grad) if grad is not None and isinstance(grad, tf.IndexedSlices) else grad for grad in grads] return [_allreduce_cond(grad, device_dense=device_dense, device_sparse=device_sparse, compression=compression, op=op) if grad is not None else grad for grad in grads] def _allreduce_cond(tensor, *args, **kwargs): def allreduce_fn(): return allreduce(tensor, *args, **kwargs) def id_fn(): return tensor return tf.cond(size_op() > 1, allreduce_fn, id_fn) def _allreduce(tensor, name=None, op=Sum): """An op which reduces an input tensor over all the Horovod processes. The default reduction is a sum. The reduction operation is keyed by the name of the op. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor. Returns: A tensor of the same shape and type as `tensor`, summed across all processes. """ if name is None and not _executing_eagerly(): name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name) return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op)

      allreduce_grads?會修改 name scope,添加后綴 _Allreduce。在后續的調用中,進行了一些復雜但不核心的邏輯,如壓縮等。最后調用?_allreduce。在這一函數中,會直接調用 C++ 實現的 Kernel。

      void ComputeAsync(OpKernelContext* context, DoneCallback done) override { OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()), done); ... auto enqueue_result = EnqueueTensorAllreduce( hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device, [context, done](const common::Status& status) { context->SetStatus(ConvertStatus(status)); done(); }, reduce_op); ... }

      在?ComputeAsync?里,會把這一 AllReduce 的請求入隊。可以看到,在 TensorFlow 支持的實現上,Horovod 與百度大同小異。都是自定義了 AllReduce Op,在 Op 中把請求入隊。

      所以在 Horovod 的日志中,我們可以看到這樣的日志(當然要設置?HOROVOD_LOG_LEVEL=trace?環境變量)。DistributedGradientTape 的 name scope 被改寫成了 DistributedGradientTape_Allreduce,名字被加上了 HorovodAllreduce_ 的前綴。

      [1,1]:[2020-07-09 10:27:56.839122: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_1_BiasAdd_BiasAddGrad_0 [1,1]:[2020-07-09 10:27:56.839176: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_1_MatMul_1_0 [1,1]:[2020-07-09 10:27:56.839280: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_BiasAdd_BiasAddGrad_0

      EnqueueTensorAllreduce?是進入了一個進程內共享的全局對象維護的一個隊列中。之前提到的后臺進程,會一直在執行一個循環?RunLoopOnce。在其中,后臺線程會利用?MPIController?來處理入隊的請求。MPIController?可以理解為是協調不同的 Rank 進程,處理請求的對象。這個抽象是百度所不具備的,主要是為了支持 Facebook gloo 等其他的集合計算庫。因此 Horovod 也有 GlooController 等等實現。

      MPI學習(轉載)

      在后臺線程里,最重要的一個函數調用是?ComputeResponseList。Horovod 也遵循著 Coordinator 的設計,與百度類似。無論是百度還是 Horovod 中的 Coordinator 都類似是 Actor 模式,主要起來協調多個進程工作的作用。在真正執行計算的時候,Horovod 同樣引入了一個新的抽象 op_manager。從某種程度來說,我們可以把 controller 看做是對通信和協調管理能力的抽象,而 op_manager 是對實際計算的抽象。

      class OperationManager { public: OperationManager(ParameterManager* param_manager, std::vector> allreduce_ops, std::vector> allgather_ops, std::vector> broadcast_ops, std::shared_ptr join_op, std::vector> adasum_ops, std::shared_ptr error_op); virtual ~OperationManager() = default; Status ExecuteAllreduce(std::vector& entries, const Response& response) const; ... }

      總結來說,Horovod 的設計與實現都與百度的工作并無二致,只是進行了更多的抽象,支持更多的通信庫,更多的訓練框架。這些工作雖然都是 dirty work,但也是它受歡迎的最大原因。Horovod 可以說是 RingAllReduce 數據并行訓練框架方面的 State-of-art 了,不過最近還有一個工作,同樣受到了很多的關注,那就是字節跳動的?BytePS,甚至發了 SOSP(羨慕到變形!)。論文在此處可預覽。不過論文和實現有挺大不同的,這里我們以開源實現為準,介紹一下 byteps。

      BytePS 的代碼目錄結構跟 Horovod 很像,在 TensorFlow 的支持上,做法與百度和 Horovod 并無二致。對?Optimizer?進行了包裝,實現了自定義的 Op。

      def push_pull_grads(grads): with tf.name_scope(self._name + "_Push_Pull") as scope: if self._sparse_as_dense: grads = [tf.convert_to_tensor(grad) if grad is not None and isinstance(grad, tf.IndexedSlices) else grad for grad in grads] return [push_pull(grad, scope, device_dense=self._device_dense, device_sparse=self._device_sparse, compression=self._compression, enable_async=self._enable_async) if grad is not None else grad for grad in grads] if _executing_eagerly(): self._push_pull_grads = tf.contrib.eager.defun(push_pull_grads) else: self._push_pull_grads = push_pull_grads def compute_gradients(self, *args, **kwargs): """Compute gradients of all trainable variables. See Optimizer.compute_gradients() for more info. In DistributedOptimizer, compute_gradients() is overriden to also push_pull the gradients before returning them. """ gradients = self._optimizer.compute_gradients(*args, **kwargs) if size() > 1 and not self._enable_async: grads, vars = zip(*gradients) avg_grads = self._push_pull_grads(grads) return list(zip(avg_grads, vars)) else: return gradients

      在 Horovod 里,C++ 的 Op 會把請求入隊到全局的隊列中,被后臺進程中。而在 BytePS 里,邏輯也類似。

      void ComputeAsync(::tensorflow::OpKernelContext* context, DoneCallback done) override { ... if (bps_context.initialized) { StartTask(context, done, tmp_name, bps_input, bps_output, ready_event); } else { std::thread t(StartTask, context, done, tmp_name, bps_input, bps_output, ready_event); t.detach(); } } void StartTask(::tensorflow::OpKernelContext* context, ::tensorflow::AsyncOpKernel::DoneCallback done, std::string node_name, std::shared_ptr byteps_input, std::shared_ptr byteps_output, std::shared_ptr ready_event) { ... auto queue_list = common::GetPushQueueList(device); auto queue_list_pull = common::GetPullQueueList(device); queue_list->insert(queue_list->end(), queue_list_pull->begin(), queue_list_pull->end()); // TODO: assign priority based on topological sort auto enqueue_result = EnqueueTensor(byteps_context, byteps_input, byteps_output, ready_event, device, -byteps_context.declared_key, 0, [context, done](const common::Status& status) { context->SetStatus(ConvertStatus(status)); done(); }, queue_list); OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done); }

      代碼注釋里寫到需要給 Tensor 根據拓撲序設定優先級,這個是在 BytePS 的論文中提到的一個非常重要的優化,看代碼這部分的邏輯應該已經實現了,具體可以見這里的討論。至于這里的注釋是什么意思,還需要問一下上游才能確定。

      最終請求會在 Partition 后入隊。EnqueueTensor?與 Horvod 雖然類似,但是它會劃分 Partition,默認是 4096000 字節一個 Task,這個優化在論文中也有提到,不過在開源實現中沒有找尋到論文中提到的基于貝葉斯優化的 AutoTune 的痕跡。

      跟 Horovod 相比,還有一個比較大的不同。BytePS 為了能夠流水線地處理 Push 和 Pull,引入了?QueueType?這個概念。上述代碼中的?queue_list?就是將為了處理 Push 和 Pull 的不同事件組成了一個事件隊列。后續 BytePS 會按照這一隊列依次處理事件,處理完里面的所有事件后,就完成了 PushPullGradients 的過程。

      Status EnqueueTensor(BPSContext &context, std::shared_ptr input, std::shared_ptr output, std::shared_ptr ready_event, const int device, const int priority, const int version, StatusCallback callback, std::shared_ptr> queue_list) { ... std::vector> partitions; PartitionTensor(e, partitions); ... unsigned int accumulated = 0; for (size_t i = 0; i < partitions.size(); ++i) { auto task = partitions[i]; task->key = context.key_list[i]; // assign the key now BPS_CHECK(task->tensor_name != ""); BPS_LOG(TRACE) << "EnqueueTensor: " << (task->tensor_name) << ", key=" << (task->key) << ", offset=" << (task->offset) << ", len=" << (task->len) << ", device=" << (task->device) << " rank=" << BytePSGlobal::GetLocalRank(); BytePSGlobal::GetScheduledQueue(e->queue_list[0])->addTask(task); accumulated += task->len; } auto tensor = (e->tensor ? e->tensor : e->output); BPS_CHECK(tensor); BPS_CHECK_EQ(accumulated, tensor->size()) << "accumulated partition size not equal to original tensor size"; BPS_LOG(TRACE) << "EnqueueTensor finished: " << name << ", rank=" << BytePSGlobal::GetLocalRank(); return Status::OK(); }

      BytePS 的 PS 部分是利用?dmlc/ps-lite?實現的,dmlc/ps-lite?也被用于 MXNet,因此 BytePS 的分布式訓練中也有三個角色,Server,Worker 和 Scheduler。其中的 Server 并不是傳統意義上的 Parameter Server,而是一個具備一定的計算能力和 KV 存儲能力的,只使用 CPU 的普通 Server。為了加法做的足夠好,Server 這邊對加法操作也有一個抽象,那就是?CPUReducer。從這個角度來理解,BytePS 是采用了 Server-Worker 這種通信的模型實現了 AllReduce 的語義,并不是傳統意義上的 PS。從這樣的設計來講,確實可以通過 Tensor 的分區分段把流水線跑起來。就像知乎老師木的回答一樣。不過我對老師木說的這是把 ReduceScatter 和 AllGather 流水起來還是不太理解。

      總體來看,自從百度的 RingAllReduce 以來,后續越來越多的工作是關注在怎么樣能夠把計算和通信重疊起來,通過類似于流水線的方式隱藏掉一部分的成本。這里我很贊同 BytePS 在文檔里的兩點觀察:

      AllReduce 來自 HPC,如果在真實的集群環境里不能做到機架感知,會帶來一定的影響。BytePS 同樣對調度提出了新要求,但這種拿 CPU 和一點點的網絡換訓練速度的事情,非常值得一試。

      參考文獻

      分布式訓練的方案和效率對比

      License

      This article is licensed under?CC BY-NC-SA 3.0.

      Please contact me for commercial use.

      任務調度 機器學習

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:數字化企業無代碼開發平臺(數字化信息代碼
      下一篇:excel表格中表格線如何變細
      相關文章
      国产人成亚洲第一网站在线播放| 亚洲AV永久无码精品放毛片| 国产精品无码亚洲精品2021| 亚洲精品无码久久久久牙蜜区| 亚洲精品美女久久久久9999| 亚洲欧洲国产日韩精品| 亚洲αv在线精品糸列| 亚洲AV永久无码精品水牛影视| 国产亚洲综合色就色| 亚洲欧洲日产国码av系列天堂| 国产午夜亚洲不卡| 伊人久久综在合线亚洲91| 久久亚洲高清综合| 亚洲色大成网站www永久一区| 亚洲一区二区精品视频| 黑人大战亚洲人精品一区 | 亚洲熟妇av一区二区三区漫画| 国产日韩成人亚洲丁香婷婷| 国产综合成人亚洲区| 国产精品亚洲а∨天堂2021| 青青青国产色视频在线观看国产亚洲欧洲国产综合 | 亚洲综合精品网站在线观看| 久久国产成人精品国产成人亚洲| 中文字幕精品无码亚洲字| 国产亚洲成AV人片在线观黄桃| 亚洲成在人线av| 在线电影你懂的亚洲| 亚洲国产日韩在线人成下载| 亚洲三级在线观看| 亚洲中文字幕久久精品蜜桃| 亚洲乱码国产乱码精华| 国产大陆亚洲精品国产| 国产国拍亚洲精品福利| 亚洲精品乱码久久久久久按摩| 亚洲精品线在线观看| 亚洲91精品麻豆国产系列在线| 亚洲综合av一区二区三区| 狠狠入ady亚洲精品| 国产成人亚洲精品狼色在线| 久久综合图区亚洲综合图区| 亚洲第一成年网站大全亚洲|