Files
master/loader.py
wea_ondara c2695e0d49 wip
2020-01-27 12:17:42 +01:00

265 lines
9.0 KiB
Python

import html
import multiprocessing
import operator
import re
import time
import xml.etree.cElementTree as et
from collections import defaultdict
from datetime import datetime
from mt import mt
import gc
TAG_RE = re.compile(r'<[^>]+>')
TAG_CODE = re.compile(r'<code.+</code>')
printnoln = lambda text: print(text, end='', flush=True)
rprint = lambda text: print('\r' + text)
def dmt(data, progressinterval=5000): return mt(1, data, False, progressinterval)
def dmp(data, progressinterval=5000): return mt(multiprocessing.cpu_count(), data, False, progressinterval, True)
def cms(): return int(round(time.time() * 1000))
class Cache:
CACHED_FOLDER = None
CACHED_USERS = None
CACHED_POSTS = None
CACHED_FIRSTCONTRIB = None
CACHED_SUMCONTRIB = None
def load(folder):
if folder == Cache.CACHED_FOLDER:
return Cache.CACHED_USERS, Cache.CACHED_POSTS, Cache.CACHED_FIRSTCONTRIB, Cache.CACHED_SUMCONTRIB
Cache.CACHED_FOLDER = None
users = readUsers(folder + "/Users.xml")
gc.collect()
posts = readPosts(folder + "/Posts.xml")
gc.collect()
# get first contribution to page:
firstcontrib = computefirstcontrib(posts)
sumcontrib = computesumcontrib(posts)
Cache.CACHED_USERS = users
Cache.CACHED_POSTS = posts
Cache.CACHED_FIRSTCONTRIB = firstcontrib
Cache.CACHED_SUMCONTRIB = sumcontrib
Cache.CACHED_FOLDER = folder
return users, posts, firstcontrib, sumcontrib
def readVotes(folder):
file = folder + "/Votes.xml"
prefix = "readVotes: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
votes = dmt(items).map(mapvote, prefix + "mapping votes").getresults()
print(prefix + "done")
return votes
VOTE_TAGS = ['PostId', 'VoteTypeId', 'CreationDate']
def mapvote(item):
datetags = ['CreationDate']
vote = {tag: getTag(item, tag) for tag in VOTE_TAGS}
for tag in datetags:
if vote[tag] is not None:
vote[tag] = datetime.fromisoformat(vote[tag])
else:
print("map vote: tag " + tag + " is None: " + str(vote))
return vote
def computesumcontrib(posts):
x1 = dmt(posts).map(lambda q: q['OwnerUserId'], "calc sum contrib q").getresults()
x2 = dmt(posts).map(lambda q: [a['OwnerUserId'] for a in q['Answers']], "calc sum contrib a").getresults()
# x3 = dmt(posts).map(lambda q: [c['OwnerUserId'] for a in q['Answers'] for c in a['Comments']] + [c['OwnerUserId'] for c in q['Comments']], "calc sum contrib c").getresults()
sumcontrib = defaultdict(int)
for id in x1:
sumcontrib[id] += 1
for y in x2:
for id in y:
sumcontrib[id] += 1
# for y in x3:
# for id in y:
# sumcontrib[id] += 1
return sumcontrib
def computefirstcontrib(posts):
x1 = dmt(posts).map(lambda q: (q['OwnerUserId'], q['CreationDate']), "calc first contrib q").getresults()
x2 = dmt(posts).map(lambda q: [(a['OwnerUserId'], a['CreationDate']) for a in q['Answers']], "calc first contrib a").getresults()
firstcontrib = defaultdict(list)
for (id, date) in x1:
firstcontrib[id].append(date)
for y in x2:
for (id, date) in y:
firstcontrib[id].append(date)
firstcontrib = {id: min(ldate) for (id, ldate) in firstcontrib.items()}
return firstcontrib
USER_TAGS = ['Id', 'CreationDate']
USER_DTAGS = ['CreationDate']
def mapuser(item):
user = {tag: getTag(item, tag) for tag in USER_TAGS}
for tag in USER_DTAGS:
if user[tag] is not None:
user[tag] = datetime.fromisoformat(user[tag])
else:
print("map user: tag " + tag + " is None: " + str(user))
return user
Q_TAGS = ['Id', 'CreationDate', 'Body', 'Title', 'OwnerUserId', 'OwnerDisplayName', 'Score']
Q_DTAGS = ['CreationDate']
Q_BODY = 'Body'
def mapQuestion(item):
question = {tag: getTag(item, tag) for tag in Q_TAGS}
for tag in Q_DTAGS:
question[tag] = datetime.fromisoformat(question[tag])
question[Q_BODY] = removetags(html.unescape(question[Q_BODY]))
return question
A_TAGS = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId', 'Score']
A_DTAGS = ['CreationDate']
def mapAnswer(item):
answer = {tag: getTag(item, tag) for tag in A_TAGS}
for tag in A_DTAGS:
answer[tag] = datetime.fromisoformat(answer[tag])
answer['Body'] = removetags(html.unescape(answer['Body']))
return answer
# def mapComment(item):
# tags = ['Id', 'ParentId', 'CreationDate', 'Body', 'OwnerUserId']
# datetags = ['CreationDate']
# comment = {tag: getTag(item, tag) for tag in tags}
# for tag in datetags:
# comment[tag] = datetime.fromisoformat(comment[tag])
# comment['Body'] = removetags(html.unescape(comment['Body']))
# return comment
def readUsers(file):
prefix = "readUsers: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
users = dmt(items).map(mapuser, prefix + "mapping users").getresults()
print(prefix + "done")
return users
def readPosts(file):
prefix = "readPosts: "
printnoln(prefix + "reading xml file ...")
now = cms()
items = [elem for event, elem in et.iterparse(file) if elem.tag == "row"]
rprint(prefix + "reading xml file ... took " + str(cms() - now) + "ms")
print(prefix + "#posts total: " + str(len(items)))
questions = readQuestions(items)
questionids = set(dmt(questions).map(lambda q: q['Id'], prefix + "get question ids").getresults())
answers = readAnswers(items)
answerids = set(dmt(answers).map(lambda a: a['Id'], prefix + "get answer ids").getresults())
# comments = readComments(items)
# filter answers
answers = dmt(answers).filter(lambda a: a['ParentId'] in questionids, prefix + "filter answers by a.id in q.id").getresults()
# filter comments
# comments = dmt(comments).filter(lambda c: c['ParentId'] in questionids.union(answerids), prefix + "filter comments by c.id in q.id + a.id").getresults()
# create question answer mapping
printnoln(prefix + "create qamapping ...")
qamapping = {id: [] for id in questionids}
for a in answers:
qamapping[a['ParentId']].append(a)
rprint(prefix + "create qamapping ... done")
questions = dmt(questions).map(lambda q: setprop(q, 'Answers', qamapping[q['Id']]), prefix + "assign answers to questions").getresults()
# # create comment question comment answer mapping
# printnoln(prefix + "create qacmapping ...")
# qacmapping = {id: [] for id in questionids.union(answerids)}
# for c in comments:
# qacmapping[c['ParentId']].append(c)
# rprint(prefix + "create qacmapping ... done")
# answers = dmt(answers).map(lambda a: setprop(a, 'Comments', qacmapping[a['Id']]), prefix + "assign comments to answers").getresults()
# questions = dmt(questions).map(lambda q: setprop(q, 'Comments', qacmapping[q['Id']]), prefix + "assign comments to questions").getresults()
# safety check
countans = dmt(questions).map(lambda q: len(q['Answers']), prefix + "sum answer count") \
.reduce(operator.add, operator.add, lambda: 0, prefix + "sum answer count").getresults()
if countans != len(answers):
print(prefix + "countans != #answer: " + countans + " " + len(answers))
print(prefix + "done")
return questions
def readQuestions(items):
prefix = "readQuestions: "
questions = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "1", prefix + "filter out questions") \
.map(mapQuestion, prefix + "mapping questions") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken questions").getresults()
print(prefix + "questions read: " + str(len(questions)))
return questions
def readAnswers(items):
prefix = "readAnswers: "
answers = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "2", prefix + "filter out answers") \
.map(mapAnswer, prefix + "mapping answers") \
.filter(lambda q: q['OwnerUserId'] is not None, prefix + "filter out broken answers").getresults()
print(prefix + "answers read: " + str(len(answers)))
return answers
# def readComments(items):
# prefix = "readComments: "
# comments = dmt(items).filter(lambda item: getTag(item, 'PostTypeId') == "3", prefix + "filter out comments") \
# .map(mapComment, prefix + "mapping comments") \
# .filter(lambda c: c['OwnerUserId'] is not None, prefix + "filter out broken comments").getresults()
#
# print(prefix + "comments read: " + str(len(comments)))
# return comments
def getTag(item, tag):
return item.attrib.get(tag) if tagExists(item, tag) else None
def tagExists(item, tag):
return tag in item.attrib.keys()
def setprop(dic, key, value):
dic[key] = value
return dic
def removetags(text):
return TAG_RE.sub('', TAG_CODE.sub('', text))