Skip to content

Commit

Permalink
[AIRFLOW-356][AIRFLOW-355][AIRFLOW-354] Replace nobr, enable DAG only…
Browse files Browse the repository at this point in the history
… exists locally message, change edit DAG icon

Addresses the following issues:
- [https://issues.apache.org/jira/browse/AIRFLOW-356](https://issues.apache.org/jira/browse/AIRFLOW-356)
- [https://issues.apache.org/jira/browse/AIRFLOW-355](https://issues.apache.org/jira/browse/AIRFLOW-355)
- [https://issues.apache.org/jira/browse/AIRFLOW-354](https://issues.apache.org/jira/browse/AIRFLOW-354)

- Replace `<nobr>` with `flexbox`
- "This DAG seems to be existing only locally" now shows up
- Change edit DAG icon from info to edit
- Rename `dttm` variable to `file_last_changed_on_disk`
- Rename `dags` variable to `webserver_dags`
- Adds a comment clarifying what `self.file_last_changed` is
- Clarifies what the `dag.last_expired` column represents
- Refactors some previously very nested logic in `views.py` and adds comments
- Properly indents `dags.html` and adds comments to it

- Edit DAG icon changed
- Home page now sort of responsive, no longer fixed width
- User will occasionally see "This DAG seems to be existing only locally" message

- Verify that edit dag button is now an edit icon and click on it
- Resized home page, check that last column does not wrap

![image](https://cloud.githubusercontent.com/assets/130362/17126889/2e7adb12-52b6-11e6-9a18-b31e424e4be8.png)

Clean up html, replace nobr with flexbox

Refactor HomeView

Rename variables and update comments

Closes apache#1678 from zodiac/xuanji_refactor
  • Loading branch information
ldct authored and aoen committed Jul 26, 2016
1 parent 2c931ab commit a89d101
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 85 deletions.
20 changes: 11 additions & 9 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def __init__(
self.dag_folder = dag_folder
self.dags = {}
self.sync_to_db = sync_to_db
# the file's last modified timestamp when we last read it
self.file_last_changed = {}
self.executor = executor
self.import_errors = {}
Expand Down Expand Up @@ -192,7 +193,7 @@ def get_dag(self, dag_id):
if dag.is_subdag:
root_dag_id = dag.parent_dag.dag_id

# If the root_dag_id is absent or expired
# If the dag corresponding to root_dag_id is absent or expired
orm_dag = DagModel.get_current(root_dag_id)
if orm_dag and (
root_dag_id not in self.dags or
Expand All @@ -201,10 +202,11 @@ def get_dag(self, dag_id):
dag.last_loaded < orm_dag.last_expired
)
):
# Reprocessing source file
# Reprocess source file
found_dags = self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)

# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [dag.dag_id for dag in found_dags]:
return self.dags[dag_id]
elif dag_id in self.dags:
Expand All @@ -225,10 +227,10 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
try:
# This failed before in what may have been a git sync
# race condition
dttm = datetime.fromtimestamp(os.path.getmtime(filepath))
file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath))
if only_if_updated \
and filepath in self.file_last_changed \
and dttm == self.file_last_changed[filepath]:
and file_last_changed_on_disk == self.file_last_changed[filepath]:
return found_dags

except Exception as e:
Expand Down Expand Up @@ -257,7 +259,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
except Exception as e:
self.logger.exception("Failed to import: " + filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = dttm
self.file_last_changed[filepath] = file_last_changed_on_disk

else:
zip_file = zipfile.ZipFile(filepath)
Expand Down Expand Up @@ -288,7 +290,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
except Exception as e:
self.logger.exception("Failed to import: " + filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = dttm
self.file_last_changed[filepath] = file_last_changed_on_disk

for m in mods:
for dag in list(m.__dict__.values()):
Expand All @@ -301,7 +303,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
found_dags.append(dag)
found_dags += dag.subdags

self.file_last_changed[filepath] = dttm
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags

@provide_session
Expand Down Expand Up @@ -2441,8 +2443,8 @@ class DagModel(Base):
last_scheduler_run = Column(DateTime)
# Last time this DAG was pickled
last_pickled = Column(DateTime)
# When the DAG received a refreshed signal last, used to know when
# we need to force refresh
# Time when the DAG last received a refresh signal
# (e.g. the DAG's "refresh" button was clicked in the web UI)
last_expired = Column(DateTime)
# Whether (one of) the scheduler is scheduling this DAG at the moment
scheduler_lock = Column(Boolean)
Expand Down
109 changes: 71 additions & 38 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{#
{#
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -46,70 +46,103 @@ <h2>DAGs</h2>
</thead>
<tbody>
{% for dag_id in all_dag_ids %}
{% set dag = dags[dag_id] if dag_id in dags else None %}
{% set dag = webserver_dags[dag_id] if dag_id in webserver_dags else None %}
<tr>
<!-- Column 1: Edit dag -->
<td class="text-center" style="width:10px;">
{% if dag_id in orm_dags %}
<a href="/admin/dagmodel/edit/?id={{ dag_id }}" title="Info">
<span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span>
<span class="glyphicon glyphicon-edit" aria-hidden="true"></span>
</a>
{% endif %}
</td>

<!-- Column 2: Turn dag on/off -->
<td>
{% if dag_id in orm_dags %}
<input id="toggle-{{ dag_id }}" dag_id="{{ dag_id }}" type="checkbox" {{ "checked" if not orm_dags[dag_id].is_paused else "" }} data-toggle="toggle" data-size="mini">
{% endif %}
</td>

<!-- Column 3: Name -->
<td>
{% if dag %}
{% if dag_id in webserver_dags %}
<a href="{{ url_for('airflow.tree', dag_id=dag.dag_id) }}">
{{ dag_id }}
</a>
{% else %}
{{ dag_id }}
<span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database."></span>
{% endif %}
{% if dag_id not in orm_dags and False %}
{% if dag_id not in orm_dags %}
<span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence."></span>
{% endif %}
</td>

<!-- Column 4: Dag Schedule -->
<td>
{% if dag_id in webserver_dags %}
<a class="label label-default schedule {{ dag.dag_id }}" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
{{ dag.schedule_interval }}
</a>
{% endif %}
</td>

<!-- Column 5: Dag Owners -->
<td>
<a class="label label-default schedule {{ dag.dag_id }}" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
{{ dag.schedule_interval }}</a></td>
<td>{{ dag.owner if dag else orm_dags[dag_id].owners }}</td>
{{ dag.owner if dag else orm_dags[dag_id].owners }}
</td>

<!-- Column 6: Recent Statuses -->
<td style="padding:0px; width:200px; height:10px;">
<svg height="10" width="10" id='dag-{{ dag.safe_dag_id }}' style="display: block;"></svg>
</td>
<td class="text-center" style="width:160px;"><nobr>

<!-- Column 7: Links -->
<td class="text-center" style="display:flex; flex-direction:row; justify-content:space-around;">
{% if dag %}
<a href="{{ url_for("airflow.tree", dag_id=dag.dag_id, num_runs=25) }}" title="Tree View">
<span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span>
</a>
<a href="{{ url_for("airflow.graph", dag_id=dag.dag_id) }}" title="Graph View">
<span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
</a>
<a href="{{ url_for("airflow.duration", dag_id=dag.dag_id) }}" title="Tasks Duration">
<span class="glyphicon glyphicon-stats" aria-hidden="true"></span>
</a>
<a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}" title="Landing Times">
<span class="glyphicon glyphicon-plane" aria-hidden="true"></span>
</a>
<a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}" title="Gantt View">
<span class="glyphicon glyphicon-align-left" aria-hidden="true"></span>
<i class="icon-align-left"></i>
</a>
<a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}" title="Code View">
<span class="glyphicon glyphicon-flash" aria-hidden="true"></span>
</a>
<a href="/admin/log/?sort=1&desc=1&flt1_dag_id_equals={{ dag.dag_id }}" title="Logs">
<i class="icon-list"></i>
<span class="glyphicon glyphicon-align-justify" aria-hidden="true"></span>
</a>

<!-- Tree -->
<a href="{{ url_for('airflow.tree', dag_id=dag.dag_id, num_runs=25) }}" title="Tree View">
<span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span>
</a>

<!-- Graph -->
<a href="{{ url_for('airflow.graph', dag_id=dag.dag_id) }}" title="Graph View">
<span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
</a>

<!-- Duration -->
<a href="{{ url_for('airflow.duration', dag_id=dag.dag_id) }}" title="Tasks Duration">
<span class="glyphicon glyphicon-stats" aria-hidden="true"></span>
</a>

<!-- Landing Times -->
<a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}" title="Landing Times">
<span class="glyphicon glyphicon-plane" aria-hidden="true"></span>
</a>

<!-- Gantt -->
<a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}" title="Gantt View">
<span class="glyphicon glyphicon-align-left" aria-hidden="true"></span>
</a>

<!-- Code -->
<a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}" title="Code View">
<span class="glyphicon glyphicon-flash" aria-hidden="true"></span>
</a>

<!-- Logs -->
<a href="/admin/log/?sort=1&amp;desc=1&amp;flt1_dag_id_equals={{ dag.dag_id }}" title="Logs">
<span class="glyphicon glyphicon-align-justify" aria-hidden="true"></span>
</a>
{% endif %}
<a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}" title="Refresh">
<span class="glyphicon glyphicon-refresh" aria-hidden="true"></span>
</a>
</nobr>

<!-- Refresh -->
<a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}" title="Refresh">
<span class="glyphicon glyphicon-refresh" aria-hidden="true"></span>
</a>

</td>
</tr>
{% endfor %}
Expand Down
82 changes: 44 additions & 38 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,32 +1620,33 @@ def index(self):
session = Session()
DM = models.DagModel
qry = None
# filter the dags if filter_by_owner and current user is not superuser

# restrict the dags shown if filter_by_owner and current user is not superuser
do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
owner_mode = conf.get('webserver', 'OWNER_MODE').strip().lower()

# read orm_dags from the db

qry = session.query(DM)
qry_fltr = []

if do_filter:
if owner_mode == 'ldapgroup':
qry_fltr = (
qry.filter(
~DM.is_subdag, DM.is_active,
DM.owners.in_(current_user.ldap_groups))
.all()
)
elif owner_mode == 'user':
qry_fltr = (
qry.filter(
~DM.is_subdag, DM.is_active,
DM.owners == current_user.user.username)
.all()
)
if do_filter and owner_mode == 'ldapgroup':
qry_fltr = qry.filter(
~DM.is_subdag, DM.is_active,
DM.owners.in_(current_user.ldap_groups)
).all()
elif do_filter and owner_mode == 'user':
qry_fltr = qry.filter(
~DM.is_subdag, DM.is_active,
DM.owners == current_user.user.username
).all()
else:
qry_fltr = qry.filter(~DM.is_subdag, DM.is_active).all()
qry_fltr = qry.filter(
~DM.is_subdag, DM.is_active
).all()

orm_dags = {dag.dag_id: dag for dag in qry_fltr}

import_errors = session.query(models.ImportError).all()
for ie in import_errors:
flash(
Expand All @@ -1654,30 +1655,35 @@ def index(self):
session.expunge_all()
session.commit()
session.close()
dags = dagbag.dags.values()
if do_filter:
if owner_mode == 'ldapgroup':
dags = {
dag.dag_id: dag
for dag in dags
if (
dag.owner in current_user.ldap_groups and (not dag.parent_dag)
)
}
elif owner_mode == 'user':
dags = {
dag.dag_id: dag
for dag in dags
if (
dag.owner == current_user.user.username and (not dag.parent_dag)
)
}

# get a list of all non-subdag dags visible to everyone
unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if not dag.parent_dag]

# optionally filter to get only dags that the user should see
if do_filter and owner_mode == 'ldapgroup':
# only show dags owned by someone in @current_user.ldap_groups
webserver_dags = {
dag.dag_id: dag
for dag in unfiltered_webserver_dags
if dag.owner in current_user.ldap_groups
}
elif do_filter and owner_mode == 'user':
# only show dags owned by @current_user.user.username
webserver_dags = {
dag.dag_id: dag
for dag in unfiltered_webserver_dags
if dag.owner == current_user.user.username
}
else:
dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag}
all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys()))
webserver_dags = {
dag.dag_id: dag
for dag in unfiltered_webserver_dags
}

all_dag_ids = sorted(set(orm_dags.keys()) | set(webserver_dags.keys()))
return self.render(
'airflow/dags.html',
dags=dags,
webserver_dags=webserver_dags,
orm_dags=orm_dags,
all_dag_ids=all_dag_ids)

Expand Down

0 comments on commit a89d101

Please sign in to comment.