From ac4e713436a434f95c2faa8fedac09060ebe5e5f Mon Sep 17 00:00:00 2001 From: Xinyu Wu Date: Wed, 10 Jun 2015 14:25:45 +0800 Subject: [PATCH] Add job for computing exploration recommendations --- core/domain/recommendations_jobs.py | 102 ++++++++++++++ core/domain/recommendations_jobs_test.py | 133 +++++++++++++++++++ core/domain/recommendations_services.py | 30 +++++ core/domain/recommendations_services_test.py | 14 +- core/jobs_registry.py | 4 +- core/storage/recommendations/gae_models.py | 2 +- index.yaml | 5 + scripts/backend_tests.py | 2 +- 8 files changed, 286 insertions(+), 6 deletions(-) create mode 100644 core/domain/recommendations_jobs.py create mode 100644 core/domain/recommendations_jobs_test.py diff --git a/core/domain/recommendations_jobs.py b/core/domain/recommendations_jobs.py new file mode 100644 index 000000000000..38035b20c5ff --- /dev/null +++ b/core/domain/recommendations_jobs.py @@ -0,0 +1,102 @@ +# coding: utf-8 +# +# Copyright 2015 The Oppia Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Jobs for recommendations.""" + +__author__ = 'Xinyu Wu' + +import ast + +from core import jobs +from core.platform import models +(exp_models, recommendations_models,) = models.Registry.import_models([ + models.NAMES.exploration, models.NAMES.recommendations]) + + +class ExplorationRecommendationsRealtimeModel( + jobs.BaseRealtimeDatastoreClassForContinuousComputations): + pass + + +class ExplorationRecommendationsAggregator( + jobs.BaseContinuousComputationManager): + """A continuous-computation job that computes recommendations for each + exploration. + + This job does not have a realtime component. There will be a delay in + propagating new updates to explorations; the length of the delay will be + approximately the time it takes a batch job to run.""" + @classmethod + def get_event_types_listened_to(cls): + return [] + + @classmethod + def _get_realtime_datastore_class(cls): + return ExplorationRecommendationsRealtimeModel + + @classmethod + def _get_batch_job_manager_class(cls): + return ExplorationRecommendationsMRJobManager + + @classmethod + def _handle_incoming_event(cls, active_realtime_layer, event_type, *args): + pass + + +class ExplorationRecommendationsMRJobManager( + jobs.BaseMapReduceJobManagerForContinuousComputations): + """Manager for a MapReduce job that computes a list of recommended + explorations to play after completing some exploration.""" + + @classmethod + def _get_continuous_computation_class(cls): + return ExplorationRecommendationsAggregator + + @classmethod + def entity_classes_to_map_over(cls): + return [exp_models.ExpSummaryModel] + + @staticmethod + def map(item): + from core.domain import recommendations_services + from core.domain import exp_services + + SIMILARITY_SCORE_THRESHOLD = 4.0 + + exp_summary_id = item.id + exp_summaries_dict = exp_services.get_all_exploration_summaries() + for compared_exp_id in exp_summaries_dict.keys(): + if compared_exp_id != exp_summary_id: + similarity_score = ( + recommendations_services.get_item_similarity( + exp_summary_id, compared_exp_id)) + if similarity_score >= SIMILARITY_SCORE_THRESHOLD: + yield (exp_summary_id, (similarity_score, compared_exp_id)) + + @staticmethod + def reduce(key, stringified_values): + from core.domain import recommendations_services + + values = sorted( + [ast.literal_eval(v) for v in stringified_values], reverse=True) + del values[10:] + + recommended_exploration_ids = [] + for similarity_score, exp_id in values: + recommended_exploration_ids.append(exp_id) + + recommendations_services.set_recommendations( + key, recommended_exploration_ids) diff --git a/core/domain/recommendations_jobs_test.py b/core/domain/recommendations_jobs_test.py new file mode 100644 index 000000000000..29189a141d8b --- /dev/null +++ b/core/domain/recommendations_jobs_test.py @@ -0,0 +1,133 @@ +# coding: utf-8 +# +# Copyright 2015 The Oppia Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Models for Oppia recommendations.""" + +__author__ = 'Xinyu Wu' + +from core import jobs_registry +from core.domain import recommendations_jobs +from core.domain import recommendations_services +from core.domain import recommendations_services_test +from core.domain import rights_manager +from core.platform import models +(recommendations_models,) = models.Registry.import_models([ + models.NAMES.recommendations]) +taskqueue_services = models.Registry.import_taskqueue_services() + + +class ModifiedExplorationRecommendationsAggregator( + recommendations_jobs.ExplorationRecommendationsAggregator): + """A modified ExplorationRecommendationsAggregator that does not start a new + batch job when the previous one has finished.""" + + @classmethod + def _get_batch_job_manager_class(cls): + return ModifiedExplorationRecommendationsMRJobManager + + @classmethod + def _kickoff_batch_job_after_previous_one_ends(cls): + pass + + +class ModifiedExplorationRecommendationsMRJobManager( + recommendations_jobs.ExplorationRecommendationsMRJobManager): + + @classmethod + def _get_continuous_computation_class(cls): + return ModifiedExplorationRecommendationsAggregator + + +class ExplorationRecommendationsAggregatorUnitTests( + recommendations_services_test.RecommendationsServicesUnitTests): + """Test recommendations services.""" + + ALL_CONTINUOUS_COMPUTATION_MANAGERS_FOR_TESTS = [ + ModifiedExplorationRecommendationsAggregator] + + def test_basic_computation(self): + recommendations_services.update_topic_similarities( + 'Art,Biology,Chemistry\n' + '1.0,0.2,0.1\n' + '0.2,1.0,0.8\n' + '0.1,0.8,1.0') + for ind, owner_id in self.exp_ind_to_owner_id.iteritems(): + rights_manager.publish_exploration( + owner_id, self.EXP_DATA[ind]['id']) + + with self.swap( + jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS', + self.ALL_CONTINUOUS_COMPUTATION_MANAGERS_FOR_TESTS): + ModifiedExplorationRecommendationsAggregator.start_computation() + self.assertEqual( + self.count_jobs_in_taskqueue( + queue_name=taskqueue_services.QUEUE_NAME_DEFAULT), + 1) + self.process_and_flush_pending_tasks() + + recommendations = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[0]['id'])) + self.assertEqual(recommendations, + [self.EXP_DATA[3]['id'], self.EXP_DATA[1]['id']]) + recommendations = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[3]['id'])) + self.assertEqual(recommendations, + [self.EXP_DATA[0]['id'], self.EXP_DATA[1]['id']]) + + def test_recommendations_after_changes_in_rights(self): + with self.swap( + jobs_registry, 'ALL_CONTINUOUS_COMPUTATION_MANAGERS', + self.ALL_CONTINUOUS_COMPUTATION_MANAGERS_FOR_TESTS): + ModifiedExplorationRecommendationsAggregator.start_computation() + self.assertEqual( + self.count_jobs_in_taskqueue( + queue_name=taskqueue_services.QUEUE_NAME_DEFAULT), + 1) + self.process_and_flush_pending_tasks() + + recommendations = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[0]['id'])) + self.assertEqual(recommendations, []) + + for ind, owner_id in self.exp_ind_to_owner_id.iteritems(): + rights_manager.publish_exploration( + owner_id, self.EXP_DATA[ind]['id']) + ModifiedExplorationRecommendationsAggregator.stop_computation( + self.ADMIN_ID) + ModifiedExplorationRecommendationsAggregator.start_computation() + self.process_and_flush_pending_tasks() + recommendations = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[0]['id'])) + self.assertEqual(recommendations, [ + self.EXP_DATA[3]['id'], + self.EXP_DATA[1]['id'], + self.EXP_DATA[2]['id']]) + + rights_manager.unpublish_exploration( + self.ADMIN_ID, self.EXP_DATA[3]['id']) + ModifiedExplorationRecommendationsAggregator.stop_computation( + self.ADMIN_ID) + ModifiedExplorationRecommendationsAggregator.start_computation() + self.process_and_flush_pending_tasks() + recommendations = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[0]['id'])) + self.assertEqual(recommendations, + [self.EXP_DATA[1]['id'], self.EXP_DATA[2]['id']]) diff --git a/core/domain/recommendations_services.py b/core/domain/recommendations_services.py index 5bd6b4498fbe..6c9cff22a180 100644 --- a/core/domain/recommendations_services.py +++ b/core/domain/recommendations_services.py @@ -230,3 +230,33 @@ def get_item_similarity(reference_exp_id, compared_exp_id): similarity_score += 1 return similarity_score + + +def set_recommendations(exp_id, new_recommendations): + """Stores a list of exploration ids of recommended explorations to play + after completing the exploration keyed by exp_id.""" + + recommendations_model = ( + recommendations_models.ExplorationRecommendationsModel.get( + exp_id, strict=False)) + if recommendations_model is None: + recommendations_model = ( + recommendations_models.ExplorationRecommendationsModel( + id=exp_id, + recommended_exploration_ids=new_recommendations)) + else: + recommendations_model.recommended_exploration_ids = new_recommendations + recommendations_model.put() + + +def get_exploration_recommendations(exp_id): + """Gets a list of ids of at most 10 recommended explorations to play + after completing the exploration keyed by exp_id.""" + + recommendations_model = ( + recommendations_models.ExplorationRecommendationsModel.get( + exp_id, strict=False)) + if recommendations_model is None: + return [] + else: + return recommendations_model.recommended_exploration_ids diff --git a/core/domain/recommendations_services_test.py b/core/domain/recommendations_services_test.py index f317c70d40de..ae94343452cf 100644 --- a/core/domain/recommendations_services_test.py +++ b/core/domain/recommendations_services_test.py @@ -241,11 +241,10 @@ def setUp(self): self.set_admins([self.ADMIN_EMAIL]) -class ItemSimilarityUnitTests(RecommendationsServicesUnitTests): - """Test recommendations services relating to item comparison.""" +class ExplorationRecommendationsUnitTests(RecommendationsServicesUnitTests): + """Test recommendations services relating to exploration comparison.""" def test_get_item_similarity(self): - with self.assertRaisesRegexp( Exception, 'Invalid reference_exp_id fake_exp_id'): recommendations_services.get_item_similarity( @@ -267,3 +266,12 @@ def test_get_item_similarity(self): self.ADMIN_ID, self.EXP_DATA[3]['id']) self.assertEqual(recommendations_services.get_item_similarity( self.EXP_DATA[3]['id'], self.EXP_DATA[3]['id']), 10.0) + + def test_get_and_set_exploration_recommendations(self): + recommended_exp_ids = [self.EXP_DATA[1]['id'], self.EXP_DATA[2]['id']] + recommendations_services.set_recommendations( + self.EXP_DATA[0]['id'], recommended_exp_ids) + saved_recommendation_ids = ( + recommendations_services.get_exploration_recommendations( + self.EXP_DATA[0]['id'])) + self.assertEqual(recommended_exp_ids, saved_recommendation_ids) diff --git a/core/jobs_registry.py b/core/jobs_registry.py index de91c75c75ca..ddfe0b249ba4 100644 --- a/core/jobs_registry.py +++ b/core/jobs_registry.py @@ -22,6 +22,7 @@ from core.domain import stats_jobs from core.domain import user_jobs from core.domain import feedback_jobs +from core.domain import recommendations_jobs # List of all manager classes for one-off batch jobs for which to show controls # on the admin dashboard. @@ -40,7 +41,8 @@ exp_jobs.SearchRanker, stats_jobs.StatisticsAggregator, user_jobs.DashboardRecentUpdatesAggregator, - feedback_jobs.FeedbackAnalyticsAggregator] + feedback_jobs.FeedbackAnalyticsAggregator, + recommendations_jobs.ExplorationRecommendationsAggregator] class ContinuousComputationEventDispatcher(object): diff --git a/core/storage/recommendations/gae_models.py b/core/storage/recommendations/gae_models.py index 461ffffe4bf4..29434dfda1f8 100644 --- a/core/storage/recommendations/gae_models.py +++ b/core/storage/recommendations/gae_models.py @@ -27,7 +27,7 @@ TOPIC_SIMILARITIES_ID = 'topics' -class ExplorationRecommendationModel(base_models.BaseModel): +class ExplorationRecommendationsModel(base_models.BaseModel): """A list of recommended explorations similar to an exploration. Instances of this class are keyed by exploration id.""" diff --git a/index.yaml b/index.yaml index 5d3ae58738f0..7ab0ff795989 100644 --- a/index.yaml +++ b/index.yaml @@ -53,6 +53,11 @@ indexes: - name: last_updated direction: desc +- kind: ExplorationRecommendationsRealtimeModel + properties: + - name: realtime_layer + - name: created_on + - kind: ExplorationRightsModel properties: - name: deleted diff --git a/scripts/backend_tests.py b/scripts/backend_tests.py index e8f17e0a6579..37aece3ee591 100644 --- a/scripts/backend_tests.py +++ b/scripts/backend_tests.py @@ -30,7 +30,7 @@ # DEVELOPERS: Please change this number accordingly when new tests are added # or removed. -EXPECTED_TEST_COUNT = 445 +EXPECTED_TEST_COUNT = 452 COVERAGE_PATH = os.path.join( os.getcwd(), '..', 'oppia_tools', 'coverage-3.6', 'coverage')