一、cinder-api服务入口

D:\code-program\cinder-codejuno\api\contrib\admin_actions.py

from cinder import volume
class VolumeAdminController(AdminController):
"""AdminController for Volumes."""
@wsgi.action('os-migrate_volume')
def _migrate_volume(self, req, id, body):
"""Migrate a volume to the specified host."""
context = req.environ['cinder.context']
self.authorize(context, 'migrate_volume')
try:
volume = self._get(context, id)--------根据volume id获取卷对象
except exception.NotFound:
raise exc.HTTPNotFound()
params = body['os-migrate_volume']-----获取request请求体中参数
try:
host = params['host']-------卷要迁移到的主机
except KeyError:
raise exc.HTTPBadRequest(explanation=_("Must specify 'host'"))
force_host_copy = params.get('force_host_copy', False)------从请求体中,获取force_host_copy的值,默认情况下,force_host_copy为False,
if isinstance(force_host_copy, basestring):
try:
force_host_copy = strutils.bool_from_string(force_host_copy,-----字符串类型转化为bool类型
strict=True)
except ValueError:
raise exc.HTTPBadRequest(
explanation=_("Bad value for 'force_host_copy'"))
elif not isinstance(force_host_copy, bool):
raise exc.HTTPBadRequest(
explanation=_("'force_host_copy' not string or bool"))
self.volume_api.migrate_volume(context, volume, host, force_host_copy)----调用volume.API类里面的方法 步骤一
return webob.Response(status_int=202)

对步骤一进行详解
D:\code-program\cinder-codejuno\volume\api.py

class API(base.Base):
"""API for interacting with the volume manager.""" @wrap_check_policy
def migrate_volume(self, context, volume, host, force_host_copy):
"""Migrate the volume to the specified host.""" # We only handle "available" volumes for now
if volume['status'] not in ['available', 'in-use']:------这卷的状态进行判断,只有available', 'in-use这两种状态有效
msg = _('Volume status must be available/in-use.')
LOG.error(msg)
raise exception.InvalidVolume(reason=msg) # Make sure volume is not part of a migration
if volume['migration_status'] is not None:-----确保卷不是出于迁移的状态
msg = _("Volume is already part of an active migration")
raise exception.InvalidVolume(reason=msg) # We only handle volumes without snapshots for now-----------判断卷是否有快照,如果有,那么就抛出异常
snaps = self.db.snapshot_get_all_for_volume(context, volume['id'])
if snaps:
msg = _("volume must not have snapshots")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg) # We only handle non-replicated volumes for now
rep_status = volume['replication_status']
if rep_status is not None and rep_status != 'disabled':
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg) cg_id = volume.get('consistencygroup_id', None)
if cg_id:
msg = _("Volume must not be part of a consistency group.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg) #Make juno volume migration api support icehouse.
#Default lvm backend pool is LVM_iSCSI,
# gluster backend poll is GlusterFS.
if "#" not in host:-----------为了兼容ice版本,对host进行改造
backend_driver = host.split("@")[1]
if backend_driver == "lvmdriver":
host = host + "#LVM_iSCSI"
elif backend_driver == "GLUSTERFS_DRIVER1":
host = host + "#GlusterFS"
else:
msg = _("Volume host is bad format.")
raise exception.InvalidVolume(reason=msg) # Make sure the host is in the list of available hosts
elevated = context.elevated()
topic = CONF.volume_topic
services = self.db.service_get_all_by_topic(elevated,
topic,
disabled=False)
found = False
for service in services:
svc_host = volume_utils.extract_host(host, 'backend')-------对卷要迁移到的主机的service状态,进行判断,是否有效,首先该主机的cinder-volume服务状态是up的
if utils.service_is_up(service) and service['host'] == svc_host:
found = True
if not found:
msg = (_('No available service named %s') % host)
LOG.error(msg)
raise exception.InvalidHost(reason=msg) # Make sure the destination host is different than the current one
if host == volume['host']:-------------卷要迁移到的主机必须与卷目前所在的主机不一样
msg = _('Destination host must be different than current host')
LOG.error(msg)
raise exception.InvalidHost(reason=msg) self.update(context, volume, {'migration_status': 'starting'})-----此时更新卷的状态的migration_status:starting # Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume_type = {}
volume_type_id = volume['volume_type_id']
if volume_type_id:
volume_type = volume_types.get_volume_type(context, volume_type_id)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume['id']}
self.scheduler_rpcapi.migrate_volume_to_host(context,------给exchange为cinder-volume的发送rpc请求--对步骤1.1详解
CONF.volume_topic,
volume['id'],
host,
force_host_copy,
request_spec)

对步骤1.1详解

D:\code-program\cinder-codejuno\scheduler\rpcapi.py,给rabbitmq中,exchange为cinder-volume的发送rpc.cast请求

class SchedulerAPI(object):
'''Client side of the scheduler rpc API.
def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
force_host_copy=False, request_spec=None,
filter_properties=None): cctxt = self.client.prepare(version='1.3')
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'migrate_volume_to_host',
topic=topic,
volume_id=volume_id,
host=host,
force_host_copy=force_host_copy,
request_spec=request_spec_p,
filter_properties=filter_properties)

 二、cinder-scheduler 接受rpc请求,对迁移的主机进行判断 

D:\code-program\cinder-codejuno\scheduler\manager.py
from cinder.volume import rpcapi as volume_rpcapi
# Default scheduler driver to use (string value)
#scheduler_driver=cinder.scheduler.filter_scheduler.FilterScheduler
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes.""" def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None):
"""Ensure that the host exists and can accept the volume.""" def _migrate_volume_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'migration_status': None}}
self._set_volume_state_and_notify('migrate_volume_to_host',
volume_state,
context, ex, request_spec) try:
tgt_host = self.driver.host_passes_filters(context, host,-----------driver的取值是配置文件中scheduler_driver的取值--对步骤二进行详解
request_spec,
filter_properties)
except exception.NoValidHost as ex:
_migrate_volume_set_error(self, context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
_migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_ref = db.volume_get(context, volume_id)-----从数据库中,获取卷的信息
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,-----对步骤三进行详解
tgt_host,----目标主机
force_host_copy) 对步骤二进行详解
D:\code-program\cinder-codejuno\scheduler\filter_scheduler.py
class FilterScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def host_passes_filters(self, context, host, request_spec,
filter_properties):
"""Check if the specified host passes the filters."""
weighed_hosts = self._get_weighted_candidates(context, request_spec,------根据请求卷的特性,过滤出可用的存储节点
filter_properties)
for weighed_host in weighed_hosts:------对过滤出来的可用存储节点中,寻找是否有request请求参数中,携带的host的主机信息,如果有,则返回该主机的状态,否则抛出异常
host_state = weighed_host.obj
if host_state.host == host:
return host_state msg = (_('Cannot place volume %(id)s on %(host)s')
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg) 对步骤三进行详解
给cinder-volume发送rpc.cast请求,进行卷的迁移
D:\code-program\cinder-codejuno\volume\rpcapi.py
class VolumeAPI(object):
'''Client side of the volume rpc API.
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
new_host = utils.extract_host(volume['host'])----获取卷所在的host主机
cctxt = self.client.prepare(server=new_host, version='1.8')----更改rcp.client的环境信息,向卷所在的特定主机host发送rpc 请求
host_p = {'host': dest_host.host,-----卷迁移的目标主机
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
host=host_p, force_host_copy=force_host_copy)

 三、卷所在的主机的cinder-volume服务接受rpc请求 

# Driver to use for volume creation (string value)
#volume_driver=cinder.volume.drivers.lvm.LVMISCSIDriver
D:\code-program\cinder-codejuno\volume\manager.py
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.19'
target = messaging.Target(version=RPC_API_VERSION) def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
new_type_id=None):
"""Migrate the volume to the specified host (called on source host)."""
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
# and the migration status updated.
utils.require_driver_initialized(self.driver)-----后端存储驱动的初始化
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'error'}) volume_ref = self.db.volume_get(ctxt, volume_id)----根据volumeid 获取卷的信息
model_update = None
moved = False status_update = None
if volume_ref['status'] == 'retyping':
status_update = {'status': self._get_original_status(volume_ref)} self.db.volume_update(ctxt, volume_ref['id'],
{'migration_status': 'migrating'})------更新卷的状态的migration_status:migrating
if not force_host_copy and new_type_id is None:------默认force_host_copy为假,走这个分支,由后端存储驱动的迁移卷函数完成卷迁移
try:
LOG.debug("volume %s: calling driver migrate_volume",
volume_ref['id'])
moved, model_update = self.driver.migrate_volume(ctxt,-----------步骤四详解
volume_ref,
host)
if moved:
updates = {'host': host['host'],
'migration_status': None}
if status_update:
updates.update(status_update)
if model_update:
updates.update(model_update)
volume_ref = self.db.volume_update(ctxt,
volume_ref['id'],
updates)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
if not moved:----如果force_host_copy为真,走这个分支
try:
self._migrate_volume_generic(ctxt, volume_ref, host,----步骤五详解
new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': None}
if status_update:
updates.update(status_update)
model_update = self.driver.create_export(ctxt, volume_ref)
if model_update:
updates.update(model_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)

 四、 force_host_copy取值不同,所走分支不同的详解

默认情况下,force_host_copy为假,由后端存储驱动来完成卷的迁移工作,如果force_host_copy为真,那么有cinder-volume服务所在的主机完成,卷的迁移工作

force_host_copy为假的情况

对步骤四进行详解
ceph不支持卷迁移,以lvm卷为前提进行分析
D:\code-program\cinder-codejuno\volume\drivers\lvm.py
class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):
def migrate_volume(self, ctxt, volume, host, thin=False, mirror_count=0):
"""Optimize the migration if the destination is on the same server. If the specified host is another back-end on the same server, and
the volume is not attached, we can do the migration locally without
going through iSCSI.
""" false_ret = (False, None)
if volume['status'] != 'available':
return false_ret
if 'location_info' not in host['capabilities']:
return false_ret
info = host['capabilities']['location_info']
try:
(dest_type, dest_hostname, dest_vg, lvm_type, lvm_mirrors) =\
info.split(':')
lvm_mirrors = int(lvm_mirrors)
except ValueError:
return false_ret
if (dest_type != 'LVMVolumeDriver' or dest_hostname != self.hostname):
return false_ret if dest_vg != self.vg.vg_name:
vg_list = volutils.get_all_volume_groups()
try:
(vg for vg in vg_list if vg['name'] == dest_vg).next()
except StopIteration:
message = (_("Destination Volume Group %s does not exist") %
dest_vg)
LOG.error(message)
return false_ret helper = utils.get_root_helper()
dest_vg_ref = lvm.LVM(dest_vg, helper,
lvm_type=lvm_type,
executor=self._execute)
self.remove_export(ctxt, volume)
self._create_volume(volume['name'],
self._sizestr(volume['size']),
lvm_type,
lvm_mirrors,
dest_vg_ref) volutils.copy_volume(self.local_path(volume),
self.local_path(volume, vg=dest_vg),
volume['size'],
self.configuration.volume_dd_blocksize,
execute=self._execute)
self._delete_volume(volume)
model_update = self._create_export(ctxt, volume, vg=dest_vg) return (True, model_update)
如果迁移卷的dest_vg,与该节点配置的vg相同,那么就在该vg上对源卷进行一个拷贝,删除源卷的动作
如果迁移卷的dest_vg,与该节点配置的vg不相同,那么判断dest_vg有效的情况下,移除源卷的export,在dest_vg上创建一个新卷,拷贝数据到目标卷,删除源卷

force_host_copy为真的情况,在这种情况下,是把目的卷,挂载到源卷所在的存储节点上,然后执行linux dd的命令,进行数据的拷贝,因此,拷贝数据的时间长短,完全取决于卷的大小,及存储节点物理主机的cpu及io性能,所以这种情况下,会导致一种情况是,因为数据量太大,数据dd拷贝的时间太长,导致迁移失败的问题

对步骤五进行详解
# Timeout for creating the volume to migrate to when
# performing volume migration (seconds) (integer value)
#migration_create_volume_timeout_secs=300
D:\code-program\cinder-codejuno\volume\manager.py
class VolumeManager(manager.SchedulerDependentManager):
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
rpcapi = volume_rpcapi.VolumeAPI() # Create new volume on remote host
new_vol_values = {}
for k, v in volume.iteritems():
new_vol_values[k] = v
del new_vol_values['id']
del new_vol_values['_name_id']
# We don't copy volume_type because the db sets that according to
# volume_type_id, which we do copy
del new_vol_values['volume_type']
if new_type_id:
new_vol_values['volume_type_id'] = new_type_id
new_vol_values['host'] = host['host']
new_vol_values['status'] = 'creating'
new_vol_values['migration_status'] = 'target:%s' % volume['id']
new_vol_values['attach_status'] = 'detached'
new_volume = self.db.volume_create(ctxt, new_vol_values)
rpcapi.create_volume(ctxt, new_volume, host['host'],
None, None, allow_reschedule=False)
#以上部分,在数据库中新增一条卷的记录,同时在目的存储节点上,创建一个指定存储类型的卷
# Wait for new_volume to become ready
在指定的时间段内,检查目标存储节点上的卷,状态是否正常
starttime = time.time()
deadline = starttime + CONF.migration_create_volume_timeout_secs
new_volume = self.db.volume_get(ctxt, new_volume['id'])
tries = 0
while new_volume['status'] != 'available':
tries = tries + 1
now = time.time()
if new_volume['status'] == 'error':
msg = _("failed to create new_volume on destination host")
raise exception.VolumeMigrationFailed(reason=msg)
elif now > deadline:
msg = _("timeout creating new_volume on destination host")
raise exception.VolumeMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
new_volume = self.db.volume_get(ctxt, new_volume['id']) # Copy the source volume to the destination volume
try:
if (volume['instance_uuid'] is None and
volume['attached_host'] is None):
self.driver.copy_volume_data(ctxt, volume, new_volume,---这里的driver为 volume_driver 的值,对步骤六详解
remote='dest')
# The above call is synchronous so we complete the migration
self.migrate_volume_completion(ctxt, volume['id'],-----完成迁移的后续工作,删除源卷,更新数据库状态
new_volume['id'], error=False)
else:
nova_api = compute.API()
# This is an async call to Nova, which will call the completion
# when it's done
nova_api.update_server_volume(ctxt, volume['instance_uuid'],
volume['id'], new_volume['id'])
except Exception:
with excutils.save_and_reraise_exception():
msg = _("Failed to copy volume %(vol1)s to %(vol2)s")
LOG.error(msg % {'vol1': volume['id'],
'vol2': new_volume['id']})
volume = self.db.volume_get(ctxt, volume['id'])
# If we're in the completing phase don't delete the target
# because we may have already deleted the source!
if volume['migration_status'] == 'migrating':
rpcapi.delete_volume(ctxt, new_volume)
new_volume['migration_status'] = None
对步骤六进行详解
D:\code-program\cinder-codejuno\volume\driver.py
self.driver的取值为volume_driver的值,该方法调用的是下面父类里面的方法
核心功能是把目的节点上的卷,挂载到源卷的节点上,进行Linux dd方式的数据拷贝,在数据考完完成以后,卸载目的卷
class VolumeDriver(object): def copy_volume_data(self, context, src_vol, dest_vol, remote=None):
"""Copy data from src_vol to dest_vol."""
LOG.debug(('copy_data_between_volumes %(src)s -> %(dest)s.')
% {'src': src_vol['name'], 'dest': dest_vol['name']}) properties = utils.brick_get_connector_properties()
dest_remote = True if remote in ['dest', 'both'] else False
dest_orig_status = dest_vol['status']
try:
dest_attach_info = self._attach_volume(context,
dest_vol,
properties,
remote=dest_remote)
except Exception:
with excutils.save_and_reraise_exception():
msg = _("Failed to attach volume %(vol)s")
LOG.error(msg % {'vol': dest_vol['id']})
self.db.volume_update(context, dest_vol['id'],
{'status': dest_orig_status}) src_remote = True if remote in ['src', 'both'] else False
src_orig_status = src_vol['status']
try:
src_attach_info = self._attach_volume(context,
src_vol,
properties,
remote=src_remote)
except Exception:
with excutils.save_and_reraise_exception():
msg = _("Failed to attach volume %(vol)s")
LOG.error(msg % {'vol': src_vol['id']})
self.db.volume_update(context, src_vol['id'],
{'status': src_orig_status})
self._detach_volume(context, dest_attach_info, dest_vol,
properties, force=True, remote=dest_remote) copy_error = True
mode = self.HOST_BASED
key_values = {}
try:
size_in_mb = int(src_vol['size']) * 1024 # vol size is in GB
src_device_path = src_attach_info['device']['path']
dest_device_path = dest_attach_info['device']['path']
if (not isinstance(src_device_path, six.string_types) or
not isinstance(dest_device_path, six.string_types)):
mode = self.FILE_BASED
key_values = {src_vol['id']: mode,
src_vol['id'] + 'previous_progress': '0'}
if mode == self.HOST_BASED:
key_values[src_vol['id'] + 'source'] = src_device_path
key_values[src_vol['id'] + 'dest'] = dest_device_path
key_values[src_vol['id'] + 'pid'] = None
else:
key_values[src_vol['id'] + 'dest_handle'] = None
self._add_migration_info_key(key_values) volume_utils.copy_volume(
src_device_path,
dest_device_path,
size_in_mb,
self.configuration.volume_dd_blocksize)
copy_error = False
except Exception:
with excutils.save_and_reraise_exception():
msg = _("Failed to copy volume %(src)s to %(dest)s.")
LOG.error(msg % {'src': src_vol['id'], 'dest': dest_vol['id']})
finally:
self._detach_volume(context, dest_attach_info, dest_vol,
properties, force=copy_error,
remote=dest_remote)
self._detach_volume(context, src_attach_info, src_vol,
properties, force=copy_error,
remote=src_remote)

  

 

最新文章

  1. java发送http的get、post请求
  2. java调用shell获取返回值
  3. 最快速的Android开发环境搭建ADT-Bundle及Hello World
  4. BSD Apache GPL LGPL MIT
  5. mybatis 打印sql 语句
  6. [python] No module named _sysconfigdata_nd
  7. 计算机学院大学生程序设计竞赛(2015’12)Polygon
  8. Javascript设计模式之创建对象的灵活性
  9. 亚马逊 在线测试题目 amazon
  10. POJ1573Robot Motion
  11. VS2015创建的Asp.net WebApi默认项目在CentOS7+Mono4.2.2+jexus5.8运行不起来的解决方案
  12. 专注手机端前端界面开发的ui组件和js组合
  13. python之socket编程------粘包
  14. Stanford依存句法关系解释
  15. Alpha冲刺(5/10)——2019.4.27
  16. 【BZOJ2244】[SDOI2011]拦截导弹(CDQ分治)
  17. linux 3
  18. Centos6下编译LEDE/OpenWrt
  19. Jedis连接池
  20. Power Strings----poj2406(kmp扩展 循环节)

热门文章

  1. git chechout
  2. json互相转换
  3. Java后端总结
  4. 第三章 Java面向对象(下)
  5. 7.9 NOI模拟赛 A.图 构造 dfs树 二分图
  6. 5.22 noip模拟赛
  7. JS时间和时间戳的转换
  8. 15 张精美动图全面讲解 CORS
  9. json&pickle&shelve
  10. C++ Json工具--Jsoncpp用法简介