aws-utils/rotate-keys

185 lines
6.0 KiB
Plaintext
Raw Permalink Normal View History

#!/usr/bin/env python3
2019-10-28 17:26:51 +00:00
"""
This script will rotate the oldest AWS key and will update the .aws/credentials file. Run
mulitple times to rotate all the keys.
Options to set the newly crated keys as the default keys in the .aws/crednetials file is
an option.
"""
import logging
import sys
import argparse
import boto3
import configparser
import os
import datetime
import dateutil
_format = "%(asctime)-15s %(levelname)-8s :%(lineno)d: %(name)s.%(funcName)s - %(message)s"
_level = logging.INFO
logging.basicConfig(stream=sys.stdout, format=_format)
LOG = logging.getLogger()
LOG.setLevel(_level)
# set boto logging higher to avoid chatty logging
logging.getLogger('botocore').setLevel(logging.WARNING)
def parse_args():
argp = argparse.ArgumentParser()
argp.add_argument('-d', '--debug', action='store_true', help="Run in debug mode")
argp.add_argument('-u', '--user', required=True, help="AWS username")
argp.add_argument('-p', '--profile-name', default="default", help="""Profile name in credentials file (default is "default")""")
argp.add_argument('-c', '--credentials-file', default="~/.aws/credentials",
help="Name of AWS credentials file (default is ~/.aws/credentials)")
argp.add_argument('--set-default', action='store_true', help="Set the new key as the default value as well as the specified profile")
return argp.parse_args()
def delete_oldest_key(iam, user, user_keys):
"""
Finds the oldest AWS key for the user and deletes it.
"""
try:
oldest_key = None
right_now = datetime.datetime.utcnow()
right_now = right_now.replace(tzinfo=dateutil.tz.tzutc())
for user_key in user_keys:
delta = right_now - user_key['CreateDate']
if not oldest_key or oldest_key < delta:
oldest_key = delta
old_access_key_id = user_key['AccessKeyId']
iam.delete_access_key(UserName=user, AccessKeyId=old_access_key_id)
success_status = True
except Exception as err:
success_status = False
LOG.error(err)
print("Not able to delete the oldest key")
print(err)
return success_status
def create_new_key(iam, user):
"""
Creates a new AWS access key for the user.
"""
new_key = None
try:
new_key = iam.create_access_key(UserName=user)
except Exception as err:
LOG.error(err)
LOG.debug(err.response)
if 'Error' in err.response:
print("Key not created")
if err.response['Error']['Code'] == 'LimitExceeded':
2021-08-06 21:02:17 +00:00
print(f"User {user} already has the maximum number of keys")
return new_key
def rotate_key(session, user):
"""
Creates a new access key. If key limit is already met, the older key will be removed
in favor of the new key.
"""
iam = session.client('iam')
LOG.debug("Getting user key limit for account...")
key_limit = iam.get_account_summary()['SummaryMap']['AccessKeysPerUserQuota']
2021-08-06 21:02:17 +00:00
LOG.debug(f"User key limit: {key_limit}")
LOG.debug("Getting user's keys...")
user_keys = iam.list_access_keys()['AccessKeyMetadata']
2021-08-06 21:02:17 +00:00
LOG.debug(f"Found {len(user_keys)} keys for user {user}")
LOG.debug("Check to see if user has the limit of keys allowed...")
if len(user_keys) == key_limit:
delete_oldest_key(iam, user, user_keys)
new_key = create_new_key(iam, user)
return new_key
def update_credentials_file(credentials_file, profile_name, key, set_default):
"""
Create or update the section with the new key information.
"""
LOG.debug("Updating credentials file...")
credentials = configparser.ConfigParser()
credentials.read(credentials_file)
2021-08-06 21:02:17 +00:00
LOG.debug(f"credentials = {credentials.sections()}")
if not profile_name in credentials.sections():
LOG.debug("Profile does not exist in credentials file; creating now...")
credentials[profile_name] = {}
profile_creds = credentials[profile_name]
profile_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId']
profile_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey']
2021-08-06 21:02:17 +00:00
LOG.debug(f"Profile: {profile_name}, keys: {profile_creds['aws_access_key_id']}, {profile_creds['aws_secret_access_key']}")
LOG.debug(f"credentials = {[x for x in credentials[profile_name]]}"))
# make the keys also be the default keys if the toggle is set
if set_default:
if not 'default' in credentials.sections():
LOG.debug("Default profile does not exist; creating now...")
credentials['default'] = {}
default_creds = credentials['default']
default_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId']
default_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey']
2021-08-06 21:02:17 +00:00
LOG.debug(f"Profile: default, keys: {default_creds['aws_access_key_id'],}, {default_creds['aws_secret_access_key']}")
LOG.debug(f"credentials = {[x for x in credentials['default']]}")
LOG.debug("Writing updated credentials file...")
with open(credentials_file, 'w') as cred_file:
credentials.write(cred_file)
return True
def main():
args = parse_args()
if args.debug:
LOG.setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.WARNING)
2021-08-06 21:02:17 +00:00
LOG.debug(f"Getting AWS session and credentials for {args.profile_name}...")
session = boto3.session.Session(region_name='us-east-1', profile_name=args.profile_name)
credentials = session.get_credentials()
credentials_file = os.path.expanduser(args.credentials_file)
2021-08-06 21:02:17 +00:00
LOG.debug(f"credentials_file = {credentials_file}")
2021-08-06 21:02:17 +00:00
print(f"Generating new AWS keys for user {args.user}...")
new_key = rotate_key(session, args.user)
if new_key:
update_credentials_file(credentials_file, args.profile_name, new_key, args.set_default)
print("Key rotated and credentials file updated")
else:
print("No new key available to update credentials file")
if __name__ == "__main__":
main()