logstash input插件開發(fā)
logstash作為一個數(shù)據(jù)管道中間件稿黍,支持對各種類型數(shù)據(jù)的采集與轉(zhuǎn)換详囤,并將數(shù)據(jù)發(fā)送到各種類型的存儲庫暴拄,比如實現(xiàn)消費kafka數(shù)據(jù)并且寫入到Elasticsearch, 日志文件同步到對象存儲S3等于个,mysql數(shù)據(jù)同步到Elasticsearch等锉试。
logstash內(nèi)部主要包含三個模塊:
input: 從數(shù)據(jù)源獲取數(shù)據(jù)
filter: 過濾、轉(zhuǎn)換數(shù)據(jù)
output: 輸出數(shù)據(jù)
不同類型的數(shù)據(jù)都可以通過對應(yīng)的input-plugin览濒, output-plugin完成數(shù)據(jù)的輸入與輸出。如需要消費kafka中的數(shù)據(jù)并寫入到Elasticsearch中拖云,則需要使用logstash的kafka-input-plugin完成數(shù)據(jù)輸入贷笛,logstash-output-elasticsearch完成數(shù)據(jù)輸出。如果需要對輸入數(shù)據(jù)進(jìn)行過濾或者轉(zhuǎn)換宙项,比如根據(jù)關(guān)鍵詞過濾掉不需要的內(nèi)容乏苦,或者時間字段的格式轉(zhuǎn)換,就需要又filter-plugin完成了。
logstash的input插件目前已經(jīng)有幾十種了汇荐,支持大多數(shù)比較通用或開源的數(shù)據(jù)源的輸入洞就。但如果公司內(nèi)部開發(fā)的數(shù)據(jù)庫或其它存儲類的服務(wù)不能和開源產(chǎn)品在接口協(xié)議上兼容,比如騰訊自研的消息隊列服務(wù)CMQ不依賴于其它的開源消息隊列產(chǎn)品掀淘,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的數(shù)據(jù)旬蟋;騰訊云對象存儲服務(wù)COS, 在鑒權(quán)方式上和AWS的S3存在差異革娄,也不能直接使用logstash-input-s3插件從COS中讀取數(shù)據(jù)倾贰,對于這種情況,就需要自己開發(fā)logstash的input插件了拦惋。
本文以開發(fā)logstash的cos input插件為例匆浙,介紹如何開發(fā)logstash的input插件。
logstash官方提供了有個簡單的input plugin example可供參考:
https://github.com/logstash-plugins/logstash-input-example/
環(huán)境準(zhǔn)備
logstash使用jruby開發(fā)厕妖,首先要配置jruby環(huán)境:
-
安裝rvm:
rvm是一個ruby管理器首尼,可以安裝并管理ruby環(huán)境,也可以通過命令行切換到不同的ruby版本言秸。
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
\curl -sSL https://get.rvm.io | bash -s stable
source /etc/profile.d/rvm.sh
-
安裝jruby
rvm install jruby
rvm use jruby
-
安裝包管理工具bundle和測試工具rspec
gem install bundle gem install rspec
從example開始
-
clone logstash-input-example
git clone https://github.com/logstash-plugins/logstash-input-example.git
-
將clone出來的logstash-input-example源碼copy到logstash-input-cos目錄软能,并刪除.git文件夾,目的是以logstash-input-example的源碼為參考進(jìn)行開發(fā)井仰,同時把需要改動名稱的地方修改一下:
mv logstash-input-example.gemspec logstash-input-cos.gemspec mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
建立的源碼目錄結(jié)構(gòu)如圖所示:
其中埋嵌,重要文件的作用說明如下:
- cos.rb: 主文件,在該文件中編寫logstash配置文件的讀寫與源數(shù)據(jù)獲取的代碼俱恶,需要繼承LogStash::Inputs::Base基類
- cos_spec.rb: 單元測試文件雹嗦,通過rspec可以對cos.rb中的代碼進(jìn)行測試
- logstash-input-cos.gemspec: 類似于maven中的pom.xml文件,配置工程的版本合是、名稱了罪、licene,包依賴等,通過bundle命令可以下載依賴包
配置并下載依賴
因為騰訊云COS服務(wù)沒有ruby sdk, 因為只能依賴其Java sdk進(jìn)行開發(fā)聪全,首先添加對cos java sdk的依賴泊藕。在logstash-input-cos.gemspec中Gem dependencies配置欄中增加以下內(nèi)容:
# Gem dependencies
s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency 'jar-dependencies'
s.add_development_dependency 'logstash-devutils', '1.3.6'
相比logstash-input-example.gemspec,增加了對com.qcloud:cos_api包以及jar-dependencies包的依賴,jar-dependencies用于在ruby環(huán)境中管理jar包难礼,并且可以跟蹤jar包的加載狀態(tài)娃圆。
然后,在logstash-input-cos.gemspec中增加配置:
s.platform = 'java'
這樣可以成功下載java依賴包蛾茉,并且可以在ruby代碼中直接調(diào)用java代碼讼呢。
最后,執(zhí)行以下命令下載依賴:
bundle install
編寫代碼
logstash-input-cos的代碼邏輯其實比較簡單谦炬,主要是通過執(zhí)行定時任務(wù)悦屏,調(diào)用cos java sdk中的listObjects方法节沦,獲取到指定bucket里的數(shù)據(jù),并在每次定時任務(wù)執(zhí)行結(jié)束后設(shè)置marker保存在本地础爬,再次執(zhí)行時從marker位置獲取數(shù)據(jù)甫贯,以實現(xiàn)數(shù)據(jù)的增量同步。
jar包的引用
因為要調(diào)用cos java sdk中的代碼看蚜,先引用該jar包:
require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;
讀取配置文件
logstash配置文件讀取的代碼如圖所示:
config_name為cos,其它的配置項讀取代碼按照ruby的代碼規(guī)范編寫叫搁,添加類型校驗與默認(rèn)值,就可以從以下配置文件中讀取配置項:
input {
cos {
"endpoint" => "cos.ap-guangzhou.myqcloud.com"
"access_key_id" => "*****"
"access_key_secret" => "****"
"bucket" => "******"
"region" => "ap-guangzhou"
"appId" => "**********"
"interval" => 60
}
}
output {
stdout {
codec=>rubydebug
}
}
實現(xiàn)register方法
logstash input插件必須實現(xiàn)另個方法:register 和run
register方法類似于初始化方法失乾,在該方法中可以直接使用從配置文件讀取并賦值的變量常熙,完成cos client的初始化,代碼如下:
# 1 初始化用戶身份信息(appid, secretId, secretKey)
cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
# 2 設(shè)置bucket的區(qū)域, COS地域的簡稱請參照 https://www.qcloud.com/document/product/436/6224
clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
# 3 生成cos客戶端
@cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
# bucket名稱, 需包含appid
bucketName = @bucket + "-"+ @appId
@bucketName = bucketName
@listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
# 設(shè)置bucket名稱
@listObjectsRequest.setBucketName(bucketName)
# prefix表示列出的object的key以prefix開始
@listObjectsRequest.setPrefix(@prefix)
# 設(shè)置最大遍歷出多少個對象, 一次listobject最大支持1000
@listObjectsRequest.setMaxKeys(1000)
@listObjectsRequest.setMarker(@markerConfig.getMarker)
示例代碼中設(shè)置了@cosclient和@listObjectRequest為全局變量碱茁, 因為在run方法中會用到這兩個變量裸卫。
注意在ruby中調(diào)用java代碼的方式:沒有變量描述符;不能直接new Object()纽竣,而只能Object.new().
實現(xiàn)run方法
run方法獲取數(shù)據(jù)并將數(shù)據(jù)流轉(zhuǎn)換成event事件
最簡單的run方法為:
def run(queue)
Stud.interval(@interval) do
event = LogStash::Event.new("message" => @message, "host" => @host)
decorate(event)
queue << event
end # loop
end # def run
代碼說明:
- 通過Stud ruby模塊執(zhí)行定時任務(wù)墓贿,interval可自定義,從配置文件中讀取
- 生成event, 示例代碼生成了一個包含兩個字段數(shù)據(jù)的event
- 調(diào)用decorate()方法蜓氨, 給該event打上tag聋袋,如果配置的話
- queue<<event, 將event插入到數(shù)據(jù)管道中,發(fā)送給filter處理
logstash-input-cos的run方法實現(xiàn)為:
def run(queue)
@current_thread = Thread.current
Stud.interval(@interval) do
process(queue)
end
end
def process(queue)
@logger.info('Marker from: ' + @markerConfig.getMarker)
objectListing = @cosclient.listObjects(@listObjectsRequest)
nextMarker = objectListing.getNextMarker()
cosObjectSummaries = objectListing.getObjectSummaries()
cosObjectSummaries.each do |obj|
# 文件的路徑key
key = obj.getKey()
if stop?
@logger.info("stop while attempting to read log file")
break
end
# 根據(jù)key獲取內(nèi)容
getObject(key) { |log|
# 發(fā)送消息
@codec.decode(log) do |event|
decorate(event)
queue << event
end
}
#記錄 marker
@markerConfig.setMarker(key)
@logger.info('Marker end: ' + @markerConfig.getMarker)
end
end
# 獲取下載輸入流
def getObject(key, &block)
getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
cosObject = @cosclient.getObject(getObjectRequest)
cosObjectInput = cosObject.getObjectContent()
buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
while (line = buffered.readLine())
block.call(line)
end
end
測試代碼
在spec/inputs/cos_spec.rb中增加如下測試代碼:
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"
describe LogStash::Inputs::Cos do
it_behaves_like "an interruptible input plugin" do
let(:config) { {
"endpoint" => 'cos.ap-guangzhou.myqcloud.com',
"access_key_id" => '*',
"access_key_secret" => '*',
"bucket" => '*',
"region" => 'ap-guangzhou',
"appId" => '*',
"interval" => 60 } }
end
end
rspec是一個ruby測試庫穴吹,通過bundle命令執(zhí)行rspec:
bundle exec rspec
如果cos.rb中的代碼沒有語法或運(yùn)行時錯誤幽勒,則會出現(xiàn)如果信息表明測試成功:
Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures
構(gòu)建并測試input-plugin-cos
build
使用gem對input-plugin-cos插件源碼進(jìn)行build:
gem build logstash-input-cos.gemspec
構(gòu)建完成后會生成一個名為logstash-input-cos-0.0.1-java.gem的文件
test
在logstash的解壓目錄下,執(zhí)行一下命令安裝logstash-input-cos plugin:
./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
執(zhí)行結(jié)果為:
Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful
另外港令,可以通過./bin/logstash-plugin list命令查看logstash已經(jīng)安裝的所有input/output/filter/codec插件啥容。
生成配置文件cos.logstash.conf,內(nèi)容為:
input {
cos {
"endpoint" => "cos.ap-guangzhou.myqcloud.com"
"access_key_id" => "*****"
"access_key_secret" => "****"
"bucket" => "******"
"region" => "ap-guangzhou"
"appId" => "**********"
"interval" => 60
}
}
output {
stdout {
codec=>rubydebug
}
}
該配置文件使用騰訊云官網(wǎng)賬號的secret_id和secret_key進(jìn)行權(quán)限驗證,拉取指定bucket里的數(shù)據(jù)顷霹,為了測試咪惠,將output設(shè)置為標(biāo)準(zhǔn)輸出。
執(zhí)行l(wèi)ogstash:
./bin/logstash -f cos.logstash.conf
輸出結(jié)果為:
Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos ] Marker end: access.log
{
"message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
"@version" => "1",
"@timestamp" => 2018-07-30T11:26:17.710Z
}
{
"message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
"@version" => "1",
"@timestamp" => 2018-07-30T11:26:17.711Z
}
在cos中的bucket里上傳了名為access.log的nginx日志淋淀,上述輸出結(jié)果中最后打印出來的每個json結(jié)構(gòu)體構(gòu)成一個event遥昧, 其中message消息即為access.log中每一條日志。