#!/usr/bin/env python3 """ This is designed to be run to manage SSH keys in AWS accounts used for EC2 instances. It will handle key generation, AWS registering (uploading), and rotating of the keys used on EC2 instances. The rotation process would on the creation of a new key identify all instances using the older key(s) and remove the public key values from the ~ec2-user/.ssh/authorized_keys file leaving only the public key value for the new key. """ import argparse import logging import boto3 import time import os from cryptography.hazmat.primitives import serialization as crypto_serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend as crypto_default_backend # setting up logging for this script _LEVEL = logging.INFO _FORMAT = "%(asctime)-15s [%(levelname)-8s] : %(lineno)d : %(name)s.%(funcName)s : %(message)s" logging.basicConfig(format=_FORMAT, level=_LEVEL) log = logging.getLogger() # set the boto logging levels to WARNING logging.getLogger('botocore').setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING) # create a custom exception class RotateKeyError(Exception): pass def parse_args(): """ Parse the arguments passed """ argp = argparse.ArgumentParser() argp.add_argument( 'action', choices=[ 'generate', 'add-only', 'highlander', 'lockdown', 'rotate', 'order-66', 'remove-only', 'delete-only', ], help="Action to perform: generate, add-only will create a new key and " "register with AWS. highlander, lockdown, rotate all will create " "a new key, register with AWS, and remove the old key from AWS " "and all the instances so that the key is rendered unusable. " "remove-only, delete-only, order-66 will remove the key specified " "by the --removal-key-name without generating a new key." ) argp.add_argument( '--debug', action='store_true', help="Run in debug mode" ) account_access_group = argp.add_mutually_exclusive_group(required=True) account_access_group.add_argument( '-p', '--profile', help="AWS profile name" ) account_access_group.add_argument( '-r', '--role-arn', help="Role ARN with key access to the AWS account" ) argp.add_argument( '-s', '--key-size', type=int, default=2048, help="Key size in bytes (default: 2048)" ) argp.add_argument( '-n', '--key-name-prefix', default="", help="String to use for the new key name and searching for existing keys" ) argp.add_argument( '--removal-key-name', default="", help="Specific full name of the key to remove (must match exactly). " "This is ignored when action is not remove-only, delete-only, " "or order-66." ) return argp.parse_args() def get_session(profile_name=None, role_arn=None, region_name='us-east-1'): """ Using either an AWS profile name or a role ARN, this will return a valid session object. If neither are provided, it will default to built-in behavoir to find credentials for establishing a valid session. Region can be chosen as well. Args: profile_name: Name of an AWS profile in a local ~/.aws/credentials file role_arn: Full ARN of the role to establish the session region_name: Override for choosing a different region Returns: A session object to access AWS client and resource objects. """ if profile_name: session = boto3.session.Session(profile_name=profile_name, region_name=region_name) elif role_arn: sts = boto3.client('sts') role_creds = sts.assume_role(RoleArn=role_arn, RoleSessionName='params-patching')['Credentials'] session = boto3.session.Session( aws_access_key_id=role_creds['AccessKeyId'], aws_secret_access_key=role_creds['SecretAccessKey'], aws_session_token=role_creds['SessionToken'], region_name=region_name ) else: session = boto3.session.Session(region_name=region_name) return session def generate_ssh_keypair(key_size=2048, public_exponent=65537): """ This will generate RSA SSH keys based on the key size passed to the function defaulting to 2k sizes. The public exponent should remain as the default of 65537 based on the first answer to the Stack Exchange first answer in the link below and the understanding of Fermat primes (in the second link). A smaller value of the public exponent could be used but is not recommended. Do not adjust without understanding the effects on the key's security. - https://security.stackexchange.com/questions/2335/should-rsa-public-exponent-be-only-in-3-5-17-257-or-65537-due-to-security-c - https://en.wikipedia.org/wiki/Fermat_number#Primality_of_Fermat_numbers Args: key_size: Integer in bytes for size of the key public_exponent: Integer value to seed the creation of the key Returns: The returning value is a tuple of the public and private keys in byte form, respectively. """ log.info("Generating a new key pair") log.debug(f"key_size: {key_size}, public_exponent: {public_exponent}") key = rsa.generate_private_key( backend=crypto_default_backend(), public_exponent=public_exponent, key_size=key_size ) log.debug(f"key = {key}") private_key = key.private_bytes( crypto_serialization.Encoding.PEM, crypto_serialization.PrivateFormat.PKCS8, crypto_serialization.NoEncryption() ) log.debug(f"private_key = {private_key}") public_key = key.public_key().public_bytes( crypto_serialization.Encoding.OpenSSH, crypto_serialization.PublicFormat.OpenSSH ) log.debug(f"public_key = {public_key}") return (public_key, private_key) def get_existing_keypairs(session, prefix=""): """ Get the existing keypairs with the optional filter for a specific prefix of the key name Args: session: An AWS session object to establish access to the AWS account prefix: String value as prefix to the name of the key Returns: List of dicts of keypairs matching the prefix """ log.debug("Getting session client for EC2") ec2_client = session.client('ec2') log.info("Getting existing keypairs") existing_keypairs = ec2_client.describe_key_pairs()['KeyPairs'] if prefix: log.info("Filtering out relevant keypairs") matching_keypairs = [] for keypair in existing_keypairs: if keypair['KeyName'].startswith(prefix): matching_keypairs.append(keypair) else: matching_keypairs = existing_keypairs return existing_keypairs def upload_key(session, key_name, public_key): """ Will upload the public key to the AWS account with the provided name. Args: session: An AWS session object to establish access to the AWS account key_name: Name for the keypair public_key: Public key of the new keypair in bytes Returns: Key fingerprint if successful """ fingerprint = "" log.debug("Getting session client for EC2") ec2_client = session.client('ec2') try: log.info("Uploading the key") response = ec2_client.import_key_pair(KeyName=key_name, PublicKeyMaterial=public_key) fingerprint = response['KeyFingerprint'] log.info(f"Key fingerprint: {fingerprint}") except Exception as error: log.error(f"Failed to upload key: {error}") return fingerprint def remove_key(session, key_name): """ Will remove the key to the AWS account with the provided name. Args: session: An AWS session object to establish access to the AWS account key_name: Name for the keypair Returns: True if successful, False if unsuccessful """ return_value = False log.debug("Getting session client for EC2") ec2_client = session.client('ec2') try: log.info("Removing the key") response = ec2_client.delete_key_pair(KeyName=key_name) log.info(f"Key {key_name} successfully removed") return_value = True except Exception as error: log.error(f"Failed to upload key: {error}") return return_value def main(): args = parse_args() if args.debug: log.setLevel(logging.DEBUG) # let's keep the boto logging level sane logging.getLogger('botocore').setLevel(logging.WARNING) logging.getLogger('boto3').setLevel(logging.WARNING) # get the session object to be used for all AWS access session = get_session(profile_name=args.profile, role_arn=args.role_arn) if ['delete-only', 'remove-only', 'order-66'] in args.action: # handle specific key removal if args.removal_key_name): remove_key(session, args.removal_key_name) else: raise RotateKeyException(f"--removal_key_name must be provided with {args.action}") else: # handle the new key creation log.info("Beginnging to generate new SSH key") # create the new key pair in memory public_key, private_key = generate_ssh_keypair(args.key_size) # get epoch of UTC time for the extension to make the name unique epoch_time = time.strftime("%s", time.gmtime()) key_name = f"{args.key_name_prefix}-{epoch_time}" log.debug(f"key_name = {key_name}") # write the key values to files log.info(f"Exporting the public key to {key_name}.pub") with open(f"{key_name}.pub", 'w') as fp: fp.write(public_key.decode('utf-8')) log.info(f"Exporting the private key to file {key_name}") with open(key_name, 'w') as fp: fp.write(private_key.decode('utf-8')) log.debug("Setting permissions on private key file") os.chmod(key_name, 0o600) # this list is for rotating the older keys out of circulation existing_keypairs = get_existing_keypairs(session, args.key_name_prefix) # upload the new keypair to AWS account fingerprint = upload_key(session, key_name, public_key) # handle the clean up of the other keys if ['lockdown', 'highlander', 'rotate'] in args.action: log.info("Beginning key clean up phase") log.info("Complete") if __name__ == "__main__": main()