logstash input插件開發(fā)

logstash input插件開發(fā)

logstash作為一個數(shù)據(jù)管道中間件稿黍,支持對各種類型數(shù)據(jù)的采集與轉(zhuǎn)換详囤,并將數(shù)據(jù)發(fā)送到各種類型的存儲庫暴拄,比如實現(xiàn)消費kafka數(shù)據(jù)并且寫入到Elasticsearch, 日志文件同步到對象存儲S3等于个,mysql數(shù)據(jù)同步到Elasticsearch等锉试。

logstash內(nèi)部主要包含三個模塊:

input: 從數(shù)據(jù)源獲取數(shù)據(jù)
filter: 過濾、轉(zhuǎn)換數(shù)據(jù)
output: 輸出數(shù)據(jù)
image

不同類型的數(shù)據(jù)都可以通過對應(yīng)的input-plugin览濒, output-plugin完成數(shù)據(jù)的輸入與輸出。如需要消費kafka中的數(shù)據(jù)并寫入到Elasticsearch中拖云,則需要使用logstash的kafka-input-plugin完成數(shù)據(jù)輸入贷笛,logstash-output-elasticsearch完成數(shù)據(jù)輸出。如果需要對輸入數(shù)據(jù)進(jìn)行過濾或者轉(zhuǎn)換宙项,比如根據(jù)關(guān)鍵詞過濾掉不需要的內(nèi)容乏苦,或者時間字段的格式轉(zhuǎn)換,就需要又filter-plugin完成了。

logstash的input插件目前已經(jīng)有幾十種了汇荐,支持大多數(shù)比較通用或開源的數(shù)據(jù)源的輸入洞就。但如果公司內(nèi)部開發(fā)的數(shù)據(jù)庫或其它存儲類的服務(wù)不能和開源產(chǎn)品在接口協(xié)議上兼容,比如騰訊自研的消息隊列服務(wù)CMQ不依賴于其它的開源消息隊列產(chǎn)品掀淘,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的數(shù)據(jù)旬蟋;騰訊云對象存儲服務(wù)COS, 在鑒權(quán)方式上和AWS的S3存在差異革娄,也不能直接使用logstash-input-s3插件從COS中讀取數(shù)據(jù)倾贰,對于這種情況,就需要自己開發(fā)logstash的input插件了拦惋。

本文以開發(fā)logstash的cos input插件為例匆浙,介紹如何開發(fā)logstash的input插件。

logstash官方提供了有個簡單的input plugin example可供參考:
https://github.com/logstash-plugins/logstash-input-example/

環(huán)境準(zhǔn)備

logstash使用jruby開發(fā)厕妖,首先要配置jruby環(huán)境:

  1. 安裝rvm:

    rvm是一個ruby管理器首尼,可以安裝并管理ruby環(huán)境,也可以通過命令行切換到不同的ruby版本言秸。

    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    
    \curl -sSL https://get.rvm.io | bash -s stable
    
    source /etc/profile.d/rvm.sh
    
  2. 安裝jruby

    rvm install jruby
    
    rvm use jruby
    
  3. 安裝包管理工具bundle和測試工具rspec

    gem install bundle
    gem install rspec
    

從example開始

  1. clone logstash-input-example

    git clone https://github.com/logstash-plugins/logstash-input-example.git
    
  2. 將clone出來的logstash-input-example源碼copy到logstash-input-cos目錄软能,并刪除.git文件夾,目的是以logstash-input-example的源碼為參考進(jìn)行開發(fā)井仰,同時把需要改動名稱的地方修改一下:

     mv logstash-input-example.gemspec logstash-input-cos.gemspec
     mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
     mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
    
  3. 建立的源碼目錄結(jié)構(gòu)如圖所示:

image

其中埋嵌,重要文件的作用說明如下:

  • cos.rb: 主文件,在該文件中編寫logstash配置文件的讀寫與源數(shù)據(jù)獲取的代碼俱恶,需要繼承LogStash::Inputs::Base基類
  • cos_spec.rb: 單元測試文件雹嗦,通過rspec可以對cos.rb中的代碼進(jìn)行測試
  • logstash-input-cos.gemspec: 類似于maven中的pom.xml文件,配置工程的版本合是、名稱了罪、licene,包依賴等,通過bundle命令可以下載依賴包

配置并下載依賴

因為騰訊云COS服務(wù)沒有ruby sdk, 因為只能依賴其Java sdk進(jìn)行開發(fā)聪全,首先添加對cos java sdk的依賴泊藕。在logstash-input-cos.gemspec中Gem dependencies配置欄中增加以下內(nèi)容:

# Gem dependencies
  s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  s.add_runtime_dependency 'logstash-codec-plain'
  s.add_runtime_dependency 'stud', '>= 0.0.22'
  s.add_runtime_dependency 'jar-dependencies'
  s.add_development_dependency 'logstash-devutils', '1.3.6'

相比logstash-input-example.gemspec,增加了對com.qcloud:cos_api包以及jar-dependencies包的依賴,jar-dependencies用于在ruby環(huán)境中管理jar包难礼,并且可以跟蹤jar包的加載狀態(tài)娃圆。

然后,在logstash-input-cos.gemspec中增加配置:

s.platform = 'java'

這樣可以成功下載java依賴包蛾茉,并且可以在ruby代碼中直接調(diào)用java代碼讼呢。

最后,執(zhí)行以下命令下載依賴:

bundle install

編寫代碼

logstash-input-cos的代碼邏輯其實比較簡單谦炬,主要是通過執(zhí)行定時任務(wù)悦屏,調(diào)用cos java sdk中的listObjects方法节沦,獲取到指定bucket里的數(shù)據(jù),并在每次定時任務(wù)執(zhí)行結(jié)束后設(shè)置marker保存在本地础爬,再次執(zhí)行時從marker位置獲取數(shù)據(jù)甫贯,以實現(xiàn)數(shù)據(jù)的增量同步。

jar包的引用

因為要調(diào)用cos java sdk中的代碼看蚜,先引用該jar包:

require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;

讀取配置文件

logstash配置文件讀取的代碼如圖所示:


image

config_name為cos,其它的配置項讀取代碼按照ruby的代碼規(guī)范編寫叫搁,添加類型校驗與默認(rèn)值,就可以從以下配置文件中讀取配置項:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

實現(xiàn)register方法

logstash input插件必須實現(xiàn)另個方法:register 和run

register方法類似于初始化方法失乾,在該方法中可以直接使用從配置文件讀取并賦值的變量常熙,完成cos client的初始化,代碼如下:

    # 1 初始化用戶身份信息(appid, secretId, secretKey)
    cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
    # 2 設(shè)置bucket的區(qū)域, COS地域的簡稱請參照 https://www.qcloud.com/document/product/436/6224
    clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
    # 3 生成cos客戶端
    @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
    # bucket名稱, 需包含appid
    bucketName = @bucket + "-"+ @appId
    @bucketName = bucketName

    @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
    # 設(shè)置bucket名稱
    @listObjectsRequest.setBucketName(bucketName)
    # prefix表示列出的object的key以prefix開始
    @listObjectsRequest.setPrefix(@prefix)
    # 設(shè)置最大遍歷出多少個對象, 一次listobject最大支持1000
    @listObjectsRequest.setMaxKeys(1000)
    @listObjectsRequest.setMarker(@markerConfig.getMarker)

示例代碼中設(shè)置了@cosclient和@listObjectRequest為全局變量碱茁, 因為在run方法中會用到這兩個變量裸卫。

注意在ruby中調(diào)用java代碼的方式:沒有變量描述符;不能直接new Object()纽竣,而只能Object.new().

實現(xiàn)run方法

run方法獲取數(shù)據(jù)并將數(shù)據(jù)流轉(zhuǎn)換成event事件

最簡單的run方法為:

def run(queue)
    Stud.interval(@interval) do
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
    end # loop
  end # def run

代碼說明:

  • 通過Stud ruby模塊執(zhí)行定時任務(wù)墓贿,interval可自定義,從配置文件中讀取
  • 生成event, 示例代碼生成了一個包含兩個字段數(shù)據(jù)的event
  • 調(diào)用decorate()方法蜓氨, 給該event打上tag聋袋,如果配置的話
  • queue<<event, 將event插入到數(shù)據(jù)管道中,發(fā)送給filter處理

logstash-input-cos的run方法實現(xiàn)為:

def run(queue)
    @current_thread = Thread.current
    Stud.interval(@interval) do
      process(queue)
    end
end
 
def process(queue)
    @logger.info('Marker from: ' + @markerConfig.getMarker)
    
    objectListing = @cosclient.listObjects(@listObjectsRequest)
    nextMarker = objectListing.getNextMarker()
    cosObjectSummaries = objectListing.getObjectSummaries()
    cosObjectSummaries.each do |obj|
       # 文件的路徑key
       key = obj.getKey()
    
       if stop?
         @logger.info("stop while attempting to read log file")
         break
       end
       # 根據(jù)key獲取內(nèi)容
       getObject(key) { |log|
         # 發(fā)送消息
         @codec.decode(log) do |event|
           decorate(event)
           queue << event
         end
       }

       #記錄 marker
       @markerConfig.setMarker(key)
       @logger.info('Marker end: ' + @markerConfig.getMarker)
    end
  end


  # 獲取下載輸入流
 def getObject(key, &block)
    getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
    cosObject = @cosclient.getObject(getObjectRequest)
    cosObjectInput = cosObject.getObjectContent()
    buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
    while (line = buffered.readLine())
      block.call(line)
    end
  end

測試代碼

在spec/inputs/cos_spec.rb中增加如下測試代碼:

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"

describe LogStash::Inputs::Cos do

  it_behaves_like "an interruptible input plugin" do
    let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '*',
        "access_key_secret" => '*',
        "bucket" => '*',
         "region" => 'ap-guangzhou',
         "appId" => '*',
        "interval" => 60 } }
  end
end

rspec是一個ruby測試庫穴吹,通過bundle命令執(zhí)行rspec:

bundle exec rspec

如果cos.rb中的代碼沒有語法或運(yùn)行時錯誤幽勒,則會出現(xiàn)如果信息表明測試成功:

Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures

構(gòu)建并測試input-plugin-cos

build

使用gem對input-plugin-cos插件源碼進(jìn)行build:

gem build logstash-input-cos.gemspec

構(gòu)建完成后會生成一個名為logstash-input-cos-0.0.1-java.gem的文件

test

在logstash的解壓目錄下,執(zhí)行一下命令安裝logstash-input-cos plugin:

./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem

執(zhí)行結(jié)果為:

Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful

另外港令,可以通過./bin/logstash-plugin list命令查看logstash已經(jīng)安裝的所有input/output/filter/codec插件啥容。

生成配置文件cos.logstash.conf,內(nèi)容為:

input {
    cos {
        "endpoint" => "cos.ap-guangzhou.myqcloud.com"
        "access_key_id" => "*****"
        "access_key_secret" => "****"
        "bucket" => "******"
        "region" => "ap-guangzhou"
        "appId" => "**********"
        "interval" => 60
    }
}

output {
    stdout {
        codec=>rubydebug
    }
}

該配置文件使用騰訊云官網(wǎng)賬號的secret_id和secret_key進(jìn)行權(quán)限驗證,拉取指定bucket里的數(shù)據(jù)顷霹,為了測試咪惠,將output設(shè)置為標(biāo)準(zhǔn)輸出。

執(zhí)行l(wèi)ogstash:

./bin/logstash -f cos.logstash.conf

輸出結(jié)果為:

Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.710Z
}
{
       "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
      "@version" => "1",
    "@timestamp" => 2018-07-30T11:26:17.711Z
}

在cos中的bucket里上傳了名為access.log的nginx日志淋淀,上述輸出結(jié)果中最后打印出來的每個json結(jié)構(gòu)體構(gòu)成一個event遥昧, 其中message消息即為access.log中每一條日志。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末朵纷,一起剝皮案震驚了整個濱河市炭臭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌袍辞,老刑警劉巖鞋仍,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異革屠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進(jìn)店門似芝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事熄赡℃⒍妫” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵寞奸,是天一觀的道長呛谜。 經(jīng)常有香客問我,道長枪萄,這世上最難降的妖魔是什么隐岛? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮瓷翻,結(jié)果婚禮上聚凹,老公的妹妹穿的比我還像新娘。我一直安慰自己齐帚,他們只是感情好妒牙,可當(dāng)我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著对妄,像睡著了一般湘今。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上剪菱,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天摩瞎,我揣著相機(jī)與錄音,去河邊找鬼琅豆。 笑死愉豺,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的茫因。 我是一名探鬼主播蚪拦,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼冻押!你這毒婦竟也來了驰贷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤洛巢,失蹤者是張志新(化名)和其女友劉穎括袒,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體稿茉,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡锹锰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年芥炭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恃慧。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡园蝠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出痢士,到底是詐尸還是另有隱情彪薛,我是刑警寧澤,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布怠蹂,位于F島的核電站善延,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏城侧。R本人自食惡果不足惜易遣,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赞庶。 院中可真熱鬧训挡,春花似錦、人聲如沸歧强。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摊册。三九已至肤京,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間茅特,已是汗流浹背忘分。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留白修,地道東北人妒峦。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像兵睛,于是被迫代替她去往敵國和親肯骇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理祖很,服務(wù)發(fā)現(xiàn)笛丙,斷路器,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • Logstash是一個具有實時管線能力的開源數(shù)據(jù)收集引擎假颇。在ELK Stack中胚鸯,通常選擇更輕量級的Filebea...
    steanxy閱讀 3,525評論 1 6
  • 概述 監(jiān)控預(yù)警平臺, eagle + eye (鷹眼)的合體詞, 寓意可以快速發(fā)現(xiàn)問題, 并及時作出響應(yīng),Eagl...
    Kungfu貓熊閱讀 7,389評論 0 52
  • 0.摘要 這是開智學(xué)堂「信息分析」課程第1周基礎(chǔ)任務(wù),包含分析背景笨鸡、思路與分析步驟姜钳、主要結(jié)論坦冠、進(jìn)一步討論等內(nèi)容,完...
    空靈一月閱讀 357評論 0 1
  • 《呂氏春秋》里有一段哥桥,講孔子周游列國蓝牲,曾因兵荒馬亂,旅途困頓泰讽,三餐以野菜果腹,大家已七日沒吃下一粒米飯昔期。 一天已卸,顏...
    劉現(xiàn)輝民俗畫閱讀 1,867評論 0 1