Skip to content

Commit

Permalink
Update SubscribedSubredditsByAccount backfill.
Browse files Browse the repository at this point in the history
There's no index on SRMember.c.rel_id so instead sort the query
by SRMember.c.thing2_id (the user's id). Also the timestamp for
writing to C* is updated to an integer timestamp corresponding to
when the dual write was deployed.
  • Loading branch information
bsimpson63 committed Jun 30, 2015
1 parent c0448ee commit d846f06
Showing 1 changed file with 53 additions and 13 deletions.
66 changes: 53 additions & 13 deletions scripts/migrate/backfill/srmember_to_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,70 @@
# Inc. All Rights Reserved.
###############################################################################

from datetime import datetime
from collections import defaultdict

from pylons import g
import time

from r2.lib.db.operators import desc
from r2.lib.utils import fetch_things2, to36
from r2.models.subreddit import SRMember, SubscribedSubredditsByAccount


def migrate_srmember_subscribers():
DUAL_WRITE_START = datetime(2015, 5, 20, 0 ,0, tzinfo=g.tz)

def get_query(after_user_id):
q = SRMember._query(
SRMember.c._name == "subscriber",
SRMember.c._date < DUAL_WRITE_START,
sort=desc("_date"),
SRMember.c._thing2_id < after_user_id,
sort=desc("_thing2_id"),
)
return q


def get_srmembers(after_user_id):
previous_user_id = None

while True:
# there isn't a good index on rel_id so we need to get a new query
# for each batch rather than relying solely on fetch_things2
q = get_query(after_user_id)
users_seen = 0

with SubscribedSubredditsByAccount._cf.batch() as b:
for rel in fetch_things2(q):
sr_id = rel._thing1_id
user_id = rel._thing2_id
action_date = rel._date

rowkey = to36(user_id)
column = {to36(sr_id): action_date}
b.insert(rowkey, column, timestamp=DUAL_WRITE_START)
if user_id != previous_user_id:
if users_seen >= 20:
# set after_user_id to the previous id so we will pick up
# the query at this same point
after_user_id = previous_user_id
break

users_seen += 1
previous_user_id = user_id

yield rel


def migrate_srmember_subscribers(after_user_id=39566712):
columns = {}
rowkey = None
proc_time = time.time()

for i, rel in enumerate(get_srmembers(after_user_id)):
sr_id = rel._thing1_id
user_id = rel._thing2_id
action_date = rel._date
new_rowkey = to36(user_id)

if new_rowkey != rowkey and columns:
SubscribedSubredditsByAccount._cf.insert(
rowkey, columns, timestamp=1434403336829573)
columns = {}

columns[to36(sr_id)] = action_date
rowkey = new_rowkey

if i % 1000 == 0:
new_proc_time = time.time()
duration = new_proc_time - proc_time
print "%s (%.3f): %s - %s" % (i, duration, user_id, action_date)
proc_time = new_proc_time

0 comments on commit d846f06

Please sign in to comment.