streamsets利用jython實現(xiàn)數(shù)據校驗

一胶征、需求:

利用mongo元數(shù)據庫中提供的資源描述导犹,去校驗csv中的每條數(shù)據

二怨规、實現(xiàn)

首先配置好原數(shù)據,以及路徑傳遞锡足,還有jython模塊

注意,jython有很多第三方包是沒辦法直接使用的壳坪,需要用sys去加載舶得,這時我們會碰上一個最大的難題,就是第三方包的處理爽蝴。由于jython是運行在jvm上的沐批,所以,需要c語言運行環(huán)境的包在此時都無法調用成功蝎亚,例如pandas九孩,numpy等,但其他第三方包在sys成功加載后還是可以調用成功的发框,例如pymongo,要把這些包和其依賴包放在指定路徑下躺彬。

streamsets最惡心的一點就是云端調試,問題與bug都要放在records.output中去打印輸出

還有就是要非常注意streamsets本身的知識和結構,比如records是個list梅惯,而record是個對象宪拥;batch by batch ?和record by record是兩種不同的運行模式等,如何利用他們的性質進行編程仍是我們需要學習的

注意python格式的問題铣减,循環(huán)的問題她君,還有業(yè)務邏輯處理的問題

三、編程

import sys

#sys.path.append('D:\JavaWorkplace\jython\jpython')

sys.path.append("/home/fengwenke/usr/streamset/jar/JPS.jar")

sys.path.append("/home/fengwenke/usr/streamset/python")

sys.setrecursionlimit(1000000)

from pymongo import MongoClient

import datetime as dt

import re

import json

conn = MongoClient('114.115.156.237', 27027)

db = conn.bigdata?

db.authenticate("gwssi", "gwssi123")

res = db.resourceProfile

for record in records:

? name = record.value['filepath']

#這個6和0是寫死的葫哗,需要改缔刹,6需要根據csv路徑的不同進行修改 0可能不需要改

csvName = name.split('/')[6]

tableName = csvName.split('_')[0]

a =list(res.find({"essentialInfo.resCode":tableName}))

meteData = []

for i in a:


? ? for s in i['dataInfos']:


? ? ? ? meteData.append(s['isPrimaryKey'])

? ? ? ? meteData.append(s['dataName'])

? ? ? ? meteData.append(s['dataType'])

print dt

newDate = dt.datetime.utcnow().strftime("%Y-%m-%d")

meteNameCollection = []

meteTypeCollection = []

meteIsprikeyCollection = []

#從mongo里拿出元數(shù)據的名字

for meteNameIndex in range(len(meteData)):

? if (meteNameIndex+2)%3 ==0:

? ? meteName = meteData[meteNameIndex]

? ? meteNameCollection.append(meteName)

? ? meteType = meteData[meteNameIndex+1]

? ? meteTypeCollection.append(meteType)

? ? meteIspri = meteData[meteNameIndex -1]

? ? meteIsprikeyCollection.append(meteIspri)

dataNameCollection = []

for recordIndex in range(len(records)):

? try:


? ? # Create a string field to store the current date with the specified format

? ? #record.value["3"] = meteData[8]


? ? #從數(shù)據流里取出第一列

? ? if recordIndex == 0:

? ? ? #從第一列里拿出每個名字


? ? ? for dataNameIndex in range(len(records[0].value)):

? ? ? ? dataNameCollection.append(records[0].value['{0}'.format(dataNameIndex)])


? ? else:

? ? ? #利用這個數(shù)據匹配元數(shù)據球涛,并對其他的數(shù)據類型進行校驗? 為什么不拆成兩層循環(huán),因為record記錄會覆蓋

? ? ? for dataNameIndex2 in range(len(dataNameCollection)):

? ? ? ? for meteNameIndex in range(len(meteNameCollection)):

? ? ? ? ? if dataNameCollection[dataNameIndex2] == meteNameCollection[meteNameIndex]:

? ? ? ? ? ? if meteIsprikeyCollection[meteNameIndex] == 1:

? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 時間校镐。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":

? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 字母數(shù)字混合數(shù)據亿扁。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":

? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據

? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'

? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)

? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)

? ? ? ? ? ? ? ? if mixJudge :

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"

? ? ? ? ? ? ? ? #英文

? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #中文

? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #空值

? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "zhujianweikong"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #整數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":

? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #浮點數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":

? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? else:

? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 時間。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":

? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 字母數(shù)字混合數(shù)據灭翔。測試完成

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":

? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據

? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'

? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)

? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)

? ? ? ? ? ? ? ? if mixJudge :

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"

? ? ? ? ? ? ? ? #英文

? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #中文

? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? #空值

? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #整數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":

? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"

? ? ? ? ? ? ? #浮點數(shù)

? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":

? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'

? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]

? ? ? ? ? ? ? ? if re.match(matchRule,matchData):

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"

? ? ? ? ? ? ? ? else:

? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"




? ? # Write record to processor output

? ? output.write(records[recordIndex])

? ? conn.close()

? except Exception as e:

? ? # Send record to error

? ? error.write(records[recordIndex], str(e))

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末魏烫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子肝箱,更是在濱河造成了極大的恐慌哄褒,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煌张,死亡現(xiàn)場離奇詭異呐赡,居然都是意外死亡,警方通過查閱死者的電腦和手機骏融,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門链嘀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人档玻,你說我怎么就攤上這事怀泊。” “怎么了误趴?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵霹琼,是天一觀的道長。 經常有香客問我凉当,道長枣申,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任看杭,我火速辦了婚禮忠藤,結果婚禮上,老公的妹妹穿的比我還像新娘楼雹。我一直安慰自己模孩,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布贮缅。 她就那樣靜靜地躺著瓜贾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪携悯。 梳的紋絲不亂的頭發(fā)上祭芦,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機與錄音憔鬼,去河邊找鬼龟劲。 笑死胃夏,一個胖子當著我的面吹牛,可吹牛的內容都是我干的昌跌。 我是一名探鬼主播仰禀,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蚕愤!你這毒婦竟也來了答恶?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤萍诱,失蹤者是張志新(化名)和其女友劉穎悬嗓,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體裕坊,經...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡包竹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了籍凝。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片周瞎。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖饵蒂,靈堂內的尸體忽然破棺而出声诸,到底是詐尸還是另有隱情,我是刑警寧澤退盯,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布双絮,位于F島的核電站,受9級特大地震影響得问,放射性物質發(fā)生泄漏。R本人自食惡果不足惜软免,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一宫纬、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧膏萧,春花似錦漓骚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至曹锨,卻和暖如春孤个,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沛简。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工齐鲤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留斥废,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓给郊,卻偏偏與公主長得像牡肉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子淆九,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內容

  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,453評論 0 13
  • "use strict";function _classCallCheck(e,t){if(!(e instanc...
    久些閱讀 2,030評論 0 2
  • 概念:邊際成本 邊際成本指的是每多生產或者每多賣一件產品统锤,所帶來的總成本的增加。邊際成本的結構性改變炭庙,是互聯(lián)網經濟...
    任性的Cissy閱讀 281評論 0 0
  • 新工作入職的第一天,對周遭的環(huán)境及同事難免感到陌生擦盾,小心翼翼的打量每一個人嘲驾,做每一件事 第一次接觸到秋秋是我在區(qū)域...
    竹與千尋閱讀 346評論 0 0
  • 訪妙玉乞紅梅原文 酒未開樽句未裁,尋春問臘到蓬萊迹卢。不求大士瓶中露辽故,為乞孀娥檻外梅。入世冷挑紅雪去腐碱,離塵香割紫云來誊垢。...
    如是緣起閱讀 451評論 0 0