一、項(xiàng)目介紹
Adventure
Works Cycles是一家生產(chǎn)和銷售自行車及其附帶產(chǎn)品的大型跨國(guó)制造公司梆暮,業(yè)務(wù)遍布全球服协。亞太市場(chǎng)則是其重點(diǎn)發(fā)展地區(qū),其中中國(guó)市場(chǎng)更是核心區(qū)域啦粹。
這家公司主要有下面四個(gè)產(chǎn)品線:
● Adventure Works Cycles生產(chǎn)的自行車偿荷;
● 自行車部件,例如車輪唠椭,踏板或制動(dòng)組件跳纳;
● 從供應(yīng)商處購(gòu)買的自行車服裝,用于轉(zhuǎn)售給Adventure Works Cycles的客戶贪嫂;
● 從供應(yīng)商處購(gòu)買的自行車配件寺庄,用于轉(zhuǎn)售給Adventure Works Cycles的客戶
二、分析目的
隨著公司業(yè)務(wù)規(guī)模逐漸擴(kuò)大的情況力崇,公司需要增強(qiáng)數(shù)據(jù)化管理斗塘,滿足業(yè)務(wù)同事實(shí)現(xiàn)自主分析的需求,從而實(shí)現(xiàn)對(duì)市場(chǎng)的快速判斷亮靴。因此需要于數(shù)據(jù)部分進(jìn)行溝通分析業(yè)務(wù)指標(biāo)需求逛拱,構(gòu)建可視化看板,并自動(dòng)更新
1台猴、通過(guò)Python對(duì)數(shù)據(jù)進(jìn)行加工,從整體、地域饱狂、時(shí)間等角度曹步,對(duì)銷量、銷售額休讳、客單價(jià)等指標(biāo)進(jìn)行分析讲婚,搭建可視化看板。
2俊柔、通過(guò)linux服務(wù)器部署代碼筹麸,實(shí)現(xiàn)數(shù)據(jù)的每日自動(dòng)更新。
三雏婶、項(xiàng)目過(guò)程
● 數(shù)據(jù)觀察
● 指標(biāo)搭建
● 代碼加工
● 數(shù)據(jù)自動(dòng)更新
● 搭建可視化看板
1物赶、數(shù)據(jù)觀察
MySQL數(shù)據(jù)庫(kù)中包含多張信息表,但我們只需要用到以下三張表:
(1) ods_sales_orders:訂單明細(xì)表(2) ods_customers:用戶信息表
(3) dim_date_df:日期維度表
2留晚、指標(biāo)搭建
從當(dāng)日酵紫、昨日、當(dāng)月错维、當(dāng)季奖地、當(dāng)年日期維度角度,對(duì)銷售金額赋焕、銷量参歹、同比等數(shù)據(jù)進(jìn)行分析;
3隆判、代碼加工
(1) dw_order_by_day:每日聚合表
從MySQL數(shù)據(jù)庫(kù)銷售訂單表犬庇,按照日期將每日信息進(jìn)行聚合重組,最后計(jì)算銷售額蜜氨、訂單數(shù)械筛、每日環(huán)比等指標(biāo)。部分Python代碼如下:
# 按日聚合訂單表
def sum_amount_order(adventure_conn_read):
"""sum_amount_order:銷量訂單聚合表
具體:讀取ods_sales_orders(訂單明細(xì)表)飒炎,根據(jù)create_date聚合,求總銷量/訂單量/客單價(jià)"""
try:
# 讀取ods_sales_orders表數(shù)據(jù)
sum_amount_order = pd.read_sql_query("select * from ods_sales_orders ",
con=adventure_conn_read)
# 聚合埋哟。根據(jù)create_date分組求總和單價(jià) 及 客戶數(shù)量
sum_amount_order = sum_amount_order.groupby(by='create_date').agg(
{'unit_price': sum, 'customer_key': pd.Series.nunique}).reset_index()
# 修改列名
sum_amount_order.rename(columns={'unit_price': 'sum_amount',
'customer_key': 'sum_order'},
inplace=True)
# 計(jì)算客單價(jià)
sum_amount_order['amount_div_order'] = \
sum_amount_order['sum_amount'] / sum_amount_order['sum_order']
# 修改列名
sum_amount_order.rename(columns = {'customer_key': 'order_counts',
"unit_price": "sum_amount"}, inplace=True)
return sum_amount_order
# 如果錯(cuò)誤,則填寫錯(cuò)誤日志
except Exception as e:
logger.info("sum_amount_order異常,報(bào)錯(cuò)信息:{}".format(e))
# 生成目標(biāo)銷售額和銷量
def add_order_goal(sum_amount_order):
"""add_order_goal:生成目標(biāo)金額及目標(biāo)銷量
具體:利用空列表及循環(huán)生成對(duì)應(yīng)隨機(jī)值郎汪,與銷量訂單聚合表合并形成sum_amount_order_goal(銷量訂單聚合目標(biāo)表)"""
try:
# 定義兩個(gè)列表赤赊,一個(gè)為每日銷售額目標(biāo)、一個(gè)為每日銷量目標(biāo)
sum_amount_goal_list = []
sum_order_goal_list = []
# 獲取sum_amount_order中的create_date
create_date_list = list(sum_amount_order['create_date'])
for i in create_date_list:
# 生成一個(gè)在[0.85,1.1]隨機(jī)數(shù)
a = random.uniform(0.85, 1.1)
b = random.uniform(0.85, 1.1)
# 生成每日目標(biāo)銷售額和目標(biāo)銷量
amount_goal = list(sum_amount_order[sum_amount_order['create_date'] == i]
['sum_amount'])[0] * a # 對(duì)應(yīng)日期下生成總金額(sum_amount)*a的列
order_goal = list(sum_amount_order[sum_amount_order['create_date'] == i]
['sum_order'])[0] * b # 對(duì)應(yīng)日期下生成總訂單數(shù)(sum_order)*b的列
sum_amount_goal_list.append(amount_goal) # 將生成的目標(biāo)值加入空列表
sum_order_goal_list.append(order_goal)
# 合并日聚合訂單表煞赢、目標(biāo)銷售額表和目標(biāo)銷量表抛计,生成銷售聚合表
sum_amount_order_goal = pd.concat([sum_amount_order, pd.DataFrame(
{'sum_amount_goal': sum_amount_goal_list, 'sum_order_goal':
sum_order_goal_list})], axis=1)
# 計(jì)算完成率
sum_amount_order_goal["compliance_rate"] = sum_amount_order_goal["sum_amount"] / sum_amount_order_goal[
"sum_amount_goal"]
return sum_amount_order_goal
except Exception as e:
logger.info("add_order_goal異常,報(bào)錯(cuò)信息:{}".format(e))
# 讀取日期維度表
def date_data(adventure_conn_tosql):
"""讀取dim_date_df日期維度表"""
try:
date_sql = """
select create_date,
is_current_year,
is_last_year,
is_yesterday,
is_today,
is_21_day,
is_current_month,
is_current_quarter
from dim_date_df"""
date_info = pd.read_sql_query(date_sql, con=adventure_conn_tosql)
return date_info
except Exception as e:
logger.info("date_data異常,報(bào)錯(cuò)信息:{}".format(e))
# 合并銷售聚合目標(biāo)表和日期維度表
def merge_data(sum_amount_order_goal, date_info):
"""參數(shù)解釋:sum_amount_order_goal銷量訂單聚合目標(biāo)表,
date_info日期維度表(來(lái)自date_data函數(shù))
輸出:amount_order_by_day銷量訂單聚合目標(biāo)及日期維度表
"""
try:
sum_amount_order_goal['create_date'] = sum_amount_order_goal['create_date']. \
apply(lambda x: x.strftime('%Y-%m-%d')) # 轉(zhuǎn)化create_date格式為標(biāo)準(zhǔn)日期格式
amount_order_by_day = pd.merge(sum_amount_order_goal, date_info,
on='create_date', how='inner') # 通過(guò)主鍵create_date連接日期維度
return amount_order_by_day
except Exception as e:
logger.info("merge_data異常,報(bào)錯(cuò)信息:{}".format(e))
# 新表儲(chǔ)存到數(shù)據(jù)庫(kù)
def save_to_mysql(amount_order_by_day, adventure_conn_tosql):
"""將amount_order_by_day數(shù)據(jù)追加到數(shù)據(jù)庫(kù)dw_order_by_day(每日環(huán)比表)當(dāng)中"""
try:
amount_order_by_day['amount_diff'] = amount_order_by_day['sum_amount'].pct_change().fillna(0) # pct_change()表示當(dāng)前元素與先前元素的相差百分比,默認(rèn)豎向照筑,例:前面元素x吹截,當(dāng)前元素y瘦陈,公式 result = (y-x)/x
amount_order_by_day.to_sql('dw_order_by_day_{}'.format(MY_NAME), con=adventure_conn_tosql,
if_exists='replace', index=False) # 追加數(shù)據(jù)至dw_order_by_day
except Exception as e:
logger.info("save_to_mysql異常,報(bào)錯(cuò)信息:{}".format(e))
(2) dw_amount_diff:數(shù)據(jù)同比表
讀取dw_order_by_day數(shù)據(jù),按照今天波俄、昨天晨逝、當(dāng)月、當(dāng)季度懦铺、今年捉貌、去年等日期維度進(jìn)行聚合重組,再分別與去年的數(shù)據(jù)進(jìn)行比較冬念。部分python代碼如下:
def diff(stage, indictor):
"""stage:日期維度的判斷趁窃,如:is_today 內(nèi)有[0,1]
indictor:需取值字段,如:sum_amount(總金額),sum_order(總訂單量)
輸出:當(dāng)前時(shí)間維度下總和急前,去年同期總和"""
try:
# 求當(dāng)前日期維度stage下的indictor總和
current_stage_indictor = dw_order_by_day[dw_order_by_day
[stage] == 1][indictor].sum()
# 取出當(dāng)前日期維度下的前年對(duì)應(yīng)日期列表
before_stage_list = list(dw_order_by_day[dw_order_by_day[stage] == 1]
['create_date'] - datetime.timedelta(days=366))
# 求當(dāng)前日期維度下的前一年對(duì)應(yīng)indictor總和
before_stage_indictor = dw_order_by_day[dw_order_by_day['create_date']
.isin(before_stage_list)][indictor].sum()
return current_stage_indictor, before_stage_indictor
except Exception as e:
logger.info("diff異常,報(bào)錯(cuò)信息:{}".format(e))
def delete_table():
try:
pd.read_sql_query("Truncate table dw_amount_diff_{}".format(STUNDENT_NAME), con=adventure_conn_tosql)
except: # 因?yàn)閯h除插入更新操作沒(méi)有返回值醒陆,程序會(huì)拋出ResourceClosedError,并終止程序叔汁。使用try捕捉此異常统求。
print('繼續(xù)')
logger.info("繼續(xù)運(yùn)行")
if __name__ == "__main__":
'''目的:生成dw_amount_diff當(dāng)日維度表(按當(dāng)天/昨天/當(dāng)月/當(dāng)季/當(dāng)年的同比)'''
"""各階段的金額"""
today_amount, before_year_today_amount = diff('is_today', 'sum_amount')
yesterday_amount, before_year_yesterday_amount = diff('is_yesterday', 'sum_amount')
month_amount, before_year_month_amount = diff('is_current_month', 'sum_amount')
quarter_amount, before_year_quarter_amount = diff('is_current_quarter', 'sum_amount')
year_amount, before_year_year_amount = diff('is_current_year', 'sum_amount')
"""各階段的訂單數(shù)"""
today_order, before_year_today_order = diff('is_today', 'sum_order')
yesterday_order, before_year_yesterday_order = diff('is_yesterday', 'sum_order')
month_order, before_year_month_order = diff('is_current_month', 'sum_order')
quarter_order, before_year_quarter_order = diff('is_current_quarter', 'sum_order')
year_order, before_year_year_order = diff('is_current_year', 'sum_order')
'''同比增長(zhǎng)或同比下降(均與去年對(duì)比):總金額/訂單量/客單價(jià),當(dāng)日/昨日/當(dāng)月/當(dāng)季/當(dāng)年/'''
try:
amount_dic = {'today_diff': [today_amount / before_year_today_amount - 1,
today_order / before_year_today_order - 1,
(today_amount / today_order) / (before_year_today_amount /
before_year_today_order) - 1],
'yesterday_diff': [yesterday_amount / before_year_yesterday_amount - 1,
yesterday_order / before_year_yesterday_order - 1,
(yesterday_amount / yesterday_order) / (before_year_yesterday_amount /
before_year_yesterday_order) - 1],
'month_diff': [month_amount / before_year_month_amount - 1,
month_order / before_year_month_order - 1,
(month_amount / month_order) / (before_year_month_amount /
before_year_month_order) - 1],
'quarter_diff': [quarter_amount / before_year_quarter_amount - 1,
quarter_order / before_year_quarter_order - 1,
(quarter_amount / quarter_order) / (before_year_quarter_amount /
before_year_quarter_order) - 1],
'year_diff': [year_amount / before_year_year_amount - 1,
year_order / before_year_year_order - 1,
(year_amount / year_order) / (before_year_year_amount /
before_year_year_order) - 1],
'flag': ['amount', 'order', 'avg']} # 做符號(hào)簡(jiǎn)稱据块,橫向提取數(shù)據(jù)方便
amount_diff = pd.DataFrame(amount_dic)
amount_diff["create_date"] = today_date.strftime("%Y-%m-%d")
logger.info("""準(zhǔn)備儲(chǔ)存的數(shù)據(jù)為:""")
logger.info(amount_diff.head())
logger.info("""刪除數(shù)據(jù)""")
delete_table()
logger.info("""數(shù)據(jù)存儲(chǔ)""")
"""每次存儲(chǔ)的時(shí)候码邻,使用replace方式,便可替換原有數(shù)據(jù)"""
amount_diff.to_sql('dw_amount_diff_{}'.format(MY_NAME), con=adventure_conn_tosql,
if_exists='replace', index=False) # 存儲(chǔ)為當(dāng)日維度表
logger.info('成功生成dw_amount_diff當(dāng)日維度表')
except ZeroDivisionError as e:
logger.info("請(qǐng)檢查dw_order_by_day表是否有數(shù)據(jù),錯(cuò)誤信息:{}".format(e))
(3) dw_orders_info:銷售信息表
從MySQL數(shù)據(jù)源讀取訂單明細(xì)表、客戶信息表另假、日期維度表像屋,將訂單明細(xì)表與客戶信息表通過(guò)"customer_key"字段合并,再與日期維度表進(jìn)行合并边篮。以下是部分python腳本代碼:
# 數(shù)據(jù)表
def read_date(adventure_conn_read):
try:
# 訂單明細(xì)表
sql_1 = "SELECT * from ods_sales_orders"
ods_sales_orders = pd.read_sql(sql_1,adventure_conn_read)
# 客戶信息表
sql_2 = "SELECT customer_key,gender,chinese_territory,chinese_province,chinese_city from ods_customer"
ods_customer = pd.read_sql(sql_2,adventure_conn_read).drop_duplicates("customer_key")
# 日期維度表
sql_3 = "SELECT create_date, is_today, is_yesterday, is_21_day, is_current_month, is_current_quarter, is_current_year, is_last_year from dim_date_df"
dim_date_df = pd.read_sql(sql_3,adventure_conn_read)
return ods_sales_orders, ods_customer, dim_date_df
except Exception as e:
logger.info("read_date報(bào)錯(cuò)信息:{}".format(e))
# 合并訂單明細(xì)表和客戶信息表
def merge1(ods_sales_orders, ods_customer):
try:
ods_sales_orders['customer_key']=ods_sales_orders['customer_key'].apply(int)
ods_customer['customer_key'] = ods_customer['customer_key'].apply(int)
df1 = pd.merge(ods_sales_orders, ods_customer, on = "customer_key")
return df1
except Exception as e:
logger.info("merge1報(bào)錯(cuò)信息:{}".format(e))
# 合并df1和日期維度表
def merge2(df1, dim_date_df):
try:
df1["create_date"] = pd.to_datetime(df1["create_date"])
dim_date_df["create_date"] = pd.to_datetime(dim_date_df["create_date"])
dw_orders_info = pd.merge(df1, dim_date_df, on = "create_date")
# dw_orders_info["create_date"] = dw_orders_info["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
return dw_orders_info
except Exception as e:
logger.info("merge2報(bào)錯(cuò)信息:{}".format(e))
# 保存至sql
def save_tosql(dw_orders_info,adventure_conn_tosql):
try:
dw_orders_info.to_sql("dw_orders_info_{}".format(MY_NAME),con=adventure_conn_tosql, if_exists = 'replace', index=False)
except Exception as e:
logger.info("save_tosql報(bào)錯(cuò)信息{}".format(e))
4己莺、數(shù)據(jù)自動(dòng)更新
為了保證數(shù)據(jù)及時(shí)更新,通過(guò)Python的schedule模塊戈轿、os模塊以及Linux后臺(tái)功能實(shí)現(xiàn)自動(dòng)更新凌受。
(1) 創(chuàng)建腳本
通過(guò)schedule模塊定時(shí)執(zhí)行:
在每天早上的7點(diǎn),7點(diǎn)20分以及7點(diǎn)40分分別對(duì)三個(gè)表進(jìn)行更新思杯。
# 設(shè)置定時(shí)執(zhí)行代碼
if __name__ == '__main__':
schedule.every().day.at('07:00').do(job1)
schedule.every().day.at('07:20').do(job2)
schedule.every().day.at('07:40').do(job3)
通過(guò)os模塊與命令窗口交互:
os.system(
"/home/anaconda3/bin/python3 /home/frog005/adventure_chen/dw_order_by_day.py >> /home/frog005/adventure_chen/chen_logs/dw_order_by_day_schedule.log 2>&1 &")
(2) 在Linux上部署腳本
在linux服務(wù)器上胜蛉,加入nohup即可使退出終端,腳本依然執(zhí)行色乾,&可令腳本自動(dòng)掛在后臺(tái)執(zhí)行誊册。
nohup python schedule_job_chen.py > schedule_chen.log 2>&1 &
5、搭建可視化看板
在完成上述步驟后暖璧,使用Power BI與MySQL數(shù)據(jù)庫(kù)鏈接案怯,制作可視化看板。共三類:詳細(xì)銷售數(shù)據(jù)頁(yè)面澎办、銷售趨勢(shì)頁(yè)面嘲碱、總體銷售情況頁(yè)面金砍。
可視化報(bào)表地址:請(qǐng)點(diǎn)擊
(1) 詳細(xì)銷售數(shù)據(jù)頁(yè)面
共分為今日、昨日麦锯、本月捞魁、本季度、本年度五個(gè)部分离咐。
(2) 銷售趨勢(shì)頁(yè)面
(3) 總體銷售情況頁(yè)面