#!/usr/bin/env python3 """ This script will rotate the oldest AWS key and will update the .aws/credentials file. Run mulitple times to rotate all the keys. Options to set the newly crated keys as the default keys in the .aws/crednetials file is an option. """ import logging import sys import argparse import boto3 import configparser import os import datetime import dateutil _format = "%(asctime)-15s %(levelname)-8s :%(lineno)d: %(name)s.%(funcName)s - %(message)s" _level = logging.INFO logging.basicConfig(stream=sys.stdout, format=_format) LOG = logging.getLogger() LOG.setLevel(_level) # set boto logging higher to avoid chatty logging logging.getLogger('botocore').setLevel(logging.WARNING) def parse_args(): argp = argparse.ArgumentParser() argp.add_argument('-d', '--debug', action='store_true', help="Run in debug mode") argp.add_argument('-u', '--user', required=True, help="AWS username") argp.add_argument('-p', '--profile-name', default="default", help="""Profile name in credentials file (default is "default")""") argp.add_argument('-c', '--credentials-file', default="~/.aws/credentials", help="Name of AWS credentials file (default is ~/.aws/credentials)") argp.add_argument('--set-default', action='store_true', help="Set the new key as the default value as well as the specified profile") return argp.parse_args() def delete_oldest_key(iam, user, user_keys): """ Finds the oldest AWS key for the user and deletes it. """ try: oldest_key = None right_now = datetime.datetime.utcnow() right_now = right_now.replace(tzinfo=dateutil.tz.tzutc()) for user_key in user_keys: delta = right_now - user_key['CreateDate'] if not oldest_key or oldest_key < delta: oldest_key = delta old_access_key_id = user_key['AccessKeyId'] iam.delete_access_key(UserName=user, AccessKeyId=old_access_key_id) success_status = True except Exception as err: success_status = False LOG.error(err) print("Not able to delete the oldest key") print(err) return success_status def create_new_key(iam, user): """ Creates a new AWS access key for the user. """ new_key = None try: new_key = iam.create_access_key(UserName=user) except Exception as err: LOG.error(err) LOG.debug(err.response) if 'Error' in err.response: print("Key not created") if err.response['Error']['Code'] == 'LimitExceeded': print(f"User {user} already has the maximum number of keys") return new_key def rotate_key(session, user): """ Creates a new access key. If key limit is already met, the older key will be removed in favor of the new key. """ iam = session.client('iam') LOG.debug("Getting user key limit for account...") key_limit = iam.get_account_summary()['SummaryMap']['AccessKeysPerUserQuota'] LOG.debug(f"User key limit: {key_limit}") LOG.debug("Getting user's keys...") user_keys = iam.list_access_keys()['AccessKeyMetadata'] LOG.debug(f"Found {len(user_keys)} keys for user {user}") LOG.debug("Check to see if user has the limit of keys allowed...") if len(user_keys) == key_limit: delete_oldest_key(iam, user, user_keys) new_key = create_new_key(iam, user) return new_key def update_credentials_file(credentials_file, profile_name, key, set_default): """ Create or update the section with the new key information. """ LOG.debug("Updating credentials file...") credentials = configparser.ConfigParser() credentials.read(credentials_file) LOG.debug(f"credentials = {credentials.sections()}") if not profile_name in credentials.sections(): LOG.debug("Profile does not exist in credentials file; creating now...") credentials[profile_name] = {} profile_creds = credentials[profile_name] profile_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId'] profile_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey'] LOG.debug(f"Profile: {profile_name}, keys: {profile_creds['aws_access_key_id']}, {profile_creds['aws_secret_access_key']}") LOG.debug(f"credentials = {[x for x in credentials[profile_name]]}")) # make the keys also be the default keys if the toggle is set if set_default: if not 'default' in credentials.sections(): LOG.debug("Default profile does not exist; creating now...") credentials['default'] = {} default_creds = credentials['default'] default_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId'] default_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey'] LOG.debug(f"Profile: default, keys: {default_creds['aws_access_key_id'],}, {default_creds['aws_secret_access_key']}") LOG.debug(f"credentials = {[x for x in credentials['default']]}") LOG.debug("Writing updated credentials file...") with open(credentials_file, 'w') as cred_file: credentials.write(cred_file) return True def main(): args = parse_args() if args.debug: LOG.setLevel(logging.DEBUG) logging.getLogger('botocore').setLevel(logging.WARNING) LOG.debug(f"Getting AWS session and credentials for {args.profile_name}...") session = boto3.session.Session(region_name='us-east-1', profile_name=args.profile_name) credentials = session.get_credentials() credentials_file = os.path.expanduser(args.credentials_file) LOG.debug(f"credentials_file = {credentials_file}") print(f"Generating new AWS keys for user {args.user}...") new_key = rotate_key(session, args.user) if new_key: update_credentials_file(credentials_file, args.profile_name, new_key, args.set_default) print("Key rotated and credentials file updated") else: print("No new key available to update credentials file") if __name__ == "__main__": main()