Files
master/loader.py
wea_ondara 1f699f6b56 wip
2019-07-22 22:06:32 +02:00

204 lines
7.1 KiB
Python

import multiprocessing
import operator
import time
import xml.etree.cElementTree as et
from collections import defaultdict
from datetime import datetime
from mt import mt
printnoln = lambda text: print(text, end='', flush=True)
rprint = lambda text: print('\r' + text)
def dmt(data): return mt(multiprocessing.cpu_count(), data, False)
def cms(): return int(round(time.time() * 1000))
def load(folder):
users = readUsers(folder + "/Users.xml")
posts = readPosts(folder + "/Posts.xml")
# get first contribution to page:
firstcontrib = computefirstcontrib(posts)
sumcontrib = computesumcontrib(posts)
return users, posts, firstcontrib, sumcontrib
def computesumcontrib(posts):
x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults()
x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults()
x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults()
sumcontrib = defaultdict(int)
for id in x1:
sumcontrib[id] += 1
for y in x2:
for id in y:
sumcontrib[id] += 1
for y in x3:
for id in y:
sumcontrib[id] += 1
return sumcontrib
def computefirstcontrib(posts):
x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults()
x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults()
firstcontrib = defaultdict(list)
for (id, date) in x1:
firstcontrib[id].append(date)
for y in x2:
for (id, date) in y:
firstcontrib[id].append(date)
firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()}
return firstcontrib
def mapuser(item):
tags = ['Id', 'CreationDate']
datetags = ['CreationDate']
user = {tag: getTag(item, tag) for tag in tags}
for tag in datetags:
if user[tag] is not None:
user[tag] = datetime.fromisoformat(user[tag])
else:
print("map user: tag " + tag + " is None: " + str(user))
return user
def mapQuestion(item):
tags = ['Id', 'CreationDate', 'Body', 'Title', 'OwnerUserId', 'OwnerDisplayName']
datetags = ['CreationDate']
question = {tag: getTag(item, tag) for tag in tags}
for tag in datetags:
question[tag] = datetime.fromisoformat(question[tag])
return question
def mapAnswer(item):
tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
datetags = ['CreationDate']
answer = {tag: getTag(item, tag) for tag in tags}
for tag in datetags:
answer[tag] = datetime.fromisoformat(answer[tag])
return answer
def mapComment(item):
tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
datetags = ['CreationDate']
comment = {tag: getTag(item, tag) for tag in tags}
for tag in datetags:
comment[tag] = datetime.fromisoformat(comment[tag])
return comment
def readUsers(file):
prefix = "readUsers: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
users = dmt(items).map(mapuser, prefix + "mapping users").getresults()
print(prefix + "done")
return users
def readPosts(file):
prefix = "readPosts: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
print(prefix + "#posts total: " + str(len(items)))
questions = readQuestions(items)
questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults())
answers = readAnswers(items)
answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults())
comments = readComments(items)
# filter answers
answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults()
# filter comments
comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults()
# create question answer mapping
printnoln(prefix + "create qamapping ...")
qamapping = {id: [] for id in questionids}
for a in answers:
qamapping[a['ParentId']].append(a)
rprint(prefix + "create qamapping ... done")
questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults()
# create comment question comment answer mapping
printnoln(prefix + "create qacmapping ...")
qacmapping = {id: [] for id in questionids.union(answerids)}
for c in comments:
qacmapping[c['ParentId']].append(c)
rprint(prefix + "create qacmapping ... done")
answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults()
questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults()
# safety check
countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \
.reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults()
if countans != len(answers):
print(prefix + "countans != #answer: " + countans + " " + len(answers))
print(prefix + "done")
return questions
def readQuestions(items):
prefix = "readQuestions: "
questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \
.map(mapQuestion, prefix + "mapping questions") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults()
print(prefix + "questions read: " + str(len(questions)))
return questions
def readAnswers(items):
prefix = "readAnswers: "
answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \
.map(mapAnswer, prefix + "mapping answers") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults()
print(prefix + "answers read: " + str(len(answers)))
return answers
def readComments(items):
prefix = "readComments: "
comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \
.map(mapComment, prefix + "mapping comments") \
.filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults()
print(prefix + "comments read: " + str(len(comments)))
return comments
def getTag(item, tag):
return item.attrib.get(tag) if tagExists(item, tag) else None
def tagExists(item, tag):
return tag in item.attrib.keys()
def setprop(dic, key, value):
dic[key] = value
return dic