Source code for deckhand.db.sqlalchemy.api

# Copyright 2017 AT&T Intellectual Property.  All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines interface for DB access."""

import copy
import functools
import hashlib
import threading

from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db import options
from oslo_db.sqlalchemy import session
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
import sqlalchemy.orm as sa_orm
from sqlalchemy import text

from deckhand.common import utils
from deckhand.db.sqlalchemy import models
from deckhand.engine import utils as eng_utils
from deckhand import errors
from deckhand import types

LOG = logging.getLogger(__name__)
CONF = cfg.CONF

options.set_defaults(CONF)

_FACADE = None
_LOCK = threading.Lock()


def _create_facade_lazily():
    global _LOCK, _FACADE
    if _FACADE is None:
        with _LOCK:
            if _FACADE is None:
                _FACADE = session.EngineFacade.from_config(
                    CONF, sqlite_fk=True)
    return _FACADE


[docs]def get_engine(): facade = _create_facade_lazily() return facade.get_engine()
[docs]def get_session(autocommit=True, expire_on_commit=False): facade = _create_facade_lazily() return facade.get_session(autocommit=autocommit, expire_on_commit=expire_on_commit)
[docs]def drop_db(): models.unregister_models(get_engine())
[docs]def setup_db(connection_string, create_tables=False): models.register_models(get_engine(), connection_string) if create_tables: models.create_tables(get_engine())
[docs]def raw_query(query, **kwargs): """Execute a raw query against the database.""" # Cast all the strings that represent integers to integers because type # matters when using ``bindparams``. for key, val in kwargs.items(): if key.endswith('_id'): try: val = int(val) kwargs[key] = val except ValueError: pass stmt = text(query) stmt = stmt.bindparams(**kwargs) return get_engine().execute(stmt)
[docs]def require_unique_document_schema(schema=None): """Decorator to enforce only one singleton document exists in the system. An example of a singleton document is a ``LayeringPolicy`` document. Only one singleton document can exist within the system at any time. It is an error to attempt to insert a new document with the same ``schema`` if it has a different ``metadata.name`` than the existing document. A singleton document that already exists can be updated, if the document that is passed in has the same name/schema as the existing one. The existing singleton document can be replaced by first deleting it and only then creating a new one. :raises SingletonDocumentConflict: if a singleton document in the system already exists and any of the documents to be created has the same ``schema`` but has a ``metadata.name`` that differs from the one already registered. """ def decorator(f): if schema not in types.DOCUMENT_SCHEMA_TYPES: raise errors.DeckhandException( 'Unrecognized document schema %s.' % schema) @functools.wraps(f) def wrapper(bucket_name, documents, *args, **kwargs): existing_documents = revision_documents_get( schema=schema, deleted=False, include_history=False) existing_document_names = [ eng_utils.meta(x) for x in existing_documents ] conflicting_names = [ eng_utils.meta(x) for x in documents if eng_utils.meta(x) not in existing_document_names and x['schema'].startswith(schema) ] if existing_document_names and conflicting_names: raise errors.SingletonDocumentConflict( schema=existing_document_names[0][0], layer=existing_document_names[0][1], name=existing_document_names[0][2], conflict=', '.join(["[%s, %s] %s" % (x[0], x[1], x[2]) for x in conflicting_names])) return f(bucket_name, documents, *args, **kwargs) return wrapper return decorator
[docs]@require_unique_document_schema(types.LAYERING_POLICY_SCHEMA) def documents_create(bucket_name, documents, session=None): """Create a set of documents and associated bucket. If no changes are detected, a new revision will not be created. This allows services to periodically re-register their schemas without creating unnecessary revisions. :param bucket_name: The name of the bucket with which to associate created documents. :param documents: List of documents to be created. :param session: Database session object. :returns: List of created documents in dictionary format. :raises DocumentExists: If the document already exists in the DB for any bucket. """ session = session or get_session() resp = [] with session.begin(): documents_to_create = _documents_create(bucket_name, documents, session=session) # The documents to be deleted are computed by comparing the documents # for the previous revision (if it exists) that belong to `bucket_name` # with `documents`: the difference between the former and the latter. document_history = [ d for d in revision_documents_get(bucket_name=bucket_name, session=session) ] documents_to_delete = [ h for h in document_history if eng_utils.meta(h) not in [ eng_utils.meta(d) for d in documents] ] # Only create a revision if any docs have been created, changed or # deleted. if any([documents_to_create, documents_to_delete]): revision = revision_create(session=session) bucket = bucket_get_or_create(bucket_name, session=session) if documents_to_delete: LOG.debug('Deleting documents: %s.', [eng_utils.meta(d) for d in documents_to_delete]) deleted_documents = [] for d in documents_to_delete: doc = document_delete(d, revision['id'], bucket, session=session) deleted_documents.append(doc) resp.append(doc) if documents_to_create: LOG.debug( 'Creating documents: %s.', [ (d['schema'], d['layer'], d['name']) for d in documents_to_create ] ) for doc in documents_to_create: doc['bucket_id'] = bucket['id'] doc['revision_id'] = revision['id'] if not doc.get('orig_revision_id'): doc['orig_revision_id'] = doc['revision_id'] try: doc.save(session=session) except db_exception.DBDuplicateEntry: raise errors.DuplicateDocumentExists( schema=doc['schema'], layer=doc['layer'], name=doc['name'], bucket=bucket['name']) resp.append(doc.to_dict()) # NOTE(fmontei): The orig_revision_id is not copied into the # revision_id for each created document, because the revision_id here # should reference the just-created revision. In case the user needs # the original revision_id, that is returned as well. return resp
[docs]def document_delete(document, revision_id, bucket, session=None): """Delete a document Creates a new document with the bare minimum information about the document that is to be deleted, and then sets the appropriate deleted fields :param document: document object/dict to be deleted :param revision_id: id of the revision where the document is to be deleted :param bucket: bucket object/dict where the document will be deleted from :param session: Database session object. :return: dict representation of deleted document """ session = session or get_session() doc = models.Document() # Store bare minimum information about the document. doc['schema'] = document['schema'] doc['name'] = document['name'] doc['layer'] = document['layer'] doc['data'] = {} doc['meta'] = document['metadata'] doc['data_hash'] = _make_hash({}) doc['metadata_hash'] = _make_hash({}) doc['bucket_id'] = bucket['id'] doc['revision_id'] = revision_id # Save and mark the document as `deleted` in the database. try: doc.save(session=session) except db_exception.DBDuplicateEntry: raise errors.DuplicateDocumentExists( schema=doc['schema'], layer=doc['layer'], name=doc['name'], bucket=bucket['name']) doc.safe_delete(session=session) return doc.to_dict()
[docs]def documents_delete_from_buckets_list(bucket_names, session=None): """Delete all documents in the provided list of buckets :param bucket_names: list of bucket names for which the associated buckets and their documents need to be deleted. :param session: Database session object. :returns: A new model.Revisions object after all the documents have been deleted. """ session = session or get_session() with session.begin(): # Create a new revision revision = models.Revision() revision.save(session=session) for bucket_name in bucket_names: documents_to_delete = [ d for d in revision_documents_get(bucket_name=bucket_name, session=session) if "deleted" not in d or not d['deleted'] ] bucket = bucket_get_or_create(bucket_name, session=session) if documents_to_delete: LOG.debug('Deleting documents: %s.', [eng_utils.meta(d) for d in documents_to_delete]) for document in documents_to_delete: document_delete(document, revision['id'], bucket, session=session) return revision
def _documents_create(bucket_name, documents, session=None): documents = copy.deepcopy(documents) session = session or get_session() filters = ('name', 'schema', 'layer') changed_documents = [] def _document_create(document): model = models.Document() model.update(document) return model for document in documents: document.setdefault('data', {}) document = _fill_in_metadata_defaults(document) # Hash the document's metadata and data to later efficiently check # whether those data have changed. document['data_hash'] = _make_hash(document['data']) document['metadata_hash'] = _make_hash(document['meta']) try: existing_document = document_get( raw_dict=True, deleted=False, revision_id='latest', **{x: document[x] for x in filters}) except errors.DocumentNotFound: # Ignore bad data at this point. Allow creation to bubble up the # error related to bad data. existing_document = None if existing_document: # If the document already exists in another bucket, raise an error. if existing_document['bucket_name'] != bucket_name: raise errors.DuplicateDocumentExists( schema=existing_document['schema'], name=existing_document['name'], layer=existing_document['layer'], bucket=existing_document['bucket_name']) # By this point we know existing_document and document have the # same name, schema and layer due to the filters passed to the DB # query. But still want to check whether the document is precisely # the same one by comparing metadata/data hashes. if (existing_document['data_hash'] == document['data_hash'] and existing_document['metadata_hash'] == document[ 'metadata_hash']): # Since the document has not changed, reference the original # revision in which it was created. This is necessary so that # the correct revision history is maintained. if existing_document['orig_revision_id']: document['orig_revision_id'] = existing_document[ 'orig_revision_id'] else: document['orig_revision_id'] = existing_document[ 'revision_id'] # Create all documents, even unchanged ones, for the current revision. This # makes the generation of the revision diff a lot easier. for document in documents: doc = _document_create(document) changed_documents.append(doc) return changed_documents def _fill_in_metadata_defaults(document): document['meta'] = document.pop('metadata') document['name'] = document['meta']['name'] if not document['meta'].get('storagePolicy', None): document['meta']['storagePolicy'] = 'cleartext' document['meta'].setdefault('layeringDefinition', {}) document['layer'] = document['meta']['layeringDefinition'].get('layer') if 'abstract' not in document['meta']['layeringDefinition']: document['meta']['layeringDefinition']['abstract'] = False if 'replacement' not in document['meta']: document['meta']['replacement'] = False return document def _make_hash(data): return hashlib.sha256( json.dumps(data, sort_keys=True).encode('utf-8')).hexdigest()
[docs]def document_get(session=None, raw_dict=False, revision_id=None, **filters): """Retrieve the first document for ``revision_id`` that match ``filters``. :param session: Database session object. :param raw_dict: Whether to retrieve the exact way the data is stored in DB if ``True``, else the way users expect the data. :param revision_id: The ID corresponding to the ``Revision`` object. If the it is "latest", then retrieve the latest revision, if one exists. :param filters: Dictionary attributes (including nested) used to filter out revision documents. :returns: Dictionary representation of retrieved document. :raises: DocumentNotFound if the document wasn't found. """ session = session or get_session() if revision_id == 'latest': revision = session.query(models.Revision)\ .order_by(models.Revision.created_at.desc())\ .first() if revision: filters['revision_id'] = revision.id elif revision_id: filters['revision_id'] = revision_id # TODO(fmontei): Currently Deckhand doesn't support filtering by nested # JSON fields via sqlalchemy. For now, filter the documents using all # "regular" filters via sqlalchemy and all nested filters via Python. nested_filters = {} for f in filters.copy(): if any([x in f for x in ('.', 'schema')]): nested_filters.setdefault(f, filters.pop(f)) # Documents with the same metadata.name and schema can exist across # different revisions, so it is necessary to order documents by creation # date, then return the first document that matches all desired filters. documents = session.query(models.Document)\ .filter_by(**filters)\ .order_by(models.Document.created_at.desc())\ .all() for doc in documents: d = doc.to_dict(raw_dict=raw_dict) if utils.deepfilter(d, **nested_filters): return d filters.update(nested_filters) raise errors.DocumentNotFound(filters=filters)
[docs]def document_get_all(session=None, raw_dict=False, revision_id=None, **filters): """Retrieve all documents for ``revision_id`` that match ``filters``. :param session: Database session object. :param raw_dict: Whether to retrieve the exact way the data is stored in DB if ``True``, else the way users expect the data. :param revision_id: The ID corresponding to the ``Revision`` object. If the it is "latest", then retrieve the latest revision, if one exists. :param filters: Dictionary attributes (including nested) used to filter out revision documents. :returns: Dictionary representation of each retrieved document. """ session = session or get_session() if revision_id == 'latest': revision = session.query(models.Revision)\ .order_by(models.Revision.created_at.desc())\ .first() if revision: filters['revision_id'] = revision.id elif revision_id: filters['revision_id'] = revision_id # TODO(fmontei): Currently Deckhand doesn't support filtering by nested # JSON fields via sqlalchemy. For now, filter the documents using all # "regular" filters via sqlalchemy and all nested filters via Python. nested_filters = {} for f in filters.copy(): if any([x in f for x in ('.', 'schema')]): nested_filters.setdefault(f, filters.pop(f)) # Retrieve the most recently created documents for the revision, because # documents with the same metadata.name and schema can exist across # different revisions. documents = session.query(models.Document)\ .filter_by(**filters)\ .order_by(models.Document.created_at.desc())\ .all() final_documents = [] for doc in documents: d = doc.to_dict(raw_dict=raw_dict) if utils.deepfilter(d, **nested_filters): final_documents.append(d) return final_documents
####################
[docs]def bucket_get_or_create(bucket_name, session=None): """Retrieve or create bucket. Retrieve the ``Bucket`` DB object by ``bucket_name`` if it exists or else create a new ``Bucket`` DB object by ``bucket_name``. :param bucket_name: Unique identifier used for creating or retrieving a bucket. :param session: Database session object. :returns: Dictionary representation of created/retrieved bucket. """ session = session or get_session() try: bucket = session.query(models.Bucket)\ .filter_by(name=bucket_name)\ .one() except sa_orm.exc.NoResultFound: bucket = models.Bucket() bucket.update({'name': bucket_name}) bucket.save(session=session) return bucket.to_dict()
####################
[docs]def bucket_get_all(session=None, **filters): """Return list of all buckets. :param session: Database session object. :returns: List of dictionary representations of retrieved buckets. """ session = session or get_session() buckets = session.query(models.Bucket)\ .all() result = [] for bucket in buckets: revision_dict = bucket.to_dict() if utils.deepfilter(revision_dict, **filters): result.append(bucket) return result
[docs]def revision_create(session=None): """Create a revision. :param session: Database session object. :returns: Dictionary representation of created revision. """ session = session or get_session() revision = models.Revision() revision.save(session=session) return revision.to_dict()
[docs]def revision_get(revision_id=None, session=None): """Return the specified `revision_id`. :param revision_id: The ID corresponding to the ``Revision`` object. :param session: Database session object. :returns: Dictionary representation of retrieved revision. :raises RevisionNotFound: if the revision was not found. """ session = session or get_session() try: revision = session.query(models.Revision)\ .filter_by(id=revision_id)\ .one()\ .to_dict() except sa_orm.exc.NoResultFound: raise errors.RevisionNotFound(revision_id=revision_id) revision['documents'] = _update_revision_history(revision['documents']) return revision
[docs]def revision_get_latest(session=None): """Return the latest revision. :param session: Database session object. :returns: Dictionary representation of latest revision. """ session = session or get_session() latest_revision = session.query(models.Revision)\ .order_by(models.Revision.created_at.desc())\ .first() if latest_revision: latest_revision = latest_revision.to_dict() latest_revision['documents'] = _update_revision_history( latest_revision['documents']) else: # If the latest revision doesn't exist, assume an empty revision # history and return a dummy revision instead for the purposes of # revision rollback. latest_revision = {'documents': [], 'id': 0} return latest_revision
[docs]def require_revision_exists(f): """Decorator to require the specified revision to exist. Requires the wrapped function to use revision_id as the first argument. If revision_id is not provided, then the check is not performed. """ @functools.wraps(f) def wrapper(revision_id=None, *args, **kwargs): if revision_id: revision_get(revision_id) return f(revision_id, *args, **kwargs) return wrapper
def _update_revision_history(documents): # Since documents that are unchanged across revisions need to be saved for # each revision, we need to ensure that the original revision is shown # for the document's `revision_id` to maintain the correct revision # history. for doc in documents: if doc['orig_revision_id']: doc['revision_id'] = doc['orig_revision_id'] return documents
[docs]def revision_get_all(session=None, **filters): """Return list of all revisions. :param session: Database session object. :returns: List of dictionary representations of retrieved revisions. """ session = session or get_session() revisions = session.query(models.Revision)\ .all() result = [] for revision in revisions: revision_dict = revision.to_dict() if utils.deepfilter(revision_dict, **filters): revision_dict['documents'] = _update_revision_history( revision_dict['documents']) result.append(revision_dict) return result
[docs]def revision_delete_all(): """Delete all revisions and resets primary key index back to 1 for each table in the database. .. warning:: Effectively purges all data from database. :param session: Database session object. :returns: None """ engine = get_engine() if engine.name == 'postgresql': # NOTE(fmontei): While cascade should delete all data from all tables, # we also need to reset the index to 1 for each table. for table in ['buckets', 'revisions', 'revision_tags', 'documents', 'validations']: engine.execute( text("TRUNCATE TABLE %s RESTART IDENTITY CASCADE;" % table) .execution_options(autocommit=True)) else: raw_query("DELETE FROM revisions;")
[docs]@require_revision_exists def revision_documents_get(revision_id=None, include_history=True, unique_only=True, session=None, **filters): """Return the documents that match filters for the specified `revision_id`. :param revision_id: The ID corresponding to the ``Revision`` object. If the ID is ``None``, then retrieve the latest revision, if one exists. :param include_history: Return all documents for revision history prior and up to current revision, if ``True``. Default is ``True``. :param unique_only: Return only unique documents if ``True``. Default is ``True``. :param session: Database session object. :param filters: Key-value pairs used for filtering out revision documents. :returns: All revision documents for ``revision_id`` that match the ``filters``, including document revision history if applicable. :raises RevisionNotFound: if the revision was not found. """ session = session or get_session() revision_documents = [] try: if revision_id: revision = session.query(models.Revision)\ .filter_by(id=revision_id)\ .one() else: # If no revision_id is specified, grab the latest one. revision = session.query(models.Revision)\ .order_by(models.Revision.created_at.desc())\ .first() if revision: revision_documents = revision.to_dict()['documents'] if include_history: relevant_revisions = session.query(models.Revision)\ .filter(models.Revision.created_at < revision.created_at)\ .order_by(models.Revision.created_at)\ .all() # Include documents from older revisions in response body. for relevant_revision in relevant_revisions: revision_documents.extend( relevant_revision.to_dict()['documents']) except sa_orm.exc.NoResultFound: raise errors.RevisionNotFound(revision_id=revision_id) revision_documents = _update_revision_history(revision_documents) filtered_documents = eng_utils.filter_revision_documents( revision_documents, unique_only, **filters) return filtered_documents
####################
[docs]@require_revision_exists def revision_tag_create(revision_id, tag, data=None, session=None): """Create a revision tag. If a tag already exists by name ``tag``, the request is ignored. :param revision_id: ID corresponding to ``Revision`` DB object. :param tag: Name of the revision tag. :param data: Dictionary of data to be associated with tag. :param session: Database session object. :returns: The tag that was created if not already present in the database, else None. :raises RevisionTagBadFormat: If data is neither None nor dictionary. """ session = session or get_session() tag_model = models.RevisionTag() if data is None: data = {} if data and not isinstance(data, dict): raise errors.RevisionTagBadFormat(data=data) try: with session.begin(): tag_model.update( {'tag': tag, 'data': data, 'revision_id': revision_id}) tag_model.save(session=session) resp = tag_model.to_dict() except db_exception.DBDuplicateEntry: # Update the revision tag if it already exists. LOG.debug('Tag %s already exists for revision_id %s. Attempting to ' 'update the entry.', tag, revision_id) try: tag_to_update = session.query(models.RevisionTag)\ .filter_by(tag=tag, revision_id=revision_id)\ .one() except sa_orm.exc.NoResultFound: raise errors.RevisionTagNotFound(tag=tag, revision=revision_id) tag_to_update.update({'data': data}) tag_to_update.save(session=session) resp = tag_to_update.to_dict() return resp
[docs]@require_revision_exists def revision_tag_get(revision_id, tag, session=None): """Retrieve tag details. :param revision_id: ID corresponding to ``Revision`` DB object. :param tag: Name of the revision tag. :param session: Database session object. :returns: None :raises RevisionTagNotFound: If ``tag`` for ``revision_id`` was not found. """ session = session or get_session() try: tag = session.query(models.RevisionTag)\ .filter_by(tag=tag, revision_id=revision_id)\ .one() except sa_orm.exc.NoResultFound: raise errors.RevisionTagNotFound(tag=tag, revision=revision_id) return tag.to_dict()
[docs]@require_revision_exists def revision_tag_get_all(revision_id, session=None): """Return list of tags for a revision. :param revision_id: ID corresponding to ``Revision`` DB object. :param tag: Name of the revision tag. :param session: Database session object. :returns: List of tags for ``revision_id``, ordered by the tag name by default. """ session = session or get_session() tags = session.query(models.RevisionTag)\ .filter_by(revision_id=revision_id)\ .order_by(models.RevisionTag.tag)\ .all() return [t.to_dict() for t in tags]
[docs]@require_revision_exists def revision_tag_delete(revision_id, tag, session=None): """Delete a specific tag for a revision. :param revision_id: ID corresponding to ``Revision`` DB object. :param tag: Name of the revision tag. :param session: Database session object. :returns: None """ query = raw_query( """DELETE FROM revision_tags WHERE tag=:tag AND revision_id=:revision_id;""", tag=tag, revision_id=revision_id) if query.rowcount == 0: raise errors.RevisionTagNotFound(tag=tag, revision=revision_id)
[docs]@require_revision_exists def revision_tag_delete_all(revision_id, session=None): """Delete all tags for a revision. :param revision_id: ID corresponding to ``Revision`` DB object. :param session: Database session object. :returns: None """ session = session or get_session() session.query(models.RevisionTag)\ .filter_by(revision_id=revision_id)\ .delete(synchronize_session=False)
####################
[docs]def revision_rollback(revision_id, latest_revision, session=None): """Rollback the latest revision to revision specified by ``revision_id``. Rolls back the latest revision to the revision specified by ``revision_id`` thereby creating a new, carbon-copy revision. :param revision_id: Revision ID to which to rollback. :param latest_revision: Dictionary representation of the latest revision in the system. :returns: The newly created revision. """ session = session or get_session() latest_revision_docs = revision_documents_get(latest_revision['id'], session=session) latest_revision_hashes = [ (d['data_hash'], d['metadata_hash']) for d in latest_revision_docs ] if latest_revision['id'] == revision_id: LOG.debug('The revision being rolled back to is the current revision.' 'Expect no meaningful changes.') if revision_id == 0: # Delete all existing documents in all buckets all_buckets = bucket_get_all(deleted=False) bucket_names = [str(b['name']) for b in all_buckets] revision = documents_delete_from_buckets_list(bucket_names, session=session) return revision.to_dict() else: # Sorting the documents so the documents in the new revision are in # the same order as the previous revision to support stable testing orig_revision_docs = sorted(revision_documents_get(revision_id, session=session), key=lambda d: d['id']) # A mechanism for determining whether a particular document has changed # between revisions. Keyed with the document_id, the value is True if # it has changed, else False. doc_diff = {} # List of unique buckets that exist in this revision unique_buckets = [] for orig_doc in orig_revision_docs: if ((orig_doc['data_hash'], orig_doc['metadata_hash']) not in latest_revision_hashes): doc_diff[orig_doc['id']] = True else: doc_diff[orig_doc['id']] = False if orig_doc['bucket_id'] not in unique_buckets: unique_buckets.append(orig_doc['bucket_id']) # We need to find which buckets did not exist at this revision buckets_to_delete = [] all_buckets = bucket_get_all(deleted=False) for bucket in all_buckets: if bucket['id'] not in unique_buckets: buckets_to_delete.append(str(bucket['name'])) # Create the new revision, if len(buckets_to_delete) > 0: new_revision = documents_delete_from_buckets_list(buckets_to_delete, session=session) else: new_revision = models.Revision() with session.begin(): new_revision.save(session=session) # No changes have been made between the target revision to rollback to # and the latest revision. if set(doc_diff.values()) == set([False]): LOG.debug('The revision being rolled back to has the same documents ' 'as that of the current revision. Expect no meaningful ' 'changes.') # Create the documents for the revision. for orig_document in orig_revision_docs: orig_document['revision_id'] = new_revision['id'] orig_document['meta'] = orig_document.pop('metadata') new_document = models.Document() new_document.update({x: orig_document[x] for x in ( 'name', 'meta', 'layer', 'data', 'data_hash', 'metadata_hash', 'schema', 'bucket_id')}) new_document['revision_id'] = new_revision['id'] # If the document has changed, then use the revision_id of the new # revision, otherwise use the original revision_id to preserve the # revision history. if doc_diff[orig_document['id']]: new_document['orig_revision_id'] = new_revision['id'] else: new_document['orig_revision_id'] = revision_id with session.begin(): new_document.save(session=session) new_revision = new_revision.to_dict() new_revision['documents'] = _update_revision_history( new_revision['documents']) return new_revision
#################### def _get_validation_policies_for_revision(revision_id, session=None): session = session or get_session() # Check if a ValidationPolicy for the revision exists. validation_policies = document_get_all( session, revision_id=revision_id, deleted=False, schema=types.VALIDATION_POLICY_SCHEMA) if not validation_policies: # Otherwise return early. LOG.debug('Failed to find a ValidationPolicy for revision ID %s. ' 'Only the "%s" results will be included in the response.', revision_id, types.DECKHAND_SCHEMA_VALIDATION) validation_policies = [] return validation_policies
[docs]@require_revision_exists def validation_create(revision_id, val_name, val_data, session=None): session = session or get_session() validation_kwargs = { 'revision_id': revision_id, 'name': val_name, 'status': val_data.get('status', None), 'validator': val_data.get('validator', None), 'errors': val_data.get('errors', []), } validation = models.Validation() with session.begin(): validation.update(validation_kwargs) validation.save(session=session) return validation.to_dict()
[docs]@require_revision_exists def validation_get_all(revision_id, session=None): # Query selects only unique combinations of (name, status) from the # `Validations` table and prioritizes 'failure' result over 'success' # result via alphabetical ordering of the status column. Each document # has its own validation but for this query we want to return the result # of the overall validation for the revision. If just 1 document failed # validation, we regard the validation for the whole revision as 'failure'. session = session or get_session() query = raw_query(""" SELECT DISTINCT name, status FROM validations as v1 WHERE revision_id=:revision_id AND status = ( SELECT status FROM validations as v2 WHERE v2.name = v1.name ORDER BY status LIMIT 1 ) GROUP BY name, status ORDER BY name, status; """, revision_id=revision_id) result = {v[0]: v for v in query.fetchall()} actual_validations = set(v[0] for v in result.values()) validation_policies = _get_validation_policies_for_revision(revision_id) if not validation_policies: return result.values() # TODO(fmontei): Raise error for expiresAfter conflicts for duplicate # validations across ValidationPolicy documents. expected_validations = set() for vp in validation_policies: expected_validations = expected_validations.union( list(v['name'] for v in vp['data'].get('validations', []))) missing_validations = expected_validations - actual_validations extra_validations = actual_validations - expected_validations # If an entry in the ValidationPolicy was never POSTed, set its status # to failure. for missing_validation in missing_validations: result[missing_validation] = (missing_validation, 'failure') # If an entry is not in the ValidationPolicy but was externally registered, # then override its status to "ignored [{original_status}]". for extra_validation in extra_validations: result[extra_validation] = ( extra_validation, 'ignored [%s]' % result[extra_validation][1]) return result.values()
def _check_validation_entries_against_validation_policies( revision_id, entries, val_name=None, session=None): session = session or get_session() result = [e.to_dict() for e in entries] result_map = {} for r in result: result_map.setdefault(r['name'], []) result_map[r['name']].append(r) actual_validations = set(v['name'] for v in result) validation_policies = _get_validation_policies_for_revision(revision_id) if not validation_policies: return result # TODO(fmontei): Raise error for expiresAfter conflicts for duplicate # validations across ValidationPolicy documents. expected_validations = set() for vp in validation_policies: expected_validations |= set( v['name'] for v in vp['data'].get('validations', [])) missing_validations = expected_validations - actual_validations extra_validations = actual_validations - expected_validations # If an entry in the ValidationPolicy was never POSTed, set its status # to failure. for missing_name in missing_validations: if val_name is None or missing_name == val_name: result.append({ 'id': len(result), 'name': val_name, 'status': 'failure', 'errors': [{ 'message': 'The result for this validation was never ' 'externally registered so its status defaulted ' 'to "failure".' }] }) break # If an entry is not in the ValidationPolicy but was externally registered, # then override its status to "ignored [{original_status}]". for extra_name in extra_validations: for entry in result_map[extra_name]: original_status = entry['status'] entry['status'] = 'ignored [%s]' % original_status entry.setdefault('errors', []) msg_args = eng_utils.meta(vp) + ( ', '.join(v['name'] for v in vp['data'].get( 'validations', [])), ) for vp in validation_policies: entry['errors'].append({ 'message': ( 'The result for this validation was externally ' 'registered but has been ignored because it is not ' 'found in the validations for ValidationPolicy ' '[%s, %s] %s: %s.' % msg_args ) }) return result
[docs]@require_revision_exists def validation_get_all_entries(revision_id, val_name=None, session=None): session = session or get_session() entries = session.query(models.Validation)\ .filter_by(revision_id=revision_id) if val_name: entries = entries.filter_by(name=val_name) entries.order_by(models.Validation.created_at.asc())\ .all() return _check_validation_entries_against_validation_policies( revision_id, entries, val_name=val_name, session=session)
[docs]@require_revision_exists def validation_get_entry(revision_id, val_name, entry_id, session=None): session = session or get_session() entries = validation_get_all_entries( revision_id, val_name, session=session) try: return entries[entry_id] except IndexError: raise errors.ValidationNotFound( revision_id=revision_id, validation_name=val_name, entry_id=entry_id)