Private
Server IP : 195.201.23.43  /  Your IP : 3.145.85.212
Web Server : Apache
System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64
User : kdecoratie ( 1041)
PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/awscli/customizations/cloudformation/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/awscli/customizations/cloudformation/artifact_exporter.py
# Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import logging
import os
import tempfile
import zipfile
import contextlib
import uuid
import shutil
from awscli.compat import six
from botocore.utils import set_value_from_jmespath

from awscli.compat import urlparse
from contextlib import contextmanager
from awscli.customizations.cloudformation import exceptions
from awscli.customizations.cloudformation.yamlhelper import yaml_dump, \
    yaml_parse
import jmespath


LOG = logging.getLogger(__name__)


def is_path_value_valid(path):
    return isinstance(path, six.string_types)


def make_abs_path(directory, path):
    if is_path_value_valid(path) and not os.path.isabs(path):
        return os.path.normpath(os.path.join(directory, path))
    else:
        return path


def is_s3_url(url):
    try:
        parse_s3_url(url)
        return True
    except ValueError:
        return False


def is_local_folder(path):
    return is_path_value_valid(path) and os.path.isdir(path)


def is_local_file(path):
    return is_path_value_valid(path) and os.path.isfile(path)


def is_zip_file(path):
    return (
        is_path_value_valid(path) and
        zipfile.is_zipfile(path))


def parse_s3_url(url,
                 bucket_name_property="Bucket",
                 object_key_property="Key",
                 version_property=None):

    if isinstance(url, six.string_types) \
            and url.startswith("s3://"):

        # Python < 2.7.10 don't parse query parameters from URI with custom
        # scheme such as s3://blah/blah. As a workaround, remove scheme
        # altogether to trigger the parser "s3://foo/bar?v=1" =>"//foo/bar?v=1"
        parsed = urlparse.urlparse(url[3:])
        query = urlparse.parse_qs(parsed.query)

        if parsed.netloc and parsed.path:
            result = dict()
            result[bucket_name_property] = parsed.netloc
            result[object_key_property] = parsed.path.lstrip('/')

            # If there is a query string that has a single versionId field,
            # set the object version and return
            if version_property is not None \
                    and 'versionId' in query \
                    and len(query['versionId']) == 1:
                result[version_property] = query['versionId'][0]

            return result

    raise ValueError("URL given to the parse method is not a valid S3 url "
                     "{0}".format(url))


def upload_local_artifacts(resource_id, resource_dict, property_name,
                           parent_dir, uploader):
    """
    Upload local artifacts referenced by the property at given resource and
    return S3 URL of the uploaded object. It is the responsibility of callers
    to ensure property value is a valid string

    If path refers to a file, this method will upload the file. If path refers
    to a folder, this method will zip the folder and upload the zip to S3.
    If path is omitted, this method will zip the current working folder and
    upload.

    If path is already a path to S3 object, this method does nothing.

    :param resource_id:     Id of the CloudFormation resource
    :param resource_dict:   Dictionary containing resource definition
    :param property_name:   Property name of CloudFormation resource where this
                            local path is present
    :param parent_dir:      Resolve all relative paths with respect to this
                            directory
    :param uploader:        Method to upload files to S3

    :return:                S3 URL of the uploaded object
    :raise:                 ValueError if path is not a S3 URL or a local path
    """

    local_path = jmespath.search(property_name, resource_dict)

    if local_path is None:
        # Build the root directory and upload to S3
        local_path = parent_dir

    if is_s3_url(local_path):
        # A valid CloudFormation template will specify artifacts as S3 URLs.
        # This check is supporting the case where your resource does not
        # refer to local artifacts
        # Nothing to do if property value is an S3 URL
        LOG.debug("Property {0} of {1} is already a S3 URL"
                  .format(property_name, resource_id))
        return local_path

    local_path = make_abs_path(parent_dir, local_path)

    # Or, pointing to a folder. Zip the folder and upload
    if is_local_folder(local_path):
        return zip_and_upload(local_path, uploader)

    # Path could be pointing to a file. Upload the file
    elif is_local_file(local_path):
        return uploader.upload_with_dedup(local_path)

    raise exceptions.InvalidLocalPathError(
            resource_id=resource_id,
            property_name=property_name,
            local_path=local_path)


def zip_and_upload(local_path, uploader):
    with zip_folder(local_path) as zipfile:
            return uploader.upload_with_dedup(zipfile)


@contextmanager
def zip_folder(folder_path):
    """
    Zip the entire folder and return a file to the zip. Use this inside
    a "with" statement to cleanup the zipfile after it is used.

    :param folder_path:
    :return: Name of the zipfile
    """

    filename = os.path.join(
        tempfile.gettempdir(), "data-" + uuid.uuid4().hex)

    zipfile_name = make_zip(filename, folder_path)
    try:
        yield zipfile_name
    finally:
        if os.path.exists(zipfile_name):
            os.remove(zipfile_name)


def make_zip(filename, source_root):
    zipfile_name = "{0}.zip".format(filename)
    source_root = os.path.abspath(source_root)
    with open(zipfile_name, 'wb') as f:
        zip_file = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
        with contextlib.closing(zip_file) as zf:
            for root, dirs, files in os.walk(source_root, followlinks=True):
                for filename in files:
                    full_path = os.path.join(root, filename)
                    relative_path = os.path.relpath(
                        full_path, source_root)
                    zf.write(full_path, relative_path)

    return zipfile_name


@contextmanager
def mktempfile():
    directory = tempfile.gettempdir()
    filename = os.path.join(directory, uuid.uuid4().hex)

    try:
        with open(filename, "w+") as handle:
            yield handle
    finally:
        if os.path.exists(filename):
            os.remove(filename)


def copy_to_temp_dir(filepath):
    tmp_dir = tempfile.mkdtemp()
    dst = os.path.join(tmp_dir, os.path.basename(filepath))
    shutil.copyfile(filepath, dst)
    return tmp_dir


class Resource(object):
    """
    Base class representing a CloudFormation resource that can be exported
    """

    RESOURCE_TYPE = None
    PROPERTY_NAME = None
    PACKAGE_NULL_PROPERTY = True
    # Set this property to True in base class if you want the exporter to zip
    # up the file before uploading This is useful for Lambda functions.
    FORCE_ZIP = False

    def __init__(self, uploader):
        self.uploader = uploader

    def export(self, resource_id, resource_dict, parent_dir):
        if resource_dict is None:
            return

        property_value = jmespath.search(self.PROPERTY_NAME, resource_dict)

        if not property_value and not self.PACKAGE_NULL_PROPERTY:
            return

        if isinstance(property_value, dict):
            LOG.debug("Property {0} of {1} resource is not a URL"
                      .format(self.PROPERTY_NAME, resource_id))
            return

        # If property is a file but not a zip file, place file in temp
        # folder and send the temp folder to be zipped
        temp_dir = None
        if is_local_file(property_value) and not \
                is_zip_file(property_value) and self.FORCE_ZIP:
            temp_dir = copy_to_temp_dir(property_value)
            set_value_from_jmespath(resource_dict, self.PROPERTY_NAME, temp_dir)

        try:
            self.do_export(resource_id, resource_dict, parent_dir)

        except Exception as ex:
            LOG.debug("Unable to export", exc_info=ex)
            raise exceptions.ExportFailedError(
                    resource_id=resource_id,
                    property_name=self.PROPERTY_NAME,
                    property_value=property_value,
                    ex=ex)
        finally:
            if temp_dir:
                shutil.rmtree(temp_dir)

    def do_export(self, resource_id, resource_dict, parent_dir):
        """
        Default export action is to upload artifacts and set the property to
        S3 URL of the uploaded object
        """
        uploaded_url = upload_local_artifacts(resource_id, resource_dict,
                                   self.PROPERTY_NAME,
                                   parent_dir, self.uploader)
        set_value_from_jmespath(resource_dict, self.PROPERTY_NAME, uploaded_url)


class ResourceWithS3UrlDict(Resource):
    """
    Represents CloudFormation resources that need the S3 URL to be specified as
    an dict like {Bucket: "", Key: "", Version: ""}
    """

    BUCKET_NAME_PROPERTY = None
    OBJECT_KEY_PROPERTY = None
    VERSION_PROPERTY = None

    def __init__(self, uploader):
        super(ResourceWithS3UrlDict, self).__init__(uploader)

    def do_export(self, resource_id, resource_dict, parent_dir):
        """
        Upload to S3 and set property to an dict representing the S3 url
        of the uploaded object
        """

        artifact_s3_url = \
            upload_local_artifacts(resource_id, resource_dict,
                                   self.PROPERTY_NAME,
                                   parent_dir, self.uploader)

        parsed_url = parse_s3_url(
                artifact_s3_url,
                bucket_name_property=self.BUCKET_NAME_PROPERTY,
                object_key_property=self.OBJECT_KEY_PROPERTY,
                version_property=self.VERSION_PROPERTY)
        set_value_from_jmespath(resource_dict, self.PROPERTY_NAME, parsed_url)


class ServerlessFunctionResource(Resource):
    RESOURCE_TYPE = "AWS::Serverless::Function"
    PROPERTY_NAME = "CodeUri"
    FORCE_ZIP = True


class ServerlessApiResource(Resource):
    RESOURCE_TYPE = "AWS::Serverless::Api"
    PROPERTY_NAME = "DefinitionUri"
    # Don't package the directory if DefinitionUri is omitted.
    # Necessary to support DefinitionBody
    PACKAGE_NULL_PROPERTY = False


class GraphQLSchemaResource(Resource):
    RESOURCE_TYPE = "AWS::AppSync::GraphQLSchema"
    PROPERTY_NAME = "DefinitionS3Location"
    # Don't package the directory if DefinitionS3Location is omitted.
    # Necessary to support Definition
    PACKAGE_NULL_PROPERTY = False


class AppSyncResolverRequestTemplateResource(Resource):
    RESOURCE_TYPE = "AWS::AppSync::Resolver"
    PROPERTY_NAME = "RequestMappingTemplateS3Location"
    # Don't package the directory if RequestMappingTemplateS3Location is omitted.
    # Necessary to support RequestMappingTemplate
    PACKAGE_NULL_PROPERTY = False


class AppSyncResolverResponseTemplateResource(Resource):
    RESOURCE_TYPE = "AWS::AppSync::Resolver"
    PROPERTY_NAME = "ResponseMappingTemplateS3Location"
    # Don't package the directory if ResponseMappingTemplateS3Location is omitted.
    # Necessary to support ResponseMappingTemplate
    PACKAGE_NULL_PROPERTY = False


class AppSyncFunctionConfigurationRequestTemplateResource(Resource):
    RESOURCE_TYPE = "AWS::AppSync::FunctionConfiguration"
    PROPERTY_NAME = "RequestMappingTemplateS3Location"
    # Don't package the directory if RequestMappingTemplateS3Location is omitted.
    # Necessary to support RequestMappingTemplate
    PACKAGE_NULL_PROPERTY = False


class AppSyncFunctionConfigurationResponseTemplateResource(Resource):
    RESOURCE_TYPE = "AWS::AppSync::FunctionConfiguration"
    PROPERTY_NAME = "ResponseMappingTemplateS3Location"
    # Don't package the directory if ResponseMappingTemplateS3Location is omitted.
    # Necessary to support ResponseMappingTemplate
    PACKAGE_NULL_PROPERTY = False


class LambdaFunctionResource(ResourceWithS3UrlDict):
    RESOURCE_TYPE = "AWS::Lambda::Function"
    PROPERTY_NAME = "Code"
    BUCKET_NAME_PROPERTY = "S3Bucket"
    OBJECT_KEY_PROPERTY = "S3Key"
    VERSION_PROPERTY = "S3ObjectVersion"
    FORCE_ZIP = True


class ApiGatewayRestApiResource(ResourceWithS3UrlDict):
    RESOURCE_TYPE = "AWS::ApiGateway::RestApi"
    PROPERTY_NAME = "BodyS3Location"
    PACKAGE_NULL_PROPERTY = False
    BUCKET_NAME_PROPERTY = "Bucket"
    OBJECT_KEY_PROPERTY = "Key"
    VERSION_PROPERTY = "Version"


class ElasticBeanstalkApplicationVersion(ResourceWithS3UrlDict):
    RESOURCE_TYPE = "AWS::ElasticBeanstalk::ApplicationVersion"
    PROPERTY_NAME = "SourceBundle"
    BUCKET_NAME_PROPERTY = "S3Bucket"
    OBJECT_KEY_PROPERTY = "S3Key"
    VERSION_PROPERTY = None


class LambdaLayerVersionResource(ResourceWithS3UrlDict):
    RESOURCE_TYPE = "AWS::Lambda::LayerVersion"
    PROPERTY_NAME = "Content"
    BUCKET_NAME_PROPERTY = "S3Bucket"
    OBJECT_KEY_PROPERTY = "S3Key"
    VERSION_PROPERTY = "S3ObjectVersion"
    FORCE_ZIP = True


class ServerlessLayerVersionResource(Resource):
    RESOURCE_TYPE = "AWS::Serverless::LayerVersion"
    PROPERTY_NAME = "ContentUri"
    FORCE_ZIP = True


class ServerlessRepoApplicationReadme(Resource):
    RESOURCE_TYPE = "AWS::ServerlessRepo::Application"
    PROPERTY_NAME = "ReadmeUrl"
    PACKAGE_NULL_PROPERTY = False


class ServerlessRepoApplicationLicense(Resource):
    RESOURCE_TYPE = "AWS::ServerlessRepo::Application"
    PROPERTY_NAME = "LicenseUrl"
    PACKAGE_NULL_PROPERTY = False


class CloudFormationStackResource(Resource):
    """
    Represents CloudFormation::Stack resource that can refer to a nested
    stack template via TemplateURL property.
    """
    RESOURCE_TYPE = "AWS::CloudFormation::Stack"
    PROPERTY_NAME = "TemplateURL"

    def __init__(self, uploader):
        super(CloudFormationStackResource, self).__init__(uploader)

    def do_export(self, resource_id, resource_dict, parent_dir):
        """
        If the nested stack template is valid, this method will
        export on the nested template, upload the exported template to S3
        and set property to URL of the uploaded S3 template
        """

        template_path = resource_dict.get(self.PROPERTY_NAME, None)

        if template_path is None or is_s3_url(template_path) or \
                template_path.startswith("http://") or \
                template_path.startswith("https://"):
            # Nothing to do
            return

        abs_template_path = make_abs_path(parent_dir, template_path)
        if not is_local_file(abs_template_path):
            raise exceptions.InvalidTemplateUrlParameterError(
                    property_name=self.PROPERTY_NAME,
                    resource_id=resource_id,
                    template_path=abs_template_path)

        exported_template_dict = \
            Template(template_path, parent_dir, self.uploader).export()

        exported_template_str = yaml_dump(exported_template_dict)

        with mktempfile() as temporary_file:
            temporary_file.write(exported_template_str)
            temporary_file.flush()

            url = self.uploader.upload_with_dedup(
                    temporary_file.name, "template")

            # TemplateUrl property requires S3 URL to be in path-style format
            parts = parse_s3_url(url, version_property="Version")
            s3_path_url = self.uploader.to_path_style_s3_url(
                    parts["Key"], parts.get("Version", None))
            set_value_from_jmespath(resource_dict, self.PROPERTY_NAME, s3_path_url)


class ServerlessApplicationResource(CloudFormationStackResource):
    """
    Represents Serverless::Application resource that can refer to a nested
    app template via Location property.
    """
    RESOURCE_TYPE = "AWS::Serverless::Application"
    PROPERTY_NAME = "Location"



class GlueJobCommandScriptLocationResource(Resource):
    """
    Represents Glue::Job resource.
    """
    RESOURCE_TYPE = "AWS::Glue::Job"
    # Note the PROPERTY_NAME includes a '.' implying it's nested.
    PROPERTY_NAME = "Command.ScriptLocation"


RESOURCES_EXPORT_LIST = [
    ServerlessFunctionResource,
    ServerlessApiResource,
    GraphQLSchemaResource,
    AppSyncResolverRequestTemplateResource,
    AppSyncResolverResponseTemplateResource,
    AppSyncFunctionConfigurationRequestTemplateResource,
    AppSyncFunctionConfigurationResponseTemplateResource,
    ApiGatewayRestApiResource,
    LambdaFunctionResource,
    ElasticBeanstalkApplicationVersion,
    CloudFormationStackResource,
    ServerlessApplicationResource,
    ServerlessLayerVersionResource,
    LambdaLayerVersionResource,
    GlueJobCommandScriptLocationResource,
]

METADATA_EXPORT_LIST = [
    ServerlessRepoApplicationReadme,
    ServerlessRepoApplicationLicense
]


def include_transform_export_handler(template_dict, uploader, parent_dir):
    if template_dict.get("Name", None) != "AWS::Include":
        return template_dict

    include_location = template_dict.get("Parameters", {}).get("Location", None)
    if not include_location \
            or not is_path_value_valid(include_location) \
            or is_s3_url(include_location):
        # `include_location` is either empty, or not a string, or an S3 URI
        return template_dict

    # We are confident at this point that `include_location` is a string containing the local path
    abs_include_location = os.path.join(parent_dir, include_location)
    if is_local_file(abs_include_location):
        template_dict["Parameters"]["Location"] = uploader.upload_with_dedup(abs_include_location)
    else:
        raise exceptions.InvalidLocalPathError(
            resource_id="AWS::Include",
            property_name="Location",
            local_path=abs_include_location)

    return template_dict


GLOBAL_EXPORT_DICT = {
    "Fn::Transform": include_transform_export_handler
}


class Template(object):
    """
    Class to export a CloudFormation template
    """

    def __init__(self, template_path, parent_dir, uploader,
                 resources_to_export=RESOURCES_EXPORT_LIST,
                 metadata_to_export=METADATA_EXPORT_LIST):
        """
        Reads the template and makes it ready for export
        """

        if not (is_local_folder(parent_dir) and os.path.isabs(parent_dir)):
            raise ValueError("parent_dir parameter must be "
                             "an absolute path to a folder {0}"
                             .format(parent_dir))

        abs_template_path = make_abs_path(parent_dir, template_path)
        template_dir = os.path.dirname(abs_template_path)

        with open(abs_template_path, "r") as handle:
            template_str = handle.read()

        self.template_dict = yaml_parse(template_str)
        self.template_dir = template_dir
        self.resources_to_export = resources_to_export
        self.metadata_to_export = metadata_to_export
        self.uploader = uploader

    def export_global_artifacts(self, template_dict):
        """
        Template params such as AWS::Include transforms are not specific to
        any resource type but contain artifacts that should be exported,
        here we iterate through the template dict and export params with a
        handler defined in GLOBAL_EXPORT_DICT
        """
        for key, val in template_dict.items():
            if key in GLOBAL_EXPORT_DICT:
                template_dict[key] = GLOBAL_EXPORT_DICT[key](val, self.uploader, self.template_dir)
            elif isinstance(val, dict):
                self.export_global_artifacts(val)
            elif isinstance(val, list):
                for item in val:
                    if isinstance(item, dict):
                        self.export_global_artifacts(item)
        return template_dict

    def export_metadata(self, template_dict):
        """
        Exports the local artifacts referenced by the metadata section in
        the given template to an s3 bucket.

        :return: The template with references to artifacts that have been
        exported to s3.
        """
        if "Metadata" not in template_dict:
            return template_dict

        for metadata_type, metadata_dict in template_dict["Metadata"].items():
            for exporter_class in self.metadata_to_export:
                if exporter_class.RESOURCE_TYPE != metadata_type:
                    continue

                exporter = exporter_class(self.uploader)
                exporter.export(metadata_type, metadata_dict, self.template_dir)

        return template_dict

    def export(self):
        """
        Exports the local artifacts referenced by the given template to an
        s3 bucket.

        :return: The template with references to artifacts that have been
        exported to s3.
        """
        self.template_dict = self.export_metadata(self.template_dict)

        if "Resources" not in self.template_dict:
            return self.template_dict

        self.template_dict = self.export_global_artifacts(self.template_dict)

        for resource_id, resource in self.template_dict["Resources"].items():

            resource_type = resource.get("Type", None)
            resource_dict = resource.get("Properties", None)

            for exporter_class in self.resources_to_export:
                if exporter_class.RESOURCE_TYPE != resource_type:
                    continue

                # Export code resources
                exporter = exporter_class(self.uploader)
                exporter.export(resource_id, resource_dict, self.template_dir)

        return self.template_dict
Private