HBase 讀寫過程中的線程池模型原理實現

      網友投稿 1403 2022-05-30

      【簡介】

      隨著大數據的發展,HBase已經成為大數據高性能讀寫場景不可或缺的組件,HBase也也提供了豐富的API,應用開發也相對簡單,如何在開發時準確使用API就顯得比較重要了,如果使用不準確,往往會與設計者背到而馳,并且造成應用訪問抖動、資源泄露等問題,等等……

      HBase socket連接池原理實現

      首先我們來看下HBase的API文檔當中對Connection的定義是:A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.,這句話的理解就是和regionserver(包括HMaster)以及zookeeper直間的連接,總結起來就是如下圖:

      所以對一個connection實列來講,如果保持單例模式,和regionserver之間的socket連接最大就是五個,并且每次socket連接就不會再次創建,除非這個鏈接長時間(默認兩分鐘)和regionserver之間不進行交互,那么client端會自動關閉這個鏈接,應用開發時一般使用如下方法創建connection

      connection = ConnectionFactory.createConnection(conf);

      除了此種方式創建連接外,hbase還可通過ConnectionManager獲取連接,不過此種方式就要注意,一定要保持conf對象單例,否則也就是不斷創建新的連接

      conn = HConnectionManager.getConnection(conf);

      static ClusterConnection getConnectionInternal(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { ConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } } HConnectionKey(Configuration conf) { Map m = new HashMap(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { ConnectionManager.LOG.warn( "Error obtaining current user, skipping username in HConnectionKey", ioe); } }

      此處這兩段代碼邏輯是根據傳入的conf構建HConnectionKey,然后以HConnectionKey實例為key到連接池Map對象CONNECTION_INSTANCES中去查找connection,如果找到就返回connection,如果找不到就新建,如果找到但已被關閉,就刪除再新建;接收conf構造HConnectionKey實例時,其實是將conf配置文件中的屬性賦值給HConnectionKey自身的屬性,所以使用此種方式時,要保證conf對象不變,new出來的HConnectionKey實例的屬性才相同。

      HBase 表操作線程接池原理實現

      應用端讀寫hbase時,目前實現都是通過將每次讀寫操作封裝成為一個個task,然后提交到應用端的表操作池中去執行,默認情況下,如果保證connection單例,那么應用將有256個線程、對應每個regionserver5個(由參數hbase.client.ipc.pool.size決定)socket鏈接,有的人可能會認為,這樣會不會造成性能瓶頸,這里256線程是針對所有操作,也就是針對的是整個集群,而5個socket鏈接是針對單個regionserver,并且和regionserver的socket交互耗時是非常短的,所以一般情況下這個默認配置已經完全夠用

      相關代碼實現原理如下:

      public HTableInterface getTable(TableName tableName) throws IOException { return getTable(tableName, getBatchPool()); } private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { if (batchPool == null) { this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); this.cleanupPool = true; } } } return this.batchPool; }

      這里要注意,對于meta表操作來講 ,默認是10個核心線程,并且線程名也不一樣

      private ExecutorService getMetaLookupPool() { if (this.metaLookupPool == null) { synchronized (this) { if (this.metaLookupPool == null) { //Some of the threads would be used for meta replicas //To start with, threads.max.core threads can hit the meta (including replicas). //After that, requests will get queued up in the passed queue, and only after //the queue is full, a new thread will be started this.metaLookupPool = getThreadPool( conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), "-metaLookup-shared-", new LinkedBlockingQueue()); } } } return this.metaLookupPool; }

      HBase 線程池引發的直接內存泄露

      JDK對每個線程進行NIO交互時,都會持有一個緩存對象(屬于堆外),此對象不限制大小空間,并且隨著線程消亡才會釋放,線程每次申請直接內存空間時,都是在已有的對象上新增加空間,意味著,此空間可能會無限增大,而hbase的表操作線程池,線程不會消亡,所以此直接內存會不斷增大,直到觸發了FULL GC,才有可能釋放,否則應用就會報直接內存溢出錯誤。相關源碼分析如下:

      // Per-thread cache of temporary direct buffers private static ThreadLocal bufferCache = new ThreadLocal() { @Override protected BufferCache initialValue() { return new BufferCache(); } }; /** * Returns the max size allowed for a cached temp buffers, in * bytes. It defaults to Long.MAX_VALUE. It can be set with the * jdk.nio.maxCachedBufferSize property. Even though * ByteBuffer.capacity() returns an int, we're using a long here * for potential future-proofing. */ private static long getMaxCachedBufferSize() { String s = java.security.AccessController.doPrivileged( new PrivilegedAction() { @Override public String run() { return System.getProperty("jdk.nio.maxCachedBufferSize"); } }); if (s != null) { try { long m = Long.parseLong(s); if (m >= 0) { return m; } else { // if it's negative, ignore the system property } } catch (NumberFormatException e) { // if the string is not well formed, ignore the system property } } return Long.MAX_VALUE; } /** * Returns a temporary buffer of at least the given size */ public static ByteBuffer getTemporaryDirectBuffer(int size) { // If a buffer of this size is too large for the cache, there // should not be a buffer in the cache that is at least as // large. So we'll just create a new one. Also, we don't have // to remove the buffer from the cache (as this method does // below) given that we won't put the new buffer in the cache. if (isBufferTooLarge(size)) { return ByteBuffer.allocateDirect(size); } BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } }

      HBase開源社區針對此問題的解決方式是設置-Djdk.nio.maxCachedBufferSize,保證線程級別的buffercache不會持續增大,開源單號如下:https://issues.apache.org/jira/browse/HBASE-19320

      值得注意的是HBase2.x以后使用netty框架實現rpc交互,所以就不存在這個問題了,另外JDK上述修復是?jdk8u102 and jdk9版本及以上才能配置上述參數解決

      JDK bug介紹如下:https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo

      HBase 讀寫過程中的線程池模型原理實現

      EI企業智能 FusionInsight HBase

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:2020年年終總結
      下一篇:基礎知識漫談(1): 想到哪兒寫到哪兒
      相關文章
      亚洲日本在线播放| 亚洲精品日韩专区silk| 久久精品国产亚洲AV大全| 亚洲国产美女精品久久久久∴| 亚洲国产精品尤物yw在线| 国产亚洲情侣久久精品| 成人亚洲国产va天堂| 亚洲精品二三区伊人久久| 亚洲制服丝袜一区二区三区| 亚洲视频免费在线看| 亚洲国产成人久久| ass亚洲**毛茸茸pics| 久久精品亚洲AV久久久无码| 久久精品亚洲AV久久久无码| 亚洲成人黄色在线| 国产成人精品日本亚洲专| 亚洲一区精彩视频| 亚洲中文字幕日本无线码| 亚洲综合精品成人| 亚洲国产精品无码中文lv| 国产精品无码亚洲一区二区三区| 亚洲a∨无码一区二区| 国产av无码专区亚洲av毛片搜| 亚洲AV成人潮喷综合网| 美腿丝袜亚洲综合| 亚洲αv久久久噜噜噜噜噜| 久久国产精品亚洲一区二区| 亚洲一区二区影院| 亚洲日本在线播放| 亚洲欧美国产国产一区二区三区 | 国产亚洲成AV人片在线观黄桃| 久久精品国产亚洲网站| 久久精品国产亚洲av麻豆| 久久亚洲精品人成综合网| 亚洲六月丁香六月婷婷色伊人 | 亚洲中字慕日产2020| 亚洲午夜在线播放| 国产亚洲欧美日韩亚洲中文色| 亚洲精品视频免费观看| 国产亚洲情侣一区二区无| 亚洲成AV人片在线观看ww|