Source code for azureenergylabelercli.azureenergylabelercli

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: azureenergylabelercli.py
#
# Copyright 2022 Sayantan Khanra
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for azureenergylabelercli.

.. _Google Python Style Guide:
   https://google.github.io/styleguide/pyguide.html

"""

import logging
import logging.config
import json
import argparse
import os
import coloredlogs

from yaspin import yaspin
from azureenergylabelerlib import (AzureEnergyLabeler,
                                   ALL_TENANT_EXPORT_TYPES,
                                   ALL_SUBSCRIPTION_EXPORT_DATA,
                                   SUBSCRIPTION_METRIC_EXPORT_TYPES,
                                   TENANT_THRESHOLDS,
                                   SUBSCRIPTION_THRESHOLDS,
                                   RESOURCE_GROUP_THRESHOLDS,
                                   TENANT_METRIC_EXPORT_TYPES)

from .validators import (ValidatePath,
                         azure_subscription_id,
                         get_mutually_exclusive_args)


__author__ = '''Sayantan Khanra <skhanra@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''04-05-2022'''
__copyright__ = '''Copyright 2022, Sayantan Khanra'''
__credits__ = ["Sayantan Khanra"]
__license__ = '''MIT'''
__maintainer__ = '''Sayantan Khanra'''
__email__ = '''<skhanra@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".


# This is the main prefix used for logging
LOGGER_BASENAME = '''azureenergylabelercli'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]def get_arguments(): """ Gets us the cli arguments. Returns the args as parsed from the argsparser. """ # https://docs.python.org/3/library/argparse.html parser = argparse.ArgumentParser(description='''A cli to help generate energy label for Azure tenant, subscriptions and resource groups. ''') parser.add_argument('--log-config', '-l', action='store', dest='logger_config', help='The location of the logging config json file', default=os.environ.get('AZURE_LABELER_LOG_CONFIG', '')) parser.add_argument('--log-level', '-L', help='Provide the log level. Defaults to info.', dest='log_level', action='store', default=os.environ.get('AZURE_LABELER_LOG_LEVEL', 'info'), choices=['debug', 'info', 'warning', 'error', 'critical']) parser.add_argument('--tenant-id', '-tid', dest='tenant_id', type=str, default=os.environ.get('AZURE_LABELER_TENANT_ID'), help='The ID of the Tenant to labeled') single_subscription_action = parser.add_argument('--single-subscription-id', '-s', required=False, dest='single_subscription_id', action='store', type=azure_subscription_id, default=os.environ.get('AZURE_LABELER_SINGLE_SUBSCRIPTION_ID'), help='Run the labeler on a single subscription.') parser.add_argument('--frameworks', '-f', default=os.environ.get('AZURE_LABELER_FRAMEWORKS', ['Microsoft cloud security benchmark']), type=comma_delimited_list, help='The comma delimited list of applicable frameworks: \ ["Microsoft cloud security benchmark", "Azure CIS 1.1.0"], ' 'default=["Microsoft cloud security benchmark"]\n' 'example="Microsoft cloud security benchmark,Azure CIS 1.1.0"') subscription_list = parser.add_mutually_exclusive_group() subscription_list._group_actions.append(single_subscription_action) # pylint: disable=protected-access subscription_list.add_argument('--allowed-subscription-ids', '-a', required=False, default=os.environ.get('AZURE_LABELER_ALLOWED_SUBSCRIPTION_IDS'), type=comma_delimited_list, help=('A comma delimited list of Azure Subscription IDs' ' for which an energy label will be produced. ' 'Mutually exclusive with ' '--denied-subscription-ids and --single-subscription-id arguments.\n' 'example=' '"00000000-0000-0000-0000-000000000000,00000000-0000-0000-0000-000000000001"')) subscription_list.add_argument('--denied-subscription-ids', '-d', required=False, default=os.environ.get('AZURE_LABELER_DENIED_SUBSCRIPTION_IDS'), type=comma_delimited_list, help=('A comma delimited list of Azure Subscription IDs that will ' 'be excluded from producing the energy label. ' 'Mutually exclusive with ' '--allowed-subscription-ids and --single-subscription-id arguments.\n' 'example=' '"00000000-0000-0000-0000-000000000000,00000000-0000-0000-0000-000000000001"')) subscription_list.add_argument('--denied-resource-group-names', '-e', required=False, default=os.environ.get('AZURE_LABELER_DENIED_RESOURCE_GROUP_NAMES'), type=comma_delimited_list, help=('A comma delimited list of Azure resource group names that will ' 'be excluded from producing the energy label.\n' 'example=' '"SBPP-WEU-AARC-01-RSG, SBPA-WEU-AARC-01-RSG"')) parser.add_argument('--export-path', '-p', action=ValidatePath, required=False, default=os.environ.get('AZURE_LABELER_EXPORT_PATH'), help='Exports a snapshot of chosen data in ' 'JSON formatted files to the specified directory or Storage Account Container location.') export_options = parser.add_mutually_exclusive_group() export_options.add_argument('--export-metrics', '-em', action='store_const', dest='export_all', const=False, default=os.environ.get('AZURE_LABELER_EXPORT_METRICS'), help='Exports metrics/statistics along with findings data in ' 'JSON formatted files to the specified directory or ' 'Storage Account Container location.') export_options.add_argument('--export-all', '-ea', action='store_const', dest='export_all', const=True, default=os.environ.get('AZURE_LABELER_EXPORT_ALL', True), help='Exports metrics/statistics without sensitive findings data in ' 'JSON formatted files to the specified directory or ' 'Storage Account Container location.') parser.add_argument('--to-json', '-j', dest='to_json', action='store_true', required=False, default=os.environ.get('AZURE_LABELER_TO_JSON', False), help='Return the report in json format.') parser.add_argument('--disable-spinner', '-ds', action='store_true', default=os.environ.get('AZURE_LABELER_DISABLE_SPINNER', False), help='If set spinner will be disabled on the CLI.') parser.add_argument('--disable-banner', '-db', action='store_true', default=os.environ.get('AZURE_LABELER_DISABLE_BANNER', False), help='If set banner will be disabled on the CLI.') parser.set_defaults(export_all=True) args = parser.parse_args() args.allowed_subscription_ids, args.denied_subscription_ids = get_mutually_exclusive_args( args.allowed_subscription_ids, args.denied_subscription_ids, msg="conflicting arguments: --denied-subscription-ids, --allowed-subscription-ids") args.tenant_id, _ = get_mutually_exclusive_args( args.tenant_id, None, required=True, msg="the following arguments are required: --tenant-id/-tid") return args
[docs]def comma_delimited_list(argument, sep=','): """Takes a str, splits based on character and returns a list.""" return argument.split(sep)
[docs]def setup_logging(level, config_file=None): """ Sets up the logging. Needs the args to get the log level supplied Args: level: At which level do we log config_file: Configuration to use """ # This will configure the logging, if the user has set a config file. # If there's no config file, logging will default to stdout. if config_file: # Get the config for the logger. Of course this needs exception # catching in case the file is not there and everything. Proper IO # handling is not shown here. try: with open(config_file, encoding='utf-8') as conf_file: configuration = json.loads(conf_file.read()) # Configure the logger logging.config.dictConfig(configuration) except ValueError: print(f'File "{config_file}" is not valid json, cannot continue.') raise SystemExit(1) from None else: coloredlogs.install(level=level.upper())
[docs]def wait_for_findings(method_name, method_argument, log_level, disable_spinner=False): """If log level is not debug shows a spinner while the callable provided gets security hub findings. Args: method_name: The method to execute while waiting. method_argument: The argument to pass to the method. log_level: The log level as set by the user. disable_spinner: The spinner will be disabled while retrieving the findings. Returns: findings: A list of defender for cloud findings as retrieved by the callable. """ try: if all([log_level != 'debug', not disable_spinner]): with yaspin(text="Please wait while retrieving Defender For Cloud findings...", color="yellow") as spinner: findings = method_name(method_argument) spinner.ok("✅") else: findings = method_name(method_argument) except Exception as msg: LOGGER.error(msg) raise SystemExit(1) from None return findings
[docs]def get_tenant_reporting_data(tenant_id, # pylint: disable=too-many-arguments allowed_subscription_ids, denied_subscription_ids, denied_resource_group_names, export_all_data_flag, frameworks, log_level, disable_spinner): """Gets the reporting data for a landing zone. Args: tenant_id: Tenant Id of the tenant allowed_subscription_ids: The allowed subscription ids for tenant inclusion if any. denied_subscription_ids: The denied subscription ids for tenant zone exclusion if any. denied_resource_group_names: List of resource groups to exclude if any. export_all_data_flag: If set all data is going to be exported, else only basic reporting. frameworks: The frameworks to include in scoring. log_level: The log level set. disable_spinner: The spinner will be disabled while retrieving the findings. Returns: report_data, exporter_arguments """ labeler = AzureEnergyLabeler(tenant_id=tenant_id, tenant_thresholds=TENANT_THRESHOLDS, resource_group_thresholds=RESOURCE_GROUP_THRESHOLDS, subscription_thresholds=SUBSCRIPTION_THRESHOLDS, frameworks=frameworks, allowed_subscription_ids=allowed_subscription_ids, denied_subscription_ids=denied_subscription_ids, denied_resource_group_names=denied_resource_group_names) wait_for_findings(AzureEnergyLabeler.filtered_defender_for_cloud_findings.fget, labeler, log_level, disable_spinner=disable_spinner) report_data = [['Tenant ID:', tenant_id], ['Tenant Security Score:', labeler.tenant_energy_label.label], ['Tenant Percentage Coverage:', labeler.tenant_energy_label.coverage], ['Labeled Subscriptions Measured:', labeler.labeled_subscriptions_energy_label.subscriptions_measured]] if labeler.tenant_energy_label.best_label != labeler.tenant_energy_label.worst_label: report_data.extend([['Best Subscription Security Score:', labeler.tenant_energy_label.best_label], ['Worst Subscription Security Score:', labeler.tenant_energy_label.worst_label]]) export_types = ALL_TENANT_EXPORT_TYPES if export_all_data_flag else TENANT_METRIC_EXPORT_TYPES exporter_arguments = {'export_types': export_types, 'id': tenant_id, 'energy_label': labeler.tenant_energy_label.label, 'defender_for_cloud_findings': labeler.filtered_defender_for_cloud_findings, 'labeled_subscriptions': labeler.tenant_labeled_subscriptions, 'credentials': labeler.tenant_credentials} return report_data, exporter_arguments
[docs]def get_subscription_reporting_data( # pylint: disable=too-many-arguments,too-many-locals tenant_id, subscription_id, export_all_data_flag, frameworks, log_level, disable_spinner): """Gets the reporting data for a single account. Args: tenant_id: Tenant Id of the tenant subscription_id: The ID of the subscription to get reporting on. export_all_data_flag: If set all data is going to be exported, else only basic reporting. frameworks: The frameworks to include in scoring. log_level: The log level set. disable_spinner: The spinner will be disabled while retrieving the findings. Returns: report_data, exporter_arguments """ _allowed_subscription_ids = [] _allowed_subscription_ids.append(subscription_id) labeler = AzureEnergyLabeler(tenant_id=tenant_id, tenant_thresholds=TENANT_THRESHOLDS, resource_group_thresholds=RESOURCE_GROUP_THRESHOLDS, subscription_thresholds=SUBSCRIPTION_THRESHOLDS, frameworks=frameworks, allowed_subscription_ids=_allowed_subscription_ids) tenant = labeler.tenant defender_for_cloud_findings = wait_for_findings(AzureEnergyLabeler.filtered_defender_for_cloud_findings.fget, labeler, log_level, disable_spinner=disable_spinner) filtered_findings = [finding for finding in defender_for_cloud_findings if finding.subscription_id == subscription_id] subscription = next( subscription for subscription in tenant.subscriptions if subscription.subscription_id == subscription_id) energy_label = subscription.get_energy_label(defender_for_cloud_findings) report_data = [['Subscription ID:', subscription.subscription_id], ['Subscription Security Score:', energy_label.label], ['Number Of High Findings:', energy_label.number_of_high_findings], ['Number Of Medium Findings:', energy_label.number_of_medium_findings], ['Number Of Low Findings:', energy_label.number_of_low_findings], ['Max Days Open:', energy_label.max_days_open]] if subscription.display_name: report_data.insert(0, ['Subscription Display Name:', subscription.display_name]) export_types = ALL_SUBSCRIPTION_EXPORT_DATA if export_all_data_flag else SUBSCRIPTION_METRIC_EXPORT_TYPES exporter_arguments = {'export_types': export_types, 'id': subscription.subscription_id, 'energy_label': energy_label.label, 'defender_for_cloud_findings': filtered_findings, 'labeled_subscriptions': [subscription], 'credentials': labeler.tenant_credentials} return report_data, exporter_arguments