forked from ckan/ckan
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathrelated.py
238 lines (193 loc) · 8.53 KB
/
related.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import urllib
import ckan.model as model
import ckan.logic as logic
import ckan.lib.base as base
import ckan.lib.helpers as h
import ckan.lib.navl.dictization_functions as df
from ckan.common import _, c
abort = base.abort
_get_action = logic.get_action
class RelatedController(base.BaseController):
def new(self, id):
return self._edit_or_new(id, None, False)
def edit(self, id, related_id):
return self._edit_or_new(id, related_id, True)
def dashboard(self):
""" List all related items regardless of dataset """
context = {'model': model, 'session': model.Session,
'user': c.user or c.author, 'auth_user_obj': c.userobj,
'for_view': True}
data_dict = {
'type_filter': base.request.params.get('type', ''),
'sort': base.request.params.get('sort', ''),
'featured': base.request.params.get('featured', '')
}
params_nopage = [(k, v) for k, v in base.request.params.items()
if k != 'page']
page = self._get_page_number(base.request.params)
# Update ordering in the context
related_list = logic.get_action('related_list')(context, data_dict)
def search_url(params):
url = h.url_for(controller='related', action='dashboard')
params = [(k, v.encode('utf-8')
if isinstance(v, basestring) else str(v))
for k, v in params]
return url + u'?' + urllib.urlencode(params)
def pager_url(q=None, page=None):
params = list(params_nopage)
params.append(('page', page))
return search_url(params)
c.page = h.Page(
collection=related_list,
page=page,
url=pager_url,
item_count=len(related_list),
items_per_page=9
)
c.filters = dict(params_nopage)
c.type_options = self._type_options()
c.sort_options = (
{'value': '', 'text': _('Most viewed')},
{'value': 'view_count_desc', 'text': _('Most Viewed')},
{'value': 'view_count_asc', 'text': _('Least Viewed')},
{'value': 'created_desc', 'text': _('Newest')},
{'value': 'created_asc', 'text': _('Oldest')}
)
return base.render("related/dashboard.html")
def read(self, id):
context = {'model': model, 'session': model.Session,
'user': c.user or c.author,
'auth_user_obj': c.userobj,
'for_view': True}
data_dict = {'id': id}
try:
logic.check_access('related_show', context, data_dict)
except logic.NotAuthorized:
base.abort(401, _('Not authorized to see this page'))
related = model.Session.query(model.Related) \
.filter(model.Related.id == id).first()
if not related:
base.abort(404, _('The requested related item was not found'))
related.view_count = model.Related.view_count + 1
model.Session.add(related)
model.Session.commit()
base.redirect(related.url)
def list(self, id):
""" List all related items for a specific dataset """
context = {'model': model, 'session': model.Session,
'user': c.user or c.author,
'auth_user_obj': c.userobj,
'for_view': True}
data_dict = {'id': id}
try:
logic.check_access('package_show', context, data_dict)
except logic.NotFound:
base.abort(404, base._('Dataset not found'))
except logic.NotAuthorized:
base.abort(401, base._('Not authorized to see this page'))
try:
c.pkg_dict = logic.get_action('package_show')(context, data_dict)
c.related_list = logic.get_action('related_list')(context,
data_dict)
c.pkg = context['package']
c.resources_json = h.json.dumps(c.pkg_dict.get('resources', []))
except logic.NotFound:
base.abort(404, base._('Dataset not found'))
except logic.NotAuthorized:
base.abort(401, base._('Unauthorized to read package %s') % id)
return base.render("package/related_list.html")
def _edit_or_new(self, id, related_id, is_edit):
"""
Edit and New were too similar and so I've put the code together
and try and do as much up front as possible.
"""
context = {'model': model, 'session': model.Session,
'user': c.user or c.author, 'auth_user_obj': c.userobj,
'for_view': True}
data_dict = {}
if is_edit:
tpl = 'related/edit.html'
auth_name = 'related_update'
auth_dict = {'id': related_id}
action_name = 'related_update'
try:
related = logic.get_action('related_show')(
context, {'id': related_id})
except logic.NotFound:
base.abort(404, _('Related item not found'))
else:
tpl = 'related/new.html'
auth_name = 'related_create'
auth_dict = {}
action_name = 'related_create'
try:
logic.check_access(auth_name, context, auth_dict)
except logic.NotAuthorized:
base.abort(401, base._('Not authorized'))
try:
c.pkg_dict = logic.get_action('package_show')(context, {'id': id})
except logic.NotFound:
base.abort(404, _('Package not found'))
data, errors, error_summary = {}, {}, {}
if base.request.method == "POST":
try:
data = logic.clean_dict(
df.unflatten(
logic.tuplize_dict(
logic.parse_params(base.request.params))))
if is_edit:
data['id'] = related_id
else:
data['dataset_id'] = id
data['owner_id'] = c.userobj.id
related = logic.get_action(action_name)(context, data)
if not is_edit:
h.flash_success(_("Related item was successfully created"))
else:
h.flash_success(_("Related item was successfully updated"))
h.redirect_to(
controller='related', action='list', id=c.pkg_dict['name'])
except df.DataError:
base.abort(400, _(u'Integrity Error'))
except logic.ValidationError, e:
errors = e.error_dict
error_summary = e.error_summary
else:
if is_edit:
data = related
c.types = self._type_options()
c.pkg_id = id
vars = {'data': data, 'errors': errors, 'error_summary': error_summary}
c.form = base.render("related/edit_form.html", extra_vars=vars)
return base.render(tpl)
def delete(self, id, related_id):
if 'cancel' in base.request.params:
h.redirect_to(controller='related', action='edit',
id=id, related_id=related_id)
context = {'model': model, 'session': model.Session,
'user': c.user or c.author, 'auth_user_obj': c.userobj}
try:
if base.request.method == 'POST':
logic.get_action('related_delete')(context, {'id': related_id})
h.flash_notice(_('Related item has been deleted.'))
h.redirect_to(controller='package', action='read', id=id)
c.related_dict = logic.get_action('related_show')(
context, {'id': related_id})
c.pkg_id = id
except logic.NotAuthorized:
base.abort(401, _('Unauthorized to delete related item %s') % '')
except logic.NotFound:
base.abort(404, _('Related item not found'))
return base.render('related/confirm_delete.html')
def _type_options(self):
'''
A tuple of options for the different related types for use in
the form.select() template macro.
'''
return ({"text": _("API"), "value": "api"},
{"text": _("Application"), "value": "application"},
{"text": _("Idea"), "value": "idea"},
{"text": _("News Article"), "value": "news_article"},
{"text": _("Paper"), "value": "paper"},
{"text": _("Post"), "value": "post"},
{"text": _("Visualization"), "value": "visualization"})