wip
This commit is contained in:
123
analyze.py
Normal file
123
analyze.py
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from datetime import timedelta
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from collections import defaultdict
|
||||||
|
from loader import load, dmt
|
||||||
|
|
||||||
|
printnoln = lambda text: print(text, end='', flush=True)
|
||||||
|
rprint = lambda text: print('\r' + text)
|
||||||
|
|
||||||
|
DAYS_NEW_USER = 7
|
||||||
|
OLD_USER_YEAR = 3
|
||||||
|
|
||||||
|
analyser = SentimentIntensityAnalyzer()
|
||||||
|
|
||||||
|
|
||||||
|
def main(folder, option_date_from, option_date_to, option_posts):
|
||||||
|
users, posts, firstcontrib, sumcontrib = load(folder)
|
||||||
|
|
||||||
|
# filter users by option_date_from <= creation date <= option_date_to
|
||||||
|
newusers = dmt(users).filter(lambda u: option_date_from <= u['CreationDate'] < option_date_to, "filtering users by creation").getresults()
|
||||||
|
newuserids = set(dmt(newusers).map(lambda u: u['Id'], "get user id list").getresults())
|
||||||
|
|
||||||
|
# get questions for filtered users
|
||||||
|
newposts = dmt(posts).filter(lambda p: p['OwnerUserId'] in newuserids, "filter posts by first contrib").getresults()
|
||||||
|
|
||||||
|
# computer toxic levels
|
||||||
|
print("computing toxic levels")
|
||||||
|
toxlevels = defaultdict(list)
|
||||||
|
searchedposts = defaultdict(int)
|
||||||
|
for (i, post) in enumerate(newposts):
|
||||||
|
if (i + 1) % 100 == 0:
|
||||||
|
printnoln("\rpost #" + str(i + 1) + "/" + str(len(newposts)))
|
||||||
|
if (i + 1) == len(newposts):
|
||||||
|
rprint("post #" + str(i + 1) + "/" + str(len(newposts)))
|
||||||
|
userid = post['OwnerUserId']
|
||||||
|
|
||||||
|
# check first contribution
|
||||||
|
if firstcontrib[userid] + timedelta(days=DAYS_NEW_USER) < post['CreationDate']:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# no more than option_posts posts from one user
|
||||||
|
searchedposts[userid] += 1
|
||||||
|
if searchedposts[userid] > option_posts:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for a in post['Answers']:
|
||||||
|
toxlevel = computeToxLevel(a['Body'])
|
||||||
|
toxlevels[userid].append(toxlevel)
|
||||||
|
|
||||||
|
neglevelsflat = [item['neg'] for item in flatmap(toxlevels.values())]
|
||||||
|
neulevelsflat = [item['neu'] for item in flatmap(toxlevels.values())]
|
||||||
|
poslevelsflat = [item['pos'] for item in flatmap(toxlevels.values())]
|
||||||
|
comlevelsflat = [item['compound'] for item in flatmap(toxlevels.values())]
|
||||||
|
|
||||||
|
fig, axs = plt.subplots(2, 2, figsize=(16, 12))
|
||||||
|
axs[0, 0].set_title('Neg')
|
||||||
|
axs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[1, 0].set_title('Neu')
|
||||||
|
axs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[0, 1].set_title('Pos')
|
||||||
|
axs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[1, 1].set_title('Compound')
|
||||||
|
axs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
# plt.show()
|
||||||
|
os.system("mkdir -p output/analyze/")
|
||||||
|
pltfile = "output/analyze/" + folder.split("/")[-1] + "_" + option_date_from.strftime("%d-%m-%Y") + "_" + option_date_to.strftime("%d-%m-%Y") + "_" + str(option_posts) + ".png"
|
||||||
|
plt.savefig(pltfile)
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
def computeToxLevel(text):
|
||||||
|
return analyser.polarity_scores(text)
|
||||||
|
|
||||||
|
|
||||||
|
def flatmap(arr):
|
||||||
|
return [item for sublist in arr for item in sublist]
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# execute only if run as a script
|
||||||
|
usage = sys.argv[0] + " <folder> [--from <%d-%m-%Y>] [--to <%d-%m-%Y>] [--posts <#posts e.g. 2>]"
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
folder = sys.argv[1]
|
||||||
|
if not os.path.isdir(folder):
|
||||||
|
print(folder + " is not a folder")
|
||||||
|
sys.exit(1)
|
||||||
|
consider_date_from = datetime.today() - timedelta(days=3 * 30)
|
||||||
|
consider_date_to = datetime.today()
|
||||||
|
consider_posts = 2
|
||||||
|
i = 2
|
||||||
|
while i < len(sys.argv) - 1:
|
||||||
|
if sys.argv[i] == "--from":
|
||||||
|
i += 1
|
||||||
|
try:
|
||||||
|
consider_date_from = datetime.strptime(sys.argv[i], "%d-%m-%Y")
|
||||||
|
except ValueError:
|
||||||
|
print(sys.argv[i] + " is not a valid date")
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
elif sys.argv[i] == "--to":
|
||||||
|
i += 1
|
||||||
|
try:
|
||||||
|
consider_date_to = datetime.strptime(sys.argv[i], "%d-%m-%Y")
|
||||||
|
except ValueError:
|
||||||
|
print(sys.argv[i] + " is not a valid date")
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
elif sys.argv[i] == "--posts":
|
||||||
|
i += 1
|
||||||
|
if not sys.argv[i].isdigit():
|
||||||
|
print(sys.argv[i] + " is not a number")
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
consider_posts = int(sys.argv[i])
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
main(folder, consider_date_from, consider_date_to, consider_posts)
|
||||||
201
analyze_batch.py
Normal file
201
analyze_batch.py
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from datetime import timedelta
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from collections import defaultdict
|
||||||
|
from loader import load, dmt, cms
|
||||||
|
import math
|
||||||
|
|
||||||
|
printnoln = lambda text: print(text, end='', flush=True)
|
||||||
|
rprint = lambda text: print('\r' + text)
|
||||||
|
|
||||||
|
DAYS_NEW_USER = 7
|
||||||
|
OLD_USER_YEAR = 3
|
||||||
|
|
||||||
|
analyser = SentimentIntensityAnalyzer()
|
||||||
|
colors = ['red', 'green', 'blue', 'orange', 'deeppink']
|
||||||
|
|
||||||
|
|
||||||
|
def main(folder):
|
||||||
|
users, posts, firstcontrib, sumcontrib = load(folder)
|
||||||
|
|
||||||
|
intervals = calc_intervals(posts)
|
||||||
|
|
||||||
|
postcounts = range(1, 5 + 1)
|
||||||
|
for (option_date_from, option_date_to) in intervals:
|
||||||
|
# filter users by option_date_from <= creation date <= option_date_to
|
||||||
|
newusers = dmt(users).filter(lambda u: option_date_from <= u['CreationDate'] < option_date_to, "filtering users by creation").getresults()
|
||||||
|
newuserids = set(dmt(newusers).map(lambda u: u['Id'], "get user id list").getresults())
|
||||||
|
|
||||||
|
# get questions for filtered users
|
||||||
|
newposts = dmt(posts).filter(lambda p: p['OwnerUserId'] in newuserids, "filter posts by selected users").getresults()
|
||||||
|
if len(newposts) == 0:
|
||||||
|
continue
|
||||||
|
print("computing toxic levels: " + option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y"))
|
||||||
|
gfig, gaxs = plt.subplots(2, 2, figsize=(16, 12))
|
||||||
|
gaxs[0, 0].set_title('Neg')
|
||||||
|
gaxs[1, 0].set_title('Neu')
|
||||||
|
gaxs[0, 1].set_title('Pos')
|
||||||
|
gaxs[1, 1].set_title('Compound')
|
||||||
|
|
||||||
|
gneg = []
|
||||||
|
gneu = []
|
||||||
|
gpos = []
|
||||||
|
gcom = []
|
||||||
|
|
||||||
|
outfolder = "output/batch/" + folder.split("/")[-1] + "/"
|
||||||
|
goutfilename = outfolder + "batch_" + folder.split("/")[-1] + "_" + option_date_from.strftime("%d-%m-%Y") + "_" + option_date_to.strftime("%d-%m-%Y")
|
||||||
|
|
||||||
|
for option_posts in postcounts:
|
||||||
|
# print(option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y") + " - #posts: " + str(option_posts))
|
||||||
|
|
||||||
|
# computer toxic levels
|
||||||
|
start = cms()
|
||||||
|
printnoln("computing toxic levels: filtering")
|
||||||
|
toxlevels = defaultdict(list)
|
||||||
|
searchedposts = defaultdict(int)
|
||||||
|
filteredposts = []
|
||||||
|
for (i, post) in enumerate(newposts):
|
||||||
|
userid = post['OwnerUserId']
|
||||||
|
|
||||||
|
# check first contribution
|
||||||
|
if firstcontrib[userid] + timedelta(days=DAYS_NEW_USER) < post['CreationDate']:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# no more than option_posts posts from one user
|
||||||
|
searchedposts[userid] += 1
|
||||||
|
if searchedposts[userid] > option_posts:
|
||||||
|
continue
|
||||||
|
|
||||||
|
filteredposts.append(post)
|
||||||
|
|
||||||
|
for (i, post) in enumerate(filteredposts):
|
||||||
|
if (i + 1) % 100 == 0:
|
||||||
|
printnoln("\rcomputing toxic levels: post #" + str(i + 1) + "/" + str(len(filteredposts)))
|
||||||
|
if (i + 1) == len(newposts):
|
||||||
|
printnoln("\rcomputing toxic levels: post #" + str(i + 1) + "/" + str(len(filteredposts)))
|
||||||
|
userid = post['OwnerUserId']
|
||||||
|
for a in post['Answers']:
|
||||||
|
toxlevel = computeToxLevel(a['Body'])
|
||||||
|
toxlevels[userid].append(toxlevel)
|
||||||
|
rprint("computing toxic levels: post #" + str(len(filteredposts)) + "/" + str(len(filteredposts)) + " ... took " + str(cms() - start) + "ms")
|
||||||
|
|
||||||
|
outfilename = goutfilename + "_" + str(option_posts)
|
||||||
|
os.system("mkdir -p " + outfolder)
|
||||||
|
dumptoxlevels(toxlevels, outfilename + ".py")
|
||||||
|
|
||||||
|
neglevelsflat = [item['neg'] for item in flatmap(toxlevels.values())]
|
||||||
|
neulevelsflat = [item['neu'] for item in flatmap(toxlevels.values())]
|
||||||
|
poslevelsflat = [item['pos'] for item in flatmap(toxlevels.values())]
|
||||||
|
comlevelsflat = [item['compound'] for item in flatmap(toxlevels.values())]
|
||||||
|
|
||||||
|
gneg.append(neglevelsflat)
|
||||||
|
gneu.append(neulevelsflat)
|
||||||
|
gpos.append(poslevelsflat)
|
||||||
|
gcom.append(comlevelsflat)
|
||||||
|
|
||||||
|
fig, axs = plt.subplots(2, 2, figsize=(16, 12))
|
||||||
|
axs[0, 0].set_title('Neg')
|
||||||
|
axs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[1, 0].set_title('Neu')
|
||||||
|
axs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[0, 1].set_title('Pos')
|
||||||
|
axs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
axs[1, 1].set_title('Compound')
|
||||||
|
axs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100))
|
||||||
|
|
||||||
|
# global
|
||||||
|
# gaxs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts")
|
||||||
|
# gaxs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts")
|
||||||
|
|
||||||
|
# plt.show()
|
||||||
|
fig.suptitle("Sentiment of answers to the first " + str(option_posts) + " (max) posts\nUsers registered between "
|
||||||
|
+ option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y"))
|
||||||
|
fig.savefig(outfilename + ".png", bbox_inches='tight')
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
# global
|
||||||
|
gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
# gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts])
|
||||||
|
gaxs[0, 0].legend(loc="upper right")
|
||||||
|
gaxs[1, 0].legend(loc="upper right")
|
||||||
|
gaxs[0, 1].legend(loc="upper right")
|
||||||
|
gaxs[1, 1].legend(loc="upper right")
|
||||||
|
gfig.suptitle("Sentiment of answers to the first X (max) posts\nUsers registered between " + option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y"))
|
||||||
|
gfig.savefig(goutfilename + ".png", bbox_inches='tight')
|
||||||
|
plt.close(gfig)
|
||||||
|
|
||||||
|
|
||||||
|
def computeToxLevel(text):
|
||||||
|
return analyser.polarity_scores(text)
|
||||||
|
|
||||||
|
|
||||||
|
def flatmap(arr):
|
||||||
|
return [item for sublist in arr for item in sublist]
|
||||||
|
|
||||||
|
|
||||||
|
def dumptoxlevels(lvls, filename):
|
||||||
|
with open(filename, "w") as file:
|
||||||
|
file.write("from collections import defaultdict\n\n")
|
||||||
|
file.write("toxlevels = " + str(lvls).replace("<class 'list'>", "list", 1) + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
def calc_intervals(posts):
|
||||||
|
firstpost = dmt(posts).reduce(lambda acc, e: acc if acc < e['CreationDate'] else e['CreationDate'], lambda acc, e: acc if acc < e else e, lambda: posts[0]['CreationDate'], "firstpost").getresults()
|
||||||
|
lastpost = dmt(posts).reduce(lambda acc, e: acc if acc > e['CreationDate'] else e['CreationDate'], lambda acc, e: acc if acc > e else e, lambda: posts[0]['CreationDate'], "lastpost").getresults()
|
||||||
|
|
||||||
|
# calc quarter beginning
|
||||||
|
firstpost = firstpost.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
if firstpost.month not in (1, 4, 7, 10):
|
||||||
|
firstpost = firstpost.replace(month={1: 1, 2: 1, 3: 1, 4: 4, 5: 4, 6: 4, 7: 7, 8: 7, 9: 7, 10: 10, 11: 10, 12: 10}[firstpost.month])
|
||||||
|
lastpost = lastpost.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
if lastpost.month not in (1, 4, 7, 10):
|
||||||
|
lastpost = lastpost.replace(month={1: 1, 2: 1, 3: 1, 4: 4, 5: 4, 6: 4, 7: 7, 8: 7, 9: 7, 10: 10, 11: 10, 12: 10}[lastpost.month])
|
||||||
|
# add 3 months to last post
|
||||||
|
if lastpost.month == 10:
|
||||||
|
lastpost = lastpost.replace(month=1, year=lastpost.year + 1)
|
||||||
|
else:
|
||||||
|
lastpost = lastpost.replace(month=lastpost.month + 3)
|
||||||
|
|
||||||
|
cdate = firstpost
|
||||||
|
intervals = []
|
||||||
|
while cdate < lastpost:
|
||||||
|
nextquarter = cdate.replace(month=(cdate.month + 3) % 12, year=cdate.year + (0 if cdate.month + 3 < 12 else 1))
|
||||||
|
print("adding interval: " + cdate.strftime("%d-%m-%Y") + " - " + nextquarter.strftime("%d-%m-%Y"))
|
||||||
|
intervals.append((cdate, nextquarter))
|
||||||
|
cdate = nextquarter
|
||||||
|
# sys.exit(0)
|
||||||
|
return intervals
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# execute only if run as a script
|
||||||
|
usage = sys.argv[0] + " <folder>"
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
folder = sys.argv[1]
|
||||||
|
if not os.path.isdir(folder):
|
||||||
|
print(folder + " is not a folder")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
main(folder)
|
||||||
188
calctoxdiff.py
Normal file
188
calctoxdiff.py
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
import importlib
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from os import listdir
|
||||||
|
from os.path import isfile, join
|
||||||
|
from scipy.stats import ks_2samp
|
||||||
|
from collections import defaultdict
|
||||||
|
from datetime import datetime
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
|
def main(folder):
|
||||||
|
if folder.endswith("/"):
|
||||||
|
folder = folder[:-1]
|
||||||
|
onlyfiles = [f for f in listdir(folder)]
|
||||||
|
onlyfiles = [f for f in onlyfiles if isfile(join(folder, f))]
|
||||||
|
onlyfiles = [f for f in onlyfiles if f.endswith(".py")]
|
||||||
|
# onlyfiles = [f[:-3] for f in onlyfiles]
|
||||||
|
# onlyfiles = [f.replace(".", "\.") for f in onlyfiles]
|
||||||
|
|
||||||
|
onlyfiles = sorted(onlyfiles)
|
||||||
|
plotbypost(onlyfiles)
|
||||||
|
plotbydate(onlyfiles)
|
||||||
|
|
||||||
|
|
||||||
|
def plotbypost(onlyfiles):
|
||||||
|
files = defaultdict(list)
|
||||||
|
for f in onlyfiles:
|
||||||
|
s = f[:-3].split("_")
|
||||||
|
files[int(s[4])].append(f)
|
||||||
|
files = {p: sorted(l, key=lambda e: datetime.strptime(e.split("_")[2], "%d-%m-%Y")) for (p, l) in files.items()}
|
||||||
|
|
||||||
|
changes_neg = defaultdict(list)
|
||||||
|
changes_neu = defaultdict(list)
|
||||||
|
changes_pos = defaultdict(list)
|
||||||
|
changes_com = defaultdict(list)
|
||||||
|
|
||||||
|
for (p, l) in files.items():
|
||||||
|
if len(l) < 2:
|
||||||
|
continue
|
||||||
|
print(p)
|
||||||
|
for i in range(len(l) - 1):
|
||||||
|
tox1 = imprt(folder + "/" + l[i]).toxlevels
|
||||||
|
tox2 = imprt(folder + "/" + l[i + 1]).toxlevels
|
||||||
|
|
||||||
|
neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())]
|
||||||
|
neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())]
|
||||||
|
poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())]
|
||||||
|
comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())]
|
||||||
|
|
||||||
|
neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())]
|
||||||
|
neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())]
|
||||||
|
poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())]
|
||||||
|
comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())]
|
||||||
|
|
||||||
|
ksneg = ks_2samp(neglevelsflat1, neglevelsflat2)
|
||||||
|
ksneu = ks_2samp(neulevelsflat1, neulevelsflat2)
|
||||||
|
kspos = ks_2samp(poslevelsflat1, poslevelsflat2)
|
||||||
|
kscom = ks_2samp(comlevelsflat1, comlevelsflat2)
|
||||||
|
|
||||||
|
changes_neg[p].append(ksneg)
|
||||||
|
changes_neu[p].append(ksneu)
|
||||||
|
changes_pos[p].append(kspos)
|
||||||
|
changes_com[p].append(kscom)
|
||||||
|
|
||||||
|
for (p, l) in files.items():
|
||||||
|
with open(folder + "/ks_" + str(p) + ".log", "w") as f:
|
||||||
|
for i in range(len(l) - 1):
|
||||||
|
f1 = l[i]
|
||||||
|
f2 = l[i + 1]
|
||||||
|
f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[p][i]) + "; ks neu = " + str(changes_neu[p][i])
|
||||||
|
+ "; ks pos = " + str(changes_pos[p][i]) + "; ks com = " + str(changes_com[p][i]) + "\n")
|
||||||
|
|
||||||
|
for (p, l) in files.items():
|
||||||
|
x = [l[i].split("_")[2] + " -\n" + l[i + 1].split("_")[2] for i in range(len(l) - 1)]
|
||||||
|
fig = plt.figure(figsize=(16, 12))
|
||||||
|
for type, changes in {"neg": changes_neg[p], "neu": changes_neu[p], "pos": changes_pos[p], "com": changes_com[p]}.items():
|
||||||
|
stat = [x.statistic for x in changes]
|
||||||
|
pval = [x.pvalue for x in changes]
|
||||||
|
plt.plot(x, stat, label=type + ".stat")
|
||||||
|
plt.plot(x, pval, label=type + ".pval")
|
||||||
|
plt.title("KS 2-sided test with max " + str(p) + " posts")
|
||||||
|
plt.xticks(rotation=90)
|
||||||
|
plt.legend(loc="upper right")
|
||||||
|
plt.savefig(folder + "/ks_" + str(p) + ".png", bbox_inches='tight')
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
def plotbydate(onlyfiles):
|
||||||
|
files = defaultdict(list)
|
||||||
|
for f in onlyfiles:
|
||||||
|
s = f[:-3].split("_")
|
||||||
|
files[(s[2], s[3])].append(f)
|
||||||
|
files = {d: sorted(l, key=lambda e: e.split("_")[4]) for (d, l) in files.items()}
|
||||||
|
|
||||||
|
changes_neg = defaultdict(list)
|
||||||
|
changes_neu = defaultdict(list)
|
||||||
|
changes_pos = defaultdict(list)
|
||||||
|
changes_com = defaultdict(list)
|
||||||
|
|
||||||
|
for (d, l) in files.items():
|
||||||
|
if len(l) < 2:
|
||||||
|
continue
|
||||||
|
print(d)
|
||||||
|
for i in range(len(l) - 1):
|
||||||
|
tox1 = imprt(folder + "/" + l[i]).toxlevels
|
||||||
|
tox2 = imprt(folder + "/" + l[i + 1]).toxlevels
|
||||||
|
|
||||||
|
neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())]
|
||||||
|
neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())]
|
||||||
|
poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())]
|
||||||
|
comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())]
|
||||||
|
|
||||||
|
neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())]
|
||||||
|
neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())]
|
||||||
|
poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())]
|
||||||
|
comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())]
|
||||||
|
|
||||||
|
ksneg = ks_2samp(neglevelsflat1, neglevelsflat2)
|
||||||
|
ksneu = ks_2samp(neulevelsflat1, neulevelsflat2)
|
||||||
|
kspos = ks_2samp(poslevelsflat1, poslevelsflat2)
|
||||||
|
kscom = ks_2samp(comlevelsflat1, comlevelsflat2)
|
||||||
|
|
||||||
|
changes_neg[d].append(ksneg)
|
||||||
|
changes_neu[d].append(ksneu)
|
||||||
|
changes_pos[d].append(kspos)
|
||||||
|
changes_com[d].append(kscom)
|
||||||
|
|
||||||
|
for (d, l) in files.items():
|
||||||
|
with open(folder + "/ks_" + d[0] + "_" + d[1] + ".log", "w") as f:
|
||||||
|
for i in range(len(l) - 1):
|
||||||
|
f1 = l[i]
|
||||||
|
f2 = l[i + 1]
|
||||||
|
f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[d][i]) + "; ks neu = " + str(changes_neu[d][i])
|
||||||
|
+ "; ks pos = " + str(changes_pos[d][i]) + "; ks com = " + str(changes_com[d][i]) + "\n")
|
||||||
|
|
||||||
|
for (d, l) in files.items():
|
||||||
|
x = [l[i].split("_")[4][:-3] + "-" + l[i + 1].split("_")[4][:-3] for i in range(len(l) - 1)]
|
||||||
|
fig = plt.figure(figsize=(16, 12))
|
||||||
|
for type, changes in {"neg": changes_neg[d], "neu": changes_neu[d], "pos": changes_pos[d], "com": changes_com[d]}.items():
|
||||||
|
stat = [x.statistic for x in changes]
|
||||||
|
pval = [x.pvalue for x in changes]
|
||||||
|
plt.plot(x, stat, label=type + ".stat")
|
||||||
|
plt.plot(x, pval, label=type + ".pval")
|
||||||
|
plt.title("KS 2-sided test with between " + d[0] + " and " + d[1])
|
||||||
|
plt.xticks(rotation=90)
|
||||||
|
plt.legend(loc="upper right")
|
||||||
|
plt.savefig(folder + "/ks_" + d[0] + "_" + d[1] + ".png", bbox_inches='tight')
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
def imprt(file):
|
||||||
|
spec = importlib.util.spec_from_file_location("module.name", file)
|
||||||
|
foo = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(foo)
|
||||||
|
return foo
|
||||||
|
|
||||||
|
|
||||||
|
def flatmap(arr):
|
||||||
|
return [item for sublist in arr for item in sublist]
|
||||||
|
|
||||||
|
|
||||||
|
def filecmp(file1, file2):
|
||||||
|
if file1 == file2:
|
||||||
|
return 0
|
||||||
|
s1 = file1.split("_")
|
||||||
|
s2 = file2.split("_")
|
||||||
|
d1 = datetime.strptime(s1[2], "%d-%m-%Y")
|
||||||
|
d2 = datetime.strptime(s2[2], "%d-%m-%Y")
|
||||||
|
if d1 < d2:
|
||||||
|
return -1
|
||||||
|
elif d1 > d2:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# execute only if run as a script
|
||||||
|
usage = sys.argv[0] + " <folder>"
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print(usage)
|
||||||
|
sys.exit(1)
|
||||||
|
folder = sys.argv[1]
|
||||||
|
if not os.path.isdir(folder):
|
||||||
|
print(folder + " is not a folder")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
main(folder)
|
||||||
203
loader.py
Normal file
203
loader.py
Normal file
@@ -0,0 +1,203 @@
|
|||||||
|
from xml.dom import minidom
|
||||||
|
from datetime import datetime
|
||||||
|
from collections import defaultdict
|
||||||
|
import time
|
||||||
|
import multiprocessing
|
||||||
|
import operator
|
||||||
|
from mt import mt
|
||||||
|
import xml.etree.cElementTree as et
|
||||||
|
|
||||||
|
printnoln = lambda text: print(text, end='', flush=True)
|
||||||
|
rprint = lambda text: print('\r' + text)
|
||||||
|
|
||||||
|
|
||||||
|
def dmt(data): return mt(multiprocessing.cpu_count(), data, False)
|
||||||
|
|
||||||
|
|
||||||
|
def cms(): return int(round(time.time() * 1000))
|
||||||
|
|
||||||
|
|
||||||
|
def load(folder):
|
||||||
|
users = readUsers(folder + "/Users.xml")
|
||||||
|
posts = readPosts(folder + "/Posts.xml")
|
||||||
|
|
||||||
|
# get first contribution to page:
|
||||||
|
firstcontrib = computefirstcontrib(posts)
|
||||||
|
sumcontrib = computesumcontrib(posts)
|
||||||
|
|
||||||
|
return users, posts, firstcontrib, sumcontrib
|
||||||
|
|
||||||
|
|
||||||
|
def computesumcontrib(posts):
|
||||||
|
x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults()
|
||||||
|
x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults()
|
||||||
|
x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults()
|
||||||
|
sumcontrib = defaultdict(int)
|
||||||
|
for id in x1:
|
||||||
|
sumcontrib[id] += 1
|
||||||
|
for y in x2:
|
||||||
|
for id in y:
|
||||||
|
sumcontrib[id] += 1
|
||||||
|
for y in x3:
|
||||||
|
for id in y:
|
||||||
|
sumcontrib[id] += 1
|
||||||
|
return sumcontrib
|
||||||
|
|
||||||
|
|
||||||
|
def computefirstcontrib(posts):
|
||||||
|
x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults()
|
||||||
|
x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults()
|
||||||
|
firstcontrib = defaultdict(list)
|
||||||
|
for (id, date) in x1:
|
||||||
|
firstcontrib[id].append(date)
|
||||||
|
for y in x2:
|
||||||
|
for (id, date) in y:
|
||||||
|
firstcontrib[id].append(date)
|
||||||
|
firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()}
|
||||||
|
return firstcontrib
|
||||||
|
|
||||||
|
|
||||||
|
def mapuser(item):
|
||||||
|
tags = ['Id', 'CreationDate']
|
||||||
|
datetags = ['CreationDate']
|
||||||
|
user = {tag: getTag(item, tag) for tag in tags}
|
||||||
|
for tag in datetags:
|
||||||
|
if user[tag] is not None:
|
||||||
|
user[tag] = datetime.fromisoformat(user[tag])
|
||||||
|
else:
|
||||||
|
print("map user: tag " + tag + " is None: " + str(user))
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def mapQuestion(item):
|
||||||
|
tags = ['Id', 'CreationDate', 'Body', 'Title', 'OwnerUserId', 'OwnerDisplayName']
|
||||||
|
datetags = ['CreationDate']
|
||||||
|
question = {tag: getTag(item, tag) for tag in tags}
|
||||||
|
for tag in datetags:
|
||||||
|
question[tag] = datetime.fromisoformat(question[tag])
|
||||||
|
return question
|
||||||
|
|
||||||
|
|
||||||
|
def mapAnswer(item):
|
||||||
|
tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
|
||||||
|
datetags = ['CreationDate']
|
||||||
|
answer = {tag: getTag(item, tag) for tag in tags}
|
||||||
|
for tag in datetags:
|
||||||
|
answer[tag] = datetime.fromisoformat(answer[tag])
|
||||||
|
return answer
|
||||||
|
|
||||||
|
|
||||||
|
def mapComment(item):
|
||||||
|
tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
|
||||||
|
datetags = ['CreationDate']
|
||||||
|
comment = {tag: getTag(item, tag) for tag in tags}
|
||||||
|
for tag in datetags:
|
||||||
|
comment[tag] = datetime.fromisoformat(comment[tag])
|
||||||
|
return comment
|
||||||
|
|
||||||
|
|
||||||
|
def readUsers(file):
|
||||||
|
prefix = "readUsers: "
|
||||||
|
printnoln(prefix + "reading xml file ...")
|
||||||
|
|
||||||
|
now = cms()
|
||||||
|
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
|
||||||
|
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
|
||||||
|
|
||||||
|
users = dmt(items).map(mapuser, prefix + "mapping users").getresults()
|
||||||
|
|
||||||
|
print(prefix + "done")
|
||||||
|
return users
|
||||||
|
|
||||||
|
|
||||||
|
def readPosts(file):
|
||||||
|
prefix = "readPosts: "
|
||||||
|
printnoln(prefix + "reading xml file ...")
|
||||||
|
|
||||||
|
now = cms()
|
||||||
|
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
|
||||||
|
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
|
||||||
|
|
||||||
|
print(prefix + "#posts total: " + str(len(items)))
|
||||||
|
|
||||||
|
questions = readQuestions(items)
|
||||||
|
questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults())
|
||||||
|
|
||||||
|
answers = readAnswers(items)
|
||||||
|
answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults())
|
||||||
|
|
||||||
|
comments = readComments(items)
|
||||||
|
|
||||||
|
# filter answers
|
||||||
|
answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults()
|
||||||
|
|
||||||
|
# filter comments
|
||||||
|
comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults()
|
||||||
|
|
||||||
|
# create question answer mapping
|
||||||
|
printnoln(prefix + "create qamapping ...")
|
||||||
|
qamapping = {id: [] for id in questionids}
|
||||||
|
for a in answers:
|
||||||
|
qamapping[a['ParentId']].append(a)
|
||||||
|
rprint(prefix + "create qamapping ... done")
|
||||||
|
questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults()
|
||||||
|
|
||||||
|
# create comment question comment answer mapping
|
||||||
|
printnoln(prefix + "create qacmapping ...")
|
||||||
|
qacmapping = {id: [] for id in questionids.union(answerids)}
|
||||||
|
for c in comments:
|
||||||
|
qacmapping[c['ParentId']].append(c)
|
||||||
|
rprint(prefix + "create qacmapping ... done")
|
||||||
|
answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults()
|
||||||
|
questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults()
|
||||||
|
|
||||||
|
# safety check
|
||||||
|
countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \
|
||||||
|
.reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults()
|
||||||
|
if countans != len(answers):
|
||||||
|
print(prefix + "countans != #answer: " + countans + " " + len(answers))
|
||||||
|
print(prefix + "done")
|
||||||
|
return questions
|
||||||
|
|
||||||
|
|
||||||
|
def readQuestions(items):
|
||||||
|
prefix = "readQuestions: "
|
||||||
|
questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \
|
||||||
|
.map(mapQuestion, prefix + "mapping questions") \
|
||||||
|
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults()
|
||||||
|
|
||||||
|
print(prefix + "questions read: " + str(len(questions)))
|
||||||
|
return questions
|
||||||
|
|
||||||
|
|
||||||
|
def readAnswers(items):
|
||||||
|
prefix = "readAnswers: "
|
||||||
|
answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \
|
||||||
|
.map(mapAnswer, prefix + "mapping answers") \
|
||||||
|
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults()
|
||||||
|
|
||||||
|
print(prefix + "answers read: " + str(len(answers)))
|
||||||
|
return answers
|
||||||
|
|
||||||
|
|
||||||
|
def readComments(items):
|
||||||
|
prefix = "readComments: "
|
||||||
|
comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \
|
||||||
|
.map(mapComment, prefix + "mapping comments") \
|
||||||
|
.filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults()
|
||||||
|
|
||||||
|
print(prefix + "comments read: " + str(len(comments)))
|
||||||
|
return comments
|
||||||
|
|
||||||
|
|
||||||
|
def getTag(item, tag):
|
||||||
|
return item.attrib.get(tag) if tagExists(item, tag) else None
|
||||||
|
|
||||||
|
|
||||||
|
def tagExists(item, tag):
|
||||||
|
return tag in item.attrib.keys()
|
||||||
|
|
||||||
|
|
||||||
|
def setprop(dic, key, value):
|
||||||
|
dic[key] = value
|
||||||
|
return dic
|
||||||
155
mt.py
Normal file
155
mt.py
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
from threading import Thread, Lock
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class mt():
|
||||||
|
def __init__(self, threads, data, verbose=False):
|
||||||
|
self.__running = False
|
||||||
|
self.__closed = False
|
||||||
|
self.__data = data
|
||||||
|
self.__verbose = verbose
|
||||||
|
# dummy
|
||||||
|
self.__final = None
|
||||||
|
self.__comment = None
|
||||||
|
self.__starttime = None
|
||||||
|
self.__endtime = None
|
||||||
|
self.__type = None
|
||||||
|
# thread things
|
||||||
|
self.__threadcount = threads
|
||||||
|
self.__threads = []
|
||||||
|
self.__lock = Lock()
|
||||||
|
self.__results = []
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
self.__results.append([])
|
||||||
|
self.__threads.append(None)
|
||||||
|
|
||||||
|
def filter(self, cond, comment=None):
|
||||||
|
if self.__closed:
|
||||||
|
raise RuntimeError("Already closed")
|
||||||
|
if self.__running:
|
||||||
|
self.join()
|
||||||
|
self.__data = self.getresults()
|
||||||
|
self.__running = True
|
||||||
|
self.__final = self.__getresultsmapfilter
|
||||||
|
self.__type = "filter"
|
||||||
|
self.__comment = comment if comment is not None else ""
|
||||||
|
if comment is not None:
|
||||||
|
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||||
|
self.__starttime = self.__cms()
|
||||||
|
self.__endtime = None
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
||||||
|
self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond))
|
||||||
|
self.__threads[i].start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __dofilter(self, i, list, cond):
|
||||||
|
now = self.__cms()
|
||||||
|
results = [l for l in list if cond(l)]
|
||||||
|
with self.__lock:
|
||||||
|
self.__results[i] = results
|
||||||
|
dur = self.__cms() - now
|
||||||
|
if self.__verbose:
|
||||||
|
print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms")
|
||||||
|
|
||||||
|
def map(self, func, comment=None):
|
||||||
|
if self.__closed:
|
||||||
|
raise RuntimeError("Already closed")
|
||||||
|
if self.__running:
|
||||||
|
self.join()
|
||||||
|
self.__data = self.getresults()
|
||||||
|
self.__running = True
|
||||||
|
self.__final = self.__getresultsmapfilter
|
||||||
|
self.__type = "map"
|
||||||
|
self.__comment = comment if comment is not None else ""
|
||||||
|
if comment is not None:
|
||||||
|
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||||
|
self.__starttime = self.__cms()
|
||||||
|
self.__endtime = None
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
||||||
|
self.__threads[i] = Thread(target=self.__domap, args=(i, part, func))
|
||||||
|
self.__threads[i].start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __domap(self, i, list, func):
|
||||||
|
now = self.__cms()
|
||||||
|
results = [func(l) for l in list]
|
||||||
|
with self.__lock:
|
||||||
|
self.__results[i] = results
|
||||||
|
dur = self.__cms() - now
|
||||||
|
if self.__verbose:
|
||||||
|
print(self.__comment + ": Thread " + str(i) + ": map took " + str(dur) + "ms")
|
||||||
|
|
||||||
|
def reduce(self, reducer, aggregator, initval, comment=None):
|
||||||
|
if self.__closed:
|
||||||
|
raise RuntimeError("Already closed")
|
||||||
|
if self.__running:
|
||||||
|
self.join()
|
||||||
|
self.__data = self.getresults()
|
||||||
|
self.__running = True
|
||||||
|
self.__final = lambda: self.__getresultsreduce(aggregator, initval)
|
||||||
|
self.__type = "reduce"
|
||||||
|
self.__comment = comment if comment is not None else ""
|
||||||
|
if comment is not None:
|
||||||
|
print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True)
|
||||||
|
self.__starttime = self.__cms()
|
||||||
|
self.__endtime = None
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
|
||||||
|
self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval))
|
||||||
|
self.__threads[i].start()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __doreduce(self, i, list, reducer, initval):
|
||||||
|
now = self.__cms()
|
||||||
|
val = initval()
|
||||||
|
for j in range(len(list)):
|
||||||
|
val = reducer(val, list[j])
|
||||||
|
with self.__lock:
|
||||||
|
self.__results[i] = val
|
||||||
|
dur = self.__cms() - now
|
||||||
|
if self.__verbose:
|
||||||
|
print(self.__comment + ": Thread " + str(i) + ": reduce took " + str(dur) + "ms")
|
||||||
|
|
||||||
|
def getresults(self):
|
||||||
|
self.join()
|
||||||
|
return self.__final()
|
||||||
|
|
||||||
|
def __getresultsmapfilter(self):
|
||||||
|
res = []
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
res += self.__results[i]
|
||||||
|
return res
|
||||||
|
|
||||||
|
def __getresultsreduce(self, aggregator, initval):
|
||||||
|
val = initval()
|
||||||
|
for j in range(self.__threadcount):
|
||||||
|
val = aggregator(val, self.__results[j])
|
||||||
|
return val
|
||||||
|
|
||||||
|
def join(self):
|
||||||
|
if self.__closed:
|
||||||
|
raise RuntimeError("Already closed")
|
||||||
|
for i in range(self.__threadcount):
|
||||||
|
if self.__threads[i] is not None:
|
||||||
|
self.__threads[i].join()
|
||||||
|
self.__threads[i] = None
|
||||||
|
if self.__endtime is None:
|
||||||
|
self.__endtime = self.__cms();
|
||||||
|
if self.__comment is not None:
|
||||||
|
dur = self.__endtime - self.__starttime
|
||||||
|
if self.__verbose:
|
||||||
|
print(self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
|
||||||
|
else:
|
||||||
|
print("\r" + self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
|
||||||
|
return self
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.__closed:
|
||||||
|
raise RuntimeError("Already closed")
|
||||||
|
self.join()
|
||||||
|
self.__closed = True
|
||||||
|
|
||||||
|
def __cms(self):
|
||||||
|
return int(round(time.time() * 1000))
|
||||||
Reference in New Issue
Block a user