Source code for aristotle_mdr.downloader

from typing import Any, List, Dict, Optional, Union, AnyStr, Set

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core.files import File
from import get_storage_class
from django.core.exceptions import PermissionDenied
from django.core.files.base import ContentFile
from django.core.mail.message import EmailMessage
from django.db.models.query import QuerySet
from django.http.request import QueryDict
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.module_loading import import_string
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _

from operator import attrgetter
from hashlib import sha256
from collections import defaultdict
import pickle
import pypandoc

from import ConceptHelp
from aristotle_mdr import models as MDR
from aristotle_mdr.contrib.custom_fields.models import CustomValue
from aristotle_mdr.utils import fetch_aristotle_settings, get_model_label, format_seconds, cloud_enabled
from aristotle_mdr.utils.utils import get_download_template_path_for_item
from aristotle_mdr.views.utils import get_lazy_viewable_ids

import logging
logger = logging.getLogger(__name__)

[docs]class Downloader: """ Base class used by all downloaders Subclasses must override the create_file method Required class properties: * description: a description of the downloader type * download_type: the extension or name of the download to support * icon_class: the font-awesome class * metadata_register: can be one of: * a dictionary with keys corresponding to django app labels and values as lists of models within that app the downloader supports * the string "__all__" indicating the downloader supports all metadata types * the string "__template__" indicating the downloader supports any metadata type with a matching download template """ metadata_register: Union[Dict[str, List], str] = '__all__' # Fontawesome icon to use icon_class: str = 'fa-file-text-o' description: str = '' filename: str = 'download' allow_wrapper_pages: bool = False # Whether to allow a front and back page # A unique identifier for the downloader (used in url and passed to task) download_type: str = '' # Used to discover templates template_type: str = '' file_extension: str = '' requires_pandoc: bool = False # Download label (displayed to user) label: str = '' # Mime type used when sending file in an email mime_type: str = '' default_options = { 'include_supporting': False, 'include_related': False, 'subclasses': None, 'front_page': None, 'back_page': None, 'email_copy': False, 'registration_status': None, 'registration_authority': None, } def __init__(self, item_ids: List[int], user_id: Optional[int], options: Dict[str, Any] = {}, override_bulk: bool = False): self.item_ids = item_ids self.error = False self.order_fields = ['name'] if user_id is not None: self.user = get_user_model().objects.prefetch_related('profile').get(id=user_id) else: self.user = AnonymousUser() self.items = MDR._concept.objects.filter(id__in=item_ids).visible(self.user).select_subclasses().order_by(*self.order_fields) # Do len here since we are going to evaluate it later anyways self.numitems = len(self.items) self.bulk = (self.numitems > 1) or override_bulk if self.numitems == 0: raise PermissionDenied('User does not have permission to view any items') # Shallow copy of options self.options = self.default_options.copy() self.options.update(options)
[docs] def create_file(self) -> File: """ Create the file object, should be overwritten by subclasses See below for examples """ raise NotImplementedError
[docs] def get_storage(self, media=False): """Gets a storage class object (use media to get default media class instead of dl class)""" if settings.DOWNLOADS_STORAGE is not None and not media: storage_class = import_string(settings.DOWNLOADS_STORAGE) else: storage_class = get_storage_class() return storage_class()
@property def has_wrap_pages(self): return (self.options['front_page'] is not None or self.options['back_page'] is not None) def get_wrap_pages(self) -> List: if not self.allow_wrapper_pages: return [None, None] storage = self.get_storage(media=True) pages = [] for page_name in ['front_page', 'back_page']: page_path = self.options[page_name] if page_path is not None: with as page_file: pages.append( else: pages.append(None) return pages def get_filepath(self): if self.user.is_authenticated: userpart = str( else: userpart = 'anon' arghash = sha256() arghash.update(pickle.dumps(self.item_ids)) arghash.update(pickle.dumps(self.options)) fname = '/'.join([userpart, arghash.hexdigest(), self.filename]) if self.file_extension: return '.'.join([fname, self.file_extension]) return fname
[docs] def retrieve_file(self, filename: str) -> Optional[str]: """Use default storage class to retrieve file if it exists""" storage = self.get_storage() if storage.exists(filename): file_modified = storage.get_modified_time(filename) for item in self.items: # If one of the items has been modified after the file if item.modified > file_modified: storage.delete(filename) return None # If the file was modified after the items return storage.url(filename) return None
[docs] def store_file(self, filename: str, content: File) -> str: """Use default storage class to store file""" storage = self.get_storage() # Filename can change if a file already exists # (wont happen unless caching is off) final_fname =, content) return storage.url(final_fname)
def email_file(self, f: File, size: int, url: str): template_name = 'aristotle_mdr/email/download.html' context: Dict[str, Union[str, bool]] = { 'item_names': ', '.join([ for i in self.items]), 'download_url': url } # Send email with a link to the file and link to regenerate storage = self.get_storage() if hasattr(storage, 'querystring_expire'): expire_seconds = storage.querystring_expire context['expire_time'] = format_seconds(expire_seconds) # Build url to regenerate download query = QueryDict(mutable=True) query.setlist('items', self.item_ids) regenerate_url = '{host}{url}?{qstring}'.format( host=self.options['CURRENT_HOST'], url=reverse('aristotle:download_options', args=[self.download_type]), qstring=query.urlencode() ) # Update context context.update({'attached': False, 'regenerate_url': regenerate_url}) email = EmailMessage( 'Aristotle Download', render_to_string(template_name, context), to=[], ) email.content_subtype = 'html' # Sets the mime type of the body to text/html email.send(fail_silently=True)
[docs] def download(self) -> str: """Get the url for this downloads file, creating it if necessary""" filepath = self.get_filepath() if settings.DOWNLOAD_CACHING: url = self.retrieve_file(filepath) if url is not None: return url fileobj = self.create_file() size = fileobj.size # Access size here while file is open url = self.store_file(filepath, fileobj) if self.options['email_copy']: self.email_file(fileobj, size, url) return url
[docs] @classmethod def get_class_info(cls) -> Dict[str, Any]: """Used as context instead of passing classes to templates""" return { 'icon_class': cls.icon_class, 'label': cls.label, 'download_type': cls.download_type, 'description': cls.description }
class ItemList: """Class for storing items of one type along with model information (used in sub_dict below)""" def __init__(self, model_class, items: List = []): # Private properties self._cache: Dict = {} self._model_class = model_class self._items = { i for i in items} # Public properties self.app_label = self._model_class._meta.app_label self.model_name = self._model_class._meta.model_name self.verbose_name = self._model_class.get_verbose_name() self.verbose_name_plural = self._model_class.get_verbose_name_plural() @property def model_pluralized(self): """Get model name as plural if more than one""" if len(self) > 1: return self.verbose_name_plural return self.verbose_name @property def ids(self) -> List[int]: """List of item ids in list""" return [ for i in self._items.values()] @property def items(self): """Iterator of items""" return self._items.values() @property def help(self) -> Optional[ConceptHelp]: """ Help object for the model (None if not found) This is cached so repeated calls are ok """ if 'help' in self._cache: return self._cache['help'] try: help_obj = ConceptHelp.objects.get( app_label=self.app_label, concept_type=self.model_name ) except ConceptHelp.DoesNotExist: help_obj = None self._cache['help'] = help_obj return help_obj def has_item(self, iid): """Whether the list contins item with this id""" return iid in self._items def get_item(self, iid): """Get item by id""" return self._items[iid] def __getitem__(self, key): """Support indexing the itemlist to get an item""" return self.get_item(key) def add_item(self, item): """Add item to list""" self._items[] = item def as_dict(self): """Return copy of internal dictionary""" return self._items.copy() def __len__(self): """Support len()""" return len(self._items) def sorted_items(self): """Items sorted by name""" items_list = list(self.items) return sorted(items_list, key=attrgetter('name')) class HTMLDownloader(Downloader): """ Generates a html download This is subclassed for other formats that are generated from html such as the pdf downloader. But can be enabled for testing purposes """ download_type = 'html' template_type = 'html' file_extension = 'html' label = 'HTML' mime_type = 'text/html' metadata_register = '__all__' description = 'Download as html (used for debugging)' bulk_download_template = 'aristotle_mdr/downloads/html/bulk_download.html' def get_base_download_context(self) -> Dict[str, Any]: aristotle_settings = fetch_aristotle_settings() context = { 'infobox_identifier_name': aristotle_settings.get('INFOBOX_IDENTIFIER_NAME', _("Item ID")), 'user': self.user, 'options': self.options, 'config': aristotle_settings, 'aristotle_cloud_is_active': cloud_enabled(), 'export_date': now(), 'viewable_ids': get_lazy_viewable_ids(self.user) } context['CURRENT_CLIENT_BASE'] = getattr(settings, 'CURRENT_CLIENT_BASE', None) return context def get_download_context(self) -> Dict[str, Any]: """ Return context for single item download """ context = self.get_base_download_context() # This will raise an exception if the list is empty, but that's ok item = self.items[0] if self.options['include_supporting']: sub_items = self.get_sub_items_dict() else: sub_items = {} # Add tree if dss if 'aristotle_dse' in settings.INSTALLED_APPS: from aristotle_dse.models import DataSetSpecification, DSSClusterInclusion, DSSDEInclusion inclusion_orderer = {DSSClusterInclusion: 0, DSSDEInclusion: 1} if isinstance(item, DataSetSpecification): kwargs = self.prelim.get(, None) if kwargs: kwargs['objects'] = {} # Reuse sub objects if already avaliable (saves a query) if sub_items: if 'aristotle_dse.datasetspecification' in sub_items: kwargs['objects'].update(sub_items['aristotle_dse.datasetspecification'].as_dict()) if 'aristotle_dse.dataelement' in sub_items: kwargs['objects'].update(sub_items['aristotle_mdr.dataelement'].as_dict()) dss_tree = item.get_cluster_tree(**kwargs) context['tree'] = dss_tree # A nice lambda to force ordering by inclusion type, then inclusion order child_sorter = lambda node: (inclusion_orderer.get(node.relation_data.__class__, 999), node.relation_data.order) context['tree_values'] = dss_tree.get_values(dss_tree.root, sort_by=child_sorter) context.update({ 'title':, 'item': item, 'subitems': sub_items, 'tableOfContents': len(sub_items) > 0, }) return context def _add_to_sub_items(self, items_dict, item): """Adds an item to the sub items dict""" item_class = type(item) label = get_model_label(item_class) # Create a new item list if label not in dict if label not in items_dict: items_dict[label] = ItemList(item_class) # Add item to itemlist items_dict[label].add_item(item) def get_sub_items_dict(self, include_root=False) -> Dict[str, ItemList]: """Function that populates the supporting items in the template""" items: Dict[str, ItemList] = {} glossary_enabled = False if 'aristotle_glossary' in settings.INSTALLED_APPS: from aristotle_glossary.models import GlossaryItem glossary_items = GlossaryItem.objects.none() glossary_enabled = True # Get all items using above method to create dict for item in self.items: # include_root includes the selected items in the dict if include_root: self._add_to_sub_items(items, item) registration_authority_id = self.options['registration_authority'] state = self.options['registration_status'] # Fetch prelim values, use with get_download item if avaliable prelim = self.prelim.get(, {}) all_download_items = item.get_download_items(**prelim) for download_items in all_download_items: if isinstance(download_items, QuerySet): # It's a queryset with multiple items if registration_authority_id is not None: download_items = download_items.filter(statuses__registrationAuthority=registration_authority_id) if state is not None: download_items = download_items.filter(statuses__state=state) sub_query = download_items.visible(self.user) # Prefetch all sub objects on a value domain if sub_query.model == MDR.ValueDomain: sub_query = sub_query.select_related( 'unit_of_measure', 'conceptual_domain', 'data_type' ).prefetch_related( 'permissiblevalue_set', 'supplementaryvalue_set' ) sub_list = list(sub_query) if glossary_enabled: glossary_items |= GlossaryItem.objects.filter(index__in=download_items) else: raise AssertionError("Must be a QuerySet") for sub_item in sub_list: # Can be none for components if sub_item is not None: self._add_to_sub_items(items, sub_item) if glossary_enabled: for sub_item in glossary_items.visible(self.user): # Can be none for components if sub_item is not None: self._add_to_sub_items(items, sub_item) return items def get_bulk_download_context(self) -> Dict[str, Any]: """ Return context for bulk download """ context = self.get_base_download_context() if self.options['include_supporting']: sub_items = self.get_sub_items_dict() else: sub_items = {} context.update({ 'tableOfContents': True, 'items': self.items, 'supporting_items': sub_items }) return context def get_preliminary_values(self) -> Dict: """Fetch prelim values for calculation of download items""" prelim = {} for item in self.items: if 'aristotle_dse' in settings.INSTALLED_APPS: from aristotle_dse.models import DataSetSpecification if isinstance(item, DataSetSpecification): cluster_relations = item.get_all_clusters() dss_ids = item.get_unique_ids(cluster_relations) de_relations = item.get_de_relations(dss_ids) prelim[] = { 'cluster_relations': cluster_relations, 'de_relations': de_relations } return prelim def qs_as_dict(self, qs, concept_field_name='concept') -> Dict[int, List]: """Get queryset as dict mapping ids to lists of objects""" concept_id_field_name = concept_field_name + '_id' object_dict: Dict = defaultdict(list) for o in qs: concept_id = getattr(o, concept_id_field_name) object_dict[concept_id].append(o) return object_dict def get_caches(self, context: Dict) -> Dict: """Fetch related objects for items in bulk to be used in template""" # Build set of all items & subitem id's all_ids: Set = { for i in self.items} # Fetch for subitems if they are avaliable if 'subitems' in context: subitems = context['subitems'] # Add to all_ids for label, itemlist in subitems.items(): for iid in itemlist.ids: all_ids.add(iid) # Bulk lookup current status status_objs = MDR.Status.objects.filter(concept__in=all_ids).current().all() # Bulk lookup custom values (with non empty content) custom_values = CustomValue.objects.filter( concept__in=all_ids ).with_content().visible(self.user).select_related('field') context.update({ 'current_statuses': self.qs_as_dict(status_objs), 'custom_values': self.qs_as_dict(custom_values) }) return context def get_context(self) -> Dict[str, Any]: """ Gets the template context Can be used by subclasses """ self.prelim = self.get_preliminary_values() if self.bulk: context = self.get_bulk_download_context() else: context = self.get_download_context() context = self.get_caches(context) context.update({"is_bulk_download": self.bulk}) return context def get_template(self) -> str: """ Gets the template context Can be used by subclasses """ if self.bulk: return self.bulk_download_template else: # This will raise an exception if the list is empty, but thats ok item = self.items[0] # Template folder should be renamed to html return get_download_template_path_for_item(item, self.template_type) def get_html(self) -> bytes: """ Gets the rendered html string Can be used by subclasses """ template = self.get_template() context = self.get_context() safestring = render_to_string(template, context=context) return str(safestring).encode() def create_file(self): html = self.get_html() return ContentFile(html) class PandocDownloader(HTMLDownloader): """ Used as base class for downloader converting html to other formats via pandoc. Does not work as a downloader itself """ requires_pandoc = True def convert_html(self, html) -> AnyStr: raise NotImplementedError def create_file(self): html = self.get_html() string = self.convert_html(html) return ContentFile(string) class DocxDownloader(PandocDownloader): download_type = 'docx' file_extension = 'docx' label = 'Word' # Yep, the proper mime type for docx really is that long mime_type = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' metadata_register = '__all__' icon_class = 'fa-file-word-o' description = 'Download as word document' def convert_html(self, html): return pypandoc.convert_text(html, 'docx', format='html', return_bytes=True) class ODTDownloader(PandocDownloader): download_type = 'odt' file_extension = 'odt' label = 'ODT' mime_type = 'application/vnd.oasis.opendocument.text' metadata_register = '__all__' icon_class = 'fa-file-word-o' description = 'Download as odt document' def convert_html(self, html): return pypandoc.convert_text(html, 'odt', format='html', return_bytes=True) class MarkdownDownloader(PandocDownloader): download_type = 'md' file_extension = 'md' label = 'Markdown' mime_type = 'text/markdown' metadata_register = '__all__' description = 'Download as markdown' def convert_html(self, html): return pypandoc.convert_text(html, 'md', format='html')