Skip to content

Commit

Permalink
Merge pull request getredash#2379 from getredash/fix_empty_states
Browse files Browse the repository at this point in the history
Change: close metadata database connection early in the execute query Celery task
  • Loading branch information
arikfr authored Mar 8, 2018
2 parents a824bd5 + c054731 commit 1a75d49
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_query_sync(data_source, parameter_values, query_text, max_age=0):
return None

run_time = time.time() - started_at
query_result, updated_query_ids = models.QueryResult.store_result(data_source.org, data_source,
query_result, updated_query_ids = models.QueryResult.store_result(data_source.org_id, data_source,
query_hash, query_text, data,
run_time, utils.utcnow())

Expand Down
2 changes: 1 addition & 1 deletion redash/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ def get_latest(cls, data_source, query, max_age=0):

@classmethod
def store_result(cls, org, data_source, query_hash, query, data, run_time, retrieved_at):
query_result = cls(org=org,
query_result = cls(org_id=org,
query_hash=query_hash,
query_text=query,
runtime=run_time,
Expand Down
9 changes: 6 additions & 3 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ def __init__(self, task, query, data_source_id, user_id, metadata,
self.user = models.User.query.get(user_id)
else:
self.user = None
# Close DB connection to prevent holding a connection for a long time while the query is executing.
models.db.session.close()
self.query_hash = gen_query_hash(self.query)
self.scheduled_query = scheduled_query
# Load existing tracker or create a new one if the job was created before code update:
Expand Down Expand Up @@ -460,16 +462,17 @@ def run(self):
if error:
self.tracker.update(state='failed')
result = QueryExecutionError(error)
self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False)
if self.scheduled_query:
self.scheduled_query.schedule_failures += 1
models.db.session.add(self.scheduled_query)
else:
if (self.scheduled_query and
self.scheduled_query.schedule_failures > 0):
if (self.scheduled_query and self.scheduled_query.schedule_failures > 0):
self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False)
self.scheduled_query.schedule_failures = 0
models.db.session.add(self.scheduled_query)
query_result, updated_query_ids = models.QueryResult.store_result(
self.data_source.org, self.data_source,
self.data_source.org_id, self.data_source,
self.query_hash, self.query, data,
run_time, utils.utcnow())
self._log_progress('checking_alerts')
Expand Down
1 change: 1 addition & 0 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def test_failure_scheduled(self):
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.exception = ValueError("broken")
execute_query("SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 1)
execute_query("SELECT 1, 2", self.factory.data_source.id, {}, scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_archived_query_doesnt_return_in_all(self):
query = self.factory.create_query(schedule="1")
yesterday = utcnow() - datetime.timedelta(days=1)
query_result, _ = models.QueryResult.store_result(
query.org, query.data_source, query.query_hash, query.query_text,
query.org_id, query.data_source, query.query_hash, query.query_text,
"1", 123, yesterday)

query.latest_query_data = query_result
Expand Down Expand Up @@ -390,7 +390,7 @@ def setUp(self):

def test_stores_the_result(self):
query_result, _ = models.QueryResult.store_result(
self.data_source.org, self.data_source, self.query_hash,
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query_result.data, self.data)
Expand All @@ -406,7 +406,7 @@ def test_updates_existing_queries(self):
query3 = self.factory.create_query(query_text=self.query)

query_result, _ = models.QueryResult.store_result(
self.data_source.org, self.data_source, self.query_hash,
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
Expand All @@ -419,7 +419,7 @@ def test_doesnt_update_queries_with_different_hash(self):
query3 = self.factory.create_query(query_text=self.query + "123")

query_result, _ = models.QueryResult.store_result(
self.data_source.org, self.data_source, self.query_hash,
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
Expand All @@ -432,7 +432,7 @@ def test_doesnt_update_queries_with_different_data_source(self):
query3 = self.factory.create_query(query_text=self.query, data_source=self.factory.create_data_source())

query_result, _ = models.QueryResult.store_result(
self.data_source.org, self.data_source, self.query_hash,
self.data_source.org_id, self.data_source, self.query_hash,
self.query, self.data, self.runtime, self.utcnow)

self.assertEqual(query1.latest_query_data, query_result)
Expand Down

0 comments on commit 1a75d49

Please sign in to comment.