From 7622793f86353a5c21840f73f04129cad791565d Mon Sep 17 00:00:00 2001 From: Sebastien Goasguen Date: Thu, 22 Aug 2013 05:23:12 -0400 Subject: [PATCH] LIBCLOUD-380: Add import_keypair functions and tests --- libcloud/compute/drivers/cloudstack.py | 36 ++++++++++++++++++++++ .../test/compute/fixtures/cloudstack/dummy_rsa.pub | 1 + .../cloudstack/registerSSHKeyPair_default.json | 1 + libcloud/test/compute/test_cloudstack.py | 35 +++++++++++++++------ 4 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 libcloud/test/compute/fixtures/cloudstack/dummy_rsa.pub create mode 100644 libcloud/test/compute/fixtures/cloudstack/registerSSHKeyPair_default.json diff --git a/libcloud/compute/drivers/cloudstack.py b/libcloud/compute/drivers/cloudstack.py index d7ec86f..0726501 100644 --- a/libcloud/compute/drivers/cloudstack.py +++ b/libcloud/compute/drivers/cloudstack.py @@ -19,6 +19,7 @@ from libcloud.compute.base import Node, NodeDriver, NodeImage, NodeLocation,\ NodeSize, StorageVolume from libcloud.compute.types import NodeState, LibcloudError +import os class CloudStackNode(Node): "Subclass of Node so we can expose our extension methods." @@ -658,6 +659,41 @@ class CloudStackNodeDriver(CloudStackDriverMixIn, NodeDriver): res = self._sync_request('deleteSSHKeyPair', name=name, **extra_args) return res['success'] + def ex_import_keypair_from_string(self, name, key_material): + """ + Imports a new public key where the public key is passed in as a string + + @param name: The name of the public key to import. + @type name: C{str} + + @param key_material: The contents of a public key file. + @type key_material: C{str} + + @rtype: C{dict} + """ + + res = self._sync_request('registerSSHKeyPair', name=name, publickey=key_material) + return { + 'keyName': res['keypair']['name'], + 'keyFingerprint': res['keypair']['fingerprint'] + } + + def ex_import_keypair(self, name, keyfile): + """ + Imports a new public key where the public key is passed via a filename + + @param name: The name of the public key to import. + @type name: C{str} + + @param keyfile: The filename with path of the public key to import. + @type keyfile: C{str} + + @rtype: C{dict} + """ + with open(os.path.expanduser(keyfile)) as fh: + content = fh.read() + return self.ex_import_keypair_from_string(name, content) + def ex_list_security_groups(self, **kwargs): """ Lists Security Groups diff --git a/libcloud/test/compute/fixtures/cloudstack/dummy_rsa.pub b/libcloud/test/compute/fixtures/cloudstack/dummy_rsa.pub new file mode 100644 index 0000000..1d18ad6 --- /dev/null +++ b/libcloud/test/compute/fixtures/cloudstack/dummy_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCw3Fw6AdX+3Ul3lRJIPE0Hd5oBaHnCVB1Wl325FVeJZeQiKF9Z0sw+/StWuo2ZA5ra6/A8X7tITiO7goUncdd7xLT3r3UCwKGNZXrTn8e2Kutqd9S7EN+SUh63kZmcEQsFCuC3hg0O8TzG5ROQxukYc+7PAvcYk7+KV8r3B5eh2lvp5tHTpCX/63pm4zHm5rnE38DnESeh4Dh2R8hkhnoxo9ixQCdETbufUTo5abCkKbcf8/1+qA5A13uXqBsx/KtmZX0SvyQW3hKFPGXSaYxAE/u+DZU8Myr/dDKLrGPYt6e5CSXlQLFcnz99akuVdqOP9ygPGcgwlAajOZgt+Vwn sebgoa@sebmini.local diff --git a/libcloud/test/compute/fixtures/cloudstack/registerSSHKeyPair_default.json b/libcloud/test/compute/fixtures/cloudstack/registerSSHKeyPair_default.json new file mode 100644 index 0000000..f52ee31 --- /dev/null +++ b/libcloud/test/compute/fixtures/cloudstack/registerSSHKeyPair_default.json @@ -0,0 +1 @@ +{ "registersshkeypairresponse": { "keypair": { "name": "foobar", "fingerprint": "c4:a1:e5:d4:50:84:a9:4c:6b:22:ee:d6:57:02:b8:15" } } } diff --git a/libcloud/test/compute/test_cloudstack.py b/libcloud/test/compute/test_cloudstack.py index 2f27539..943f7d5 100644 --- a/libcloud/test/compute/test_cloudstack.py +++ b/libcloud/test/compute/test_cloudstack.py @@ -14,6 +14,7 @@ # limitations under the License. import sys +import os from libcloud.utils.py3 import httplib from libcloud.utils.py3 import urlparse @@ -196,17 +197,17 @@ class CloudStackNodeDriverTest(unittest.TestCase, TestCaseMixin): location = self.driver.list_locations()[0] self.assertEquals('Sydney', location.name) - def test_start_node(self): + def test_ex_start_node(self): node = self.driver.list_nodes()[0] res = node.ex_start() self.assertEquals('Starting', res) - def test_stop_node(self): + def test_ex_stop_node(self): node = self.driver.list_nodes()[0] res = node.ex_stop() self.assertEquals('Stopped', res) - def test_list_keypairs(self): + def test_ex_list_keypairs(self): keypairs = self.driver.ex_list_keypairs() fingerprint = '00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:' + \ '00:00:00:00:00' @@ -214,29 +215,45 @@ class CloudStackNodeDriverTest(unittest.TestCase, TestCaseMixin): self.assertEqual(keypairs[0]['name'], 'cs-keypair') self.assertEqual(keypairs[0]['fingerprint'], fingerprint) - def test_create_keypair(self): + def test_ex_create_keypair(self): self.assertRaises( LibcloudError, self.driver.ex_create_keypair, 'cs-keypair') - def test_delete_keypair(self): + def test_ex_delete_keypair(self): res = self.driver.ex_delete_keypair('cs-keypair') self.assertTrue(res) - def test_list_security_groups(self): + def test_ex_import_keypair(self): + fingerprint = 'c4:a1:e5:d4:50:84:a9:4c:6b:22:ee:d6:57:02:b8:15' + path = os.path.join(os.path.dirname(__file__), "fixtures", "cloudstack", "dummy_rsa.pub") + + res = self.driver.ex_import_keypair('foobar', path) + self.assertEqual(res['keyName'], 'foobar') + self.assertEqual(res['keyFingerprint'], fingerprint) + + def test_ex_import_keypair_from_string(self): + fingerprint = 'c4:a1:e5:d4:50:84:a9:4c:6b:22:ee:d6:57:02:b8:15' + path = os.path.join(os.path.dirname(__file__), "fixtures", "cloudstack", "dummy_rsa.pub") + + res = self.driver.ex_import_keypair_from_string('foobar', open(path).read()) + self.assertEqual(res['keyName'], 'foobar') + self.assertEqual(res['keyFingerprint'], fingerprint) + + def test_ex_list_security_groups(self): groups = self.driver.ex_list_security_groups() self.assertEqual(groups[0]['name'], 'default') - def test_create_security_group(self): + def test_ex_create_security_group(self): group = self.driver.ex_create_security_group(name='MySG') self.assertEqual(group['name'], 'MySG') - def test_delete_security_group(self): + def test_ex_delete_security_group(self): res = self.driver.ex_delete_security_group(name='MySG') self.assertTrue(res) - def test_authorize_security_group_ingress(self): + def test_ex_authorize_security_group_ingress(self): res = self.driver.ex_authorize_security_group_ingress('MySG', 'TCP', '22', -- 1.8.2.3