一胶征、需求:
利用mongo元數(shù)據庫中提供的資源描述导犹,去校驗csv中的每條數(shù)據
二怨规、實現(xiàn)
首先配置好原數(shù)據,以及路徑傳遞锡足,還有jython模塊
注意,jython有很多第三方包是沒辦法直接使用的壳坪,需要用sys去加載舶得,這時我們會碰上一個最大的難題,就是第三方包的處理爽蝴。由于jython是運行在jvm上的沐批,所以,需要c語言運行環(huán)境的包在此時都無法調用成功蝎亚,例如pandas九孩,numpy等,但其他第三方包在sys成功加載后還是可以調用成功的发框,例如pymongo,要把這些包和其依賴包放在指定路徑下躺彬。
streamsets最惡心的一點就是云端調試,問題與bug都要放在records.output中去打印輸出
還有就是要非常注意streamsets本身的知識和結構,比如records是個list梅惯,而record是個對象宪拥;batch by batch ?和record by record是兩種不同的運行模式等,如何利用他們的性質進行編程仍是我們需要學習的
注意python格式的問題铣减,循環(huán)的問題她君,還有業(yè)務邏輯處理的問題
三、編程
import sys
#sys.path.append('D:\JavaWorkplace\jython\jpython')
sys.path.append("/home/fengwenke/usr/streamset/jar/JPS.jar")
sys.path.append("/home/fengwenke/usr/streamset/python")
sys.setrecursionlimit(1000000)
from pymongo import MongoClient
import datetime as dt
import re
import json
conn = MongoClient('114.115.156.237', 27027)
db = conn.bigdata?
db.authenticate("gwssi", "gwssi123")
res = db.resourceProfile
for record in records:
? name = record.value['filepath']
#這個6和0是寫死的葫哗,需要改缔刹,6需要根據csv路徑的不同進行修改 0可能不需要改
csvName = name.split('/')[6]
tableName = csvName.split('_')[0]
a =list(res.find({"essentialInfo.resCode":tableName}))
meteData = []
for i in a:
? ? for s in i['dataInfos']:
? ? ? ? meteData.append(s['isPrimaryKey'])
? ? ? ? meteData.append(s['dataName'])
? ? ? ? meteData.append(s['dataType'])
print dt
newDate = dt.datetime.utcnow().strftime("%Y-%m-%d")
meteNameCollection = []
meteTypeCollection = []
meteIsprikeyCollection = []
#從mongo里拿出元數(shù)據的名字
for meteNameIndex in range(len(meteData)):
? if (meteNameIndex+2)%3 ==0:
? ? meteName = meteData[meteNameIndex]
? ? meteNameCollection.append(meteName)
? ? meteType = meteData[meteNameIndex+1]
? ? meteTypeCollection.append(meteType)
? ? meteIspri = meteData[meteNameIndex -1]
? ? meteIsprikeyCollection.append(meteIspri)
dataNameCollection = []
for recordIndex in range(len(records)):
? try:
? ? # Create a string field to store the current date with the specified format
? ? #record.value["3"] = meteData[8]
? ? #從數(shù)據流里取出第一列
? ? if recordIndex == 0:
? ? ? #從第一列里拿出每個名字
? ? ? for dataNameIndex in range(len(records[0].value)):
? ? ? ? dataNameCollection.append(records[0].value['{0}'.format(dataNameIndex)])
? ? else:
? ? ? #利用這個數(shù)據匹配元數(shù)據球涛,并對其他的數(shù)據類型進行校驗? 為什么不拆成兩層循環(huán),因為record記錄會覆蓋
? ? ? for dataNameIndex2 in range(len(dataNameCollection)):
? ? ? ? for meteNameIndex in range(len(meteNameCollection)):
? ? ? ? ? if dataNameCollection[dataNameIndex2] == meteNameCollection[meteNameIndex]:
? ? ? ? ? ? if meteIsprikeyCollection[meteNameIndex] == 1:
? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 時間校镐。測試完成
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":
? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 字母數(shù)字混合數(shù)據亿扁。測試完成
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":
? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據
? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'
? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)
? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)
? ? ? ? ? ? ? ? if mixJudge :
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"
? ? ? ? ? ? ? ? #英文
? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? #中文
? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? #空值
? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "zhujianweikong"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #整數(shù)
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":
? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #浮點數(shù)
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":
? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? else:
? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 時間。測試完成
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "timestamp":
? ? ? ? ? ? ? ? matchRule = r'\d{4}(\-|\/|.)\d{1,2}\1\d{1,2}'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #讀取對應的元數(shù)據類型. 字母數(shù)字混合數(shù)據灭翔。測試完成
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "varchar":
? ? ? ? ? ? ? ? #字母數(shù)字混合數(shù)據
? ? ? ? ? ? ? ? mixedData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? mixRule = '^(?=.*\d)(?=.*[a-zA-Z])(?=.*[\u4E00-\u9FA5])[\u4E00-\u9FA5A-Za-z0-9]*$'
? ? ? ? ? ? ? ? rg = re.compile(mixRule,re.IGNORECASE|re.DOTALL)
? ? ? ? ? ? ? ? mixJudge = rg.search(mixedData)
? ? ? ? ? ? ? ? if mixJudge :
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)]= "true"
? ? ? ? ? ? ? ? #英文
? ? ? ? ? ? ? ? elif re.match('^[A-Za-z]+$',mixedData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? #中文
? ? ? ? ? ? ? ? elif re.match(u"[\u4e00-\u9fa5]+",mixedData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? #空值
? ? ? ? ? ? ? ? elif records[recordIndex].value['{0}'.format(dataNameIndex2)] == "":
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #整數(shù)
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "integer" or meteTypeCollection[meteNameIndex] == "bigint":
? ? ? ? ? ? ? ? matchRule = '^-?\\d+$'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? ? ? ? ? ? #浮點數(shù)
? ? ? ? ? ? ? if meteTypeCollection[meteNameIndex] == "float" or meteTypeCollection[meteNameIndex] == "double":
? ? ? ? ? ? ? ? matchRule = '^(-?\\d+)(\\.\\d+)?$'
? ? ? ? ? ? ? ? matchData = records[recordIndex].value['{0}'.format(dataNameIndex2)]
? ? ? ? ? ? ? ? if re.match(matchRule,matchData):
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "true"
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? records[recordIndex].value['{0}'.format(dataNameIndex2)] = "false"
? ? # Write record to processor output
? ? output.write(records[recordIndex])
? ? conn.close()
? except Exception as e:
? ? # Send record to error
? ? error.write(records[recordIndex], str(e))