hdfs源碼解析客戶端寫數據(一)

      網友投稿 832 2022-05-30

      終于開始了這個很感興趣但是一直覺得困難重重的源碼解析工作,也算是一個好的開端。既然開始了,就認真耐心的看下去吧。廢話不多說,開始!

      HDFS源碼解析之客戶端寫數據(一)

      hdfs源碼解析之客戶端寫數據(二)

      在我們客戶端寫數據的代碼大致如下:

      Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://172.16.40.119:8020"); String a = "This is my first hdfs file!"; //① 得到DistributedFileSystem FileSystem filesytem = FileSystem.get(conf); //② 得到輸出流FSDataOutputStream FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); //③ 開始寫數據 fs.write(a.getBytes()); fs.flush();

      最重要的三步已經在上面標注,通過源碼分析每一步所發生的細節是什么?

      FileSystem filesytem = FileSystem.get(conf);

      其中conf是一個Configuration對象。執行這行代碼后就進入到FileSystem.get(Configuration conf)方法中,可以看到,在這個方法中先通過getDefaultUri()方法獲取文件系統對應的的URI,該URI保存了與文件系統對應的協議和授權信息,如:hdfs://localhost:9000。這個URI又是如何得到的呢?是在CLASSPATH中的配置文件中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY, "file:///") 這么一個實參,在筆者項目的CLASSPATH中的core-site.xml文件中有這么一個配置:

      fs.default.name hdfs://localhost:9000

      而常量FS_DEFAULT_NAME_KEY對應的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是hdfs://localhost:9000。

      URI創建完成之后就進入到FileSystem.get(URI uri, Configuration conf)方法。在這個方法中,先執行一些檢查,檢查URI的協議和授權信息是否為空,然后再直接或簡介調用該方法,最重要的是執行

      String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的文件系統 return createFileSystem(uri, conf); } return CACHE.get(uri, conf);

      常量CACHE用于緩存已經打開的、可共享的文件系統,它是FileSystem類的靜態內部類FileSystem.Cache的對象,在其內部使用一個Map存儲文件系統

      private final Map map = new HashMap();

      這個鍵值對映射的鍵是FileSystem.Cache.Key類型,它有三個成員變量:

      /**URI模式**/

      final String scheme;

      /**URI的授權部分**/

      final String authority;

      /**保存了打開具體文件系統的本地用戶信息,不同本地用戶打開的具體文件系統也是不能共享的**/

      final UserGroupInformation ugi;

      由于FileSystem.Cache表示可共享的文件系統,所以這個Key就用于區別不同的文件系統對象,如一個一個文件系統對象可共享,那么FileSystem.Cache.Key的三個成員變量相等,在這個類中重寫了hashCode()方法和equals()方法,就是用于判斷這三個變量是否相等。根據《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》這本書的介紹,在Hadoop1.0版本中FileSystem.Cache.Key類還有一個unique字段,這個字段表示,如果其他3個字段相等的情況,下如果用戶不想共享這個文件系統,就設置這個值(默認為0),但是不知道現在為什么去除了,還沒搞清楚,有哪位同學知道的話麻煩告知,謝謝。

      回到FileSystem.get(final URI uri, final Configuration conf)方法的最后一行語句return CACHE.get(uri, conf),調用了FileSystem.Cahce.get()方法獲取具體的文件系統對象,該方法代碼如下:

      FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); FileSystem fs = null; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); return fs; } }

      在這個方法中先查看已經map中是否已經緩存了要獲取的文件系統對象,如果已經有了,直接從集合中去除,如果沒有才進行創建,由于FileSystem.CACHE為static類型,所以在同一時刻可能有多個線程在訪問,所以需要在Cache類的方法中使用同步的操作來取值和設置值。這個方法比較簡單,最核心的就是

      fs = createFileSystem(uri, conf);

      hdfs源碼解析之客戶端寫數據(一)

      這行語句,它執行了具體的文件系統對象的創建的功能。createFileSystem()方法是FileSystem的一個私有方法,其代碼如下:

      private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }

      其實現就是先從配置文件取得URI對應的類,如在core-default.xml文件中屬性(鍵)fs.hdfs.impl對應的值是org.apache.hadoop.hdfs.DistributedFileSystem,相應的XML代碼如下:

      fs.hdfs.impl org.apache.hadoop.hdfs.DistributedFileSystem The FileSystem for hdfs: uris.

      所以若uri對應fs.hdfs.impl,那么createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class對象。然后再利用反射,創建org.apache.hadoop.hdfs.DistributedFileSystem的對象fs。然后執行fs.initialize(uri, conf);初始化fs對象。DistributedFileSystem是Hadoop分布式文件系統的實現類,實現了Hadoop文件系統的界面,提供了處理HDFS文件和目錄的相關事務。

      這行代碼

      FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true);

      主要做了兩件事:

      ①通過rpc調用在namenode命名空間創建文件條目;

      ②創建該文件對應的輸出流。

      filesytem.create()最終調用的是DistributedFileSystem的create方法

      @Override //返回HdfsDataOutputStream對象,繼承FSDataOutputStream public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { //此文件系統的統計信息,每次寫操作增加1 /* 跟蹤有關在FileSystem中完成了多少次讀取,寫入等操作的統計信息。 由于每個FileSystem只有一個這樣的對象, 因此通常會有許多線程寫入此對象。 幾乎打開文件上的每個操作都將涉及對該對象的寫入。 相比之下,大多數程序不經常閱讀統計數據, 而其他程序則根本不這樣做。 因此,這針對寫入進行了優化。 每個線程都寫入自己的線程本地內存區域。 這消除了爭用, 并允許我們擴展到許多線程。 為了讀取統計信息,讀者線程總計了所有線程本地數據區域的內容。*/ statistics.incrementWriteOps(1); //獲取絕對路徑 Path absF = fixRelativePart(f); /* 嘗試使用指定的FileSystem和Path調用重寫的doCall(Path)方法。 如果調用因UnresolvedLinkException失敗, 它將嘗試解析路徑并通過調用next(FileSystem,Path)重試該調用。*/ return new FileSystemLinkResolver() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); //返回HdfsDataOutputStream對象,并傳入DFSOutputStream對象 return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }

      在上面代碼中首先構建DFSOutputStream,然后傳給dfs.createWrappedOutputStream構建HdfsDataOutputStream,看下dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何構建輸出流DFSOutputStream的。

      public DFSOutputStream create(String src, FsPermission permission, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } String[] favoredNodeStrs = null; if (favoredNodes != null) { favoredNodeStrs = new String[favoredNodes.length]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort(); } } //DFSOutputStream.newStreamForCreate構建DFSOutputStream final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(result.getFileId(), result); return result; }

      再進到DFSOutputStream.newStreamForCreate方法中

      static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out; }

      在newStreamForCreate方法中,先定義一個文件狀態變量stat,然后不停的嘗試通過namenode創建文件條目,創建成功后再創建改文件的輸出流,然后通過out.start()啟動DataQueue線程開始發送數據。我們重點看一下namenode是怎么創建文件條目的。打開dfsClient.namenode.create方法,dfsClient.namenode是在dfsClient中聲明的ClientProtocol對象。ClientProtocol是客戶端協議接口,namenode端需要實現該接口的create方法,通過動態代理的方式把結果返回給客戶端,即是rpc遠程調用。那么看下namenode端是怎么實現這個create方法的,打開這個方法的實現類我們發現了NameNodeRpcServer這個類,這個類是實現namenode rpc機制的核心類,繼承了各種協議接口并實現。

      打開NameNodeRpcServer的create方法:

      @Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws IOException { String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine); } if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } //經過一系列的檢查最終調用了namesystem.startFile方法, HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus( getRemoteUser().getShortUserName(), null, masked), clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions); metrics.incrFilesCreated(); metrics.incrCreateFileOps(); return fileStatus; }

      打開namesystem.startFile,namesystem是NameNodeRpcServer中聲明的FSNamesystem對象:

      /** * Create a new file entry in the namespace. * 在命名空間創建一個文件條目 * * For description of parameters and exceptions thrown see * {@link ClientProtocol#create}, except it returns valid file status upon * success */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { HdfsFileStatus status = null; CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); } try { //調用的startFileInt status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, cacheEntry != null); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; } finally { RetryCache.setState(cacheEntry, status != null, status); } return status; } 最后打開startFileInt方法中,可以看到又調用了startFileInternal方法: try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot create file" + src); src = resolvePath(src, pathComponents); toRemoveBlocks = startFileInternal(pc, src, permissions, holder, clientMachine, create, overwrite, createParent, replication, blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache); stat = dir.getFileInfo(src, false, FSDirectory.isReservedRawName(srcArg), true); } catch (StandbyException se) { skipSync = true; throw se; 打開startFileInternal: /** * Create a new file or overwrite an existing file
      * * Once the file is create the client then allocates a new block with the next * call using {@link ClientProtocol#addBlock}. *

      * For description of parameters and exceptions thrown see * {@link ClientProtocol#create} */ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, boolean create, boolean overwrite, boolean createParent, short replication, long blockSize, boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version, EncryptedKeyVersion edek, boolean logRetryEntry) throws FileAlreadyExistsException, AccessControlException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, RetryStartFileException, IOException { //檢查當前線程是否有寫鎖,沒有退出 assert hasWriteLock(); // Verify that the destination does not exist as a directory already. //判斷文件是否已經作為目錄存在 //INodesInPath:包含從給定路徑解析的INode信息。 //獲取給定文件或目錄的inode信息 final INodesInPath iip = dir.getINodesInPath4Write(src); final INode inode = iip.getLastINode(); if (inode != null && inode.isDirectory()) { throw new FileAlreadyExistsException(src + " already exists as a directory"); } //FileEncryptionInfo封裝加密文件的所有加密相關信息 FileEncryptionInfo feInfo = null; if (dir.isInAnEZ(iip)) { // The path is now within an EZ, but we're missing encryption parameters if (suite == null || edek == null) { throw new RetryStartFileException(); } // Path is within an EZ and we have provided encryption parameters. // Make sure that the generated EDEK matches the settings of the EZ. String ezKeyName = dir.getKeyName(iip); if (!ezKeyName.equals(edek.getEncryptionKeyName())) { throw new RetryStartFileException(); } feInfo = new FileEncryptionInfo(suite, version, edek.getEncryptedKeyVersion().getMaterial(), edek.getEncryptedKeyIv(), ezKeyName, edek.getEncryptionKeyVersionName()); Preconditions.checkNotNull(feInfo); } final INodeFile myFile = INodeFile.valueOf(inode, src, true); if (isPermissionEnabled) { if (overwrite && myFile != null) { checkPathAccess(pc, src, FsAction.WRITE); } /* * To overwrite existing file, need to check 'w' permission * of parent (equals to ancestor in this case) */ checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); } try { BlocksMapUpdateInfo toRemoveBlocks = null; if (myFile == null) { if (!create) { throw new FileNotFoundException("Can't overwrite non-existent " + src + " for client " + clientMachine); } } else { if (overwrite) { toRemoveBlocks = new BlocksMapUpdateInfo(); List toRemoveINodes = new ChunkedArrayList(); long ret = dir.delete(src, toRemoveBlocks, toRemoveINodes, now()); if (ret >= 0) { incrDeletedFileCount(ret); removePathAndBlocks(src, null, toRemoveINodes, true); } } else { // If lease soft limit time is expired, recover the lease //如果租約軟限制時間到期,則恢復租約 recoverLeaseInternal(myFile, src, holder, clientMachine, false); throw new FileAlreadyExistsException(src + " for client " + clientMachine + " already exists"); } } checkFsObjectLimit(); INodeFile newNode = null; // Always do an implicit mkdirs for parent directory tree. Path parent = new Path(src).getParent(); if (parent != null && mkdirsRecursively(parent.toString(), permissions, true, now())) { //獲取文件的inode newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine); } if (newNode == null) { throw new IOException("Unable to add " + src + " to namespace"); } leaseManager.addLease(newNode.getFileUnderConstructionFeature() .getClientName(), src); // Set encryption attributes if necessary if (feInfo != null) { dir.setFileEncryptionInfo(src, feInfo); newNode = dir.getInode(newNode.getId()).asFile(); } //設置存儲策略 setNewINodeStoragePolicy(newNode, iip, isLazyPersist); // record file record in log, record new generation stamp //把操作寫入到EditLog getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + src + " inode " + newNode.getId() + " " + holder); } return toRemoveBlocks; } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " + ie.getMessage()); throw ie; } }

      這個方法就是生成文件條目的核心方法,首先判斷檢查當前線程是否有寫鎖,沒有退出。FSDirectory dir是一個命名空間的內存樹。

      hdfs源碼解析之客戶端寫數據(二)

      EI企業智能 可信智能計算服務 TICS 數據湖治理中心 DGC 智能數據

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:成員變量和方法的區別?
      下一篇:XML DOM 遍歷節點樹
      相關文章
      亚洲av永久无码精品古装片| 亚洲欧洲国产经精品香蕉网| 亚洲白嫩在线观看| 久久亚洲中文字幕精品一区| 亚洲精华国产精华精华液网站| 亚洲国产精品久久网午夜| 亚洲精品国产成人99久久| 亚洲一区二区三区国产精品| 亚洲国产中文字幕在线观看| 亚洲成人高清在线| 亚洲日本在线观看视频| 久久久久亚洲爆乳少妇无| 久久影院亚洲一区| 亚洲色成人中文字幕网站| 亚洲人成在线播放网站| 亚洲热妇无码AV在线播放| 精品亚洲永久免费精品| 久久精品国产99精品国产亚洲性色| 亚洲AV无一区二区三区久久| 亚洲av鲁丝一区二区三区| 亚洲人成依人成综合网| 亚洲专区在线视频| 亚洲宅男天堂a在线| 33333在线亚洲| 亚洲欧美国产国产一区二区三区| 久久无码av亚洲精品色午夜| 亚洲&#228;v永久无码精品天堂久久| gogo全球高清大胆亚洲| 精品亚洲一区二区三区在线播放| 激情97综合亚洲色婷婷五 | 亚洲国产精品一区二区成人片国内 | 亚洲日本在线免费观看| 亚洲不卡视频在线观看| 亚洲色偷偷综合亚洲AV伊人蜜桃 | 亚洲高清在线观看| 亚洲影视一区二区| 亚洲爆乳成av人在线视菜奈实| 国产亚洲精品91| 亚洲乱码精品久久久久..| 亚洲va在线va天堂va888www| 亚洲最大在线观看|