需求:
1.從hive 表中獲取數(shù)據(jù)屑墨。
2.計算各個指標(biāo)與主分析指標(biāo)間的相關(guān)系數(shù)迹冤。
3.將計算出來的相關(guān)系數(shù)猪半,放入csv 文件砸彬,待使用坡倔。
首先只洒,相關(guān)系數(shù)我們選擇了皮爾遜相關(guān)系數(shù)达箍,python的實(shí)現(xiàn)也是從網(wǎng)上直接找到的铣卡。
然后確定獲取hive 數(shù)據(jù)的方式锉桑。公司環(huán)境沒有pyspark排霉,排除了用spark 操作hive 的方法。就想套用值之前腳本使用的民轴,用impala 連接hive 的獲取方法攻柠。結(jié)果基本開發(fā)完成,發(fā)現(xiàn)后裸,一個是由于sql 數(shù)據(jù)量比較大瑰钮,耗時比較長,導(dǎo)致總是自動斷開連接微驶,以至于跑不完程序浪谴。還有一個,由于其中的一個指標(biāo)計算需要用到udf 函數(shù)因苹,添加jar 包時苟耻,使用impala 的方式總是無法識別路徑。后來認(rèn)為應(yīng)該是這種方式只能支持查詢扶檐,無法支持這種添加臨時函數(shù)的操作凶杖,沒辦法放棄了這一條路。
最后呢是選擇了直接打開hive 的簡單粗暴的模式:
os.popen("""hive -S -e '{}' """.format(sql))
ok 下面貼上我的代碼蘸秘,由于我是剛開始在工作中使用python官卡,正在學(xué)習(xí),肯定有很多不足的地方醋虏,如果能得到各位指點(diǎn)一二寻咒,那我真是非常感謝,不管是思路上的颈嚼,還是代碼上的毛秘,都希望大家不吝賜教。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import os
import sys
import pandas as pd
from math import sqrt
from operator import itemgetter, attrgetter
#執(zhí)行hivesql,os.popen() 方法用于從一個命令打開一個管道叫挟,對于我來說正好是需要sql 的一行結(jié)果作為一個整體艰匙,剛好放到一個list 中。
#輸出到指定日志文件
logging.basicConfig(level=logging.INFO,
filename='....../cor_coe.log',
filemode='a',
format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='[%Y-%m-%d %H:%M:%S]'
)
def getHiveResult(sql):
output = os.popen("""hive -S -e '{}' """.format(sql))
result = output.readlines()
#形式:['字段名稱\t字段名稱','[第一行結(jié)果\t分隔]','[第二行結(jié)果\t分隔]'....]
return result
#這里是因為上面的result 這個list抹恳,每個元素的最后都加上了一個\n员凝,我要把它去掉。
def trans_list(str):
return str.replace('\n','')
# 將'[...]' 這個字符串奋献,轉(zhuǎn)換成一個list.
def str2list(str):
list1 = str.split(',')
list1[0] = list1[0].replace('[','')
list1[-1] = list1[-1].replace(']','')
return [float(x) for x in list1]
#皮爾遜相關(guān)系數(shù)公式:x,y 兩個變量的 協(xié)方差 / 標(biāo)準(zhǔn)差的乘積
# 乘積之和函數(shù)
def multipl(a,b):
sumofab=0.0
for i in range(len(a)):
temp=a[i]*b[i]
sumofab+=temp
return sumofab
# 皮爾遜相關(guān)系數(shù)函數(shù)
def corrcoef(x,y):
n=len(x)
#求和
sum1=sum(x)
sum2=sum(y)
#求乘積之和
sumofxy=multipl(x,y)
#求平方和
sumofx2 = sum([pow(i,2) for i in x])
sumofy2 = sum([pow(j,2) for j in y])
# 協(xié)方差 乘積之和 - 和的乘積
num=sumofxy-(float(sum1)*float(sum2)/n)
#標(biāo)準(zhǔn)差 * 標(biāo)準(zhǔn)差
den=sqrt((sumofx2-float(sum1**2)/n)*(sumofy2-float(sum2**2)/n))
return num/den
sql 就忽略了健霹。sql 最終的結(jié)果形式是:
南京 [11,13,10.3,12.5..........] 這是一行記錄,只有兩個字段瓶蚂。
if __name__ == "__main__":
process_start = datetime.datetime.now()
logging.info("程序開始時間:" + str(process_start))
# 獲取腳本外的日期參數(shù),并添加連接符
ymd=sys.argv[1]
y_m_d="-".join((ymd[0:4],ymd[4:6],ymd[6:8]))
sql1 = sql1.format(DT=y_m_d)
try:
result = getHiveResult(sql1)
except Exception as e:
logging.info("調(diào)用getHiveResult()函數(shù)報錯:" + traceback.format_exc())
list_result_name = ['','name1','name2','name3','name4']
list_result1 = result[1:]
list_result2 = []
list_result3 = []
for el1 in list_result1:
list_result2.append(trans_list(el1))
for el2 in list_result2:
list_tmp = []
list_tmp = el2.split('\t')
for el3 in list_tmp:
num = list_tmp.index(el3)
if num >=2:
try:
list_result3.append(( list_tmp[0],list_result_name[1]+"_"+list_result_name[num], round(corrcoef(str2list(list_tmp[1]),str2list(el3)),4) ))
except Exception as e:
logging.info("計算皮爾遜相關(guān)系數(shù)報錯:" + traceback.format_exc())
#突然覺得python 的排序功能還挺強(qiáng)大的糖埋。在對我list 中的元祖排序
list_result3 = sorted(list_result3,key=itemgetter(0,2),reverse=True)
try:
df = pd.DataFrame(list_result3)
except Exception as e:
logging.info("list_result3轉(zhuǎn)化成 df 時報錯:" + traceback.format_exc())
outputpath = 'path/cor_coe_'+ymd+'.csv'
try:
df.to_csv(outputpath,index=False,sep=',',header=['城市','相關(guān)因子','相關(guān)系數(shù)'])
except Exception as e:
logging.info("輸出到 csv 文件報錯:" + traceback.format_exc())
print ('process_end:',datetime.datetime.now())
現(xiàn)在我還有一個瓶頸,就是sql 跑的太慢了窃这,有十個sql瞳别,每個sql 涉及到的表都在千萬到億條數(shù)據(jù)左右,總共的時間需要22min左右杭攻,腳本的總時間是25min左右祟敛。已經(jīng)嘗試設(shè)置了很多hive 所謂的優(yōu)化參數(shù),除了合并小文件朴上,提升了將近4分鐘的速度之外垒棋,其他的調(diào)整都沒有什么效果。而且從執(zhí)行過程來看痪宰,并沒有明顯的數(shù)據(jù)傾斜叼架,就是map 和 reduce 的過程就比較慢。如果后期找到方法之后我還會追加進(jìn)來衣撬。也希望看到這篇文章的朋友能提一些寶貴的意見乖订。