import pyramid_jsonapi.workflow as wf
import sqlalchemy
from functools import (
partial
)
from itertools import (
islice,
)
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPForbidden,
HTTPNotFound,
)
from sqlalchemy.orm.interfaces import (
ONETOMANY,
MANYTOMANY,
MANYTOONE
)
from pyramid_jsonapi.permissions import (
PermissionTarget,
Targets,
)
stages = (
'alter_query',
'alter_related_query',
'alter_result',
'before_write_item',
)
[docs]def get_one_altered_result_object(view, stages, query):
res_obj = wf.execute_stage(
view, stages, 'alter_result',
wf.ResultObject(view, view.get_one(query, view.not_found_message))
)
if res_obj.tuple_identifier in view.pj_shared.rejected.rejected['objects']:
raise HTTPForbidden(view.not_found_message)
return res_obj
[docs]def altered_objects_iterator(view, stages, stage_name, objects_iterable):
"""
Return an iterator of objects from objects_iterable filtered and altered by
the stage_name stage.
"""
return filter(
lambda o: o.tuple_identifier not in view.pj_shared.rejected.rejected['objects'],
map(
partial(wf.execute_stage, view, stages, stage_name),
(wf.ResultObject(view, o) for o in objects_iterable)
)
)
[docs]def shp_get_item_alter_result(obj, view, stage, view_method):
reason = "Permission denied."
item_pf = view.permission_filter('get', Targets.item, stage)
# pred = view.permission_to_dict(predicate(obj))
pred = item_pf(obj, PermissionTarget(Targets.item))
if not pred.id:
view.pj_shared.rejected.reject_object(obj.tuple_identifier, reason)
reject_atts = obj.attribute_mask - pred.attributes
obj.attribute_mask &= pred.attributes
# record rejected atts
view.pj_shared.rejected.reject_attributes(
obj.tuple_identifier,
reject_atts,
reason,
)
rel_pf = view.permission_filter('get', Targets.relationship, stage)
reject_rels = {
rel for rel in obj.rel_mask
if not rel_pf(obj, PermissionTarget(Targets.relationship, rel))
}
obj.rel_mask -= reject_rels
# record rejected rels
view.pj_shared.rejected.reject_relationships(
obj.tuple_identifier,
reject_rels,
reason,
)
return obj
[docs]def permission_handler(endpoint_name, stage_name):
handlers = {
'item_get': {
'alter_result': shp_get_item_alter_result,
},
'collection_get': {
'alter_result': shp_get_item_alter_result,
},
'related_get': {
'alter_result': shp_get_item_alter_result,
},
'relationships_get': {
'alter_result': shp_get_item_alter_result,
},
}
# for ep in ('collection_get', 'related_get', 'relationships_get'):
# handlers[ep] = handlers['item_get']
return handlers[endpoint_name][stage_name]