aws-utils/rotate-ssh-keys

408 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
This is designed to be run to manage SSH keys in AWS accounts
used for EC2 instances. It will handle key generation, AWS
registering (uploading), and rotating of the keys used on
EC2 instances.
The rotation process would on the creation of a new key identify
all instances using the older key(s) and remove the public key
values from the ~ec2-user/.ssh/authorized_keys file leaving only
the public key value for the new key.
The epoch format of the time the keys are created make up the new
key name. This all works when considering the latest key is the
active key. Older keys will be removed when cleaning up keys.
"""
import argparse
import logging
import boto3
import time
import os
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
from paramiko import SSHClient, AutoAddPolicy
# setting up logging for this script
_LEVEL = logging.INFO
_FORMAT = "%(asctime)-15s [%(levelname)-8s] : %(lineno)d : %(name)s.%(funcName)s : %(message)s"
logging.basicConfig(format=_FORMAT, level=_LEVEL)
log = logging.getLogger()
# set the boto logging levels to WARNING
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
# create a custom exception
class RotateKeyError(Exception):
pass
def parse_args():
"""
Parse the arguments passed
"""
argp = argparse.ArgumentParser()
argp.add_argument(
'action',
choices=[
'generate',
'add-only',
'highlander',
'lockdown',
'rotate',
'order-66',
'remove-only',
'delete-only',
],
help="Action to perform: generate, add-only will create a new key and "
"register with AWS. highlander, lockdown, rotate all will create "
"a new key, register with AWS, and remove the old key from AWS "
"and all the instances so that the key is rendered unusable. "
"remove-only, delete-only, order-66 will remove the key specified "
"by the --removal-key-name without generating a new key."
)
argp.add_argument(
'--debug',
action='store_true',
help="Run in debug mode"
)
account_access_group = argp.add_mutually_exclusive_group(required=True)
account_access_group.add_argument(
'-p', '--profile',
help="AWS profile name"
)
account_access_group.add_argument(
'-r', '--role-arn',
help="Role ARN with key access to the AWS account"
)
argp.add_argument(
'-s', '--key-size',
type=int,
default=2048,
help="Key size in bytes (default: 2048)"
)
argp.add_argument(
'-n', '--key-name-prefix',
default="",
help="String to use for the new key name and searching for existing keys"
)
argp.add_argument(
'--removal-key-name',
default="",
help="Specific full name of the key to remove (must match exactly). "
"This is ignored when action is not remove-only, delete-only, "
"or order-66."
)
return argp.parse_args()
def get_session(profile_name=None, role_arn=None, region_name='us-east-1'):
"""
Using either an AWS profile name or a role ARN, this will return a valid session
object. If neither are provided, it will default to built-in behavoir to find
credentials for establishing a valid session. Region can be chosen as well.
Args:
profile_name: Name of an AWS profile in a local ~/.aws/credentials file
role_arn: Full ARN of the role to establish the session
region_name: Override for choosing a different region
Returns:
A session object to access AWS client and resource objects.
"""
if profile_name:
session = boto3.session.Session(profile_name=profile_name, region_name=region_name)
elif role_arn:
sts = boto3.client('sts')
role_creds = sts.assume_role(RoleArn=role_arn, RoleSessionName='params-patching')['Credentials']
session = boto3.session.Session(
aws_access_key_id=role_creds['AccessKeyId'],
aws_secret_access_key=role_creds['SecretAccessKey'],
aws_session_token=role_creds['SessionToken'],
region_name=region_name
)
else:
session = boto3.session.Session(region_name=region_name)
return session
def generate_ssh_keypair(key_size=2048, public_exponent=65537):
"""
This will generate RSA SSH keys based on the key size
passed to the function defaulting to 2k sizes.
The public exponent should remain as the default of
65537 based on the first answer to the Stack Exchange
first answer in the link below and the understanding
of Fermat primes (in the second link). A smaller value
of the public exponent could be used but is not
recommended. Do not adjust without understanding the
effects on the key's security.
- https://security.stackexchange.com/questions/2335/should-rsa-public-exponent-be-only-in-3-5-17-257-or-65537-due-to-security-c
- https://en.wikipedia.org/wiki/Fermat_number#Primality_of_Fermat_numbers
Args:
key_size: Integer in bytes for size of the key
public_exponent: Integer value to seed the creation of the key
Returns:
The returning value is a tuple of the public and private
keys in byte form, respectively.
"""
log.info("Generating a new key pair")
log.debug(f"key_size: {key_size}, public_exponent: {public_exponent}")
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=public_exponent,
key_size=key_size
)
log.debug(f"key = {key}")
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption()
)
log.debug(f"private_key = {private_key}")
public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.OpenSSH,
crypto_serialization.PublicFormat.OpenSSH
)
log.debug(f"public_key = {public_key}")
return (public_key, private_key)
def get_existing_keypairs(session, prefix=""):
"""
Get the existing keypairs with the optional filter
for a specific prefix of the key name
Args:
session: An AWS session object to establish access to the AWS account
prefix: String value as prefix to the name of the key
Returns:
List of dicts of keypairs matching the prefix
"""
log.debug("Getting session client for EC2")
ec2_client = session.client('ec2')
log.info("Getting existing keypairs")
existing_keypairs = ec2_client.describe_key_pairs()['KeyPairs']
if prefix:
log.info("Filtering out relevant keypairs")
matching_keypairs = []
for keypair in existing_keypairs:
if keypair['KeyName'].startswith(prefix):
matching_keypairs.append(keypair)
else:
matching_keypairs = existing_keypairs
return existing_keypairs
def upload_key(session, key_name, public_key):
"""
Will upload the public key to the AWS account with
the provided name.
Args:
session: An AWS session object to establish access to the AWS account
key_name: Name for the keypair
public_key: Public key of the new keypair in bytes
Returns:
Key fingerprint if successful
"""
fingerprint = ""
log.debug("Getting session client for EC2")
ec2_client = session.client('ec2')
try:
log.info("Uploading the key")
response = ec2_client.import_key_pair(KeyName=key_name, PublicKeyMaterial=public_key)
fingerprint = response['KeyFingerprint']
log.info(f"Key fingerprint: {fingerprint}")
except Exception as error:
log.error(f"Failed to upload key: {error}")
return fingerprint
def create_new_key(session, key_name_prefix, key_size):
"""
Creates a new public and private key files and registers (uploads)
the new public key to AWS.
Args:
session: An AWS session object to establish access to the AWS account
key_name_prefix: Name of the key before the epoch value is appended
key_size: Size of the key to be created in bits
Returns:
Tuple of the key name and AWS key fingerprint
"""
# handle the new key creation
log.info("Beginnging to generate new SSH key")
# create the new key pair in memory
public_key, private_key = generate_ssh_keypair(key_size)
# get epoch of UTC time for the extension to make the name unique
epoch_time = time.strftime("%s", time.gmtime())
key_name = f"{key_name_prefix}-{epoch_time}"
log.debug(f"key_name = {key_name}")
# write the key values to files
log.info(f"Exporting the public key to {key_name}.pub")
with open(f"{key_name}.pub", 'w') as fp:
fp.write(public_key.decode('utf-8'))
log.info(f"Exporting the private key to file {key_name}")
with open(key_name, 'w') as fp:
fp.write(private_key.decode('utf-8'))
log.debug("Setting permissions on private key file")
os.chmod(key_name, 0o600)
# upload the new keypair to AWS account
fingerprint = upload_key(session, key_name, public_key)
return (key_name, fingerprint)
def remove_key(session, key_name):
"""
Will remove the key to the AWS account with the provided name.
Args:
session: An AWS session object to establish access to the AWS account
key_name: Name for the keypair
Returns:
True if successful, False if unsuccessful
"""
return_value = False
log.debug("Getting session client for EC2")
ec2_client = session.client('ec2')
try:
log.info(f"Removing the key {key_name}")
response = ec2_client.delete_key_pair(KeyName=key_name)
log.info(f"Key {key_name} successfully removed")
return_value = True
except Exception as error:
log.error(f"Failed to upload key: {error}")
return return_value
def switch_keys_on_instances(session, new_key, old_key_name, remove_old_key_only=True):
"""
Filters out the EC2 instances based on a specified old key names, adds the new
public key to the ~ec2-user/.ssh/authorized_keys file, and will remove the old
key in that file. If the remove_old_key_only is set to False, the
~ec2-user/.ssh/authorized_keys file will be cleared with only the new present.
Args:
session: An AWS session object to establish access to the AWS account
new_key: String consisting of the public key with the key name appended
old_key_name: Key name (only) to filter out the EC2 instances to have the
new key added and all other keys removed
remove_old_key_only: Boolean to indicate whether the authorized_keys file
should only contain the new key or just remove the specified old
key (default is True)
Returns:
True if successful, False if unsuccessful
"""
ec2_client = session.client('ec2')
instance_filters = {
Filters=[
{
'Name': 'key-name',
'Values': [
old_key_name
]
}
]
}
ec2_instances = [
y
for x in ec2_client.describe_instances(**instance_filters)['Reservations']
for y in x['Instances']
]
def main():
args = parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
# let's keep the boto logging level sane
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
# get the session object to be used for all AWS access
session = get_session(profile_name=args.profile, role_arn=args.role_arn)
if ['delete-only', 'remove-only', 'order-66'] in args.action:
# these actions are for key removal only
if args.removal_key_name):
remove_key(session, args.removal_key_name)
else:
raise RotateKeyException(f"The --removal_key_name argument must be provided with {args.action}")
else:
# all other actions will create a new key and upload to AWS
key_name, fingerprint = create_new_key(session, args.key_name_prefix, args.key_size)
# these actions will do a bit more by cleaning up
# the other keys after a new key is created
if ['lockdown', 'highlander', 'rotate'] in args.action:
log.info("Beginning key clean up phase")
# this list is for rotating the older keys out of circulation
existing_keypairs = get_existing_keypairs(session, args.key_name_prefix)
for existing_keypair in existing_keypairs:
remove_key(session, existing_keypair)
log.info("Complete")
if __name__ == "__main__":
main()