#!/usr/bin/env python

import argparse, Queue, logging, random, sys
import multiprocessing, signal, re
from multiprocessing.pool import ThreadPool
import boto.s3.connection

version = "0.2"

# Make sure we have a semi-recent version of boto installed
try:
    import boto.s3.connection                                  
    if tuple(map(int, boto.__version__.split("."))) < (2,5,2): 
        raise Exception
except:
    print "ERROR: s3wipe requires boto v2.5.2 or later!"
    sys.exit(1)

# Format specifier for our S3 object
def s3Path(path):

    match = re.match('s3://([^/]+)(?:/([^/]+(?:/[^/]+)*))?$', path)
    if match:
        return match.groups()
    else:
        raise argparse.ArgumentTypeError(
            "must be in the 's3://bucket[/path]' format")

# Fetch our command line arguments
def getArgs():
    parser = argparse.ArgumentParser(
        prog="s3wipe",
        description="Recursively delete all keys in an S3 path",
        formatter_class=lambda prog: 
            argparse.HelpFormatter(prog,max_help_position=27))

    parser.add_argument("--path", type=s3Path,
        help="S3 path to delete (e.g. s3://bucket/path)", required=True)
    parser.add_argument("--id", 
        help="Your AWS access key ID", required=False)
    parser.add_argument("--key", 
        help="Your AWS secret access key", required=False)
    parser.add_argument("--dryrun", 
        help="Don't delete.  Print what we would have deleted", 
        action='store_true')
    parser.add_argument("--quiet", 
        help="Suprress all non-error output", action='store_true')
    parser.add_argument("--batchsize", 
        help="# of keys to batch delete (default 100)", 
        type=int, default=100)
    parser.add_argument("--maxqueue", 
        help="Max size of deletion queue (default 10k)", 
        type=int, default=10000)
    parser.add_argument("--delbucket", 
        help="If S3 path is a bucket path, delete the bucket also",
        action='store_true')

    return parser.parse_args()


# Set up our logging object
def loggerSetup(args):
                                                                              
    # Set our maximum severity level to log (i.e. debug or not)               
    if args.quiet:
        logLevel = logging.ERROR
    else:
        logLevel = logging.DEBUG
                                                                              
    # Log configuration                                                       
    logging.basicConfig(                                              
        level=logLevel,                                               
        format="%(asctime)s %(levelname)s: %(message)s",              
        datefmt="[%Y-%m-%d@%H:%M:%S]"                                 
    )                                                                 
                                                                              
    # Create logger and point it at our log file                              
    global logger                                                             
    logger = logging.getLogger("s3wipe")                                     
                                                                              
    # Make the logger emit all unhandled exceptions                           
    sys.excepthook = lambda t, v, x: logger.error(
        "Uncaught exception", exc_info=(t,v,x))
                                                             
    # Supress boto debug logging, since it is very chatty    
    logging.getLogger("boto").setLevel(logging.CRITICAL)     


# Our deletion worker, called by Threadpool
def deleter(args, rmQueue, numThreads):

    # Set up per-thread boto objects
    myconn = boto.s3.connection.S3Connection(                                  
        aws_access_key_id=args.id,
        aws_secret_access_key=args.key)
    bucket, path = args.path
    mybucket = myconn.get_bucket(bucket)

    done = False
    rmKeys = []

    while True:
        # Snatch a key off our deletion queue and add it
        # to our local deletion list
        rmKey = rmQueue.get()
        rmKeys.append(rmKey)

        # Poll our deletion queue until it is empty or
        # until we have accumulated enough keys in this
        # thread's delete list to justify a batch delete
        if len(rmKeys) >= args.batchsize or rmQueue.empty():
            try:
                if args.dryrun:
                    for key in rmKeys:
                        logger.info("Would have deleted '%s'" % key.name)
                else:
                    mybucket.delete_keys(rmKeys)
            except:
                continue

            with keysDeleted.get_lock():
                keysDeleted.value += len(rmKeys)
            rmKeys = []

            # Print some progress info
            if random.randint(0,numThreads) == numThreads and not args.dryrun:
                logger.info("Deleted %s out of %s keys found thus far.",
                    keysDeleted.value, keysFound.value)

        rmQueue.task_done()

# Set the global vars for our listing threads
def listInit(arg1, arg2):
    global args, rmQueue
    args = arg1
    rmQueue = arg2

# Our listing worker, which will poll the s3 bucket mericlessly and
# insert all objects found into the deletion queue.
def lister(subDir):

    # Set up our per-thread boto connection
    myconn = boto.s3.connection.S3Connection(                                  
        aws_access_key_id=args.id,
        aws_secret_access_key=args.key)
    bucket, path = args.path
    mybucket = myconn.get_bucket(bucket)

    # Iterate through bucket and enqueue all keys found in
    # our deletion queue
    for key in mybucket.list_versions(prefix=subDir.name):
        rmQueue.put(key)
        with keysFound.get_lock():
            keysFound.value += 1

# Our main function
def main():

    # Parse arguments
    args = getArgs()

    # Set up the logging object
    loggerSetup(args)

    rmQueue = Queue.Queue(maxsize=args.maxqueue)

    # Catch ctrl-c to exit cleanly
    signal.signal(signal.SIGINT, lambda x,y: sys.exit(0))

    # Our thread-safe variables, used for progress tracking
    global keysFound, keysDeleted
    keysFound = multiprocessing.Value("i",0)
    keysDeleted = multiprocessing.Value("i",0)

    bucket, path = args.path
    logger.info("Deleting from bucket: %s, path: %s" % (bucket,path))
    logger.info("Getting subdirs to feed to list threads")

    # Our main boto object.  Really only used to start the
    # watcher threads on a per-subdir basis
    conn = boto.s3.connection.S3Connection(                                  
        aws_access_key_id=args.id,
        aws_secret_access_key=args.key)

    try:
        mybucket = conn.get_bucket(bucket)

    except boto.s3.connection.S3ResponseError as e:
        shortErr = str(e).split('\n')[0]                
        logger.error(shortErr)
        sys.exit(1)

    mybucket.configure_versioning(True)

    # Poll the root-level directories in the s3 bucket, and
    # start a reader process for each one of them
    subDirs = list(mybucket.list_versions(prefix=path, delimiter="/"))
    listThreads = len(subDirs)
    deleteThreads = listThreads*2

    # Now start all of our delete & list threads
    if listThreads > 0:
        logger.info("Starting %s delete threads..." % deleteThreads)
        deleterPool = ThreadPool(processes=deleteThreads, 
            initializer=deleter, initargs=(args, rmQueue, deleteThreads))

        logger.info("Starting %s list threads..." % listThreads)
        listerPool = ThreadPool(processes=listThreads, 
            initializer=listInit, initargs=(args, rmQueue))

        # Feed the root-level subdirs to our listing process, which
        # will in-turn populate the deletion queue, which feed the
        # deletion threads
        listerPool.map(lister, subDirs)
        rmQueue.join()

    logger.info("Done deleting keys")

    if args.delbucket and path is None:
        if list(mybucket.list_versions(delimiter='/')):
            logger.info("Bucket not empty.  Not removing (this can happen "  +
                "when deleting large amounts of files.  It sometimes takes " +
                "the S3 service a while (minutes to days) to catch up.")

        else:
            if args.dryrun:
                logger.info("Bucket is empty.  Would have removed bucket")
            else:
                logger.info("Bucket is empty.  Attempting to remove bucket")
                conn.delete_bucket(mybucket)

if __name__ == "__main__":
    main()
