This commit is contained in:
wea_ondara
2019-12-28 13:14:49 +01:00
parent 88de1e9c7b
commit e54e4f2938
3 changed files with 75 additions and 37 deletions

View File

@@ -17,7 +17,10 @@ printnoln = lambda text: print(text, end='', flush=True)
rprint = lambda text: print('\r' + text) rprint = lambda text: print('\r' + text)
def dmt(data, progressinterval=1000): return mt(multiprocessing.cpu_count(), data, False, progressinterval) def dmt(data, progressinterval=5000): return mt(1, data, False, progressinterval)
def dmp(data, progressinterval=5000): return mt(multiprocessing.cpu_count(), data, False, progressinterval, True)
def cms(): return int(round(time.time() * 1000)) def cms(): return int(round(time.time() * 1000))
@@ -39,16 +42,16 @@ def load(folder):
def computesumcontrib(posts): def computesumcontrib(posts):
x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults() x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults()
x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults() x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults()
x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults() # x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults()
sumcontrib = defaultdict(int) sumcontrib = defaultdict(int)
for id in x1: for id in x1:
sumcontrib[id] += 1 sumcontrib[id] += 1
for y in x2: for y in x2:
for id in y: for id in y:
sumcontrib[id] += 1 sumcontrib[id] += 1
for y in x3: # for y in x3:
for id in y: # for id in y:
sumcontrib[id] += 1 # sumcontrib[id] += 1
return sumcontrib return sumcontrib
@@ -137,13 +140,13 @@ def readPosts(file):
answers = readAnswers(items) answers = readAnswers(items)
answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults()) answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults())
comments = readComments(items) # comments = readComments(items)
# filter answers # filter answers
answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults() answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults()
# filter comments # filter comments
comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults() # comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults()
# create question answer mapping # create question answer mapping
printnoln(prefix + "create qamapping ...") printnoln(prefix + "create qamapping ...")
@@ -153,14 +156,14 @@ def readPosts(file):
rprint(prefix + "create qamapping ... done") rprint(prefix + "create qamapping ... done")
questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults() questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults()
# create comment question comment answer mapping # # create comment question comment answer mapping
printnoln(prefix + "create qacmapping ...") # printnoln(prefix + "create qacmapping ...")
qacmapping = {id: [] for id in questionids.union(answerids)} # qacmapping = {id: [] for id in questionids.union(answerids)}
for c in comments: # for c in comments:
qacmapping[c['ParentId']].append(c) # qacmapping[c['ParentId']].append(c)
rprint(prefix + "create qacmapping ... done") # rprint(prefix + "create qacmapping ... done")
answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults() # answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults()
questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults() # questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults()
# safety check # safety check
countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \ countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \

74
mt.py
View File

@@ -1,14 +1,16 @@
import multiprocessing as mp
import threading as th
import time import time
from math import ceil from math import ceil
from threading import Thread, Lock
class mt(): class mt():
def __init__(self, threads, data, verbose=False, progressinterval=1000): def __init__(self, threads, data, verbose=False, progressinterval=1000, useprocesses=False):
self.__running = False self.__running = False
self.__closed = False self.__closed = False
self.__data = data self.__data = data
self.__verbose = verbose self.__verbose = verbose
self.__useprocesses = useprocesses
# dummy # dummy
self.__final = None self.__final = None
self.__comment = None self.__comment = None
@@ -18,9 +20,16 @@ class mt():
# thread things # thread things
self.__threadcount = threads self.__threadcount = threads
self.__threads = [] self.__threads = []
self.__lock = Lock() if useprocesses:
self.__results = [] mp.set_start_method('fork', True)
self.__progress = 0 manager = mp.Manager()
self.__results = manager.list()
self.__progress = mp.Value("i", 0)
self.__lock = mp.Lock()
else:
self.__results = []
self.__progress = 0
self.__lock = th.Lock()
self.__progressinterval = progressinterval self.__progressinterval = progressinterval
for i in range(self.__threadcount): for i in range(self.__threadcount):
self.__results.append([]) self.__results.append([])
@@ -32,7 +41,7 @@ class mt():
if self.__running: if self.__running:
self.join() self.join()
self.__data = self.getresults() self.__data = self.getresults()
self.__progress = 0 self.__reset_progress()
self.__running = True self.__running = True
self.__final = self.__getresultsmapfilter self.__final = self.__getresultsmapfilter
self.__type = "filter" self.__type = "filter"
@@ -43,7 +52,10 @@ class mt():
self.__endtime = None self.__endtime = None
for i in range(self.__threadcount): for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
self.__threads[i] = Thread(target=self.__dofilter, args=(i, part, cond)) if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__dofilter, args=(i, part, cond))
else:
self.__threads[i] = th.Thread(target=self.__dofilter, args=(i, part, cond))
self.__threads[i].start() self.__threads[i].start()
return self return self
@@ -55,9 +67,9 @@ class mt():
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
results += [l for l in part if cond(l)] results += [l for l in part if cond(l)]
with self.__lock: with self.__lock:
self.__progress += len(part) self.__inc_progress(len(part))
if self.__comment is not None: if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
with self.__lock: with self.__lock:
self.__results[i] = results self.__results[i] = results
@@ -74,7 +86,7 @@ class mt():
if self.__running: if self.__running:
self.join() self.join()
self.__data = self.getresults() self.__data = self.getresults()
self.__progress = 0 self.__reset_progress()
self.__running = True self.__running = True
self.__final = self.__getresultsmapfilter self.__final = self.__getresultsmapfilter
self.__type = "map" self.__type = "map"
@@ -85,7 +97,10 @@ class mt():
self.__endtime = None self.__endtime = None
for i in range(self.__threadcount): for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
self.__threads[i] = Thread(target=self.__domap, args=(i, part, func)) if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__domap, args=(i, part, func))
else:
self.__threads[i] = th.Thread(target=self.__domap, args=(i, part, func))
self.__threads[i].start() self.__threads[i].start()
return self return self
@@ -96,9 +111,9 @@ class mt():
part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))] part = list[j * self.__progressinterval: min((j + 1) * self.__progressinterval, len(list))]
results += [func(l) for l in part] results += [func(l) for l in part]
with self.__lock: with self.__lock:
self.__progress += len(part) self.__inc_progress(len(part))
if self.__comment is not None: if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
with self.__lock: with self.__lock:
self.__results[i] = results self.__results[i] = results
@@ -115,7 +130,7 @@ class mt():
if self.__running: if self.__running:
self.join() self.join()
self.__data = self.getresults() self.__data = self.getresults()
self.__progress = 0 self.__reset_progress()
self.__running = True self.__running = True
self.__final = lambda: self.__getresultsreduce(aggregator, initval) self.__final = lambda: self.__getresultsreduce(aggregator, initval)
self.__type = "reduce" self.__type = "reduce"
@@ -126,7 +141,10 @@ class mt():
self.__endtime = None self.__endtime = None
for i in range(self.__threadcount): for i in range(self.__threadcount):
part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount] part = self.__data[i * len(self.__data) // self.__threadcount:(i + 1) * len(self.__data) // self.__threadcount]
self.__threads[i] = Thread(target=self.__doreduce, args=(i, part, reducer, initval)) if self.__useprocesses:
self.__threads[i] = mp.Process(target=self.__doreduce, args=(i, part, reducer, initval))
else:
self.__threads[i] = th.Thread(target=self.__doreduce, args=(i, part, reducer, initval))
self.__threads[i].start() self.__threads[i].start()
return self return self
@@ -139,9 +157,9 @@ class mt():
for k in range(len(part)): for k in range(len(part)):
val = reducer(val, part[k]) val = reducer(val, part[k])
with self.__lock: with self.__lock:
self.__progress += len(part) self.__inc_progress(len(part))
if self.__comment is not None: if self.__comment is not None:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + " ...", end='', flush=True) print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + " ...", end='', flush=True)
with self.__lock: with self.__lock:
self.__results[i] = val self.__results[i] = val
@@ -180,10 +198,10 @@ class mt():
if self.__comment is not None: if self.__comment is not None:
dur = self.__endtime - self.__starttime dur = self.__endtime - self.__starttime
if self.__verbose: if self.__verbose:
print(self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + ( print(self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
else: else:
print("\r" + self.__comment + ": " + str(self.__progress) + "/" + str(len(self.__data)) + ( print("\r" + self.__comment + ": " + str(self.__get_progress()) + "/" + str(len(self.__data)) + (
" -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms") " -> #" + str(sum([len(l) for l in self.__results])) if self.__type == "filter" else "") + " ... took " + str(dur) + "ms")
return self return self
@@ -195,3 +213,21 @@ class mt():
def __cms(self): def __cms(self):
return int(round(time.time() * 1000)) return int(round(time.time() * 1000))
def __reset_progress(self):
if self.__useprocesses:
self.__progress.value = 0
else:
self.__progress = 0
def __inc_progress(self, val):
if self.__useprocesses:
self.__progress.value += val
else:
self.__progress += val
def __get_progress(self):
if self.__useprocesses:
return self.__progress.value
else:
return self.__progress

View File

@@ -3,8 +3,7 @@ import sys
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from loader import load, dmt from loader import load, dmp
from common import imprt
analyser = SentimentIntensityAnalyzer() analyser = SentimentIntensityAnalyzer()
@@ -17,7 +16,7 @@ def main(folder):
outfilename = outfolder + "sentiments" outfilename = outfolder + "sentiments"
# compute toxic levels # compute toxic levels
toxlevels = dmt(posts, 100).map(lambda p: (p['Id'], {a['Id']: computeToxLevel(a['Body']) for a in p['Answers']}), "calculating sentiments").getresults() toxlevels = dmp(posts, 100).map(lambda p: (p['Id'], {a['Id']: computeToxLevel(a['Body']) for a in p['Answers']}), "calculating sentiments").getresults()
toxlevels = {id: p for (id, p) in toxlevels} toxlevels = {id: p for (id, p) in toxlevels}
dumptoxlevels(toxlevels, outfilename + ".py") dumptoxlevels(toxlevels, outfilename + ".py")