Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

add bucket versioning test #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 168 additions & 4 deletions nephoria/testcases/s3/bucket_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/env python
import re
from boto.exception import S3ResponseError

import boto
from boto.exception import S3ResponseError, S3CreateError
from boto.s3.lifecycle import Lifecycle

from nephoria.testcase_utils.cli_test_runner import CliTestRunner, SkipTestException
from nephoria.testcontroller import TestController
Expand Down Expand Up @@ -67,8 +70,8 @@ def test_negative_basic(self):
raise("Should have caught exception for creating bucket with empty-string name.")
except S3ResponseError as e:
assert (e.status == 405), 'Expected response status code to be 405, actual status code is ' + str(e.status)
assert (
re.search("MethodNotAllowed", e.code)), "Incorrect exception returned when creating bucket with null name."
assert (re.search("MethodNotAllowed", e.code)),\
"Incorrect exception returned when creating bucket with null name."

self.log.debug("Testing an invalid bucket names, calls should fail.")

Expand Down Expand Up @@ -96,8 +99,169 @@ def test_creating_bucket_invalid_names(bad_bucket):
for bad_bucket in ["bucket&123", "bucket*123"]:
test_creating_bucket_invalid_names(self.bucket_prefix + bad_bucket)

# def test_bucket_acl(self):
# '''
# Tests bucket ACL management and adding/removing from the ACL with both valid and invalid usernames
# '''
#
# test_bucket = self.bucket_prefix + 'acl_bucket_test'
#
# test_user_id = self.tc.user.s3.connection.get_canonical_user_id()
# self.log.debug('Starting ACL test with bucket name: ' + test_bucket + ' and userid ' + test_user_id)
#
# try:
# acl_bucket = self.tc.user.s3.create_bucket(test_bucket)
# self.tc.user.test_user_resources['_bucket'].append(acl_bucket)
# self.log.debug('Created bucket: ' + test_bucket)
# except S3CreateError:
# self.log.debug("Can't create the bucket, already exists. Deleting it an trying again")
# try:
# self.tc.user.s3.delete_bucket(test_bucket)
# acl_bucket = self.tc.s3.create_bucket(test_bucket)
# except:
# self.log.debug("Couldn't delete and create new bucket. Failing test")
# raise("Couldn't make the test bucket: " + test_bucket)
#
# policy = acl_bucket.get_acl()
#
# if policy == None:
# raise('No acl returned')
#
# self.log.debug(policy)
# # Check that the acl is correct: owner full control.
# if len(policy.acl.grants) > 1:
# self.tc.s3.delete_bucket(test_bucket)
# raise('Expected only 1 grant in acl. Found: ' + policy.acl.grants.grants.__len__())
#
# if policy.acl.grants[0].id != test_user_id or policy.acl.grants[0].permission != 'FULL_CONTROL':
# self.tc.s3.delete_bucket(test_bucket)
# raise('Unexpected grant encountered: ' + policy.acl.grants[0].display_name + ' ' + policy.acl.grants[
# 0].permission + ' ' + policy.acl.grants[0].id)
#
# # Get the info on the owner from the ACL returned
# owner_display_name = policy.acl.grants[0].display_name
# owner_id = policy.acl.grants[0].id
#
# # upload a new acl for the bucket
# new_acl = policy
# new_user_display_name = owner_display_name
# new_user_id = owner_id
# new_acl.acl.add_user_grant(permission="READ", user_id=new_user_id, display_name=new_user_display_name)
# try:
# acl_bucket.set_acl(new_acl)
# acl_check = acl_bucket.get_acl()
# except S3ResponseError:
# raise ("Failed to set or get new acl")
#
# self.log.info("Got ACL: " + acl_check.acl.to_xml())
#
# # expected_result_base='<AccessControlList>
# # <Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
# # <ID>' + owner_id + '</ID><DisplayName>'+ owner_display_name + '</DisplayName></Grantee><Permission>FULL_CONTROL</Permission></Grant>
# # <Grant><Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser"><ID>EXPECTED_ID</ID><DisplayName>EXPECTED_NAME</DisplayName></Grantee><Permission>READ</Permission></Grant>
# # </AccessControlList>'
#
# if acl_check == None and not self.tc.user.s3.check_acl_equivalence(acl1=acl_check.acl, acl2=new_acl.acl):
# self.tc.user.s3.delete_bucket(test_bucket)
# raise ("Incorrect acl length or acl not found\n. Got bucket ACL:\n" + acl_check.acl.to_xml() +
# "\nExpected:" + new_acl.acl.to_xml())
# else:
# self.log.info("Got expected basic ACL addition")
#
# self.log.info(
# "Grants 0 and 1: " + acl_check.acl.grants[0].to_xml() + " -- " + acl_check.acl.grants[1].to_xml())
#
# # Check each canned ACL string in boto to make sure Walrus does it right
# for acl in boto.s3.acl.CannedACLStrings:
# if acl == "authenticated-read":
# continue
# self.log.info('Testing canned acl: ' + acl)
# try:
# acl_bucket.set_acl(acl)
# acl_check = acl_bucket.get_acl()
# except Exception as e:
# self.tc.user.s3.delete_bucket(test_bucket)
# raise("Got exception trying to set acl to " + acl + ": " + str(e))
#
# self.log.info("Set canned-ACL: " + acl + " -- Got ACL from service: " + acl_check.acl.to_xml())
# expected_acl = self.tc.user.get_canned_acl(bucket_owner_id=owner_id, canned_acl=acl,
# bucket_owner_display_name=owner_display_name)
#
# if expected_acl == None:
# self.tc.user.s3.delete_bucket(test_bucket)
# raise("Got None when trying to generate expected acl for canned acl string: " + acl)
#
# if not self.tc.user.check_acl_equivalence(acl1=expected_acl, acl2=acl_check.acl):
# self.tc.user.s3.delete_bucket(test_bucket)
# raise (
# "Invalid " + acl + " acl returned from Walrus:\n" + acl_check.acl.to_xml() + "\nExpected\n" + expected_acl.to_xml())
# else:
# self.log.debug("Got correct acl for: " + acl)
#
# try:
# acl_bucket.set_acl('invalid-acl')
# raise ('Did not catch expected exception for invalid canned-acl')
# except:
# self.log.debug("Caught expected exception from invalid canned-acl")
#
# self.tc.user.s3.delete_bucket(test_bucket)
# self.log.debug("Bucket ACL: PASSED")

def test_bucket_versioning(self):
test_bucket = self.bucket_prefix + "versioning_test_bucket"
self.log.info('Testing bucket versioning using bucket:' + test_bucket)
version_bucket = self.tc.user.s3.create_bucket(test_bucket)
self.tc.user.test_user_resources['_bucket'].append(version_bucket)
version_status = version_bucket.get_versioning_status().get("Versioning")

# Test the default setup after bucket creation. Should be disabled.
if version_status != None:
version_bucket.delete()
raise ("Expected versioning disabled (empty), found: " + str(version_status))
elif version_status == None:
self.log.info("Null version status returned, may be correct since it should be disabled")

# Turn on versioning, confirm that it is 'Enabled'
version_bucket.configure_versioning(True)
version_status = version_bucket.get_versioning_status().get("Versioning")
if version_status == None or version_status != "Enabled":
version_bucket.delete()
raise("Expected versioning enabled, found: " + str(version_status))
elif version_status == None:
version_bucket.delete()
raise("Null version status returned")
self.log.info("Versioning of bucket is set to: " + version_status)

# Turn off/suspend versioning, confirm.
version_bucket.configure_versioning(False)
version_status = version_bucket.get_versioning_status().get("Versioning")
if version_status == None or version_status != "Suspended":
version_bucket.delete()
raise("Expected versioning suspended, found: " + str(version_status))
elif version_status == None:
version_bucket.delete()
raise("Null version status returned")

self.log.info("Versioning of bucket is set to: " + version_status)

version_bucket.configure_versioning(True)
version_status = version_bucket.get_versioning_status().get("Versioning")
if version_status == None or version_status != "Enabled":
version_bucket.delete()
raise("Expected versioning enabled, found: " + str(version_status))
elif version_status == None:
version_bucket.delete()
raise("Null version status returned")

self.log.info("Versioning of bucket is set to: " + version_status)

# version_bucket.delete()
# self.buckets_used.remove(test_bucket)
self.log.info("Bucket Versioning: PASSED")

def clean_method(self):
pass
for bkt in self.tc.user.test_user_resources['_bucket']:
self.user.s3.delete_bucket(bkt)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion nephoria/testcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TestController(object):
def __init__(self,
hostname=None, username='root', password=None, region=None,
proxy_hostname=None, proxy_password=None,
clouduser_account='nephotest', clouduser_name='sys_admin', clouduser_credpath=None,
clouduser_account='nephotest', clouduser_name='admin', clouduser_credpath=None,
clouduser_accesskey=None, clouduser_secretkey=None,
cloudadmin_credpath=None, cloudadmin_accesskey=None, cloudadmin_secretkey=None,
timeout=10, log_level='DEBUG', environment_file=None,
Expand Down
16 changes: 7 additions & 9 deletions nephoria/usercontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def __init__(self, aws_access_key=None, aws_secret_key=None, aws_account_name=N
self.log.identifier = str(self)
self.log.debug('Successfully created User Context')

self.test_user_resources = \
{
'_instances': [],
'_volumes': [],
'_bucket': []
}

##########################################################################################
# User/Account Properties, Attributes, Methods, etc..
##########################################################################################
Expand Down Expand Up @@ -458,12 +465,3 @@ def cleanup_artifacts(self,
for account_name in self.test_resources["iam_accounts"]:
self.iam.delete_account(account_name=account_name, recursive=True)
except: pass