Files
master/calctoxdiff.py
wea_ondara 117651d944 wip
2019-07-22 22:04:23 +02:00

234 lines
9.7 KiB
Python

import importlib
import os
import sys
from collections import defaultdict
from datetime import datetime
from os import listdir
from os.path import isfile, join
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import ks_2samp
colors = {'neg': 'red', 'neu': 'green', 'pos': 'blue', 'com': 'orange'}
def main(folder):
outputdir = folder + "/output/ksbatch/"
os.system("mkdir -p " + outputdir)
folder = folder + "/output/batch/"
onlyfiles = [folder + f for f in listdir(folder) if isfile(join(folder, f)) and f.endswith(".py")]
onlyfiles = sorted(onlyfiles)
plotbypost(onlyfiles, outputdir)
plotbydate(onlyfiles, outputdir)
def plotbypost(onlyfiles, outputdir):
files = defaultdict(list)
for f in onlyfiles:
s = f[:-3].split("_")
files[int(s[4])].append(f)
files = {p: sorted(l, key=lambda e: datetime.strptime(e.split("_")[2], "%d-%m-%Y")) for (p, l) in files.items()}
changes_neg = defaultdict(list)
changes_neu = defaultdict(list)
changes_pos = defaultdict(list)
changes_com = defaultdict(list)
for (p, l) in files.items():
if len(l) < 2:
continue
print(p)
for i in range(len(l) - 1):
tox1 = imprt(l[i]).toxlevels
tox2 = imprt(l[i + 1]).toxlevels
neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())]
neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())]
poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())]
comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())]
neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())]
neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())]
poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())]
comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())]
ksneg = ks_2samp(neglevelsflat1, neglevelsflat2)
ksneu = ks_2samp(neulevelsflat1, neulevelsflat2)
kspos = ks_2samp(poslevelsflat1, poslevelsflat2)
kscom = ks_2samp(comlevelsflat1, comlevelsflat2)
changes_neg[p].append(ksneg)
changes_neu[p].append(ksneu)
changes_pos[p].append(kspos)
changes_com[p].append(kscom)
for (p, l) in files.items():
with open(outputdir + "/ks_post_" + str(p) + ".log", "w") as f:
for i in range(len(l) - 1):
f1 = l[i]
f2 = l[i + 1]
f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[p][i]) + "; ks neu = " + str(changes_neu[p][i])
+ "; ks pos = " + str(changes_pos[p][i]) + "; ks com = " + str(changes_com[p][i]) + "\n")
# pval
for (p, l) in files.items():
x = [l[i].split("_")[2] + " -\n" + l[i + 1].split("_")[2] for i in range(len(l) - 1)]
fig = plt.figure(figsize=(16, 12))
for type, changes in {"neg": changes_neg[p], "neu": changes_neu[p], "pos": changes_pos[p], "com": changes_com[p]}.items():
pval = [x.pvalue for x in changes]
plt.plot(x, pval, label=type + ".pval", color=colors[type])
mean = np.mean(pval)
std = np.std(pval)
dev = [(xx, s) for (xx, s) in zip(x, pval) if s <= mean - std or s >= mean + std]
plt.plot(x, [mean] * len(pval), color=colors[type], ls='dashed')
plt.plot([xx for (xx, s) in dev], [s for (xx, s) in dev], color=colors[type], ls='None', marker='o')
plt.title("KS 2-sided test with max " + str(p) + " posts")
plt.xticks(rotation=90)
plt.legend(loc="upper right")
plt.savefig(outputdir + "/ks_post_pval_" + str(p) + ".png", bbox_inches='tight')
plt.close(fig)
# stat
for (p, l) in files.items():
x = [l[i].split("_")[2] + " -\n" + l[i + 1].split("_")[2] for i in range(len(l) - 1)]
fig = plt.figure(figsize=(16, 12))
for type, changes in {"neg": changes_neg[p], "neu": changes_neu[p], "pos": changes_pos[p], "com": changes_com[p]}.items():
stat = [x.statistic for x in changes]
plt.plot(x, stat, label=type + ".stat", color=colors[type])
mean = np.mean(stat)
std = np.std(stat)
dev = [(xx, s) for (xx, s) in zip(x, stat) if s <= mean - std or s >= mean + std]
plt.plot(x, [mean] * len(stat), color=colors[type], ls='dashed')
plt.plot([xx for (xx, s) in dev], [s for (xx, s) in dev], color=colors[type], ls='None', marker='o')
plt.title("KS 2-sided test with max " + str(p) + " posts")
plt.xticks(rotation=90)
plt.legend(loc="upper right")
plt.savefig(outputdir + "/ks_post_stat_" + str(p) + ".png", bbox_inches='tight')
plt.close(fig)
def plotbydate(onlyfiles, outputdir):
files = defaultdict(list)
for f in onlyfiles:
s = f[:-3].split("_")
files[(s[2], s[3])].append(f)
files = {d: sorted(l, key=lambda e: e.split("_")[4]) for (d, l) in files.items()}
changes_neg = defaultdict(list)
changes_neu = defaultdict(list)
changes_pos = defaultdict(list)
changes_com = defaultdict(list)
for (d, l) in files.items():
if len(l) < 2:
continue
print(d)
for i in range(len(l) - 1):
tox1 = imprt(l[i]).toxlevels
tox2 = imprt(l[i + 1]).toxlevels
neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())]
neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())]
poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())]
comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())]
neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())]
neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())]
poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())]
comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())]
ksneg = ks_2samp(neglevelsflat1, neglevelsflat2)
ksneu = ks_2samp(neulevelsflat1, neulevelsflat2)
kspos = ks_2samp(poslevelsflat1, poslevelsflat2)
kscom = ks_2samp(comlevelsflat1, comlevelsflat2)
changes_neg[d].append(ksneg)
changes_neu[d].append(ksneu)
changes_pos[d].append(kspos)
changes_com[d].append(kscom)
for (d, l) in files.items():
with open(outputdir + "/ks_date_" + d[0] + "_" + d[1] + ".log", "w") as f:
for i in range(len(l) - 1):
f1 = l[i]
f2 = l[i + 1]
f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[d][i]) + "; ks neu = " + str(changes_neu[d][i])
+ "; ks pos = " + str(changes_pos[d][i]) + "; ks com = " + str(changes_com[d][i]) + "\n")
# pval
for (d, l) in files.items():
x = [l[i].split("_")[4][:-3] + "-" + l[i + 1].split("_")[4][:-3] for i in range(len(l) - 1)]
fig = plt.figure(figsize=(16, 12))
for type, changes in {"neg": changes_neg[d], "neu": changes_neu[d], "pos": changes_pos[d], "com": changes_com[d]}.items():
pval = [x.pvalue for x in changes]
plt.plot(x, pval, label=type + ".pval", color=colors[type])
mean = np.mean(pval)
std = np.std(pval)
dev = [(xx, s) for (xx, s) in zip(x, pval) if s <= mean - std or s >= mean + std]
plt.plot(x, [mean] * len(pval), color=colors[type], ls='dashed')
plt.plot([xx for (xx, s) in dev], [s for (xx, s) in dev], color=colors[type], ls='None', marker='o')
plt.title("KS 2-sided test with between " + d[0] + " and " + d[1])
plt.xticks(rotation=90)
plt.legend(loc="upper right")
plt.savefig(outputdir + "/ks_date_pval_" + d[0] + "_" + d[1] + ".png", bbox_inches='tight')
plt.close(fig)
# stat
for (d, l) in files.items():
x = [l[i].split("_")[4][:-3] + "-" + l[i + 1].split("_")[4][:-3] for i in range(len(l) - 1)]
fig = plt.figure(figsize=(16, 12))
for type, changes in {"neg": changes_neg[d], "neu": changes_neu[d], "pos": changes_pos[d], "com": changes_com[d]}.items():
stat = [x.statistic for x in changes]
plt.plot(x, stat, label=type + ".stat", color=colors[type])
mean = np.mean(stat)
std = np.std(stat)
dev = [(xx, s) for (xx, s) in zip(x, stat) if s <= mean - std or s >= mean + std]
plt.plot(x, [mean] * len(stat), color=colors[type], ls='dashed')
plt.plot([xx for (xx, s) in dev], [s for (xx, s) in dev], color=colors[type], ls='None', marker='o')
plt.title("KS 2-sided test with between " + d[0] + " and " + d[1])
plt.xticks(rotation=90)
plt.legend(loc="upper right")
plt.savefig(outputdir + "/ks_date_stat_" + d[0] + "_" + d[1] + ".png", bbox_inches='tight')
plt.close(fig)
def imprt(file):
spec = importlib.util.spec_from_file_location("module.name", file)
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)
return foo
def flatmap(arr):
return [item for sublist in arr for item in sublist]
def filecmp(file1, file2):
if file1 == file2:
return 0
s1 = file1.split("_")
s2 = file2.split("_")
d1 = datetime.strptime(s1[2], "%d-%m-%Y")
d2 = datetime.strptime(s2[2], "%d-%m-%Y")
if d1 < d2:
return -1
elif d1 > d2:
return 1
return 0
if __name__ == "__main__":
# execute only if run as a script
usage = sys.argv[0] + " <folder>"
if len(sys.argv) < 2:
print(usage)
sys.exit(1)
folder = sys.argv[1]
if not os.path.isdir(folder):
print(folder + " is not a folder")
sys.exit(1)
main(folder)