#!/usr/bin/env python3 import logging import sys import argparse import boto3 import configparser import os import datetime import dateutil _format = "%(asctime)-15s %(levelname)-8s :%(lineno)d: %(name)s.%(funcName)s - %(message)s" _level = logging.INFO logging.basicConfig(stream=sys.stdout, format=_format) LOG = logging.getLogger() LOG.setLevel(_level) # set boto logging higher to avoid chatty logging logging.getLogger('botocore').setLevel(logging.WARNING) def parse_args(): argp = argparse.ArgumentParser() argp.add_argument('-d', '--debug', action='store_true', help="Run in debug mode") argp.add_argument('-u', '--user', required=True, help="AWS username") argp.add_argument('-p', '--profile-name', default="default", help="""Profile name in credentials file (default is "default")""") argp.add_argument('-c', '--credentials-file', default="~/.aws/credentials", help="Name of AWS credentials file (default is ~/.aws/credentials)") argp.add_argument('--set-default', action='store_true', help="Set the new key as the default value as well as the specified profile") return argp.parse_args() def delete_oldest_key(iam, user, user_keys): """ Finds the oldest AWS key for the user and deletes it. """ try: oldest_key = None right_now = datetime.datetime.utcnow() right_now = right_now.replace(tzinfo=dateutil.tz.tzutc()) for user_key in user_keys: delta = right_now - user_key['CreateDate'] if not oldest_key or oldest_key < delta: oldest_key = delta old_access_key_id = user_key['AccessKeyId'] iam.delete_access_key(UserName=user, AccessKeyId=old_access_key_id) success_status = True except Exception as err: success_status = False LOG.error(err) print("Not able to delete the oldest key") print(err) return success_status def create_new_key(iam, user): """ Creates a new AWS access key for the user. """ new_key = None try: new_key = iam.create_access_key(UserName=user) except Exception as err: LOG.error(err) LOG.debug(err.response) if 'Error' in err.response: print("Key not created") if err.response['Error']['Code'] == 'LimitExceeded': print("User {} already has the maximum number of keys".format(user)) return new_key def rotate_key(session, user): """ Creates a new access key. If key limit is already met, the older key will be removed in favor of the new key. """ iam = session.client('iam') LOG.debug("Getting user key limit for account...") key_limit = iam.get_account_summary()['SummaryMap']['AccessKeysPerUserQuota'] LOG.debug("User key limit: {}".format(key_limit)) LOG.debug("Getting user's keys...") user_keys = iam.list_access_keys()['AccessKeyMetadata'] LOG.debug("Found {} keys for user {}".format(len(user_keys), user)) LOG.debug("Check to see if user has the limit of keys allowed...") if len(user_keys) == key_limit: delete_oldest_key(iam, user, user_keys) new_key = create_new_key(iam, user) return new_key def update_credentials_file(credentials_file, profile_name, key, set_default): """ Create or update the section with the new key information. """ LOG.debug("Updating credentials file...") credentials = configparser.ConfigParser() credentials.read(credentials_file) LOG.debug("credentials = {}".format(credentials.sections())) if not profile_name in credentials.sections(): LOG.debug("Profile does not exist in credentials file; creating now...") credentials[profile_name] = {} profile_creds = credentials[profile_name] profile_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId'] profile_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey'] LOG.debug("Profile: {}, keys: {}, {}".format(profile_name, profile_creds['aws_access_key_id'], profile_creds['aws_secret_access_key'])) LOG.debug("credentials = {}".format([x for x in credentials[profile_name]])) # make the keys also be the default keys if the toggle is set if set_default: if not 'default' in credentials.sections(): LOG.debug("Default profile does not exist; creating now...") credentials['default'] = {} default_creds = credentials['default'] default_creds['aws_access_key_id'] = key['AccessKey']['AccessKeyId'] default_creds['aws_secret_access_key'] = key['AccessKey']['SecretAccessKey'] LOG.debug("Profile: default, keys: {}, {}".format(default_creds['aws_access_key_id'], default_creds['aws_secret_access_key'])) LOG.debug("credentials = {}".format([x for x in credentials['default']])) LOG.debug("Writing updated credentials file...") with open(credentials_file, 'w') as cred_file: credentials.write(cred_file) return True def main(): args = parse_args() if args.debug: LOG.setLevel(logging.DEBUG) logging.getLogger('botocore').setLevel(logging.WARNING) LOG.debug("Getting AWS session and credentials for {}...".format(args.profile_name)) session = boto3.session.Session(region_name='us-east-1', profile_name=args.profile_name) credentials = session.get_credentials() credentials_file = os.path.expanduser(args.credentials_file) LOG.debug("credentials_file = {}".format(credentials_file)) print("Generating new AWS keys for user {}...".format(args.user)) new_key = rotate_key(session, args.user) if new_key: update_credentials_file(credentials_file, args.profile_name, new_key, args.set_default) print("Key rotated and credentials file updated") else: print("No new key available to update credentials file") if __name__ == "__main__": main()