如何使用modelarts訓練海量數據
在ModelArts上使用notebook上使用evs空間默認大小是5G,能滿足大部分文本和圖片訓練模型的需求。如果訓練數據稍微超過這個限額,可以適當的擴增下空間。但如果訓練對象是視頻,或是實際生成過程中的海量數據,這個空間就顯得小了,這時候擴增evs空間就顯得很不經濟了。
最近老山便碰到這樣的案例,客戶的訓練數據大約在1T的量級,在obs上存儲的數據結構大概如下圖所示。
your-obs-name
└── ...
└── video
├── folder1
│?? ├── text.txt
│?? └── video.mp4
├── folder2
│?? ├── text.txt
│?? └── video.mp4
├── folder3
│?? ├── text.txt
│?? └── video.mp4
├── folder4
│?? ├── text.txt
│?? └── video.mp4
├── folder5
│?? ├── text.txt
│?? └── video.mp4
├── ...
雖然使用華為云自帶的moxing模塊可以直接讀取obs的數據,但由于實質是通過http實時讀取數據,這個速度比從evs的ssd硬盤上讀取數據要慢得多。而解決方案也比較直接,在evs上開辟一個固定大小的空間作為緩存區,一方面不斷把obs數據讀入緩存區,如果緩存區滿了,就等待其騰出空間,另一方面訓練任務消費evs數據,當消費完后便刪除數據。
程序上也自然選用生產者-消費者模型。程序定義了管道類Pipeline,有生產者線程producer用于將obs數據保存到evs;同時輸出evs數據用于外部模型的消費。由于每個視頻文件都單獨放在一個文件夾下,所以程序的輸出對象也是這個文件夾在evs上保存的地址,如folder1,folder2等。至于讀取文件夾內部文件信息等消費工作,由用戶自行定義。
不多說,直接上代碼。
import moxing as mox
mox.file.shift('os', 'mox')
import os, shutil
from queue import Queue
from time import sleep
import threading
import logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",)
class ObsClient:
def __init__(self, root):
'''獲取obs路徑上需要讀取的文件夾的相關信息'''
self.root = root
self.directory = self.list_directory()
self.maxSize = self.getMaxSize()
def getMaxSize(self):
'''最大的文件夾的大小'''
return max([size for *_, size in self.directory])
def list_directory(self):
'''輸出用于訓練的文件夾的路徑,輸出directory:
[(文件夾相對路徑,文件夾絕對路徑,文件夾大小), ...]
'''
directory = []
folders = mox.file.list_directory(self.root)
for folder in folders:
folderPath = os.path.join(self.root, folder)
if mox.file.is_directory(folderPath):
size = self.get_size(folderPath)
directory.append((folder, folderPath, size))
return directory
def get_size(self, path):
'''獲取文件(夾)的大小'''
if mox.file.is_directory(path):
return self.get_size_folder(path)
return self.get_size_file(path)
def get_size_file(self, path):
'''獲取文件的大小'''
return mox.file.get_size(path)
def get_size_folder(self, path):
'''獲取文件夾的大小'''
size = 0
for filename in mox.file.list_directory(path, recursive=True):
filepath = os.path.join(path, filename)
if not mox.file.is_directory(filepath):
size+= self.get_size_file(filepath)
return size
class EvsClient:
def __init__(self, root, memory, queue, directory, interval = 0.1):
self.root = root # evs緩存區根目錄
self.directory = directory # obs文件夾信息
self.size = 0 # evs緩存區已使用的空間
self.memory = memory # evs上用于緩存的空間大小
self.queue = queue # 隊列,存儲了evs緩存區文件夾的信息
self.interval = interval # 如果緩存區滿后,查詢緩存大小的間隔時間
def remove(self, folder, size):
'''刪除evs文件夾,在文件夾被消費后調用'''
logging.info(f"consumer: start removing folder {folder} with size {size}|{self.size}")
shutil.rmtree(folder, True)
self.size -= size
logging.info(f"consumer: end removing folder {folder} with size -{size}|{self.size}")
def work(self):
'''生成者主程序,用于從obs中copy文件夾到evs'''
for relObsFolder, absObsFolder, size in self.directory:
while True:
# 緩存區沒滿,就copy文件
if not self.waitOrDo(size):
self.copy(relObsFolder, absObsFolder, size)
break
# 如果緩存區滿了,就等待
sleep(self.interval)
# 當所有文件都拷貝后,置入結束符(None, None)
self.queue.put((None, None))
def waitOrDo(self, size):
'''返回True時等待,返回False時工作'''
return self.size + size > self.memory
def copy(self, relObsFolder, absObsFolder, size):
'''從obs中copy文件夾到evs'''
evsFolder = os.path.join(self.root, relObsFolder)
logging.info(f"producer: start copying folder {relObsFolder} with size {size}|{self.size}")
mox.file.copy_parallel(absObsFolder, evsFolder)
self.queue.put((evsFolder, size))
self.size += size
logging.info(f"producer: end copying folder {relObsFolder} with size +{size}|{self.size}")
class Pipeline:
def __init__(self, evsRoot, obsRoot, memory = '1g', timeout = 300, interval = 0.1):
self.memory = self.rescript(memory) # evs上用于緩存的空間大小
self.timeout = timeout # 消費者獲取evs緩存區文件夾的最長等待時間
self.queue = Queue() # 隊列,存儲了evs緩存區文件夾的信息
self.obsClient = ObsClient(obsRoot) # 存儲obs上的文件夾信息
# evs上的操作
self.evsClient = EvsClient(evsRoot, self.memory, self.queue, self.obsClient.directory, interval)
self.checkMemory() # 驗證evs上用于緩存的空間大小是否足夠大
def checkMemory(self):
'''evs上用于緩存的空間大小不能小于obs上最大文件夾大小'''
if self.memory raise Exception("memory should bigger than maxFolderSize!") def rescript(self, memory): '''將文本或數值類型的memory轉寫成數值''' try: if isinstance(memory, str): if memory[-1].lower()=='g': return int(float(memory[:-1])*1024*1024*1024) elif memory[-1].lower()=='m': return int(float(memory[:-1])*1024*1024) elif memory[-1].lower()=='k': return int(float(memory[:-1])*1024) else: return int(float(memory)) else: return int(float(memory)) except: raise Exception("Error when rescripting memory!") def __iter__(self): '''生成器,yield輸出evs文件夾路徑和大小''' # 生產者線程 producer = threading.Thread(target = self.evsClient.work) producer.start() # 主程序提供生成器用于消費,輸出evs文件夾路徑和大小 while True: logging.info(f"consumer: start to get the queue") path, size = self.queue.get(timeout=self.timeout) logging.info(f"consumer: get the queue {path}, {size} ") if path is None and size is None: break yield path, size self.evsClient.remove(path, size) # 主程序等待 producer.join() if __name__ == '__main__': # 使用示例 for path, size in Pipeline('./video', 's3://your-obs-name/.../video'): do_job(path, size) 如果你覺得老山的文章不錯,不妨點擊下關注。 大數據開發 ModelArts
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。