HTTP 狀態消息
832
2022-05-30
終于開始了這個很感興趣但是一直覺得困難重重的源碼解析工作,也算是一個好的開端。既然開始了,就認真耐心的看下去吧。廢話不多說,開始!
HDFS源碼解析之客戶端寫數據(一)
hdfs源碼解析之客戶端寫數據(二)
在我們客戶端寫數據的代碼大致如下:
Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://172.16.40.119:8020"); String a = "This is my first hdfs file!"; //① 得到DistributedFileSystem FileSystem filesytem = FileSystem.get(conf); //② 得到輸出流FSDataOutputStream FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); //③ 開始寫數據 fs.write(a.getBytes()); fs.flush();
最重要的三步已經在上面標注,通過源碼分析每一步所發生的細節是什么?
FileSystem filesytem = FileSystem.get(conf);
其中conf是一個Configuration對象。執行這行代碼后就進入到FileSystem.get(Configuration conf)方法中,可以看到,在這個方法中先通過getDefaultUri()方法獲取文件系統對應的的URI,該URI保存了與文件系統對應的協議和授權信息,如:hdfs://localhost:9000。這個URI又是如何得到的呢?是在CLASSPATH中的配置文件中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY, "file:///") 這么一個實參,在筆者項目的CLASSPATH中的core-site.xml文件中有這么一個配置:
而常量FS_DEFAULT_NAME_KEY對應的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是hdfs://localhost:9000。
URI創建完成之后就進入到FileSystem.get(URI uri, Configuration conf)方法。在這個方法中,先執行一些檢查,檢查URI的協議和授權信息是否為空,然后再直接或簡介調用該方法,最重要的是執行
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的文件系統 return createFileSystem(uri, conf); } return CACHE.get(uri, conf);
常量CACHE用于緩存已經打開的、可共享的文件系統,它是FileSystem類的靜態內部類FileSystem.Cache的對象,在其內部使用一個Map存儲文件系統
private final Map
這個鍵值對映射的鍵是FileSystem.Cache.Key類型,它有三個成員變量:
/**URI模式**/
final String scheme;
/**URI的授權部分**/
final String authority;
/**保存了打開具體文件系統的本地用戶信息,不同本地用戶打開的具體文件系統也是不能共享的**/
final UserGroupInformation ugi;
由于FileSystem.Cache表示可共享的文件系統,所以這個Key就用于區別不同的文件系統對象,如一個一個文件系統對象可共享,那么FileSystem.Cache.Key的三個成員變量相等,在這個類中重寫了hashCode()方法和equals()方法,就是用于判斷這三個變量是否相等。根據《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》這本書的介紹,在Hadoop1.0版本中FileSystem.Cache.Key類還有一個unique字段,這個字段表示,如果其他3個字段相等的情況,下如果用戶不想共享這個文件系統,就設置這個值(默認為0),但是不知道現在為什么去除了,還沒搞清楚,有哪位同學知道的話麻煩告知,謝謝。
回到FileSystem.get(final URI uri, final Configuration conf)方法的最后一行語句return CACHE.get(uri, conf),調用了FileSystem.Cahce.get()方法獲取具體的文件系統對象,該方法代碼如下:
FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); FileSystem fs = null; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); return fs; } }
在這個方法中先查看已經map中是否已經緩存了要獲取的文件系統對象,如果已經有了,直接從集合中去除,如果沒有才進行創建,由于FileSystem.CACHE為static類型,所以在同一時刻可能有多個線程在訪問,所以需要在Cache類的方法中使用同步的操作來取值和設置值。這個方法比較簡單,最核心的就是
fs = createFileSystem(uri, conf);
這行語句,它執行了具體的文件系統對象的創建的功能。createFileSystem()方法是FileSystem的一個私有方法,其代碼如下:
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
其實現就是先從配置文件取得URI對應的類,如在core-default.xml文件中屬性(鍵)fs.hdfs.impl對應的值是org.apache.hadoop.hdfs.DistributedFileSystem,相應的XML代碼如下:
所以若uri對應fs.hdfs.impl,那么createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class對象。然后再利用反射,創建org.apache.hadoop.hdfs.DistributedFileSystem的對象fs。然后執行fs.initialize(uri, conf);初始化fs對象。DistributedFileSystem是Hadoop分布式文件系統的實現類,實現了Hadoop文件系統的界面,提供了處理HDFS文件和目錄的相關事務。
這行代碼
FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true);
主要做了兩件事:
①通過rpc調用在namenode命名空間創建文件條目;
②創建該文件對應的輸出流。
filesytem.create()最終調用的是DistributedFileSystem的create方法
@Override //返回HdfsDataOutputStream對象,繼承FSDataOutputStream public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet
在上面代碼中首先構建DFSOutputStream,然后傳給dfs.createWrappedOutputStream構建HdfsDataOutputStream,看下dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何構建輸出流DFSOutputStream的。
public DFSOutputStream create(String src, FsPermission permission, EnumSet
再進到DFSOutputStream.newStreamForCreate方法中
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet
在newStreamForCreate方法中,先定義一個文件狀態變量stat,然后不停的嘗試通過namenode創建文件條目,創建成功后再創建改文件的輸出流,然后通過out.start()啟動DataQueue線程開始發送數據。我們重點看一下namenode是怎么創建文件條目的。打開dfsClient.namenode.create方法,dfsClient.namenode是在dfsClient中聲明的ClientProtocol對象。ClientProtocol是客戶端協議接口,namenode端需要實現該接口的create方法,通過動態代理的方式把結果返回給客戶端,即是rpc遠程調用。那么看下namenode端是怎么實現這個create方法的,打開這個方法的實現類我們發現了NameNodeRpcServer這個類,這個類是實現namenode rpc機制的核心類,繼承了各種協議接口并實現。
打開NameNodeRpcServer的create方法:
@Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable
打開namesystem.startFile,namesystem是NameNodeRpcServer中聲明的FSNamesystem對象:
/** * Create a new file entry in the namespace. * 在命名空間創建一個文件條目 * * For description of parameters and exceptions thrown see * {@link ClientProtocol#create}, except it returns valid file status upon * success */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet * For description of parameters and exceptions thrown see * {@link ClientProtocol#create} */ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, boolean create, boolean overwrite, boolean createParent, short replication, long blockSize, boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version, EncryptedKeyVersion edek, boolean logRetryEntry) throws FileAlreadyExistsException, AccessControlException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, RetryStartFileException, IOException { //檢查當前線程是否有寫鎖,沒有退出 assert hasWriteLock(); // Verify that the destination does not exist as a directory already. //判斷文件是否已經作為目錄存在 //INodesInPath:包含從給定路徑解析的INode信息。 //獲取給定文件或目錄的inode信息 final INodesInPath iip = dir.getINodesInPath4Write(src); final INode inode = iip.getLastINode(); if (inode != null && inode.isDirectory()) { throw new FileAlreadyExistsException(src + " already exists as a directory"); } //FileEncryptionInfo封裝加密文件的所有加密相關信息 FileEncryptionInfo feInfo = null; if (dir.isInAnEZ(iip)) { // The path is now within an EZ, but we're missing encryption parameters if (suite == null || edek == null) { throw new RetryStartFileException(); } // Path is within an EZ and we have provided encryption parameters. // Make sure that the generated EDEK matches the settings of the EZ. String ezKeyName = dir.getKeyName(iip); if (!ezKeyName.equals(edek.getEncryptionKeyName())) { throw new RetryStartFileException(); } feInfo = new FileEncryptionInfo(suite, version, edek.getEncryptedKeyVersion().getMaterial(), edek.getEncryptedKeyIv(), ezKeyName, edek.getEncryptionKeyVersionName()); Preconditions.checkNotNull(feInfo); } final INodeFile myFile = INodeFile.valueOf(inode, src, true); if (isPermissionEnabled) { if (overwrite && myFile != null) { checkPathAccess(pc, src, FsAction.WRITE); } /* * To overwrite existing file, need to check 'w' permission * of parent (equals to ancestor in this case) */ checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); } try { BlocksMapUpdateInfo toRemoveBlocks = null; if (myFile == null) { if (!create) { throw new FileNotFoundException("Can't overwrite non-existent " + src + " for client " + clientMachine); } } else { if (overwrite) { toRemoveBlocks = new BlocksMapUpdateInfo(); List 這個方法就是生成文件條目的核心方法,首先判斷檢查當前線程是否有寫鎖,沒有退出。FSDirectory dir是一個命名空間的內存樹。 hdfs源碼解析之客戶端寫數據(二) EI企業智能 可信智能計算服務 TICS 數據湖治理中心 DGC 智能數據
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
* * Once the file is create the client then allocates a new block with the next * call using {@link ClientProtocol#addBlock}. *