Skip to content

Commit

Permalink
[AIRFLOW-4419] Restore used_slots and queued_slots Pool methods (apac…
Browse files Browse the repository at this point in the history
…he#5210)

These are still used in the UI and are helpful for observability of
Ariflow
  • Loading branch information
ashb authored Apr 30, 2019
1 parent fafcaa7 commit 8ffe08d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
33 changes: 33 additions & 0 deletions airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,37 @@ def to_json(self):
'description': self.description,
}

@provide_session
def used_slots(self, session):
"""
Returns the number of slots used at the moment
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import

running = (
session
.query(TaskInstance)
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.RUNNING)
.count()
)
return running

@provide_session
def queued_slots(self, session):
"""
Returns the number of slots used at the moment
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import

return (
session
.query(TaskInstance)
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.QUEUED)
.count()
)

@provide_session
def open_slots(self, session):
"""
Expand All @@ -64,6 +95,8 @@ def open_slots(self, session):
from airflow.models.taskinstance import \
TaskInstance as TI # Avoid circular import

# Issue a single query instead of using the used_slots/queued_slots to
# avoid load on DB
used_slots = session.query(func.count()).filter(TI.pool == self.pool).filter(
TI.state.in_([State.RUNNING, State.QUEUED])).scalar()
return self.slots - used_slots
17 changes: 16 additions & 1 deletion tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import mock
import jinja2
from flask import url_for
from flask import Markup, url_for
from parameterized import parameterized
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse
Expand Down Expand Up @@ -265,6 +265,21 @@ def test_odd_name(self):
self.check_content_in_response('test-pool<script>', resp)
self.check_content_not_in_response('test-pool<script>', resp)

def test_list(self):
self.pool['pool'] = 'test-pool'
self.session.add(models.Pool(**self.pool))
self.session.commit()
resp = self.client.get('/pool/list/')
# We should see this link
with self.app.test_request_context():
url = url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='running')
used_tag = Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)

url = url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='queued')
queued_tag = Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)
self.check_content_in_response(used_tag, resp)
self.check_content_in_response(queued_tag, resp)


class TestMountPoint(unittest.TestCase):
@classmethod
Expand Down

0 comments on commit 8ffe08d

Please sign in to comment.