如何使用modelarts訓練海量數據

      網友投稿 761 2022-05-29

      ModelArts上使用notebook上使用evs空間默認大小是5G,能滿足大部分文本和圖片訓練模型的需求。如果訓練數據稍微超過這個限額,可以適當的擴增下空間。但如果訓練對象是視頻,或是實際生成過程中的海量數據,這個空間就顯得小了,這時候擴增evs空間就顯得很不經濟了。

      最近老山便碰到這樣的案例,客戶的訓練數據大約在1T的量級,在obs上存儲的數據結構大概如下圖所示。

      your-obs-name

      └── ...

      └── video

      ├── folder1

      │?? ├── text.txt

      │?? └── video.mp4

      ├── folder2

      │?? ├── text.txt

      │?? └── video.mp4

      ├── folder3

      │?? ├── text.txt

      │?? └── video.mp4

      ├── folder4

      │?? ├── text.txt

      │?? └── video.mp4

      ├── folder5

      │?? ├── text.txt

      │?? └── video.mp4

      ├── ...

      雖然使用華為云自帶的moxing模塊可以直接讀取obs的數據,但由于實質是通過http實時讀取數據,這個速度比從evs的ssd硬盤上讀取數據要慢得多。而解決方案也比較直接,在evs上開辟一個固定大小的空間作為緩存區,一方面不斷把obs數據讀入緩存區,如果緩存區滿了,就等待其騰出空間,另一方面訓練任務消費evs數據,當消費完后便刪除數據。

      程序上也自然選用生產者-消費者模型。程序定義了管道類Pipeline,有生產者線程producer用于將obs數據保存到evs;同時輸出evs數據用于外部模型的消費。由于每個視頻文件都單獨放在一個文件夾下,所以程序的輸出對象也是這個文件夾在evs上保存的地址,如folder1,folder2等。至于讀取文件夾內部文件信息等消費工作,由用戶自行定義。

      不多說,直接上代碼。

      import moxing as mox

      mox.file.shift('os', 'mox')

      import os, shutil

      from queue import Queue

      from time import sleep

      import threading

      import logging

      logging.basicConfig(level=logging.INFO,

      format="%(asctime)s %(name)s %(levelname)s %(message)s",)

      class ObsClient:

      def __init__(self, root):

      '''獲取obs路徑上需要讀取的文件夾的相關信息'''

      self.root = root

      self.directory = self.list_directory()

      self.maxSize = self.getMaxSize()

      def getMaxSize(self):

      '''最大的文件夾的大小'''

      return max([size for *_, size in self.directory])

      def list_directory(self):

      '''輸出用于訓練的文件夾的路徑,輸出directory:

      [(文件夾相對路徑,文件夾絕對路徑,文件夾大小), ...]

      '''

      directory = []

      folders = mox.file.list_directory(self.root)

      for folder in folders:

      folderPath = os.path.join(self.root, folder)

      if mox.file.is_directory(folderPath):

      size = self.get_size(folderPath)

      directory.append((folder, folderPath, size))

      return directory

      def get_size(self, path):

      '''獲取文件(夾)的大小'''

      if mox.file.is_directory(path):

      return self.get_size_folder(path)

      return self.get_size_file(path)

      def get_size_file(self, path):

      '''獲取文件的大小'''

      return mox.file.get_size(path)

      def get_size_folder(self, path):

      '''獲取文件夾的大小'''

      size = 0

      for filename in mox.file.list_directory(path, recursive=True):

      filepath = os.path.join(path, filename)

      if not mox.file.is_directory(filepath):

      size+= self.get_size_file(filepath)

      return size

      class EvsClient:

      def __init__(self, root, memory, queue, directory, interval = 0.1):

      self.root = root # evs緩存區根目錄

      self.directory = directory # obs文件夾信息

      self.size = 0 # evs緩存區已使用的空間

      self.memory = memory # evs上用于緩存的空間大小

      self.queue = queue # 隊列,存儲了evs緩存區文件夾的信息

      self.interval = interval # 如果緩存區滿后,查詢緩存大小的間隔時間

      def remove(self, folder, size):

      '''刪除evs文件夾,在文件夾被消費后調用'''

      logging.info(f"consumer: start removing folder {folder} with size {size}|{self.size}")

      shutil.rmtree(folder, True)

      self.size -= size

      logging.info(f"consumer: end removing folder {folder} with size -{size}|{self.size}")

      def work(self):

      '''生成者主程序,用于從obs中copy文件夾到evs'''

      如何使用modelarts訓練海量數據

      for relObsFolder, absObsFolder, size in self.directory:

      while True:

      # 緩存區沒滿,就copy文件

      if not self.waitOrDo(size):

      self.copy(relObsFolder, absObsFolder, size)

      break

      # 如果緩存區滿了,就等待

      sleep(self.interval)

      # 當所有文件都拷貝后,置入結束符(None, None)

      self.queue.put((None, None))

      def waitOrDo(self, size):

      '''返回True時等待,返回False時工作'''

      return self.size + size > self.memory

      def copy(self, relObsFolder, absObsFolder, size):

      '''從obs中copy文件夾到evs'''

      evsFolder = os.path.join(self.root, relObsFolder)

      logging.info(f"producer: start copying folder {relObsFolder} with size {size}|{self.size}")

      mox.file.copy_parallel(absObsFolder, evsFolder)

      self.queue.put((evsFolder, size))

      self.size += size

      logging.info(f"producer: end copying folder {relObsFolder} with size +{size}|{self.size}")

      class Pipeline:

      def __init__(self, evsRoot, obsRoot, memory = '1g', timeout = 300, interval = 0.1):

      self.memory = self.rescript(memory) # evs上用于緩存的空間大小

      self.timeout = timeout # 消費者獲取evs緩存區文件夾的最長等待時間

      self.queue = Queue() # 隊列,存儲了evs緩存區文件夾的信息

      self.obsClient = ObsClient(obsRoot) # 存儲obs上的文件夾信息

      # evs上的操作

      self.evsClient = EvsClient(evsRoot, self.memory, self.queue, self.obsClient.directory, interval)

      self.checkMemory() # 驗證evs上用于緩存的空間大小是否足夠大

      def checkMemory(self):

      '''evs上用于緩存的空間大小不能小于obs上最大文件夾大小'''

      if self.memory

      raise Exception("memory should bigger than maxFolderSize!")

      def rescript(self, memory):

      '''將文本或數值類型的memory轉寫成數值'''

      try:

      if isinstance(memory, str):

      if memory[-1].lower()=='g':

      return int(float(memory[:-1])*1024*1024*1024)

      elif memory[-1].lower()=='m':

      return int(float(memory[:-1])*1024*1024)

      elif memory[-1].lower()=='k':

      return int(float(memory[:-1])*1024)

      else:

      return int(float(memory))

      else:

      return int(float(memory))

      except:

      raise Exception("Error when rescripting memory!")

      def __iter__(self):

      '''生成器,yield輸出evs文件夾路徑和大小'''

      # 生產者線程

      producer = threading.Thread(target = self.evsClient.work)

      producer.start()

      # 主程序提供生成器用于消費,輸出evs文件夾路徑和大小

      while True:

      logging.info(f"consumer: start to get the queue")

      path, size = self.queue.get(timeout=self.timeout)

      logging.info(f"consumer: get the queue {path}, {size} ")

      if path is None and size is None:

      break

      yield path, size

      self.evsClient.remove(path, size)

      # 主程序等待

      producer.join()

      if __name__ == '__main__':

      # 使用示例

      for path, size in Pipeline('./video', 's3://your-obs-name/.../video'):

      do_job(path, size)

      如果你覺得老山的文章不錯,不妨點擊下關注。

      大數據開發 ModelArts

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何設計并實現傳統小區智能化
      下一篇:9個超級實用的 ES6 特性,超級實用哦!
      相關文章
      亚洲xxxx18| 久久久国产亚洲精品| 亚洲av中文无码| 欧美日韩亚洲精品| 国产精品亚洲а∨天堂2021| 亚洲色偷偷色噜噜狠狠99| 国产色在线|亚洲| 激情五月亚洲色图| 中文日韩亚洲欧美制服| 亚洲永久在线观看| 亚洲 欧洲 日韩 综合在线| 国产精品亚洲片夜色在线| 亚洲高清资源在线观看| 亚洲精品白色在线发布| 亚洲欧洲日本精品| 亚洲国产美女视频| 国产精品亚洲午夜一区二区三区| 亚洲人成小说网站色| 亚洲欧美自偷自拍另类视| 亚洲中文无码永久免| 亚洲精品乱码久久久久蜜桃| 亚洲AV无码AV日韩AV网站| 一本久久综合亚洲鲁鲁五月天| 亚洲中文字幕久久精品蜜桃| 亚洲粉嫩美白在线| 2020天堂在线亚洲精品专区| 久久影视综合亚洲| 久久精品国产亚洲一区二区三区| 亚洲精品国产精品乱码不99| 亚洲AV无码一区东京热| 亚洲首页在线观看| 亚洲人成网站看在线播放| 亚洲欧美日韩综合久久久| 国产亚洲情侣久久精品| 狠狠亚洲婷婷综合色香五月排名 | 在线精品亚洲一区二区小说| 亚洲精品中文字幕乱码三区| 亚洲欧洲日产国码久在线观看| 亚洲综合久久成人69| 亚洲色欲色欲www在线播放| 亚洲爆乳少妇无码激情|