From e54e4f293859bab5c37bd03d85aa6a811b632d39 Mon Sep 17 00:00:00 2001 From: wea_ondara Date: Sat, 28 Dec 2019 13:14:49 +0100 Subject: [PATCH] wip --- loader.py | 33 ++++++++++++----------- mt.py | 74 ++++++++++++++++++++++++++++++++++++++------------- sentiments.py | 5 ++-- 3 files changed, 75 insertions(+), 37 deletions(-) diff --git a/loader.py b/loader.py index c5621ee..c1f81c3 100644 --- a/loader.py +++ b/loader.py @@ -17,7 +17,10 @@ printnoln = lambda text: print(text, end='', flush=True) rprint = lambda text: print('\r' + text) -def dmt(data, progressinterval=1000): return mt(multiprocessing.cpu_count(), data, False, progressinterval) +def dmt(data, progressinterval=5000): return mt(1, data, False, progressinterval) + + +def dmp(data, progressinterval=5000): return mt(multiprocessing.cpu_count(), data, False, progressinterval, True) def cms(): return int(round(time.time() * 1000)) @@ -39,16 +42,16 @@ def load(folder): def computesumcontrib(posts): x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults() x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults() - x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults() + # x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults() sumcontrib = defaultdict(int) for id in x1: sumcontrib[id] += 1 for y in x2: for id in y: sumcontrib[id] += 1 - for y in x3: - for id in y: - sumcontrib[id] += 1 + # for y in x3: + # for id in y: + # sumcontrib[id] += 1 return sumcontrib @@ -137,13 +140,13 @@ def readPosts(file): answers = readAnswers(items) answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults()) - comments = readComments(items) + # comments = readComments(items) # filter answers answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults() # filter comments - comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults() + # comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults() # create question answer mapping printnoln(prefix + "create qamapping ...") @@ -153,14 +156,14 @@ def readPosts(file): rprint(prefix + "create qamapping ... done") questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults() - # create comment question comment answer mapping - printnoln(prefix + "create qacmapping ...") - qacmapping = {id: [] for id in questionids.union(answerids)} - for c in comments: - qacmapping[c['ParentId']].append(c) - rprint(prefix + "create qacmapping ... done") - answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults() - questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults() + # # create comment question comment answer mapping + # printnoln(prefix + "create qacmapping ...") + # qacmapping = {id: [] for id in questionids.union(answerids)} + # for c in comments: + # qacmapping[c['ParentId']].append(c) + # rprint(prefix + "create qacmapping ... done") + # answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults() + # questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults() # safety check countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \ diff --git a/mt.py b/mt.py index 053c6a6..763bf1c 100644 --- a/mt.py +++ b/mt.py @@ -1,14 +1,16 @@ +import multiprocessing as mp +import threading as th import time from math import ceil -from threading import Thread, Lock class mt(): - def __init__(self, threads, data, verbose=False, progressinterval=1000): + def __init__(self, threads, data, verbose=False, progressinterval=1000, useprocesses=False): self.__running = False self.__closed = False self.__data = data self.__verbose = verbose + self.__useprocesses = useprocesses # dummy self.__final = None self.__comment = None @@ -18,9 +20,16 @@ class mt(): # thread things self.__threadcount = threads self.__threads = [] - self.__lock = Lock() - self.__results = [] - self.__progress = 0 + if useprocesses: + mp.set_start_method('fork', True) + manager = mp.Manager() + self.__results = manager.list() + self.__progress = mp.Value("i", 0) + self.__lock = mp.Lock() + else: + self.__results = [] + self.__progress = 0 + self.__lock = th.Lock() self.__progressinterval = progressinterval for i in range(self.__threadcount): self.__results.append([]) @@ -32,7 +41,7 @@ class mt(): if self.__running: self.join() self.__data = self.getresults() - self.__progress = 0 + self.__reset_progress() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "filter" @@ -43,7 +52,10 @@ class mt(): self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] - self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond)) + if self.__useprocesses: + self.__threads[i] = mp.Process(target=self.__dofilter, args=(i, part, cond)) + else: + self.__threads[i] = th.Thread(target=self.__dofilter, args=(i, part, cond)) self.__threads[i].start() return self @@ -55,9 +67,9 @@ class mt(): part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] results += [l for l in part if cond(l)] with self.__lock: - self.__progress += len(part) + self.__inc_progress(len(part)) if self.__comment is not None: - print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) + print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) with self.__lock: self.__results[i] = results @@ -74,7 +86,7 @@ class mt(): if self.__running: self.join() self.__data = self.getresults() - self.__progress = 0 + self.__reset_progress() self.__running = True self.__final = self.__getresultsmapfilter self.__type = "map" @@ -85,7 +97,10 @@ class mt(): self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] - self.__threads[i] = Thread(target=self.__domap, args=(i, part, func)) + if self.__useprocesses: + self.__threads[i] = mp.Process(target=self.__domap, args=(i, part, func)) + else: + self.__threads[i] = th.Thread(target=self.__domap, args=(i, part, func)) self.__threads[i].start() return self @@ -96,9 +111,9 @@ class mt(): part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] results += [func(l) for l in part] with self.__lock: - self.__progress += len(part) + self.__inc_progress(len(part)) if self.__comment is not None: - print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) + print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) with self.__lock: self.__results[i] = results @@ -115,7 +130,7 @@ class mt(): if self.__running: self.join() self.__data = self.getresults() - self.__progress = 0 + self.__reset_progress() self.__running = True self.__final = lambda: self.__getresultsreduce(aggregator, initval) self.__type = "reduce" @@ -126,7 +141,10 @@ class mt(): self.__endtime = None for i in range(self.__threadcount): part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] - self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval)) + if self.__useprocesses: + self.__threads[i] = mp.Process(target=self.__doreduce, args=(i, part, reducer, initval)) + else: + self.__threads[i] = th.Thread(target=self.__doreduce, args=(i, part, reducer, initval)) self.__threads[i].start() return self @@ -139,9 +157,9 @@ class mt(): for k in range(len(part)): val = reducer(val, part[k]) with self.__lock: - self.__progress += len(part) + self.__inc_progress(len(part)) if self.__comment is not None: - print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) + print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True) with self.__lock: self.__results[i] = val @@ -180,10 +198,10 @@ class mt(): if self.__comment is not None: dur = self.__endtime - self.__starttime if self.__verbose: - print(self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + ( + print(self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + ( " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") else: - print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + ( + print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + ( " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") return self @@ -195,3 +213,21 @@ class mt(): def __cms(self): return int(round(time.time() * 1000)) + + def __reset_progress(self): + if self.__useprocesses: + self.__progress.value = 0 + else: + self.__progress = 0 + + def __inc_progress(self, val): + if self.__useprocesses: + self.__progress.value += val + else: + self.__progress += val + + def __get_progress(self): + if self.__useprocesses: + return self.__progress.value + else: + return self.__progress diff --git a/sentiments.py b/sentiments.py index f24f41d..628fd64 100644 --- a/sentiments.py +++ b/sentiments.py @@ -3,8 +3,7 @@ import sys from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer -from loader import load, dmt -from common import imprt +from loader import load, dmp analyser = SentimentIntensityAnalyzer() @@ -17,7 +16,7 @@ def main(folder): outfilename = outfolder + "sentiments" # compute toxic levels - toxlevels = dmt(posts, 100).map(lambda p: (p['Id'], {a['Id']: computeToxLevel(a['Body']) for a in p['Answers']}), "calculating sentiments").getresults() + toxlevels = dmp(posts, 100).map(lambda p: (p['Id'], {a['Id']: computeToxLevel(a['Body']) for a in p['Answers']}), "calculating sentiments").getresults() toxlevels = {id: p for (id, p) in toxlevels} dumptoxlevels(toxlevels, outfilename + ".py")