基于Flume的美團(tuán)日志收集系統(tǒng)二_第1頁
基于Flume的美團(tuán)日志收集系統(tǒng)二_第2頁
基于Flume的美團(tuán)日志收集系統(tǒng)二_第3頁
基于Flume的美團(tuán)日志收集系統(tǒng)二_第4頁
基于Flume的美團(tuán)日志收集系統(tǒng)二_第5頁
已閱讀5頁,還剩3頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、基于Flume的美團(tuán)日志收集系統(tǒng)(二)改進(jìn)和優(yōu)化在基于Flume的美團(tuán)日志收集系統(tǒng)(一)架構(gòu)和設(shè)計中,我們詳述了基于Flume的美團(tuán)日志收集系統(tǒng)的架構(gòu)設(shè)計,以及為什么做這樣的設(shè)計。在本節(jié)中,我們將會講述在實(shí)際部署和使用過程中遇到的問題,對Flume的功能改進(jìn)和對系統(tǒng)做的優(yōu)化。1 Flume的問題總結(jié)在Flume的使用過程中,遇到的主要問題如下:a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰時常報隊列大小不夠的異常;使用FileChannel又導(dǎo)致IO繁忙的問題;b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日志,在高峰時間速度較慢;c.

2、 系統(tǒng)的管理問題:配置升級,模塊重啟等;2 Flume的功能改進(jìn)和優(yōu)化點(diǎn)從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基于開源的Flume我們增加了許多功能,修改了一些Bug,并且進(jìn)行一些調(diào)優(yōu)。下面將對一些主要的方面做一些說明。2.1 增加Zabbix monitor服務(wù)一方面,F(xiàn)lume本身提供了http, ganglia的監(jiān)控服務(wù),而我們目前主要使用zabbix做監(jiān)控。因此,我們?yōu)镕lume添加了zabbix監(jiān)控模塊,和sa的監(jiān)控服務(wù)無縫融合。另一方面,凈化Flume的metrics。只將我們需要的metrics發(fā)送給zabbix,避免 zabbix server造成

3、壓力。目前我們最為關(guān)心的是Flume能否及時把應(yīng)用端發(fā)送過來的日志寫到Hdfs上, 對應(yīng)關(guān)注的metrics為:· Source : 接收的event數(shù)和處理的event數(shù)· Channel : Channel中擁堵的event數(shù)· Sink : 已經(jīng)處理的event數(shù)2.2 為HdfsSink增加自動創(chuàng)建index功能首先,我們的HdfsSink寫到hadoop的文件采用lzo壓縮存儲。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然后通過配置的方式獲取使用何種壓縮編碼,我們目前使用lzo壓縮數(shù)據(jù)。采用lzo壓縮而非bz2壓縮,是基于以下測試

4、數(shù)據(jù):event大小(Byte)sink.batch-sizehdfs.batchSize壓縮格式總數(shù)據(jù)大小(G)耗時(s)平均events/s壓縮后大小(G)54430010000bz29.1244868331.3654430010000lzo9.1612273333.49其次,我們的HdfsSink增加了創(chuàng)建lzo文件后自動創(chuàng)建index功能。Hadoop提供了對lzo創(chuàng)建索引,使得壓縮文件是可切分的,這樣 Hadoop Job可以并行處理數(shù)據(jù)文件。HdfsSink本身lzo壓縮,但寫完lzo文件并不會建索引,我們在close文件之后添加了建索引功能。 /* * Rename bucket

5、Path file from .tmp to permanent location. */ private void renameBucket() throws IOException, InterruptedException if(bucketPath.equals(targetPath) return; final Path srcPath = new Path(bucketPath); final Path dstPath = new Path(targetPath); callWithTimeout(new CallRunner<Object>() Override pu

6、blic Object call() throws Exception if(fileSystem.exists(srcPath) / could block LOG.info("Renaming " + srcPath + " to " + dstPath); fileSystem.rename(srcPath, dstPath); / could block /index the dstPath lzo file if (codeC != null && ".lzo".equals(codeC.getDefault

7、Extension() ) LzoIndexer lzoIndexer = new LzoIndexer(new Configuration(); lzoIndexer.index(dstPath); return null; );2.3 增加HdfsSink的開關(guān)我們在HdfsSink和DualChannel中增加開關(guān),當(dāng)開關(guān)打開的情況下,HdfsSink不再往Hdfs上寫數(shù)據(jù),并且數(shù)據(jù)只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機(jī)維護(hù)。2.4 增加DualChannelFlume本身提供了MemoryChannel和FileChannel。Memo

8、ryChannel處理速度快,但緩存大小有限,且沒有持久 化;FileChannel則剛好相反。我們希望利用兩者的優(yōu)勢,在Sink處理速度夠快,Channel沒有緩存過多日志的時候,就使用 MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時,就使用FileChannel,由此我 們開發(fā)了DualChannel,能夠智能的在兩個Channel之間切換。其具體的邏輯如下:/* * putToMemChannel indicate put event to memChannel or fileChannel * takeFromMemChann

9、el indicate take event from memChannel or fileChannel * */private AtomicBoolean putToMemChannel = new AtomicBoolean(true);private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);void doPut(Event event) if (switchon && putToMemChannel.get() /往memChannel中寫數(shù)據(jù) memTransaction.put(event

10、); if ( memChannel.isFull() | fileChannel.getQueueSize() > 100) putToMemChannel.set(false); else /往fileChannel中寫數(shù)據(jù) fileTransaction.put(event); Event doTake() Event event = null; if ( takeFromMemChannel.get() ) /從memChannel中取數(shù)據(jù) event = memTransaction.take(); if (event = null) takeFromMemChannel.se

11、t(false); else /從fileChannel中取數(shù)據(jù) event = fileTransaction.take(); if (event = null) takeFromMemChannel.set(true); putToMemChannel.set(true); return event;2.5 增加NullChannelFlume提供了NullSink,可以把不需要的日志通過NullSink直接丟棄,不進(jìn)行存儲。然而,Source需要先將events存放到 Channel中,NullSink再將events取出扔掉。為了提升性能,我們把這一步移到了Channel里面做,所以開

12、發(fā)了 NullChannel。2.6 增加KafkaSink為支持向Storm提供實(shí)時數(shù)據(jù)流,我們增加了KafkaSink用來向Kafka寫實(shí)時數(shù)據(jù)流。其基本的邏輯如下:public class KafkaSink extends AbstractSink implements Configurable private String zkConnect; private Integer zkTimeout; private Integer batchSize; private Integer queueSize; private String serializerClass; private

13、String producerType; private String topicPrefix; private Producer<String, String> producer; public void configure(Context context) /讀取配置,并檢查配置 Override public synchronized void start() /初始化producer Override public synchronized void stop() /關(guān)閉producer Override public Status process() throws Eve

14、ntDeliveryException Status status = Status.READY; Channel channel = getChannel(); Transaction tx = channel.getTransaction(); try tx.begin(); /將日志按category分隊列存放 Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>(); /從channel中取batchSize大小的日志,從header中獲

15、取category,生成topic,并存放于上述的Map中; /將Map中的數(shù)據(jù)通過producer發(fā)送給kafka mit(); catch (Exception e) tx.rollback(); throw new EventDeliveryException(e); finally tx.close(); return status; 2.7 修復(fù)和scribe的兼容問題Scribed在通過ScribeSource發(fā)送數(shù)據(jù)包給Flume時,大于4096字節(jié)的包,會先發(fā)送一個Dummy包檢查服務(wù)器的反應(yīng),而 Flume的ScribeSource對于logentry.size()=0的包返

16、回TRY_LATER,此時Scribed就認(rèn)為出錯,斷開連接。這 樣循環(huán)反復(fù)嘗試,無法真正發(fā)送數(shù)據(jù)。現(xiàn)在在ScribeSource的Thrift接口中,對size為0的情況返回OK,保證后續(xù)正常發(fā)送數(shù)據(jù)。3. Flume系統(tǒng)調(diào)優(yōu)經(jīng)驗總結(jié)3.1 基礎(chǔ)參數(shù)調(diào)優(yōu)經(jīng)驗HdfsSink中默認(rèn)的serializer會每寫一行在行尾添加一個換行符,我們?nèi)罩颈旧韼в袚Q行符,這樣會導(dǎo)致每條日志后面多一個空行,修改配置不要自動添加換行符;lc.sinks.sink_hdfs.serializer.appendNewline = false調(diào)大MemoryChannel的capacity,盡量利用MemoryCha

17、nnel快速的處理能力;調(diào)大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數(shù);適當(dāng)調(diào)大HdfsSink的callTimeout,避免不必要的超時錯誤;3.2 HdfsSink獲取Filename的優(yōu)化HdfsSink的path參數(shù)指明了日志被寫到Hdfs的位置,該參數(shù)中可以引用格式化的參數(shù),將日志寫到一個動態(tài)的目錄中。這方便了日志的管理。例如我們可以將日志寫到category分類的目錄,并且按天和按小時存放:lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%category/dt=%Y%m%d/ho

18、ur=%HHdfsS ink中處理每條event時,都要根據(jù)配置獲取此event應(yīng)該寫入的Hdfs path和filename,默認(rèn)的獲取方法是通過正則表達(dá)式替換配置中的變量,獲取真實(shí)的path和filename。因為此過程是每條event都要 做的操作,耗時很長。通過我們的測試,20萬條日志,這個操作要耗時6-8s左右。由于我們目前的path和filename有固定的模式,可以通過字符串拼接獲得。而后者比正則匹配快幾十倍。拼接定符串的方式,20萬條日志的操作只需要幾百毫秒。3.3 HdfsSink的b/m/s優(yōu)化在我們初始的設(shè)計中,所有的日志都通過一個Channel和一個HdfsSink寫到Hdfs上。我們來看一看這樣做有什么問題。首先,我們來看一下HdfsSink在發(fā)送數(shù)據(jù)的邏輯:/從Channel中取batchSize大小的eventsfor (txnEventCount = 0; txnEventCount < batchSize; txnEventCount+) /對每條日志根據(jù)category append到相應(yīng)的bucketWriter上; bucketWriter.app

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論