直接上代碼:
import pandas as pd
import numpy as np
import os,sys
from sklearn.preprocessing import StandardScaler
from scipy.stats import shapiro, ttest_ind, wilcoxon,levene,kruskal,mannwhitneyu
from statistics import median
from statistics import mean
from scipy.stats import normaltest
import itertools
import random
from scipy.spatial.distance import jensenshannon
from scipy.stats import gaussian_kde
import matplotlib.pyplot as plt
import pickle
os.makedirs('./Js_plot', exist_ok=True)
scaler = StandardScaler()
work="./"
tsv_files = [f for f in os.listdir(work) if f.endswith('LE5X.csv')]
tsv_files = [i for i in tsv_files if i.find('Frag2023_FragArm2023')==-1]
def js_divergence_scipy(p, q,p_SampleID,q_SampleID,name):
data1 =np.array(np.nan_to_num(p), dtype=np.float64)
data2=np.array(np.nan_to_num(q), dtype=np.float64)
kde1 = gaussian_kde(data1)
kde2 = gaussian_kde(data2)
x_range = np.linspace(min(data1.min(), data2.min()), max(data1.max(), data2.max()), 1000)
prob_dist1 = kde1.evaluate(x_range)
prob_dist2 = kde2.evaluate(x_range)
out=jensenshannon(prob_dist1, prob_dist2)
plt.clf()
plt.figure(figsize=(10, 6))
plt.plot(x_range, prob_dist1, 'r', label=p_SampleID)
plt.plot(x_range, prob_dist2, 'b', label=q_SampleID)
plt.xlabel('Feature Value')
plt.ylabel('Probability Density')
plt.title(name.split('.')[0]+' Probability Distribution')
plt.legend()
F=p_SampleID+"_"+q_SampleID+"_"+name.split('.')[0]
plt.savefig('./Js_plot/JSD_' + F + '.png')
if(out >=0):
F=p_SampleID+"_"+q_SampleID+"_"+name.split('.')[0]
with open('./Js_plot/'+F,'w+') as f:
f.write(str(out)+"\n")
return out
return None
def paired_delta(df1, df2):
allsum=0
# 刪除元素
list1,list2=def_na(df1, df2)
m=len(list1)
n=len(list2)
if(m*m==0 or m==0 or n==0 or m!=n):
return []
for i in range(0,m):
allsum=computer(list1[i],list2[i])+allsum
d=allsum/(m)
return abs(d)
def def_na(list1,list2):
nan_idx1 = [i for i, v in enumerate(list1) if np.isnan(v)]
nan_idx2 = [i for i, v in enumerate(list2) if np.isnan(v)]
# 合并兩個列表的缺失索引
nan_idx = list(set(nan_idx1 + nan_idx2))
# 刪除元素
list1 = [v for i, v in enumerate(list1) if i not in nan_idx]
list2 = [v for i, v in enumerate(list2) if i not in nan_idx]
return list1,list2
def tmpdffun(tmpdf):
scaler = StandardScaler()
grouptmp = tmpdf[['SampleID', 'group']].copy()
features = tmpdf.drop(['SampleID', 'group'], axis=1)
scaled_features = scaler.fit_transform(features)
scaled_df = pd.concat([grouptmp, pd.DataFrame(scaled_features, columns=features.columns)], axis=1)
return scaled_df
groupname_list=sys.argv[1]
groupFlag=sys.argv[2] #'TypeOfCollection'
#group1=pd.read_table(work+"group1.TypeOfCollection.info.list")
group1=pd.read_table(work+groupname_list)
if 'LabID' not in group1.columns.to_list():
group1['LabID']=group1['SampleID'].map(lambda x:x.split("-")[0])
group1=group1[['SampleID',groupFlag,'LabID']] ###LabID 是具體配對編號
print(set(group1[groupFlag]))
group1name=groupFlag
group1.columns=['SampleID','group','LabID']
mygroup_name=group1
mygroup_name_list=list(set(mygroup_name['group']))[0]
mygroup_name_dict_a=[]
mygroup_name_dict_b=[]
def paired_dict(tmpdf):
a=tmpdf.query("group==@mygroup_name_list")['SampleID'].to_list()[0]
b=tmpdf.query("group!=@mygroup_name_list")['SampleID'].to_list()[0]
mygroup_name_dict_a.append(a)
mygroup_name_dict_b.append(b)
mygroup_name.groupby("LabID").apply(paired_dict)
mygroup_name=mygroup_name[['SampleID','group']]
mydict={}
for name in tsv_files:
print(name)
mydict[name]=[]
datafile=work+name
frature=pd.read_csv(datafile,header=0)
df=pd.merge(mygroup_name,frature,on=['SampleID'])
df=df.dropna(axis=1)
#df=df.dropna(axis=1,thresh=round( df.shape[0] / 10) ) ##列NA限制10%
df1=tmpdffun(df) ## scale
# df1=df
groupA=df1.query("SampleID ==@mygroup_name_dict_a")
groupA['SampleID'] = pd.Categorical(groupA['SampleID'], categories=mygroup_name_dict_a, ordered=True)
groupA = groupA.sort_values('SampleID')
groupB=df1.query("SampleID ==@mygroup_name_dict_b")
groupB['SampleID'] = pd.Categorical(groupB['SampleID'], categories=mygroup_name_dict_b, ordered=True)
groupB = groupB.sort_values('SampleID')
for lengroup in range(groupB.shape[0]):
data_b=np.array(groupA.iloc[lengroup,2:])
b_SampleID=groupA.iloc[lengroup,0]
data_p=np.array(groupB.iloc[lengroup,2:])
p_SampleID=groupB.iloc[lengroup,0]
JSdis=js_divergence_scipy(data_b,data_p,b_SampleID,p_SampleID,name)
if(JSdis is None):
pass
#mydict[name].append(0)
else:
mydict[name].append(JSdis)
name=[]
value=[]
for k,v in mydict.items():
for j in v:
name.append(k.replace(".LE5X.csv","").split(".")[-1])
value.append(j)
newdf=pd.DataFrame({'Categories': name, 'Values': value})
newdf['Categories']=newdf['Categories'].map(lambda x:x.replace("854TF_OCF_Tcell","griffin_ocf"))
newdf['group']=groupFlag
newdf.to_csv(work+"JS_"+groupFlag,index=False,sep="\t")