"""
API for interacting with a CloudMan instance.
"""
import time
import requests
import functools
import simplejson
from urlparse import urlparse
import bioblend
from bioblend.cloudman.launch import CloudManLauncher
from bioblend.util import Bunch
[docs]def block_till_vm_ready(func):
"""
This decorator exists to make sure that a launched VM is
ready and has received a public IP before allowing the wrapped
function call to continue. If the VM is not ready, the function will
block till the VM is ready. If the VM does not become ready
till the vm_ready_timeout elapses or the VM status returns an error,
a VMLaunchException will be thrown.
This decorator relies on the wait_till_instance_ready method defined in
class GenericVMInstance. All methods to which this decorator is applied
must be members of a class which inherit from GenericVMInstance.
The following two optional keyword arguments are recognized by this decorator:
:type vm_ready_timeout: int
:param vm_ready_timeout: Maximum length of time to block before timing out.
Once the timeout is reached, a VMLaunchException
will be thrown.
:type vm_ready_check_interval: int
:param vm_ready_check_interval: The number of seconds to pause between consecutive
calls when polling the VM's ready status.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
obj = args[0]
timeout = kwargs.pop('vm_ready_timeout', 300)
interval = kwargs.pop('vm_ready_check_interval', 10)
try:
obj.wait_till_instance_ready(timeout, interval)
except AttributeError:
raise VMLaunchException("Decorated object does not define a wait_till_instance_ready method."\
"Make sure that the object is of type GenericVMInstance.")
return func(*args, **kwargs)
return wrapper
[docs]class VMLaunchException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs]class CloudManConfig(object):
def __init__(self,
access_key=None,
secret_key=None,
cluster_name=None,
image_id=None,
instance_type='m1.medium',
password=None,
cloud_metadata=None,
cluster_type=None,
initial_storage_size=10,
key_name='cloudman_key_pair',
security_groups=['CloudMan'],
placement='',
kernel_id=None,
ramdisk_id=None,
block_till_ready=False,
**kwargs):
"""
Initializes a CloudMan launch configuration object.
:type access_key: string
:param access_key: Access credentials.
:type secret_key: string
:param secret_key: Access credentials.
:type cluster_name: string
:param cluster_name: Name used to identify this CloudMan cluster.
:type image_id: string
:param image_id: Machine image ID to use when launching this
CloudMan instance.
:type instance_type: string
:param instance_type: The type of the machine instance, as understood by
the chosen cloud provider. (e.g., ``m1.medium``)
:type password: string
:param password: The administrative password for this CloudMan instance.
:type cloud_metadata: Bunch
:param cloud_metadata: This object must define the properties required
to establish a `boto <https://github.com/boto/boto/>`_
connection to that cloud. See this method's implementation
for an example of the required fields. Note that as
long the as provided object defines the required fields,
it can really by implemented as anything (e.g.,
a Bunch, a database object, a custom class). If no
value for the ``cloud`` argument is provided, the
default is to use the Amazon cloud.
:type kernel_id: string
:param kernel_id: The ID of the kernel with which to launch the
instances
:type ramdisk_id: string
:param ramdisk_id: The ID of the RAM disk with which to launch the
instances
:type key_name: string
:param key_name: The name of the key pair with which to launch instances
:type security_groups: list of strings
:param security_groups: The ID of the security groups with
which to associate instances
:type placement: string
:param placement: The availability zone in which to launch the instances
:type cluster_type: string
:param cluster_type: The ``type``, either 'Galaxy', 'Data', or
'SGE', defines the type of cluster platform to initialize.
:type initial_storage_size: int
:param initial_storage_size: The initial storage to allocate for the instance.
This only applies if ``cluster_type`` is set
to either ``Galaxy`` or ``Data``.
:type block_till_ready: boolean
:param block_till_ready: Specifies whether the launch method will block
till the instance is ready and only return once
all initialization is complete. The default is True.
If False, the launch method will return immediately
without blocking. However, any subsequent calls
made will automatically block if the instance is
not ready and initialized. The blocking timeout
and polling interval can be configured by providing
extra parameters to the ``CloudManInstance.launch_instance``
method.
"""
self.set_connection_parameters(access_key, secret_key, cloud_metadata)
self.set_pre_launch_parameters(cluster_name, image_id, instance_type,
password, kernel_id, ramdisk_id, key_name, security_groups,
placement, block_till_ready)
self.set_post_launch_parameters(cluster_type, initial_storage_size)
self.set_extra_parameters(**kwargs)
[docs] def set_connection_parameters(self, access_key, secret_key, cloud_metadata=None):
self.access_key = access_key
self.secret_key = secret_key
self.cloud_metadata = cloud_metadata
[docs] def set_pre_launch_parameters(self, cluster_name, image_id, instance_type,
password, kernel_id=None, ramdisk_id=None, key_name='cloudman_key_pair',
security_groups=['CloudMan'], placement='', block_till_ready=False):
self.cluster_name = cluster_name
self.image_id = image_id
self.instance_type = instance_type
self.password = password
self.kernel_id = kernel_id
self.ramdisk_id = ramdisk_id
self.key_name = key_name
self.security_groups = security_groups
self.placement = placement
self.block_till_ready = block_till_ready
[docs] def set_post_launch_parameters(self, cluster_type=None, initial_storage_size=10):
self.cluster_type = cluster_type
self.initial_storage_size = initial_storage_size
[docs] class CustomTypeEncoder(simplejson.JSONEncoder):
[docs] def default(self, obj):
if isinstance(obj, (CloudManConfig, Bunch)):
key = '__%s__' % obj.__class__.__name__
return {key: obj.__dict__}
return simplejson.JSONEncoder.default(self, obj)
@staticmethod
[docs] def CustomTypeDecoder(dct):
if '__CloudManConfig__' in dct:
return CloudManConfig(**dct['__CloudManConfig__'])
elif '__Bunch__' in dct:
return Bunch(**dct['__Bunch__'])
else:
return dct
@staticmethod
[docs] def load_config(fp):
return simplejson.load(fp, object_hook=CloudManConfig.CustomTypeDecoder)
[docs] def save_config(self, fp):
simplejson.dump(self, fp, cls=self.CustomTypeEncoder)
[docs] def validate(self):
if self.access_key is None:
return "Access key must not be null"
elif self.secret_key is None:
return "Secret key must not be null"
elif self.cluster_name is None:
return "Cluster name must not be null"
elif self.image_id is None:
return "Image ID must not be null"
elif self.instance_type is None:
return "Instance type must not be null"
elif self.password is None:
return "Password must not be null"
elif self.cluster_type not in [None, 'SGE', 'Data', 'Galaxy']:
return "Unrecognized cluster type ({0})".format(self.cluster_type)
elif self.key_name is None:
return "Key-pair name must not be null"
else:
return None
[docs]class GenericVMInstance(object):
def __init__(self, launcher, launch_result):
"""
Create an instance of the CloudMan API class, which is to be used when
manipulating that given CloudMan instance.
The ``url`` is a string defining the address of CloudMan, for
example "http://115.146.92.174". The ``password`` is CloudMan's password,
as defined in the user data sent to CloudMan on instance creation.
"""
# Make sure the url scheme is defined (otherwise requests will not work)
self.vm_error = None
self.vm_status = None
self.host_name = None
self.launcher = launcher
self.launch_result = launch_result
def _update_host_name(self, host_name):
if self.host_name != host_name:
self.host_name = host_name
@property
[docs] def instance_id(self):
"""
Returns the ID of this instance (e.g., ``i-87ey32dd``) if launch was
successful or ``None`` otherwise.
"""
return None if self.launch_result is None else self.launch_result['instance_id']
@property
[docs] def key_pair_name(self):
"""
Returns the name of the key pair used by this instance. If instance was
not launched properly, returns ``None``.
"""
return None if self.launch_result is None else self.launch_result['kp_name']
@property
[docs] def key_pair_material(self):
"""
Returns the private portion of the generated key pair. It does so only
if the instance was properly launched and key pair generated; ``None``
otherwise.
"""
return None if self.launch_result is None else self.launch_result['kp_material']
[docs] def get_machine_status(self):
"""
Check on the underlying VM status of an instance. This can be used to
determine whether the VM has finished booting up and if CloudMan
is up and running.
Return a ``state`` dict with the current ``instance_state``, ``public_ip``,
``placement``, and ``error`` keys, which capture the current state (the
values for those keys default to empty string if no data is available from
the cloud).
"""
if self.launcher:
return self.launcher.get_status(self.instance_id)
# elif self.host_name:
else:
state = {'instance_state': "",
'public_ip': "",
'placement': "",
'error': "No reference to the instance object"}
return state
def _init_instance(self, host_name):
self._update_host_name(host_name)
[docs] def wait_till_instance_ready(self, vm_ready_timeout=300, vm_ready_check_interval=10):
"""
Wait until the VM state changes to ready/error or timeout elapses.
Updates the host name once ready.
"""
assert vm_ready_timeout > 0
assert vm_ready_timeout > vm_ready_check_interval
assert vm_ready_check_interval > 0
if self.host_name: # Host name available. Therefore, instance is ready
return
for time_left in xrange(vm_ready_timeout, 0, -vm_ready_check_interval):
status = self.get_machine_status()
if (status['public_ip'] != '' and status['error'] == ''):
self._init_instance(status['public_ip'])
return
elif status['error'] != '':
msg = "Error launching an instance: {0}".format(status['error'])
bioblend.log.error(msg)
raise VMLaunchException(msg)
else:
bioblend.log.warn("Instance not ready yet (it's in state '{0}'); waiting another {1} seconds..."\
.format(status['instance_state'], time_left))
time.sleep(vm_ready_check_interval)
raise VMLaunchException("Waited too long for instance to become ready. Instance Id: %s"
% self.instance_id)
[docs]class CloudManInstance(GenericVMInstance):
def __init__(self, url, password, **kwargs):
"""
Create an instance of the CloudMan API class, which is to be used when
manipulating that given CloudMan instance.
The ``url`` is a string defining the address of CloudMan, for
example "http://115.146.92.174". The ``password`` is CloudMan's password,
as defined in the user data sent to CloudMan on instance creation.
"""
self.initialized = False
if kwargs.get('launch_result', None) is not None: # Used internally by the launch_instance method
super(CloudManInstance, self).__init__(kwargs['launcher'], kwargs['launch_result'])
else:
super(CloudManInstance, self).__init__(None, None)
self.config = kwargs.pop('cloudman_config', None)
if not self.config:
self.password = password
else:
self.password = self.config.password
self._set_url(url)
def __repr__(self):
if self.cloudman_url:
return "CloudMan instance at {0}".format(self.cloudman_url)
else:
return "Waiting for this CloudMan instance to start..."
def _update_host_name(self, host_name):
"""
Overrides the super-class method and makes sure that the ``cloudman_url``
is kept in sync with the host name.
"""
self._set_url(host_name)
def _init_instance(self, hostname):
super(CloudManInstance, self)._init_instance(hostname)
if self.config.cluster_type:
self.initialize(self.config.cluster_type, self.config.initial_storage_size)
def _set_url(self, url):
"""
Keeps the CloudMan URL as well and the hostname in sync.
"""
if url:
parse_result = urlparse(url)
# Make sure the URL scheme is defined (otherwise requests will not work)
if not parse_result.scheme:
url = "http://" + url
# Parse the corrected URL again to extract the hostname
parse_result = urlparse(url)
super(CloudManInstance, self)._update_host_name(parse_result.hostname)
self.url = url
@property
[docs] def galaxy_url(self):
"""
Returns the base URL for this instance, which by default happens to be
the URL for Galaxy application.
"""
return self.url
@property
[docs] def cloudman_url(self):
"""
Returns the URL for accessing this instance of CloudMan.
"""
if self.url:
return '/'.join([self.url, 'cloud'])
return None
@staticmethod
[docs] def launch_instance(cfg, **kwargs):
"""
Launches a new instance of CloudMan on the specified cloud infrastructure.
:type cfg: CloudManConfig
:param cfg: A CloudManConfig object containing the initial parameters
for this launch.
"""
validation_result = cfg.validate()
if validation_result is not None:
raise VMLaunchException("Invalid CloudMan configuration provided: {0}"
.format(validation_result))
launcher = CloudManLauncher(cfg.access_key, cfg.secret_key, cfg.cloud_metadata)
result = launcher.launch(cfg.cluster_name, cfg.image_id, cfg.instance_type,
cfg.password, cfg.kernel_id, cfg.ramdisk_id, cfg.key_name,
cfg.security_groups, cfg.placement)
if (result['error'] is not None):
raise VMLaunchException("Error launching cloudman instance: " % result['error'])
instance = CloudManInstance(None, None, launcher=launcher, launch_result=result,
cloudman_config=cfg)
if cfg.block_till_ready and cfg.cluster_type:
instance.initialize(cfg.cluster_type, cfg.initial_storage_size)
return instance
[docs] def update(self):
"""
Update the local object's fields to be in sync with the actual state
of the CloudMan instance the object points to. This method should be
called periodically to ensure you are looking at the current data.
.. versionadded:: 0.2.2
"""
ms = self.get_machine_status()
# Check if the machine is running and update IP and state
self.vm_status = ms.get('instance_state', None)
self.vm_error = ms.get('error', None)
public_ip = ms.get('public_ip', None)
# Update url if we don't have it or is different than what we have
if not self.url and (public_ip and self.url != public_ip):
self._set_url(public_ip)
# See if the cluster has been initialized
if self.vm_status == 'running' or self.url:
ct = self.get_cluster_type()
if ct.get('cluster_type', None):
self.initialized = True
if self.vm_error:
bioblend.log.error(self.vm_error)
@block_till_vm_ready
[docs] def get_cloudman_version(self):
"""
Returns the cloudman version from the server. Versions prior to Cloudman 2 does not
support this call, and therefore, the default is to return 1
"""
try:
r = self._make_get_request("cloudman_version")
return r['version']
except:
return 1
@block_till_vm_ready
[docs] def initialize(self, cluster_type, initial_storage_size=None, shared_bucket=None):
"""
Initialize CloudMan platform. This needs to be done before the cluster
can be used.
The ``cluster_type``, either 'Galaxy', 'Data', or 'SGE', defines the type
of cluster platform to initialize.
"""
if not self.initialized:
if self.get_cloudman_version() < 2:
r = self._make_get_request("initialize_cluster", parameters={'startup_opt': cluster_type,
'g_pss': initial_storage_size,
'shared_bucket': shared_bucket})
else:
r = self._make_get_request("initialize_cluster", parameters={'startup_opt': cluster_type,
'pss': initial_storage_size,
'shared_bucket': shared_bucket})
self.initialized = True
return r
@block_till_vm_ready
[docs] def get_cluster_type(self):
"""
Get the ``cluster type`` for this CloudMan instance. See the
CloudMan docs about the available types. Returns a dictionary,
for example: ``{u'cluster_type': u'SGE'}``.
"""
cluster_type = self._make_get_request("cluster_type")
if cluster_type['cluster_type']:
self.initialized = True
return cluster_type
@block_till_vm_ready
[docs] def get_status(self):
"""
Get status information on this CloudMan instance.
"""
return self._make_get_request("instance_state_json")
@block_till_vm_ready
[docs] def get_nodes(self):
"""
Get a list of nodes currently running in this CloudMan cluster.
"""
instance_feed_json = self._make_get_request("instance_feed_json")
return instance_feed_json['instances']
@block_till_vm_ready
[docs] def get_cluster_size(self):
"""
Get the size of the cluster in terms of the number of nodes; this count
includes the master node.
"""
return len(self.get_nodes())
@block_till_vm_ready
[docs] def get_static_state(self):
"""
Get static information on this CloudMan instance.
i.e. state that doesn't change over the lifetime of the cluster
"""
return self._make_get_request("static_instance_state_json")
@block_till_vm_ready
[docs] def get_master_ip(self):
"""
Returns the public IP of the master node in this CloudMan cluster
"""
status_json = self.get_static_state()
return status_json['master_ip']
@block_till_vm_ready
[docs] def get_master_id(self):
"""
Returns the instance ID of the master node in this CloudMan cluster
"""
status_json = self.get_static_state()
return status_json['master_id']
@block_till_vm_ready
[docs] def add_nodes(self, num_nodes, instance_type='', spot_price=''):
"""
Add a number of worker nodes to the cluster, optionally specifying
the type for new instances. If ``instance_type`` is not specified,
instance(s) of the same type as the master instance will be started.
Note that the ``instance_type`` must match the type of instance
available on the given cloud.
``spot_price`` applies only to AWS and, if set, defines the maximum
price for Spot instances, thus turning this request for more instances
into a Spot request.
"""
payload = {'number_nodes': num_nodes,
'instance_type': instance_type,
'spot_price': spot_price}
return self._make_get_request("add_instances", parameters=payload)
@block_till_vm_ready
[docs] def remove_nodes(self, num_nodes, force=False):
"""
Remove worker nodes from the cluster.
The ``num_nodes`` parameter defines the number of worker nodes to remove.
The ``force`` parameter (defaulting to False), is a boolean indicating
whether the nodes should be forcibly removed rather than gracefully removed.
"""
payload = {'number_nodes': num_nodes, 'force_termination': force}
result = self._make_get_request("remove_instances", parameters=payload)
return result
@block_till_vm_ready
[docs] def remove_node(self, instance_id, force=False):
"""
Remove a specific worker node from the cluster.
The ``instance_id`` parameter defines the ID, as a string, of a worker node
to remove from the cluster. The ``force`` parameter (defaulting to False),
is a boolean indicating whether the node should be forcibly removed rather
than gracefully removed.
"""
payload = {'instance_id': instance_id}
return self._make_get_request("remove_instance", parameters=payload)
@block_till_vm_ready
[docs] def reboot_node(self, instance_id):
"""
Reboot a specific worker node.
The ``instance_id`` parameter defines the ID, as a string, of a worker node
to reboot.
"""
payload = {'instance_id': instance_id}
return self._make_get_request("reboot_instance", parameters=payload)
@block_till_vm_ready
[docs] def autoscaling_enabled(self):
"""
Returns a boolean indicating whether autoscaling is enabled.
"""
return bool(self.get_status()['autoscaling']['use_autoscaling'])
@block_till_vm_ready
[docs] def enable_autoscaling(self, minimum_nodes=0, maximum_nodes=19):
"""
Enable cluster autoscaling, allowing the cluster to automatically add,
or remove, worker nodes, as needed.
The number of worker nodes in the cluster is bounded by the ``minimum_nodes``
(default is 0) and ``maximum_nodes`` (default is 19) parameters.
"""
if not(self.autoscaling_enabled()):
payload = {'as_min': minimum_nodes, 'as_max': maximum_nodes}
self._make_get_request("toggle_autoscaling", parameters=payload)
@block_till_vm_ready
[docs] def disable_autoscaling(self):
"""
Disable autoscaling, meaning that worker nodes will need to be manually
added and removed.
"""
if (self.autoscaling_enabled()):
self._make_get_request("toggle_autoscaling")
@block_till_vm_ready
[docs] def adjust_autoscaling(self, minimum_nodes=None, maximum_nodes=None):
"""
Adjust the autoscaling configuration parameters.
The number of worker nodes in the cluster is bounded by the optional
``minimum_nodes`` and ``maximum_nodes`` parameters. If a parameter is
not provided then its configuration value does not change.
"""
if (self.autoscaling_enabled()):
payload = {'as_min_adj': minimum_nodes, 'as_max_adj': maximum_nodes}
self._make_get_request("adjust_autoscaling", parameters=payload)
@block_till_vm_ready
[docs] def get_galaxy_state(self):
"""
Get the current status of Galaxy running on the cluster.
"""
payload = {'srvc': 'Galaxy'}
status = self._make_get_request("get_srvc_status", parameters=payload)
return {'status': status['status']}
@block_till_vm_ready
[docs] def terminate(self, terminate_master_instance=True, delete_cluster=False):
"""
Terminate this CloudMan cluster. There is an option to also terminate the
master instance (all worker instances will be terminated in the process
of cluster termination), and delete the whole cluster.
.. warning::
Deleting a cluster is irreversible - all of the data will be
permanently deleted.
"""
payload = {'terminate_master_instance': terminate_master_instance,
'delete_cluster': delete_cluster}
result = self._make_get_request("kill_all", parameters=payload,
timeout=15)
return result
def _make_get_request(self, url, parameters={}, timeout=None):
"""
Private function that makes a GET request to the nominated ``url``,
with the provided GET ``parameters``. Optionally, set the ``timeout``
to stop waiting for a response after a given number of seconds. This is
particularly useful when terminating a cluster as it may terminate
before sending a response.
"""
req_url = '/'.join([self.cloudman_url, 'root', url])
# print "GET request url: %s params: %s timeout: %s" % (req_url, parameters, timeout)
r = requests.get(req_url, params=parameters, auth=("", self.password), timeout=timeout)
try:
json = r.json()
return json
except:
return r.text