Skip to content

Commit

Permalink
Added snippet tags for documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcavazos authored and aaltay committed Sep 8, 2017
1 parent c7ff46d commit 71e323c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 66 deletions.
30 changes: 18 additions & 12 deletions sdks/python/apache_beam/examples/complete/game/game_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def expand(self, pcoll):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START abuse_detect]
class CalculateSpammyUsers(beam.PTransform):
"""Filter out all but those users with a high clickrate, which we will
consider as 'spammy' uesrs.
Expand Down Expand Up @@ -232,6 +233,7 @@ def expand(self, user_scores):
score > global_mean * self.SCORE_WEIGHT,
global_mean_score))
return filtered
# [END abuse_detect]


class UserSessionActivity(beam.DoFn):
Expand Down Expand Up @@ -325,16 +327,11 @@ def run(argv=None):
| 'CreateSpammersView' >> beam.CombineGlobally(
beam.combiners.ToDictCombineFn()).as_singleton_view())

# [START filter_and_calc]
# Calculate the total score per team over fixed windows, and emit cumulative
# updates for late data. Uses the side input derived above --the set of
# suspected robots-- to filter out scores from those users from the sum.
# Write the results to BigQuery.
teams_schema = {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}
(raw_events # pylint: disable=expression-not-assigned
| 'WindowIntoFixedWindows' >> beam.WindowInto(
beam.window.FixedWindows(fixed_window_duration))
Expand All @@ -345,17 +342,21 @@ def run(argv=None):
spammers_view)
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team')
# [END filter_and_calc]
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name + '_teams', args.dataset, teams_schema))

args.table_name + '_teams', args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}))

# [START session_calc]
# Detect user sessions-- that is, a burst of activity separated by a gap
# from further activity. Find and record the mean session lengths.
# This information could help the game designers track the changing user
# engagement as their set of game changes.
sessions_schema = {
'mean_duration': 'FLOAT',
}
(user_events # pylint: disable=expression-not-assigned
| 'WindowIntoSessions' >> beam.WindowInto(
beam.window.Sessions(session_gap),
Expand All @@ -368,7 +369,9 @@ def run(argv=None):

# Get the duration of the session
| 'UserSessionActivity' >> beam.ParDo(UserSessionActivity())
# [END session_calc]

# [START rewindow]
# Re-window to process groups of session sums according to when the
# sessions complete
| 'WindowToExtractSessionMean' >> beam.WindowInto(
Expand All @@ -379,7 +382,10 @@ def run(argv=None):
| 'FormatAvgSessionLength' >> beam.Map(
lambda elem: {'mean_duration': float(elem)})
| 'WriteAvgSessionLength' >> WriteToBigQuery(
args.table_name + '_sessions', args.dataset, sessions_schema))
args.table_name + '_sessions', args.dataset, {
'mean_duration': 'FLOAT',
}))
# [END rewindow]


if __name__ == '__main__':
Expand Down
89 changes: 47 additions & 42 deletions sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,42 +138,6 @@ def expand(self, pcoll):
| beam.CombinePerKey(sum))


class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
super(HourlyTeamScore, self).__init__()
self.start_timestamp = str2timestamp(start_min)
self.stop_timestamp = str2timestamp(stop_min)
self.window_duration_in_seconds = window_duration * 60

def expand(self, pcoll):
return (
pcoll
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())

# Filter out data before and after the given times so that it is not
# included in the calculations. As we collect data in batches (say, by
# day), the batch for the day that we want to analyze could potentially
# include some late-arriving data from the previous day. If so, we want
# to weed it out. Similarly, if we include data from the following day
# (to scoop up late-arriving events from the day we're analyzing), we
# need to weed out events that fall after the time period we want to
# analyze.
| 'FilterStartTime' >> beam.Filter(
lambda elem: elem['timestamp'] > self.start_timestamp)
| 'FilterEndTime' >> beam.Filter(
lambda elem: elem['timestamp'] < self.stop_timestamp)

# Add an element timestamp based on the event log, and apply fixed
# windowing.
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'FixedWindowsTeam' >> beam.WindowInto(
beam.window.FixedWindows(self.window_duration_in_seconds))

# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))


class TeamScoresDict(beam.DoFn):
"""Formats the data into a dictionary of BigQuery columns with their values
Expand Down Expand Up @@ -229,6 +193,47 @@ def expand(self, pcoll):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START main]
class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
super(HourlyTeamScore, self).__init__()
self.start_timestamp = str2timestamp(start_min)
self.stop_timestamp = str2timestamp(stop_min)
self.window_duration_in_seconds = window_duration * 60

def expand(self, pcoll):
return (
pcoll
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())

# Filter out data before and after the given times so that it is not
# included in the calculations. As we collect data in batches (say, by
# day), the batch for the day that we want to analyze could potentially
# include some late-arriving data from the previous day. If so, we want
# to weed it out. Similarly, if we include data from the following day
# (to scoop up late-arriving events from the day we're analyzing), we
# need to weed out events that fall after the time period we want to
# analyze.
# [START filter_by_time_range]
| 'FilterStartTime' >> beam.Filter(
lambda elem: elem['timestamp'] > self.start_timestamp)
| 'FilterEndTime' >> beam.Filter(
lambda elem: elem['timestamp'] < self.stop_timestamp)
# [END filter_by_time_range]

# [START add_timestamp_and_window]
# Add an element timestamp based on the event log, and apply fixed
# windowing.
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'FixedWindowsTeam' >> beam.WindowInto(
beam.window.FixedWindows(self.window_duration_in_seconds))
# [END add_timestamp_and_window]

# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))


def run(argv=None):
"""Main entry point; defines and runs the hourly_team_score pipeline."""
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -282,19 +287,19 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = True

schema = {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
}
with beam.Pipeline(options=options) as p:
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'HourlyTeamScore' >> HourlyTeamScore(
args.start_min, args.stop_min, args.window_duration)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name, args.dataset, schema))
args.table_name, args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
}))
# [END main]


if __name__ == '__main__':
Expand Down
26 changes: 14 additions & 12 deletions sdks/python/apache_beam/examples/complete/game/leader_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def expand(self, pcoll):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))


# [START window_and_trigger]
class CalculateTeamScores(beam.PTransform):
"""Calculates scores for each team within the configured window duration.
Expand All @@ -234,8 +235,10 @@ def expand(self, pcoll):
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
# [END window_and_trigger]


# [START processing_time_trigger]
class CalculateUserScores(beam.PTransform):
"""Extract user/score pairs from the event stream using processing time, via
global windowing. Get periodic updates on all users' running scores.
Expand All @@ -257,6 +260,7 @@ def expand(self, pcoll):
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
# Extract and sum username/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
# [END processing_time_trigger]


def run(argv=None):
Expand Down Expand Up @@ -313,30 +317,28 @@ def run(argv=None):
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])))

# Get team scores and write the results to BigQuery
teams_schema = {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}
(events # pylint: disable=expression-not-assigned
| 'CalculateTeamScores' >> CalculateTeamScores(
args.team_window_duration, args.allowed_lateness)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name + '_teams', args.dataset, teams_schema))
args.table_name + '_teams', args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
}))

# Get user scores and write the results to BigQuery
users_schema = {
'user': 'STRING',
'total_score': 'INTEGER',
}
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
| 'FormatUserScoreSums' >> beam.Map(
lambda (user, score): {'user': user, 'total_score': score})
| 'WriteUserScoreSums' >> WriteToBigQuery(
args.table_name + '_users', args.dataset, users_schema))
args.table_name + '_users', args.dataset, {
'user': 'STRING',
'total_score': 'INTEGER',
}))


if __name__ == '__main__':
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/examples/complete/game/user_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def process(self, elem):
logging.error('Parse error on "%s"', elem)


# [START extract_and_sum_score]
class ExtractAndSumScore(beam.PTransform):
"""A transform to extract key/score information and sum the scores.
The constructor argument `field` determines whether 'team' or 'user' info is
Expand All @@ -106,6 +107,7 @@ def expand(self, pcoll):
return (pcoll
| beam.Map(lambda elem: (elem[self.field], elem['score']))
| beam.CombinePerKey(sum))
# [END extract_and_sum_score]


class UserScore(beam.PTransform):
Expand All @@ -117,6 +119,7 @@ def expand(self, pcoll):
| 'ExtractAndSumScore' >> ExtractAndSumScore('user'))


# [START main]
def run(argv=None):
"""Main entry point; defines and runs the user_score pipeline."""
parser = argparse.ArgumentParser()
Expand All @@ -141,6 +144,7 @@ def run(argv=None):
| 'FormatUserScoreSums' >> beam.Map(
lambda (user, score): 'user: %s, total_score: %s' % (user, score))
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
# [END main]


if __name__ == '__main__':
Expand Down

0 comments on commit 71e323c

Please sign in to comment.