From 532f3ca381206d00dc6f0ba591d8ad78d08e493f Mon Sep 17 00:00:00 2001 From: wea_ondara Date: Tue, 21 May 2019 13:35:04 +0200 Subject: [PATCH] wip --- analyze.py | 123 ++++++++++++++++++++++++++++ analyze_batch.py | 201 ++++++++++++++++++++++++++++++++++++++++++++++ calctoxdiff.py | 188 +++++++++++++++++++++++++++++++++++++++++++ loader.py | 203 +++++++++++++++++++++++++++++++++++++++++++++++ mt.py | 155 ++++++++++++++++++++++++++++++++++++ 5 files changed, 870 insertions(+) create mode 100644 analyze.py create mode 100644 analyze_batch.py create mode 100644 calctoxdiff.py create mode 100644 loader.py create mode 100644 mt.py diff --git a/analyze.py b/analyze.py new file mode 100644 index 0000000..9ab9b97 --- /dev/null +++ b/analyze.py @@ -0,0 +1,123 @@ +from datetime import datetime +from datetime import timedelta +import sys +import os +from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict +from loader import load, dmt + +printnoln = lambda text: print(text, end='', flush=True) +rprint = lambda text: print('\r' + text) + +DAYS_NEW_USER = 7 +OLD_USER_YEAR = 3 + +analyser = SentimentIntensityAnalyzer() + + +def main(folder, option_date_from, option_date_to, option_posts): + users, posts, firstcontrib, sumcontrib = load(folder) + + # filter users by option_date_from <= creation date <= option_date_to + newusers = dmt(users).filter(lambda u: option_date_from <= u['CreationDate'] < option_date_to, "filtering users by creation").getresults() + newuserids = set(dmt(newusers).map(lambda u: u['Id'], "get user id list").getresults()) + + # get questions for filtered users + newposts = dmt(posts).filter(lambda p: p['OwnerUserId'] in newuserids, "filter posts by first contrib").getresults() + + # computer toxic levels + print("computing toxic levels") + toxlevels = defaultdict(list) + searchedposts = defaultdict(int) + for (i, post) in enumerate(newposts): + if (i + 1) % 100 == 0: + printnoln("\rpost #" + str(i + 1) + "/" + str(len(newposts))) + if (i + 1) == len(newposts): + rprint("post #" + str(i + 1) + "/" + str(len(newposts))) + userid = post['OwnerUserId'] + + # check first contribution + if firstcontrib[userid] + timedelta(days=DAYS_NEW_USER) < post['CreationDate']: + continue + + # no more than option_posts posts from one user + searchedposts[userid] += 1 + if searchedposts[userid] > option_posts: + continue + + for a in post['Answers']: + toxlevel = computeToxLevel(a['Body']) + toxlevels[userid].append(toxlevel) + + neglevelsflat = [item['neg'] for item in flatmap(toxlevels.values())] + neulevelsflat = [item['neu'] for item in flatmap(toxlevels.values())] + poslevelsflat = [item['pos'] for item in flatmap(toxlevels.values())] + comlevelsflat = [item['compound'] for item in flatmap(toxlevels.values())] + + fig, axs = plt.subplots(2, 2, figsize=(16, 12)) + axs[0, 0].set_title('Neg') + axs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[1, 0].set_title('Neu') + axs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[0, 1].set_title('Pos') + axs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[1, 1].set_title('Compound') + axs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100)) + # plt.show() + os.system("mkdir -p output/analyze/") + pltfile = "output/analyze/" + folder.split("/")[-1] + "_" + option_date_from.strftime("%d-%m-%Y") + "_" + option_date_to.strftime("%d-%m-%Y") + "_" + str(option_posts) + ".png" + plt.savefig(pltfile) + plt.close(fig) + + +def computeToxLevel(text): + return analyser.polarity_scores(text) + + +def flatmap(arr): + return [item for sublist in arr for item in sublist] + + +if __name__ == "__main__": + # execute only if run as a script + usage = sys.argv[0] + " [--from <%d-%m-%Y>] [--to <%d-%m-%Y>] [--posts <#posts e.g. 2>]" + if len(sys.argv) < 2: + print(usage) + sys.exit(1) + folder = sys.argv[1] + if not os.path.isdir(folder): + print(folder + " is not a folder") + sys.exit(1) + consider_date_from = datetime.today() - timedelta(days=3 * 30) + consider_date_to = datetime.today() + consider_posts = 2 + i = 2 + while i < len(sys.argv) - 1: + if sys.argv[i] == "--from": + i += 1 + try: + consider_date_from = datetime.strptime(sys.argv[i], "%d-%m-%Y") + except ValueError: + print(sys.argv[i] + " is not a valid date") + print(usage) + sys.exit(1) + elif sys.argv[i] == "--to": + i += 1 + try: + consider_date_to = datetime.strptime(sys.argv[i], "%d-%m-%Y") + except ValueError: + print(sys.argv[i] + " is not a valid date") + print(usage) + sys.exit(1) + elif sys.argv[i] == "--posts": + i += 1 + if not sys.argv[i].isdigit(): + print(sys.argv[i] + " is not a number") + print(usage) + sys.exit(1) + consider_posts = int(sys.argv[i]) + i += 1 + + main(folder, consider_date_from, consider_date_to, consider_posts) diff --git a/analyze_batch.py b/analyze_batch.py new file mode 100644 index 0000000..bffd6ea --- /dev/null +++ b/analyze_batch.py @@ -0,0 +1,201 @@ +from datetime import datetime +from datetime import timedelta +import sys +import os +from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict +from loader import load, dmt, cms +import math + +printnoln = lambda text: print(text, end='', flush=True) +rprint = lambda text: print('\r' + text) + +DAYS_NEW_USER = 7 +OLD_USER_YEAR = 3 + +analyser = SentimentIntensityAnalyzer() +colors = ['red', 'green', 'blue', 'orange', 'deeppink'] + + +def main(folder): + users, posts, firstcontrib, sumcontrib = load(folder) + + intervals = calc_intervals(posts) + + postcounts = range(1, 5 + 1) + for (option_date_from, option_date_to) in intervals: + # filter users by option_date_from <= creation date <= option_date_to + newusers = dmt(users).filter(lambda u: option_date_from <= u['CreationDate'] < option_date_to, "filtering users by creation").getresults() + newuserids = set(dmt(newusers).map(lambda u: u['Id'], "get user id list").getresults()) + + # get questions for filtered users + newposts = dmt(posts).filter(lambda p: p['OwnerUserId'] in newuserids, "filter posts by selected users").getresults() + if len(newposts) == 0: + continue + print("computing toxic levels: " + option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y")) + gfig, gaxs = plt.subplots(2, 2, figsize=(16, 12)) + gaxs[0, 0].set_title('Neg') + gaxs[1, 0].set_title('Neu') + gaxs[0, 1].set_title('Pos') + gaxs[1, 1].set_title('Compound') + + gneg = [] + gneu = [] + gpos = [] + gcom = [] + + outfolder = "output/batch/" + folder.split("/")[-1] + "/" + goutfilename = outfolder + "batch_" + folder.split("/")[-1] + "_" + option_date_from.strftime("%d-%m-%Y") + "_" + option_date_to.strftime("%d-%m-%Y") + + for option_posts in postcounts: + # print(option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y") + " - #posts: " + str(option_posts)) + + # computer toxic levels + start = cms() + printnoln("computing toxic levels: filtering") + toxlevels = defaultdict(list) + searchedposts = defaultdict(int) + filteredposts = [] + for (i, post) in enumerate(newposts): + userid = post['OwnerUserId'] + + # check first contribution + if firstcontrib[userid] + timedelta(days=DAYS_NEW_USER) < post['CreationDate']: + continue + + # no more than option_posts posts from one user + searchedposts[userid] += 1 + if searchedposts[userid] > option_posts: + continue + + filteredposts.append(post) + + for (i, post) in enumerate(filteredposts): + if (i + 1) % 100 == 0: + printnoln("\rcomputing toxic levels: post #" + str(i + 1) + "/" + str(len(filteredposts))) + if (i + 1) == len(newposts): + printnoln("\rcomputing toxic levels: post #" + str(i + 1) + "/" + str(len(filteredposts))) + userid = post['OwnerUserId'] + for a in post['Answers']: + toxlevel = computeToxLevel(a['Body']) + toxlevels[userid].append(toxlevel) + rprint("computing toxic levels: post #" + str(len(filteredposts)) + "/" + str(len(filteredposts)) + " ... took " + str(cms() - start) + "ms") + + outfilename = goutfilename + "_" + str(option_posts) + os.system("mkdir -p " + outfolder) + dumptoxlevels(toxlevels, outfilename + ".py") + + neglevelsflat = [item['neg'] for item in flatmap(toxlevels.values())] + neulevelsflat = [item['neu'] for item in flatmap(toxlevels.values())] + poslevelsflat = [item['pos'] for item in flatmap(toxlevels.values())] + comlevelsflat = [item['compound'] for item in flatmap(toxlevels.values())] + + gneg.append(neglevelsflat) + gneu.append(neulevelsflat) + gpos.append(poslevelsflat) + gcom.append(comlevelsflat) + + fig, axs = plt.subplots(2, 2, figsize=(16, 12)) + axs[0, 0].set_title('Neg') + axs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[1, 0].set_title('Neu') + axs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[0, 1].set_title('Pos') + axs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100)) + axs[1, 1].set_title('Compound') + axs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100)) + + # global + # gaxs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts") + # gaxs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts") + # gaxs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts") + # gaxs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100), label=str(option_posts) + " posts") + # gaxs[0, 0].hist(neglevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts") + # gaxs[1, 0].hist(neulevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts") + # gaxs[0, 1].hist(poslevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts") + # gaxs[1, 1].hist(comlevelsflat, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), label=str(option_posts) + " posts") + + # plt.show() + fig.suptitle("Sentiment of answers to the first " + str(option_posts) + " (max) posts\nUsers registered between " + + option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y")) + fig.savefig(outfilename + ".png", bbox_inches='tight') + plt.close(fig) + + # global + gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), alpha=1. / len(postcounts), color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[0, 0].hist(gneg, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[1, 0].hist(gneu, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[0, 1].hist(gpos, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + # gaxs[1, 1].hist(gcom, np.linspace(-1, 1, 2 * 100), stacked=True, color=colors[:len(postcounts)], label=[str(option_posts) + " posts" for option_posts in postcounts]) + gaxs[0, 0].legend(loc="upper right") + gaxs[1, 0].legend(loc="upper right") + gaxs[0, 1].legend(loc="upper right") + gaxs[1, 1].legend(loc="upper right") + gfig.suptitle("Sentiment of answers to the first X (max) posts\nUsers registered between " + option_date_from.strftime("%d-%m-%Y") + " to " + option_date_to.strftime("%d-%m-%Y")) + gfig.savefig(goutfilename + ".png", bbox_inches='tight') + plt.close(gfig) + + +def computeToxLevel(text): + return analyser.polarity_scores(text) + + +def flatmap(arr): + return [item for sublist in arr for item in sublist] + + +def dumptoxlevels(lvls, filename): + with open(filename, "w") as file: + file.write("from collections import defaultdict\n\n") + file.write("toxlevels = " + str(lvls).replace("", "list", 1) + "\n") + + +def calc_intervals(posts): + firstpost = dmt(posts).reduce(lambda acc, e: acc if acc < e['CreationDate'] else e['CreationDate'], lambda acc, e: acc if acc < e else e, lambda: posts[0]['CreationDate'], "firstpost").getresults() + lastpost = dmt(posts).reduce(lambda acc, e: acc if acc > e['CreationDate'] else e['CreationDate'], lambda acc, e: acc if acc > e else e, lambda: posts[0]['CreationDate'], "lastpost").getresults() + + # calc quarter beginning + firstpost = firstpost.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + if firstpost.month not in (1, 4, 7, 10): + firstpost = firstpost.replace(month={1: 1, 2: 1, 3: 1, 4: 4, 5: 4, 6: 4, 7: 7, 8: 7, 9: 7, 10: 10, 11: 10, 12: 10}[firstpost.month]) + lastpost = lastpost.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + if lastpost.month not in (1, 4, 7, 10): + lastpost = lastpost.replace(month={1: 1, 2: 1, 3: 1, 4: 4, 5: 4, 6: 4, 7: 7, 8: 7, 9: 7, 10: 10, 11: 10, 12: 10}[lastpost.month]) + # add 3 months to last post + if lastpost.month == 10: + lastpost = lastpost.replace(month=1, year=lastpost.year + 1) + else: + lastpost = lastpost.replace(month=lastpost.month + 3) + + cdate = firstpost + intervals = [] + while cdate < lastpost: + nextquarter = cdate.replace(month=(cdate.month + 3) % 12, year=cdate.year + (0 if cdate.month + 3 < 12 else 1)) + print("adding interval: " + cdate.strftime("%d-%m-%Y") + " - " + nextquarter.strftime("%d-%m-%Y")) + intervals.append((cdate, nextquarter)) + cdate = nextquarter + # sys.exit(0) + return intervals + + +if __name__ == "__main__": + # execute only if run as a script + usage = sys.argv[0] + " " + if len(sys.argv) < 2: + print(usage) + sys.exit(1) + folder = sys.argv[1] + if not os.path.isdir(folder): + print(folder + " is not a folder") + sys.exit(1) + + main(folder) diff --git a/calctoxdiff.py b/calctoxdiff.py new file mode 100644 index 0000000..7daca81 --- /dev/null +++ b/calctoxdiff.py @@ -0,0 +1,188 @@ +import importlib +import sys +import os +from os import listdir +from os.path import isfile, join +from scipy.stats import ks_2samp +from collections import defaultdict +from datetime import datetime +import matplotlib.pyplot as plt + + +def main(folder): + if folder.endswith("/"): + folder = folder[:-1] + onlyfiles = [f for f in listdir(folder)] + onlyfiles = [f for f in onlyfiles if isfile(join(folder, f))] + onlyfiles = [f for f in onlyfiles if f.endswith(".py")] + # onlyfiles = [f[:-3] for f in onlyfiles] + # onlyfiles = [f.replace(".", "\.") for f in onlyfiles] + + onlyfiles = sorted(onlyfiles) + plotbypost(onlyfiles) + plotbydate(onlyfiles) + + +def plotbypost(onlyfiles): + files = defaultdict(list) + for f in onlyfiles: + s = f[:-3].split("_") + files[int(s[4])].append(f) + files = {p: sorted(l, key=lambda e: datetime.strptime(e.split("_")[2], "%d-%m-%Y")) for (p, l) in files.items()} + + changes_neg = defaultdict(list) + changes_neu = defaultdict(list) + changes_pos = defaultdict(list) + changes_com = defaultdict(list) + + for (p, l) in files.items(): + if len(l) < 2: + continue + print(p) + for i in range(len(l) - 1): + tox1 = imprt(folder + "/" + l[i]).toxlevels + tox2 = imprt(folder + "/" + l[i + 1]).toxlevels + + neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())] + neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())] + poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())] + comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())] + + neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())] + neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())] + poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())] + comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())] + + ksneg = ks_2samp(neglevelsflat1, neglevelsflat2) + ksneu = ks_2samp(neulevelsflat1, neulevelsflat2) + kspos = ks_2samp(poslevelsflat1, poslevelsflat2) + kscom = ks_2samp(comlevelsflat1, comlevelsflat2) + + changes_neg[p].append(ksneg) + changes_neu[p].append(ksneu) + changes_pos[p].append(kspos) + changes_com[p].append(kscom) + + for (p, l) in files.items(): + with open(folder + "/ks_" + str(p) + ".log", "w") as f: + for i in range(len(l) - 1): + f1 = l[i] + f2 = l[i + 1] + f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[p][i]) + "; ks neu = " + str(changes_neu[p][i]) + + "; ks pos = " + str(changes_pos[p][i]) + "; ks com = " + str(changes_com[p][i]) + "\n") + + for (p, l) in files.items(): + x = [l[i].split("_")[2] + " -\n" + l[i + 1].split("_")[2] for i in range(len(l) - 1)] + fig = plt.figure(figsize=(16, 12)) + for type, changes in {"neg": changes_neg[p], "neu": changes_neu[p], "pos": changes_pos[p], "com": changes_com[p]}.items(): + stat = [x.statistic for x in changes] + pval = [x.pvalue for x in changes] + plt.plot(x, stat, label=type + ".stat") + plt.plot(x, pval, label=type + ".pval") + plt.title("KS 2-sided test with max " + str(p) + " posts") + plt.xticks(rotation=90) + plt.legend(loc="upper right") + plt.savefig(folder + "/ks_" + str(p) + ".png", bbox_inches='tight') + plt.close(fig) + + +def plotbydate(onlyfiles): + files = defaultdict(list) + for f in onlyfiles: + s = f[:-3].split("_") + files[(s[2], s[3])].append(f) + files = {d: sorted(l, key=lambda e: e.split("_")[4]) for (d, l) in files.items()} + + changes_neg = defaultdict(list) + changes_neu = defaultdict(list) + changes_pos = defaultdict(list) + changes_com = defaultdict(list) + + for (d, l) in files.items(): + if len(l) < 2: + continue + print(d) + for i in range(len(l) - 1): + tox1 = imprt(folder + "/" + l[i]).toxlevels + tox2 = imprt(folder + "/" + l[i + 1]).toxlevels + + neglevelsflat1 = [item['neg'] for item in flatmap(tox1.values())] + neulevelsflat1 = [item['neu'] for item in flatmap(tox1.values())] + poslevelsflat1 = [item['pos'] for item in flatmap(tox1.values())] + comlevelsflat1 = [item['compound'] for item in flatmap(tox1.values())] + + neglevelsflat2 = [item['neg'] for item in flatmap(tox2.values())] + neulevelsflat2 = [item['neu'] for item in flatmap(tox2.values())] + poslevelsflat2 = [item['pos'] for item in flatmap(tox2.values())] + comlevelsflat2 = [item['compound'] for item in flatmap(tox2.values())] + + ksneg = ks_2samp(neglevelsflat1, neglevelsflat2) + ksneu = ks_2samp(neulevelsflat1, neulevelsflat2) + kspos = ks_2samp(poslevelsflat1, poslevelsflat2) + kscom = ks_2samp(comlevelsflat1, comlevelsflat2) + + changes_neg[d].append(ksneg) + changes_neu[d].append(ksneu) + changes_pos[d].append(kspos) + changes_com[d].append(kscom) + + for (d, l) in files.items(): + with open(folder + "/ks_" + d[0] + "_" + d[1] + ".log", "w") as f: + for i in range(len(l) - 1): + f1 = l[i] + f2 = l[i + 1] + f.write(f1 + " -> " + f2 + ": ks neg = " + str(changes_neg[d][i]) + "; ks neu = " + str(changes_neu[d][i]) + + "; ks pos = " + str(changes_pos[d][i]) + "; ks com = " + str(changes_com[d][i]) + "\n") + + for (d, l) in files.items(): + x = [l[i].split("_")[4][:-3] + "-" + l[i + 1].split("_")[4][:-3] for i in range(len(l) - 1)] + fig = plt.figure(figsize=(16, 12)) + for type, changes in {"neg": changes_neg[d], "neu": changes_neu[d], "pos": changes_pos[d], "com": changes_com[d]}.items(): + stat = [x.statistic for x in changes] + pval = [x.pvalue for x in changes] + plt.plot(x, stat, label=type + ".stat") + plt.plot(x, pval, label=type + ".pval") + plt.title("KS 2-sided test with between " + d[0] + " and " + d[1]) + plt.xticks(rotation=90) + plt.legend(loc="upper right") + plt.savefig(folder + "/ks_" + d[0] + "_" + d[1] + ".png", bbox_inches='tight') + plt.close(fig) + + +def imprt(file): + spec = importlib.util.spec_from_file_location("module.name", file) + foo = importlib.util.module_from_spec(spec) + spec.loader.exec_module(foo) + return foo + + +def flatmap(arr): + return [item for sublist in arr for item in sublist] + + +def filecmp(file1, file2): + if file1 == file2: + return 0 + s1 = file1.split("_") + s2 = file2.split("_") + d1 = datetime.strptime(s1[2], "%d-%m-%Y") + d2 = datetime.strptime(s2[2], "%d-%m-%Y") + if d1 < d2: + return -1 + elif d1 > d2: + return 1 + return 0 + + +if __name__ == "__main__": + # execute only if run as a script + usage = sys.argv[0] + " " + if len(sys.argv) < 2: + print(usage) + sys.exit(1) + folder = sys.argv[1] + if not os.path.isdir(folder): + print(folder + " is not a folder") + sys.exit(1) + + main(folder) diff --git a/loader.py b/loader.py new file mode 100644 index 0000000..562ce6a --- /dev/null +++ b/loader.py @@ -0,0 +1,203 @@ +from xml.dom import minidom +from datetime import datetime +from collections import defaultdict +import time +import multiprocessing +import operator +from mt import mt +import xml.etree.cElementTree as et + +printnoln = lambda text: print(text, end='', flush=True) +rprint = lambda text: print('\r' + text) + + +def dmt(data): return mt(multiprocessing.cpu_count(), data, False) + + +def cms(): return int(round(time.time() * 1000)) + + +def load(folder): + users = readUsers(folder + "/Users.xml") + posts = readPosts(folder + "/Posts.xml") + + # get first contribution to page: + firstcontrib = computefirstcontrib(posts) + sumcontrib = computesumcontrib(posts) + + return users, posts, firstcontrib, sumcontrib + + +def computesumcontrib(posts): + x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults() + x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults() + x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults() + sumcontrib = defaultdict(int) + for id in x1: + sumcontrib[id] += 1 + for y in x2: + for id in y: + sumcontrib[id] += 1 + for y in x3: + for id in y: + sumcontrib[id] += 1 + return sumcontrib + + +def computefirstcontrib(posts): + x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults() + x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults() + firstcontrib = defaultdict(list) + for (id, date) in x1: + firstcontrib[id].append(date) + for y in x2: + for (id, date) in y: + firstcontrib[id].append(date) + firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()} + return firstcontrib + + +def mapuser(item): + tags = ['Id', 'CreationDate'] + datetags = ['CreationDate'] + user = {tag: getTag(item, tag) for tag in tags} + for tag in datetags: + if user[tag] is not None: + user[tag] = datetime.fromisoformat(user[tag]) + else: + print("map user: tag " + tag + " is None: " + str(user)) + return user + + +def mapQuestion(item): + tags = ['Id', 'CreationDate', 'Body', 'Title', 'OwnerUserId', 'OwnerDisplayName'] + datetags = ['CreationDate'] + question = {tag: getTag(item, tag) for tag in tags} + for tag in datetags: + question[tag] = datetime.fromisoformat(question[tag]) + return question + + +def mapAnswer(item): + tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId'] + datetags = ['CreationDate'] + answer = {tag: getTag(item, tag) for tag in tags} + for tag in datetags: + answer[tag] = datetime.fromisoformat(answer[tag]) + return answer + + +def mapComment(item): + tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId'] + datetags = ['CreationDate'] + comment = {tag: getTag(item, tag) for tag in tags} + for tag in datetags: + comment[tag] = datetime.fromisoformat(comment[tag]) + return comment + + +def readUsers(file): + prefix = "readUsers: " + printnoln(prefix + "reading xml file ...") + + now = cms() + items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"] + rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms") + + users = dmt(items).map(mapuser, prefix + "mapping users").getresults() + + print(prefix + "done") + return users + + +def readPosts(file): + prefix = "readPosts: " + printnoln(prefix + "reading xml file ...") + + now = cms() + items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"] + rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms") + + print(prefix + "#posts total: " + str(len(items))) + + questions = readQuestions(items) + questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults()) + + answers = readAnswers(items) + answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults()) + + comments = readComments(items) + + # filter answers + answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults() + + # filter comments + comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults() + + # create question answer mapping + printnoln(prefix + "create qamapping ...") + qamapping = {id: [] for id in questionids} + for a in answers: + qamapping[a['ParentId']].append(a) + rprint(prefix + "create qamapping ... done") + questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults() + + # create comment question comment answer mapping + printnoln(prefix + "create qacmapping ...") + qacmapping = {id: [] for id in questionids.union(answerids)} + for c in comments: + qacmapping[c['ParentId']].append(c) + rprint(prefix + "create qacmapping ... done") + answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults() + questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults() + + # safety check + countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \ + .reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults() + if countans != len(answers): + print(prefix + "countans != #answer: " + countans + " " + len(answers)) + print(prefix + "done") + return questions + + +def readQuestions(items): + prefix = "readQuestions: " + questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \ + .map(mapQuestion, prefix + "mapping questions") \ + .filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults() + + print(prefix + "questions read: " + str(len(questions))) + return questions + + +def readAnswers(items): + prefix = "readAnswers: " + answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \ + .map(mapAnswer, prefix + "mapping answers") \ + .filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults() + + print(prefix + "answers read: " + str(len(answers))) + return answers + + +def readComments(items): + prefix = "readComments: " + comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \ + .map(mapComment, prefix + "mapping comments") \ + .filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults() + + print(prefix + "comments read: " + str(len(comments))) + return comments + + +def getTag(item, tag): + return item.attrib.get(tag) if tagExists(item, tag) else None + + +def tagExists(item, tag): + return tag in item.attrib.keys() + + +def setprop(dic, key, value): + dic[key] = value + return dic diff --git a/mt.py b/mt.py new file mode 100644 index 0000000..02d75e0 --- /dev/null +++ b/mt.py @@ -0,0 +1,155 @@ +from threading import Thread, Lock +import time + + +class mt(): + def __init__(self, threads, data, verbose=False): + self.__running = False + self.__closed = False + self.__data = data + self.__verbose = verbose + # dummy + self.__final = None + self.__comment = None + self.__starttime = None + self.__endtime = None + self.__type = None + # thread things + self.__threadcount = threads + self.__threads = [] + self.__lock = Lock() + self.__results = [] + for i in range(self.__threadcount): + self.__results.append([]) + self.__threads.append(None) + + def filter(self, cond, comment=None): + if self.__closed: + raise RuntimeError("Already closed") + if self.__running: + self.join() + self.__data = self.getresults() + self.__running = True + self.__final = self.__getresultsmapfilter + self.__type = "filter" + self.__comment = comment if comment is not None else "" + if comment is not None: + print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) + self.__starttime = self.__cms() + self.__endtime = None + for i in range(self.__threadcount): + part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] + self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond)) + self.__threads[i].start() + return self + + def __dofilter(self, i, list, cond): + now = self.__cms() + results = [l for l in list if cond(l)] + with self.__lock: + self.__results[i] = results + dur = self.__cms() - now + if self.__verbose: + print(self.__comment + ": Thread " + str(i) + ": filter took " + str(dur) + "ms") + + def map(self, func, comment=None): + if self.__closed: + raise RuntimeError("Already closed") + if self.__running: + self.join() + self.__data = self.getresults() + self.__running = True + self.__final = self.__getresultsmapfilter + self.__type = "map" + self.__comment = comment if comment is not None else "" + if comment is not None: + print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) + self.__starttime = self.__cms() + self.__endtime = None + for i in range(self.__threadcount): + part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] + self.__threads[i] = Thread(target=self.__domap, args=(i, part, func)) + self.__threads[i].start() + return self + + def __domap(self, i, list, func): + now = self.__cms() + results = [func(l) for l in list] + with self.__lock: + self.__results[i] = results + dur = self.__cms() - now + if self.__verbose: + print(self.__comment + ": Thread " + str(i) + ": map took " + str(dur) + "ms") + + def reduce(self, reducer, aggregator, initval, comment=None): + if self.__closed: + raise RuntimeError("Already closed") + if self.__running: + self.join() + self.__data = self.getresults() + self.__running = True + self.__final = lambda: self.__getresultsreduce(aggregator, initval) + self.__type = "reduce" + self.__comment = comment if comment is not None else "" + if comment is not None: + print(self.__comment + ": #" + str(len(self.__data)) + " ...", end='\n' if self.__verbose else '', flush=True) + self.__starttime = self.__cms() + self.__endtime = None + for i in range(self.__threadcount): + part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] + self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval)) + self.__threads[i].start() + return self + + def __doreduce(self, i, list, reducer, initval): + now = self.__cms() + val = initval() + for j in range(len(list)): + val = reducer(val, list[j]) + with self.__lock: + self.__results[i] = val + dur = self.__cms() - now + if self.__verbose: + print(self.__comment + ": Thread " + str(i) + ": reduce took " + str(dur) + "ms") + + def getresults(self): + self.join() + return self.__final() + + def __getresultsmapfilter(self): + res = [] + for i in range(self.__threadcount): + res += self.__results[i] + return res + + def __getresultsreduce(self, aggregator, initval): + val = initval() + for j in range(self.__threadcount): + val = aggregator(val, self.__results[j]) + return val + + def join(self): + if self.__closed: + raise RuntimeError("Already closed") + for i in range(self.__threadcount): + if self.__threads[i] is not None: + self.__threads[i].join() + self.__threads[i] = None + if self.__endtime is None: + self.__endtime = self.__cms(); + if self.__comment is not None: + dur = self.__endtime - self.__starttime + if self.__verbose: + print(self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") + else: + print("\r" + self.__comment + ": #" + str(len(self.__data)) + (" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") + return self + + def close(self): + if self.__closed: + raise RuntimeError("Already closed") + self.join() + self.__closed = True + + def __cms(self): + return int(round(time.time() * 1000))