diff --git a/api/cpp/include/yr/api/invoke_options.h b/api/cpp/include/yr/api/invoke_options.h index b2399b56429501b836536bf47934a36f40905736..b21a2329b238a2e5f4693a09bb1f8dd3ca635f08 100644 --- a/api/cpp/include/yr/api/invoke_options.h +++ b/api/cpp/include/yr/api/invoke_options.h @@ -45,6 +45,20 @@ struct GroupOptions { - `false`: Instances can have independent lifecycles. */ bool sameLifecycle = true; + /*! + @brief The strategy to create the group, defalut strategy is ``None`` + + - ``'None'``: No strategy. + + - ``'PACK'``: Pack multiple instances into the same node as much as possible. + + - ``'SPREAD`'': Distribute multiple instances across different nodes as much as possible. + + - ''`STRICT_PACK`'': All instances must be placed on the same node, otherwise creation fails. + + - ''`STRICT_SPREAD`'': All instances must be placed on different nodes, otherwise creation fails. + */ + std::string strategy = "None"; }; /** diff --git a/api/cpp/src/cluster_mode_runtime.cpp b/api/cpp/src/cluster_mode_runtime.cpp index ec665489afaabfa95f30dab9bbbb18e2133ecba7..260864caeabb9fec91724906fa8e9f0e8bf60eca 100644 --- a/api/cpp/src/cluster_mode_runtime.cpp +++ b/api/cpp/src/cluster_mode_runtime.cpp @@ -872,6 +872,7 @@ void ClusterModeRuntime::GroupCreate(const std::string &name, GroupOptions &opts libOpts.timeout = opts.timeout; libOpts.groupName = name; libOpts.sameLifecycle = opts.sameLifecycle; + libOpts.strategy = opts.strategy; auto errInfo = YR::Libruntime::LibruntimeManager::Instance().GetLibRuntime()->GroupCreate(name, libOpts); if (!errInfo.OK()) { throw YR::Exception(static_cast(errInfo.Code()), static_cast(errInfo.MCode()), errInfo.Msg()); diff --git a/api/python/yr/__init__.py b/api/python/yr/__init__.py index 9092fbb880f60a8e49d3d3feb7b82157afd2122e..2725492a979e1231b10b0aa97bc8fbc7f575f186 100644 --- a/api/python/yr/__init__.py +++ b/api/python/yr/__init__.py @@ -32,7 +32,7 @@ __all__ = [ "FunctionGroupOptions", "SchedulingAffinityType", "FunctionGroupContext", "ServerInfo", "DeviceInfo", "get_function_group_context", "create_resource_group", "remove_resource_group", "ResourceGroup", "FunctionProxy", "InstanceCreator", "InstanceProxy", "MethodProxy", "FunctionGroupHandler", - "FunctionGroupMethodProxy", "get_node_ip_address", "list_named_instances" + "FunctionGroupMethodProxy", "get_node_ip_address", "list_named_instances", "Group", "GroupOptions", ] import os @@ -86,9 +86,11 @@ from yr.runtime import ( # noqa: E402 ) from yr.config import ( # noqa: E402 Config, InvokeOptions, UserTLSConfig, FunctionGroupOptions, SchedulingAffinityType, - FunctionGroupContext, ServerInfo, DeviceInfo, ResourceGroupOptions + FunctionGroupContext, ServerInfo, DeviceInfo, ResourceGroupOptions, GroupOptions, ) +from yr.group import Group + from yr.affinity import Affinity, AffinityType, AffinityKind, LabelOperator, OperatorType # noqa: E402 from yr.metrics import Gauge, Alarm, UInt64Counter, DoubleCounter # noqa: E402 from yr.decorator.function_proxy import FunctionProxy # noqa: E402 diff --git a/api/python/yr/cluster_mode_runtime.py b/api/python/yr/cluster_mode_runtime.py index 55462554f62d21e4d36b5e52f91ddf5a3df06916..2f8f18417de90a94b847df7e4f548b612f7cb4f0 100644 --- a/api/python/yr/cluster_mode_runtime.py +++ b/api/python/yr/cluster_mode_runtime.py @@ -22,7 +22,7 @@ from typing import Any, Dict, List, Tuple, Union, Callable from yr.exception import YRInvokeError from yr.err_type import ErrorCode, ErrorInfo, ModuleCode from yr.common.types import InvokeArg, GroupInfo -from yr.config import InvokeOptions +from yr.config import InvokeOptions, GroupOptions from yr.config_manager import ConfigManager from yr.fnruntime import Fnruntime, SharedBuffer from yr.libruntime_pb2 import FunctionMeta @@ -55,8 +55,10 @@ class ClusterModeRuntime(Runtime): function_system_rt_server_ip_addr = "" function_system_rt_server_port = 0 if ConfigManager().server_address != "": - function_system_ip_addr = ConfigManager().server_address.split(":")[0] - function_system_port = int(ConfigManager().server_address.split(":")[1]) + function_system_ip_addr = ConfigManager().server_address.split(":")[ + 0] + function_system_port = int( + ConfigManager().server_address.split(":")[1]) _logger.debug( "Initialize libruntime with functionsystem address: %s:%d", function_system_ip_addr, @@ -64,8 +66,10 @@ class ClusterModeRuntime(Runtime): ) if ConfigManager().rt_server_address != "": - function_system_rt_server_ip_addr = ConfigManager().rt_server_address.split(":")[0] - function_system_rt_server_port = int(ConfigManager().rt_server_address.split(":")[1]) + function_system_rt_server_ip_addr = ConfigManager( + ).rt_server_address.split(":")[0] + function_system_rt_server_port = int( + ConfigManager().rt_server_address.split(":")[1]) _logger.debug( "Initialize libruntime with functionsystem rt-server address: %s:%d", function_system_rt_server_ip_addr, @@ -104,7 +108,8 @@ class ClusterModeRuntime(Runtime): :return: data which get from ds """ if timeout < 0 and timeout != -1: - raise RuntimeError(f"Invalid parameter, timeout: {timeout}, expect -1 or > 0") + raise RuntimeError( + f"Invalid parameter, timeout: {timeout}, expect -1 or > 0") timeout_ms = -1 if timeout == -1 else timeout * 1000 results = self.libruntime.get(ids, timeout_ms, allow_partial) @@ -122,7 +127,8 @@ class ClusterModeRuntime(Runtime): :param timeout: timeout :return: ready objects, unready objects, exception to objects """ - ready_ids, unready_ids, exception_ids = self.libruntime.wait(objs, wait_num, timeout) + ready_ids, unready_ids, exception_ids = self.libruntime.wait( + objs, wait_num, timeout) if exception_ids: ready_ids.extend(exception_ids) if len(ready_ids) > wait_num: @@ -150,7 +156,8 @@ class ClusterModeRuntime(Runtime): else: callback(res) except Exception as e: - callback(ErrorInfo(ErrorCode.ERR_INNER_SYSTEM_ERROR, ModuleCode.RUNTIME, str(e))) + callback(ErrorInfo(ErrorCode.ERR_INNER_SYSTEM_ERROR, + ModuleCode.RUNTIME, str(e))) self.libruntime.get_async(object_id, callback_wrapper) @@ -602,11 +609,13 @@ class ClusterModeRuntime(Runtime): args_list_new = [] for arg in args_list: if isinstance(arg, ObjectRef): - invoke_arg = InvokeArg(buf=bytes(), is_ref=True, obj_id=arg.id, nested_objects=set()) + invoke_arg = InvokeArg( + buf=bytes(), is_ref=True, obj_id=arg.id, nested_objects=set()) else: serialized_arg = Serialization().serialize(arg) invoke_arg = InvokeArg(buf=None, is_ref=False, obj_id="", - nested_objects=set([ref.id for ref in serialized_arg.nested_refs]), + nested_objects=set( + [ref.id for ref in serialized_arg.nested_refs]), serialized_obj=serialized_arg) args_list_new.append(invoke_arg) return args_list_new @@ -617,3 +626,18 @@ class ClusterModeRuntime(Runtime): """ if not self.__enable_flag: raise RuntimeError("runtime not enable") + + def create_group(self, group_name: str, group_opts: GroupOptions): + self.libruntime.create_group(group_name, group_opts) + + def terminate_group(self, group_name: str): + self.libruntime.terminate_group(group_name) + + def wait_group(self, group_name: str): + self.libruntime.wait_group(group_name) + + def suspend_group(self, group_name: str): + self.libruntime.suspend_group(group_name) + + def resume_group(self, group_name: str): + self.libruntime.resume_group(group_name) diff --git a/api/python/yr/config.py b/api/python/yr/config.py index 1d657dbc294e62fbf821297cb44e0206cfdd9701..f86cb65f36723066077e05f643235767ab79ef77 100644 --- a/api/python/yr/config.py +++ b/api/python/yr/config.py @@ -296,6 +296,13 @@ class FunctionGroupContext: device_name: str = "" +@dataclass +class GroupOptions: + timeout: int = -1 + same_lifecycle: bool = True + strategy: str = "" + + @dataclass class InvokeOptions: """Use to set the invoke options. @@ -424,7 +431,8 @@ class InvokeOptions: #: selected for scheduling. preferred_anti_other_labels = False - resource_group_options: ResourceGroupOptions = field(default_factory=ResourceGroupOptions) + resource_group_options: ResourceGroupOptions = field( + default_factory=ResourceGroupOptions) """ Specify the ResourceGroup option, which includes resource_group_name and bundle_index. @@ -451,7 +459,8 @@ class InvokeOptions: """ #: Function group options. - function_group_options: FunctionGroupOptions = field(default_factory=FunctionGroupOptions) + function_group_options: FunctionGroupOptions = field( + default_factory=FunctionGroupOptions) #: Set environment variables when the instance starts. env_vars: Dict[str, str] = field(default_factory=dict) #: Number of retries for stateless functions. @@ -501,6 +510,8 @@ class InvokeOptions: * If you use conda, you need to specify the environment variable `YR_CONDA_HOME` to point to installation path. """ + group_name: str = "" + def check_options_valid(self): """ Check whether the options are valid. @@ -530,7 +541,6 @@ class InvokeOptions: f"invalid type for '{actual}', actual: {type(actual_value)}, expect: {expected_type}" ) - def check_options_range(self): """ Check whether the options are in the valid range. @@ -541,7 +551,7 @@ class InvokeOptions: "max_instances", "max_invoke_latency", "min_instances", - ] + ] for attr in attrs: value = getattr(self, attr) diff --git a/api/python/yr/fnruntime.pyx b/api/python/yr/fnruntime.pyx index 446d221f21789b72b8383b919a7e392582ac0461..9963720f4286f2582635810549faaa2baf35bd80 100644 --- a/api/python/yr/fnruntime.pyx +++ b/api/python/yr/fnruntime.pyx @@ -33,7 +33,7 @@ from cython.operator import dereference, postincrement from cpython.exc cimport PyErr_CheckSignals import yr -from yr.config import SchedulingAffinityType, FunctionGroupOptions, ResourceGroupOptions +from yr.config import SchedulingAffinityType, FunctionGroupOptions, ResourceGroupOptions, GroupOptions from yr.common.types import GroupInfo, CommonStatus, Resource, Resources, BundleInfo, Option, RgInfo, ResourceGroupUnit from yr.common import constants from yr.common.utils import generate_random_id, create_new_event_loop @@ -67,7 +67,7 @@ CErrorCode, CErrorInfo, CFunctionMeta, CInternalWaitResult, CInvokeArg, CInvokeOptions, CInvokeType, CModuleCode, CLanguageType, CLibruntimeConfig, -CLibruntimeManager,move,CLibruntime, +CLibruntimeManager,move,CLibruntime, CGroupOptions, CExistenceOpt, CSetParam, CMSetParam, CCreateParam, CStackTraceInfo, CWriteMode, CCacheType, CConsistencyType, CGetParam, CGetParams, CMultipleReadResult, CDevice, CMultipleDelResult, CUInt64CounterData, CDoubleCounterData, NativeBuffer, StringNativeBuffer, CInstanceOptions, CGaugeData, CTensor, CDataType, CResourceUnit, CAlarmInfo, CAlarmSeverity, CFunctionGroupOptions, CBundleAffinity, CFunctionGroupRunningInfo, CFiberEvent, @@ -354,6 +354,15 @@ cdef CGetParams get_params_from_py(params: GetParams): c_get_params.getParams.push_back(c_get_param) return c_get_params +cdef CGroupOptions group_options_from_py(group_name: str, group_opts: GroupOptions): + cdef: + CGroupOptions c_group_options + c_group_options.groupName = group_name.encode() + c_group_options.timeout = group_opts.timeout + c_group_options.sameLifecycle = group_opts.same_lifecycle + c_group_options.strategy = group_opts.strategy + return c_group_options + cdef CFunctionGroupOptions function_group_options_from_py(function_group_opts: FunctionGroupOptions, group_size: int): cdef: CFunctionGroupOptions c_function_group_options @@ -960,6 +969,7 @@ cdef parse_invoke_opts(CInvokeOptions & opts, opt: yr.InvokeOptions, group_info: opts.minInstances = opt.min_instances opts.maxInstances = opt.max_instances opts.resourceGroupOpts = resource_group_options_from_py(opt.resource_group_options) + opts.groupName = opt.group_name.encode() if group_info is not None: opts.functionGroupOpts = function_group_options_from_py(opt.function_group_options, group_info.group_size) opts.groupName = group_info.group_name.encode() @@ -2254,6 +2264,48 @@ cdef class Fnruntime: with nogil: ret = CLibruntimeManager.Instance().GetLibRuntime().get().AddReturnObject(c_obj_ids) return ret + + def create_group(self, group_name: str, group_opts: GroupOptions): + cdef: + string c_group_name = group_name.encode() + CGroupOptions c_group_options + c_group_options = group_options_from_py(group_name, group_opts) + with nogil: + ret = CLibruntimeManager.Instance().GetLibRuntime().get().GroupCreate(c_group_name, c_group_options) + if not ret.OK(): + raise RuntimeError( + f"failed to create group, code: {ret.Code()}, " + f"module code: {ret.MCode()}, msg: {ret.Msg().decode()}") + + def wait_group(self, group_name: str): + cdef: + string c_group_name = group_name.encode() + with nogil: + ret = CLibruntimeManager.Instance().GetLibRuntime().get().GroupWait(c_group_name) + if not ret.OK(): + raise RuntimeError( + f"failed to wait group, code: {ret.Code()}, " + f"module code: {ret.MCode()}, msg: {ret.Msg().decode()}") + + def suspend_group(self, group_name: str): + cdef: + string c_group_name = group_name.encode() + with nogil: + ret = CLibruntimeManager.Instance().GetLibRuntime().get().GroupSuspend(c_group_name) + if not ret.OK(): + raise RuntimeError( + f"failed to suspend group, code: {ret.Code()}, " + f"module code: {ret.MCode()}, msg: {ret.Msg().decode()}") + + def resume_group(self, group_name: str): + cdef: + string c_group_name = group_name.encode() + with nogil: + ret = CLibruntimeManager.Instance().GetLibRuntime().get().GroupResume(c_group_name) + if not ret.OK(): + raise RuntimeError( + f"failed to resume group, code: {ret.Code()}, " + f"module code: {ret.MCode()}, msg: {ret.Msg().decode()}") cdef cluster_access_info_cpp_to_py(const CClusterAccessInfo & c_cluster_info): return { diff --git a/api/python/yr/group.py b/api/python/yr/group.py new file mode 100644 index 0000000000000000000000000000000000000000..921e00b62e792cade18d5e8c583c07441b33f766 --- /dev/null +++ b/api/python/yr/group.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# coding=UTF-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from yr.config import GroupOptions +from yr import log, runtime_holder + + +@dataclass +class Group: + group_name: str = "" + group_opts: GroupOptions = None + + def invoke(self): + runtime_holder.global_runtime.get_runtime().create_group( + self.group_name, self.group_opts) + + def terminate(self): + runtime_holder.global_runtime.get_runtime().terminate_group(self.group_name) + + def wait(self): + runtime_holder.global_runtime.get_runtime().wait_group(self.group_name) + + def suspend(self): + runtime_holder.global_runtime.get_runtime().suspend_group(self.group_name) + + def resume(self): + runtime_holder.global_runtime.get_runtime().resume_group(self.group_name) diff --git a/api/python/yr/includes/libruntime.pxd b/api/python/yr/includes/libruntime.pxd index 59a093e9166dac47e8e85111440542ef7028308f..20b44e6d466044286f6f4327ba660351e8f8cf99 100644 --- a/api/python/yr/includes/libruntime.pxd +++ b/api/python/yr/includes/libruntime.pxd @@ -343,6 +343,12 @@ cdef extern from "src/dto/invoke_options.h" nogil: bool isGenerator bool isAsync + cdef cppclass CGroupOptions "YR::Libruntime::GroupOpts": + string groupName + int timeout + bool sameLifecycle + string strategy + cdef enum CBundleAffinity "YR::Libruntime::BundleAffinity": COMPACT "YR::Libruntime::BundleAffinity::COMPACT" DISCRETE "YR::Libruntime::BundleAffinity::DISCRETE" @@ -686,6 +692,15 @@ cdef extern from "src/libruntime/libruntime.h" nogil: bool SetError(const string &objId, const CErrorInfo &err); + CErrorInfo GroupCreate(const string & groupName, CGroupOptions & opts); + + CErrorInfo GroupWait(const string & groupName); + + CErrorInfo GroupSuspend(const string & groupName); + + CErrorInfo GroupResume(const string & groupName); + + cdef extern from "src/libruntime/libruntime_manager.h" nogil: cdef cppclass CLibruntimeManager "YR::Libruntime::LibruntimeManager": @ staticmethod diff --git a/api/python/yr/local_mode/local_mode_runtime.py b/api/python/yr/local_mode/local_mode_runtime.py index 11828137a3c4aeb83c45a2b754ee3826a9dd5990..6d4ebba91a85f0b3345e1715a3f9fcfdd33cd434 100644 --- a/api/python/yr/local_mode/local_mode_runtime.py +++ b/api/python/yr/local_mode/local_mode_runtime.py @@ -23,7 +23,7 @@ from concurrent.futures import Future from typing import Union, Dict, List, Any, Tuple, Callable from yr.accelerate.shm_broadcast import Handle from yr.common.types import GroupInfo -from yr.config import InvokeOptions +from yr.config import InvokeOptions, GroupOptions from yr.exception import YRInvokeError from yr.common.utils import ( generate_random_id, generate_task_id, GaugeData, UInt64CounterData, DoubleCounterData @@ -75,7 +75,8 @@ class LocalModeRuntime(Runtime, ABC): :return: data which get from ds """ if timeout < 0 and timeout != -1: - raise RuntimeError(f"Invalid parameter, timeout: {timeout}, expect -1 or > 0") + raise RuntimeError( + f"Invalid parameter, timeout: {timeout}, expect -1 or > 0") self.wait(ids, len(ids), timeout) objects = [self.__local_store.get(i) for i in ids] for obj in objects: @@ -104,7 +105,8 @@ class LocalModeRuntime(Runtime, ABC): break else: iteration_timeout = None - ready, _ = futures.wait(unready_futures, iteration_timeout, futures.FIRST_COMPLETED) + ready, _ = futures.wait( + unready_futures, iteration_timeout, futures.FIRST_COMPLETED) if not ready: break unready_futures -= ready @@ -619,3 +621,18 @@ class LocalModeRuntime(Runtime, ABC): else: unready_map[future] = [object_id] return ready_objs, unready_map, exist_exception + + def create_group(self, group_name: str, group_opts: GroupOptions): + raise RuntimeError("not support in local mode") + + def terminate_group(self, group_name: str): + raise RuntimeError("not support in local mode") + + def wait_group(self, group_name: str): + raise RuntimeError("not support in local mode") + + def suspend_group(self, group_name: str): + raise RuntimeError("not support in local mode") + + def resume_group(self, group_name: str): + raise RuntimeError("not support in local mode") diff --git a/api/python/yr/runtime.py b/api/python/yr/runtime.py index 628545ed7ee96debcc2c12e3e4a46ca0bef7b2e7..9af954b30316167894a69f434ff4e20eb477b707 100644 --- a/api/python/yr/runtime.py +++ b/api/python/yr/runtime.py @@ -22,7 +22,7 @@ from enum import Enum from typing import List, Tuple, Union, Any, Callable, Dict from yr.common.types import GroupInfo -from yr.config import InvokeOptions +from yr.config import InvokeOptions, GroupOptions from yr.libruntime_pb2 import FunctionMeta from yr.fnruntime import SharedBuffer from yr.common.utils import GaugeData, UInt64CounterData, DoubleCounterData @@ -122,7 +122,6 @@ class MSetParam: class CreateParam: """Create param.""" - #: Configure the reliability of the data. #: When the server is configured to support a secondary cache for ensuring reliability, #: such as a Redis service, this configuration can ensure the reliability of the data. @@ -618,3 +617,43 @@ class Runtime(metaclass=ABCMeta): Returns: bool """ + + @abstractmethod + def create_group(self, group_name: str, group_opts: GroupOptions): + """ + create group + Returns: + None + """ + + @abstractmethod + def terminate_group(self, group_name: str): + """ + terminate group + Returns: + None + """ + + @abstractmethod + def wait_group(self, group_name: str): + """ + wait group + Returns: + None + """ + + @abstractmethod + def suspend_group(self, group_name: str): + """ + suspend group + Returns: + None + """ + + @abstractmethod + def resume_group(self, group_name: str): + """ + resume group + Returns: + None + """ diff --git a/deploy/process/config.sh b/deploy/process/config.sh index 9f1ca1fb71a5cc079d9d1d4a715dab14696209bf..28ad6eb347c71a4575b8fdcfd88512fe79820909 100644 --- a/deploy/process/config.sh +++ b/deploy/process/config.sh @@ -123,7 +123,7 @@ ENABLE_METRICS=true METRICS_CONFIG="" METRICS_CONFIG_FILE=$(readlink -m '${BASE_DIR}/../../../function_system/config/metrics_config.json') RUNTIME_METRICS_CONFIG="" -STATE_STORAGE_TYPE="disable" +STATE_STORAGE_TYPE="datasystem" PULL_RESOURCE_INTERVAL=1000 BLOCK=false ENABLE_MULTI_MASTER="false" diff --git a/docs/deploy/deploy_processes/parameters.md b/docs/deploy/deploy_processes/parameters.md index 1483e3650573eda6e402a852fdc362f59bbf40d6..606806a7f2f52e607fe7a5019b856cc36e78ad54 100644 --- a/docs/deploy/deploy_processes/parameters.md +++ b/docs/deploy/deploy_processes/parameters.md @@ -107,7 +107,7 @@ | 参数名 | 说明 | 默认值 | 备注 | | ------- | ------- |------------------------------------------------------| ------------------- | | `-g,--driver_gateway_enable` | function-proxy 开启端口,支持 driver 直连。 | ``false`` | 选填,取值:``true``、``false``,yr_master.sh 启动固定为 ``true``。 | -| `--state_storage_type` | 状态存储类型。 | ``disable`` | 选填,取值:``datasystem``、``disable``。 | +| `--state_storage_type` | 状态存储类型。 | ``datasystem`` | 选填,取值:``datasystem``、``disable``。 | | `--min_instance_cpu_size` | 函数实例支持请求最小 cpu(单位:``1/1000`` 核)。 | ``300`` |选填。 | | `--min_instance_memory_size` | 函数实例支持请求最小 memory(单位:MB)。 | ``128`` | 选填。| | `--max_instance_cpu_size` | 函数实例支持请求最大 cpu(单位:``1/1000`` 核)。 | ``16000`` |选填。最大值:``1073741824``。 | diff --git a/src/dto/invoke_options.h b/src/dto/invoke_options.h index 92bb4ee60af8bbd423b4ec2f771000ffeb539ee8..12e2e6ffa9ec0dfceb9e209df6a17575a23858c9 100644 --- a/src/dto/invoke_options.h +++ b/src/dto/invoke_options.h @@ -168,6 +168,7 @@ struct GroupOpts { std::string groupName; int timeout; bool sameLifecycle = true; + std::string strategy; }; struct InstanceOptions { diff --git a/src/libruntime/fsclient/fs_intf.cpp b/src/libruntime/fsclient/fs_intf.cpp index 605082ec5d0449ec7aec41542fd0212f03ef5859..24649a0ade98e8ec4fdeabee6dcc10740cc3420f 100644 --- a/src/libruntime/fsclient/fs_intf.cpp +++ b/src/libruntime/fsclient/fs_intf.cpp @@ -117,6 +117,12 @@ bool FSIntf::DeleteProcessingRequestId(const std::string &requestId) return num == 1; } +bool FSIntf::ExistProcessingRequestId() +{ + absl::MutexLock lock(&this->mu); + return !this->processingRequestIds.empty(); +} + void FSIntf::HandleCallRequest(const std::shared_ptr &req, CallCallBack callback) { if (!AddProcessingRequestId(req->Immutable().requestid())) { @@ -153,6 +159,7 @@ void FSIntf::HandleCallRequest(const std::shared_ptr &req, Call resp.set_message(msg); YRLOG_DEBUG("after wait initialized, send call response, request ID: {}, code {}, message {}", req->Immutable().requestid(), resp.code(), resp.message()); + status.InterruptCheckpointing(); callback(resp); } else { callback(resp); @@ -183,7 +190,31 @@ void FSIntf::HandleCheckpointRequest(const CheckpointRequest &req, CheckpointCal { this->checkpointRecoverExecutor.Handle( [this, req, callback]() { - auto resp = this->handlers.checkpoint(req); + if (ExistProcessingRequestId()) { + CheckpointResponse resp; + resp.set_code(common::ERR_INNER_SYSTEM_ERROR); + resp.set_message("exit processing request"); + callback(resp); + return; + } + if (status.SetCheckpointing()) { + auto resp = this->handlers.checkpoint(req); + if (status.IsCheckpointingInterrupted()) { + CheckpointResponse resp; + auto [code, msg] = status.GetErrorInfo(); + resp.set_code(code); + resp.set_message(msg); + callback(resp); + } else { + callback(resp); + } + status.SetCheckpointed(); + return; + } + CheckpointResponse resp; + auto [code, msg] = status.GetErrorInfo(); + resp.set_code(code); + resp.set_message(msg); callback(resp); }, ""); diff --git a/src/libruntime/fsclient/fs_intf.h b/src/libruntime/fsclient/fs_intf.h index 3f8033acf447ef32ceafe918626fe0b99ef3b0dd..09607ca68c2b1276fb753b65096c7d6bf2087774 100644 --- a/src/libruntime/fsclient/fs_intf.h +++ b/src/libruntime/fsclient/fs_intf.h @@ -195,7 +195,7 @@ public: class CallMessageSpec : public MessgeSpec { public: - CallMessageSpec() : MessgeSpec(){}; + CallMessageSpec() : MessgeSpec() {}; explicit CallMessageSpec(const std::shared_ptr<::runtime_rpc::StreamingMessage> &msg) : MessgeSpec(msg) {} ~CallMessageSpec() override = default; CallRequest &Mutable() @@ -293,6 +293,7 @@ protected: private: bool AddProcessingRequestId(const std::string &requestId); bool DeleteProcessingRequestId(const std::string &requestId); + bool ExistProcessingRequestId(); bool cleared_{false}; FSIntfHandlers handlers; bool syncHeartbeat; @@ -318,6 +319,8 @@ private: INITIALIZED, SHUTTING_DOWN, SHUTDOWN, + CHECKPOINTING, + INTERRUPT_CHECKPOITING, }; InstanceStatus() : state(STARTED) @@ -375,6 +378,40 @@ private: return state == SHUTTING_DOWN; } + bool SetCheckpointing() + { + absl::MutexLock lock(&this->mu); + if (state != CHECKPOINTING) { + state = CHECKPOINTING; + err = std::make_pair(common::ERR_INNER_SYSTEM_ERROR, "instance is checkpointing"); + } + return state == CHECKPOINTING; + } + + void SetCheckpointed() + { + absl::MutexLock lock(&this->mu); + if (state == CHECKPOINTING || state == INTERRUPT_CHECKPOITING) { + state = INITIALIZED; + } + err = std::make_pair(common::ERR_NONE, ""); + } + + void InterruptCheckpointing() + { + absl::MutexLock lock(&this->mu); + if (state == CHECKPOINTING) { + state = INTERRUPT_CHECKPOITING; + err = std::make_pair(common::ERR_INNER_SYSTEM_ERROR, "checkpoint is interrupted"); + } + } + + bool IsCheckpointingInterrupted() + { + absl::MutexLock lock(&this->mu); + return state == INTERRUPT_CHECKPOITING; + } + void SetShutdown() { absl::MutexLock lock(&this->mu); diff --git a/src/libruntime/groupmanager/group.cpp b/src/libruntime/groupmanager/group.cpp index 38dec4f6629f97dfc440397865181f35ae56520e..c4a62205952a85649d0643ed7a3c14a6272f2c34 100644 --- a/src/libruntime/groupmanager/group.cpp +++ b/src/libruntime/groupmanager/group.cpp @@ -108,9 +108,14 @@ ErrorInfo Group::Wait() void Group::Terminate() { runFlag = false; - YRLOG_DEBUG("start terminate group ins, group name is {}, group id is {}", groupName, groupId); + std::string tmpGroupId; + { + std::lock_guard lock(groupIdMtx); + tmpGroupId = groupId; + } + YRLOG_DEBUG("start terminate group ins, group name is {}, group id is {}", groupName, tmpGroupId); KillRequest killReq; - killReq.set_instanceid(groupId); + killReq.set_instanceid(tmpGroupId); killReq.set_payload(""); killReq.set_signal(libruntime::Signal::KillGroupInstance); this->fsClient->KillAsync(killReq, [](KillResponse resp, ErrorInfo err) -> void { @@ -119,6 +124,50 @@ void Group::Terminate() SetTerminateError(); } +ErrorInfo Group::Suspend() +{ + YRLOG_DEBUG("start suspend group ins, group name is {}", groupName); + return Signal(libruntime::Signal::GroupSuspend); +} + +ErrorInfo Group::Resume() +{ + YRLOG_DEBUG("start resume group ins, group name is {}", groupName); + return Signal(libruntime::Signal::GroupResume); +} + +ErrorInfo Group::Signal(libruntime::Signal signal) +{ + if (!runFlag) { + return ErrorInfo(ErrorCode::ERR_INNER_SYSTEM_ERROR, ModuleCode::RUNTIME, "group already terminate"); + } + if (!isSendReq) { + return ErrorInfo(ErrorCode::ERR_INNER_SYSTEM_ERROR, ModuleCode::RUNTIME, "group not create"); + } + KillRequest killReq; + { + std::lock_guard lock(groupIdMtx); + killReq.set_instanceid(groupId); + if (groupId.empty()) { + return ErrorInfo(ErrorCode::ERR_INNER_SYSTEM_ERROR, ModuleCode::RUNTIME, "groupId is empty"); + } + } + killReq.set_signal(signal); + killReq.set_payload(""); + auto promise = std::promise(); + auto future = promise.get_future(); + this->fsClient->KillAsync(killReq, [&promise](KillResponse resp, ErrorInfo err) -> void { + if (resp.code() != common::ERR_NONE) { + ErrorInfo errInfo(static_cast(resp.code()), ModuleCode::RUNTIME, resp.message()); + promise.set_value(errInfo); + } else { + promise.set_value(ErrorInfo()); + } + }); + auto err = future.get(); + return err; +} + void Group::SetRunFlag() { this->runFlag = false; diff --git a/src/libruntime/groupmanager/group.h b/src/libruntime/groupmanager/group.h index 58e66aae977e625546c8bdefa1bd48bdd5176a63..ad760ced904457ce56626da9c0abe762128e40d4 100644 --- a/src/libruntime/groupmanager/group.h +++ b/src/libruntime/groupmanager/group.h @@ -27,11 +27,13 @@ using HandleReturnObjectCallback = std::function { public: Group() = default; - Group(const std::string &name) : groupName(name){}; + Group(const std::string &name) : groupName(name) {}; ~Group() = default; ErrorInfo Wait(); ErrorInfo GroupCreate(); void Terminate(); + ErrorInfo Suspend(); + ErrorInfo Resume(); void SetRunFlag(); bool IsReady(); std::string GetGroupName(); @@ -64,6 +66,7 @@ protected: std::string traceId; std::string tenantId; std::string groupId; + std::mutex groupIdMtx; GroupOpts opts; InstanceRange range; FunctionGroupOptions functionGroupOpts; @@ -71,6 +74,9 @@ protected: std::vector> createSpecs; std::shared_ptr waitManager_; std::shared_ptr memStore_; + +private: + ErrorInfo Signal(libruntime::Signal signal); }; } // namespace Libruntime diff --git a/src/libruntime/groupmanager/group_manager.cpp b/src/libruntime/groupmanager/group_manager.cpp index c8bd6e4d4a8b3c462522298deee9789474fc3ab1..6ce2f66e16a936c41c747c47d92d762995d42b29 100644 --- a/src/libruntime/groupmanager/group_manager.cpp +++ b/src/libruntime/groupmanager/group_manager.cpp @@ -94,6 +94,24 @@ void GroupManager::Terminate(const std::string &groupName) groupSpecs_.erase(groupName); } +ErrorInfo GroupManager::Suspend(const std::string &groupName) +{ + if (groups_.find(groupName) == groups_.end()) { + return ErrorInfo(ErrorCode::ERR_PARAM_INVALID, ModuleCode::RUNTIME, + "group not exist, please select correct group"); + } + return groups_[groupName]->Suspend(); +} + +ErrorInfo GroupManager::Resume(const std::string &groupName) +{ + if (groups_.find(groupName) == groups_.end()) { + return ErrorInfo(ErrorCode::ERR_PARAM_INVALID, ModuleCode::RUNTIME, + "group not exist, please select correct group"); + } + return groups_[groupName]->Resume(); +} + void GroupManager::Stop() { for (auto &pair : groups_) { diff --git a/src/libruntime/groupmanager/group_manager.h b/src/libruntime/groupmanager/group_manager.h index 2cb3b5b8e6f37a44068f898fd201b164d358d3c3..a65ef20e44efe5cf5ccb56fdeafc3be463063378 100644 --- a/src/libruntime/groupmanager/group_manager.h +++ b/src/libruntime/groupmanager/group_manager.h @@ -25,6 +25,8 @@ public: ErrorInfo GroupCreate(const std::string &groupName); ErrorInfo Wait(const std::string &groupName); void Terminate(const std::string &groupName); + ErrorInfo Suspend(const std::string &groupName); + ErrorInfo Resume(const std::string &groupName); void Stop(); bool IsGroupExist(const std::string &groupName); void AddSpec(std::shared_ptr spec); @@ -33,6 +35,7 @@ public: bool IsInsReady(const std::string &groupName); ErrorInfo Accelerate(const std::string &groupName, const AccelerateMsgQueueHandle &handle, HandleReturnObjectCallback callback); + private: mutable std::mutex groupMtx; std::unordered_map> groups_; diff --git a/src/libruntime/groupmanager/named_group.cpp b/src/libruntime/groupmanager/named_group.cpp index f753a43808734b7c50380de0657db12f745f82db..0ab160426a333a27cad18a237d5ac375b5dada7a 100644 --- a/src/libruntime/groupmanager/named_group.cpp +++ b/src/libruntime/groupmanager/named_group.cpp @@ -15,57 +15,16 @@ */ #include "src/libruntime/groupmanager/named_group.h" +#include "src/libruntime/utils/utils.h" namespace YR { namespace Libruntime { + NamedGroup::NamedGroup(const std::string &name, const std::string &inputTenantId, GroupOpts &inputOpts, std::shared_ptr client, std::shared_ptr waitManager, - std::shared_ptr memStore) - : Group(name, inputTenantId, inputOpts, client, waitManager, memStore) -{ -} - -void NamedGroup::CreateRespHandler(const CreateResponses &resps) + std::shared_ptr memStore, std::shared_ptr invokeOrderMgr) + : RangeGroup(name, inputTenantId, inputOpts, client, waitManager, memStore, invokeOrderMgr) { - YRLOG_DEBUG("recieve group create response, resp code is {}, message is {}, runflag is {}", resps.code(), - resps.message(), runFlag); - if (!runFlag) { - return; - } - groupId = resps.groupid(); - YRLOG_DEBUG("groupId id is {}", groupId); - if (resps.code() != common::ERR_NONE) { - for (auto &spec : createSpecs) { - memStore_->SetError(spec->returnIds[0].id, ErrorInfo(static_cast(resps.code()), ModuleCode::CORE, - resps.message(), true)); - } - } else { - for (int i = 0; i < resps.instanceids_size(); i++) { - YRLOG_DEBUG("instace_{} id is {}", i, resps.instanceids(i)); - createSpecs[i]->instanceId = resps.instanceids(i); - memStore_->SetInstanceId(createSpecs[i]->returnIds[0].id, resps.instanceids(i)); - } - } -} - -void NamedGroup::CreateNotifyHandler(const NotifyRequest &req) -{ - YRLOG_DEBUG("recieve group create notify, req code is {}, message is {}, runflag is {}", req.code(), req.message(), - runFlag); - if (!runFlag) { - return; - } - if (req.code() != common::ERR_NONE) { - for (auto &spec : createSpecs) { - this->memStore_->SetError(spec->returnIds[0].id, ErrorInfo(static_cast(req.code()), - ModuleCode::CORE, req.message(), true)); - } - } else { - for (auto &spec : createSpecs) { - this->memStore_->SetReady(spec->returnIds[0].id); - } - isReady = true; - } } CreateRequests NamedGroup::BuildCreateReqs() @@ -82,6 +41,7 @@ CreateRequests NamedGroup::BuildCreateReqs() options->set_groupname(groupName); options->set_timeout(opts.timeout); options->set_samerunninglifecycle(opts.sameLifecycle); + options->set_grouppolicy(ConvertStrategyToPolicy(opts.strategy)); return reqs; } diff --git a/src/libruntime/groupmanager/named_group.h b/src/libruntime/groupmanager/named_group.h index 09e452be8779395100f4dc4efb3a0426334e0d70..c9a47dfc5e752d838d85f10c63167e4a0735a34b 100644 --- a/src/libruntime/groupmanager/named_group.h +++ b/src/libruntime/groupmanager/named_group.h @@ -14,21 +14,21 @@ * limitations under the License. */ -#include "src/libruntime/groupmanager/group.h" +#include "src/libruntime/groupmanager/range_group.h" +#include "src/libruntime/invoke_order_manager.h" namespace YR { namespace Libruntime { -class NamedGroup : public Group { +class NamedGroup : public RangeGroup { public: - NamedGroup(const std::string &name) : Group(name){}; + NamedGroup(const std::string &name, GroupOpts &inputOpts) + : RangeGroup(name, "", inputOpts, nullptr, nullptr, nullptr, nullptr) {}; NamedGroup(const std::string &name, const std::string &inputTenantId, GroupOpts &inputOpts, std::shared_ptr client, std::shared_ptr waitManager, - std::shared_ptr memStore); + std::shared_ptr memStore, std::shared_ptr invokeOrderMgr); private: CreateRequests BuildCreateReqs() override; - void CreateRespHandler(const CreateResponses &resps) override; - void CreateNotifyHandler(const NotifyRequest &req) override; void SetTerminateError() override; }; } // namespace Libruntime diff --git a/src/libruntime/groupmanager/range_group.cpp b/src/libruntime/groupmanager/range_group.cpp index cdc7e3afd70ce52c66078b8d5eba3b58c0ff0a09..3c371bb7d4b87f6af5cdb01b1ee157d655127773 100644 --- a/src/libruntime/groupmanager/range_group.cpp +++ b/src/libruntime/groupmanager/range_group.cpp @@ -32,6 +32,13 @@ RangeGroup::RangeGroup(const std::string &name, const std::string &inputTenantId { } +RangeGroup::RangeGroup(const std::string &name, const std::string &inputTenantId, GroupOpts &inputOpts, + std::shared_ptr client, std::shared_ptr waitManager, + std::shared_ptr memStore, std::shared_ptr invokeOrderMgr) + : Group(name, inputTenantId, inputOpts, client, waitManager, memStore), invokeOrderMgr_(invokeOrderMgr) +{ +} + void RangeGroup::CreateRespHandler(const CreateResponses &resps) { HandleCreateResp(resps); @@ -49,8 +56,11 @@ void RangeGroup::HandleCreateResp(const CreateResponses &resps) if (!runFlag) { return; } - groupId = resps.groupid(); - YRLOG_DEBUG("group id is {}", groupId); + { + std::lock_guard lock(groupIdMtx); + groupId = resps.groupid(); + YRLOG_DEBUG("group id is {}", groupId); + } if (resps.code() != common::ERR_NONE) { this->memStore_->SetError(createSpecs[0]->returnIds[0].id, ErrorInfo(static_cast(resps.code()), ModuleCode::CORE, resps.message(), true)); diff --git a/src/libruntime/groupmanager/range_group.h b/src/libruntime/groupmanager/range_group.h index a3dbb1a91d0967505b67eef7e38738948fd038d1..dea51758be85d5635fe3871bb66c02de0f2ac529 100644 --- a/src/libruntime/groupmanager/range_group.h +++ b/src/libruntime/groupmanager/range_group.h @@ -31,6 +31,9 @@ protected: RangeGroup(const std::string &name, const std::string &inputTenantId, FunctionGroupOptions &inputOpts, std::shared_ptr client, std::shared_ptr waitManager, std::shared_ptr memStore, std::shared_ptr invokeOrderMgr); + RangeGroup(const std::string &name, const std::string &inputTenantId, GroupOpts &inputOpts, + std::shared_ptr client, std::shared_ptr waitManager, + std::shared_ptr memStore, std::shared_ptr invokeOrderMgr); protected: CreateRequests BuildCreateReqs() override; @@ -43,6 +46,7 @@ protected: void SetInstancesReady(); void NotifyInstances(); void RemoveInstances(); + InstanceRange range; std::shared_ptr invokeOrderMgr_; std::unordered_set instanceIds_; }; diff --git a/src/libruntime/invoke_spec.cpp b/src/libruntime/invoke_spec.cpp index fe2ddddbe46b5588820ef06e6a97fbe98c9126b9..46fcbcaa5e15bfce1dc05278ad2b78f904d90b35 100644 --- a/src/libruntime/invoke_spec.cpp +++ b/src/libruntime/invoke_spec.cpp @@ -290,8 +290,9 @@ void InvokeSpec::BuildInstanceInvokeRequest(const LibruntimeConfig &config) } } -std::string InvokeSpec::BuildCreateMetaData(const LibruntimeConfig &config) +std::string InvokeSpec::BuildCreateMetaData(const LibruntimeConfig &config, std::string &funcMetaStr) { + InitDesignatedInstanceId(config); libruntime::MetaData meta; meta.set_invoketype(this->invokeType); auto funcMeta = meta.mutable_functionmeta(); @@ -311,6 +312,9 @@ std::string InvokeSpec::BuildCreateMetaData(const LibruntimeConfig &config) funcMeta->set_ns((!this->functionMeta.ns.has_value() || this->functionMeta.ns->empty()) ? config.ns : this->functionMeta.ns.value_or("")); + if (!designatedInstanceID.empty()) { + funcMetaStr = funcMeta->SerializeAsString(); + } auto metaConfig = meta.mutable_config(); config.BuildMetaConfig(*metaConfig); if (!this->opts.codePaths.empty()) { diff --git a/src/libruntime/invoke_spec.h b/src/libruntime/invoke_spec.h index 4ff99294ef643676979892e80c599f04404f5019..7fef1d6762da2162a0f3b7bcbdae0c0f06f2d70d 100644 --- a/src/libruntime/invoke_spec.h +++ b/src/libruntime/invoke_spec.h @@ -56,6 +56,9 @@ extern const char *DELEGATE_DOWNLOAD; const std::string NEED_ORDER = "need_order"; const std::string RECOVER_RETRY_TIMES = "RecoverRetryTimes"; +// Suspend-state instance handler retrieval only; pending refactor +const std::string NAMED_FUNCMETA = "named_funcmeta"; + struct InvokeSpec { InvokeSpec(const std::string &jobId, const FunctionMeta &functionMeta, const std::vector &returnObjs, std::vector invokeArgs, const libruntime::InvokeType invokeType, std::string traceId, @@ -110,7 +113,7 @@ struct InvokeSpec { void BuildInstanceInvokeRequest(const LibruntimeConfig &config); - std::string BuildCreateMetaData(const LibruntimeConfig &config); + std::string BuildCreateMetaData(const LibruntimeConfig &config, std::string &funcMetaStr); std::string BuildInvokeMetaData(const LibruntimeConfig &config); @@ -120,12 +123,17 @@ struct InvokeSpec { Arg *pbArg; if (functionMeta.apiType != libruntime::ApiType::Posix) { std::string metaData; + std::string funcMetaStr; if (isCreate) { - metaData = BuildCreateMetaData(config); + metaData = BuildCreateMetaData(config, funcMetaStr); } else { metaData = BuildInvokeMetaData(config); } - + if constexpr (std::is_same::value) { + if (!funcMetaStr.empty()) { + request.mutable_createoptions()->insert({NAMED_FUNCMETA, funcMetaStr}); + } + } pbArg = request.add_args(); pbArg->set_type(Arg_ArgType::Arg_ArgType_VALUE); pbArg->set_value(metaData); diff --git a/src/libruntime/invokeadaptor/invoke_adaptor.cpp b/src/libruntime/invokeadaptor/invoke_adaptor.cpp index b487da8b02ecb1d521cdc5c89be0278766ccfac5..65c60046d434caa3c8daa249787f029439d00e78 100644 --- a/src/libruntime/invokeadaptor/invoke_adaptor.cpp +++ b/src/libruntime/invokeadaptor/invoke_adaptor.cpp @@ -1348,7 +1348,8 @@ ErrorInfo InvokeAdaptor::Kill(const std::string &instanceId, const std::string & auto killPromise = std::make_shared>(); std::shared_future killFuture = killPromise->get_future().share(); - this->fsClient->KillAsync(killReq, [killPromise](KillResponse rsp, ErrorInfo err) -> void { killPromise->set_value(rsp); }); + this->fsClient->KillAsync(killReq, + [killPromise](KillResponse rsp, ErrorInfo err) -> void { killPromise->set_value(rsp); }); ErrorInfo errInfo; if (signal == libruntime::Signal::killInstanceSync) { errInfo = WaitAndCheckResp(killFuture, instanceId, NO_TIMEOUT); @@ -1383,7 +1384,7 @@ ErrorInfo InvokeAdaptor::GroupCreate(const std::string &groupName, GroupOpts &op { if (!this->groupManager->IsGroupExist(groupName)) { auto group = std::make_shared(groupName, librtConfig->tenantId, opts, this->fsClient, - this->waitingObjectManager, this->memStore); + this->waitingObjectManager, this->memStore, this->invokeOrderMgr); this->groupManager->AddGroup(group); return this->groupManager->GroupCreate(groupName); } @@ -1437,6 +1438,16 @@ void InvokeAdaptor::GroupTerminate(const std::string &groupName) return this->groupManager->Terminate(groupName); } +ErrorInfo InvokeAdaptor::GroupSuspend(const std::string &groupName) +{ + return this->groupManager->Suspend(groupName); +} + +ErrorInfo InvokeAdaptor::GroupResume(const std::string &groupName) +{ + return this->groupManager->Resume(groupName); +} + std::pair, ErrorInfo> InvokeAdaptor::GetInstanceIds(const std::string &objId, const std::string &groupName) { @@ -1655,18 +1666,21 @@ std::pair InvokeAdaptor::GetInstance(co killReq.set_signal(libruntime::Signal::GetInstance); auto promise = std::promise>(); auto future = promise.get_future(); - this->fsClient->KillAsync(killReq, [&promise](const KillResponse &rsp, const ErrorInfo &err) -> void { - if (rsp.code() != common::ERR_NONE) { - YR::Libruntime::ErrorInfo errInfo(static_cast(rsp.code()), ModuleCode::RUNTIME, - rsp.message()); - errInfo.SetIsTimeout(err.IsTimeout()); - promise.set_value(std::make_pair(libruntime::FunctionMeta{}, errInfo)); - } else { - libruntime::FunctionMeta funcMeta; - funcMeta.ParseFromString(rsp.message()); - promise.set_value(std::make_pair(funcMeta, YR::Libruntime::ErrorInfo())); - } - }, timeoutSec); + this->fsClient->KillAsync( + killReq, + [&promise](const KillResponse &rsp, const ErrorInfo &err) -> void { + if (rsp.code() != common::ERR_NONE) { + YR::Libruntime::ErrorInfo errInfo(static_cast(rsp.code()), ModuleCode::RUNTIME, + rsp.message()); + errInfo.SetIsTimeout(err.IsTimeout()); + promise.set_value(std::make_pair(libruntime::FunctionMeta{}, errInfo)); + } else { + libruntime::FunctionMeta funcMeta; + funcMeta.ParseFromString(rsp.message()); + promise.set_value(std::make_pair(funcMeta, YR::Libruntime::ErrorInfo())); + } + }, + timeoutSec); auto [funcMeta, errorInfo] = future.get(); YRLOG_DEBUG("get instance finished, err code is {}, err msg is {}, function meta is {}", errorInfo.Code(), errorInfo.Msg(), funcMeta.DebugString()); diff --git a/src/libruntime/invokeadaptor/invoke_adaptor.h b/src/libruntime/invokeadaptor/invoke_adaptor.h index 11b4472b156e61999d9177165bb8c264355a1784..dd41f9db3e68f3caef4935c869aaf4c2cd72bdeb 100644 --- a/src/libruntime/invokeadaptor/invoke_adaptor.h +++ b/src/libruntime/invokeadaptor/invoke_adaptor.h @@ -28,8 +28,8 @@ #include "src/libruntime/dependency_resolver.h" #include "src/libruntime/err_type.h" #include "src/libruntime/fiber.h" -#include "src/libruntime/fsclient/fs_client.h" #include "src/libruntime/fmclient/fm_client.h" +#include "src/libruntime/fsclient/fs_client.h" #include "src/libruntime/groupmanager/function_group.h" #include "src/libruntime/groupmanager/group_manager.h" #include "src/libruntime/invoke_order_manager.h" @@ -39,12 +39,12 @@ #include "src/libruntime/metricsadaptor/metrics_adaptor.h" #include "src/libruntime/objectstore/memory_store.h" #include "src/libruntime/objectstore/object_store.h" +#include "src/libruntime/rgroupmanager/resource_group_create_spec.h" +#include "src/libruntime/rgroupmanager/resource_group_manager.h" #include "src/libruntime/runtime_context.h" #include "src/libruntime/utils/constants.h" #include "src/libruntime/utils/exception.h" #include "src/libruntime/utils/utils.h" -#include "src/libruntime/rgroupmanager/resource_group_create_spec.h" -#include "src/libruntime/rgroupmanager/resource_group_manager.h" #include "src/utility/notification_utility.h" namespace YR { @@ -119,8 +119,12 @@ public: virtual void GroupTerminate(const std::string &groupName); + virtual ErrorInfo GroupSuspend(const std::string &groupName); + + virtual ErrorInfo GroupResume(const std::string &groupName); + virtual std::pair, ErrorInfo> GetInstanceIds(const std::string &objId, - const std::string &groupName); + const std::string &groupName); virtual ErrorInfo SaveState(const std::shared_ptr data, const int &timeout); @@ -134,7 +138,8 @@ public: void CreateResourceGroup(std::shared_ptr spec); virtual std::pair GetInstance(const std::string &name, - const std::string &nameSpace, int timeoutSec); + const std::string &nameSpace, + int timeoutSec); void SubscribeAll(); void Subscribe(const std::string &insId); ErrorInfo Accelerate(const std::string &groupName, const AccelerateMsgQueueHandle &handle, @@ -144,10 +149,11 @@ public: virtual std::pair GetNodeId(); virtual std::pair> GetResources(void); - virtual std::pair GetResourceGroupTable(const std::string &resourceGroupId);\ + virtual std::pair GetResourceGroupTable(const std::string &resourceGroupId); virtual std::pair QueryNamedInstances(); void PushInvokeSpec(std::shared_ptr spec); void SubscribeActiveMaster(); + private: void CreateResponseHandler(std::shared_ptr spec, const CreateResponse &resp); void CreateNotifyHandler(const NotifyRequest &req); diff --git a/src/libruntime/libruntime.cpp b/src/libruntime/libruntime.cpp index 600191f6528cbd190e01c58dbeb5475214a0a93b..be653a1409d44575756ecacc4e7c3ddfedae843a 100755 --- a/src/libruntime/libruntime.cpp +++ b/src/libruntime/libruntime.cpp @@ -14,9 +14,10 @@ * limitations under the License. */ +#include "src/libruntime/libruntime.h" #include -#include "re2/re2.h" #include "invoke_order_manager.h" +#include "re2/re2.h" #include "src/dto/config.h" #include "src/dto/data_object.h" #include "src/dto/status.h" @@ -24,7 +25,6 @@ #include "src/libruntime/fmclient/fm_client.h" #include "src/libruntime/fsclient/fs_client.h" #include "src/libruntime/invokeadaptor/request_manager.h" -#include "src/libruntime/libruntime.h" #include "src/libruntime/metricsadaptor/metrics_adaptor.h" #include "src/libruntime/objectstore/memory_store.h" #include "src/libruntime/utils/serializer.h" @@ -1231,6 +1231,16 @@ void Libruntime::GroupTerminate(const std::string &groupName) return this->invokeAdaptor->GroupTerminate(groupName); } +ErrorInfo Libruntime::GroupSuspend(const std::string &groupName) +{ + return this->invokeAdaptor->GroupSuspend(groupName); +} + +ErrorInfo Libruntime::GroupResume(const std::string &groupName) +{ + return this->invokeAdaptor->GroupResume(groupName); +} + std::pair, ErrorInfo> Libruntime::GetInstances(const std::string &objId, int timeoutSec) { return this->memStore->GetInstanceIds(objId, timeoutSec); @@ -1551,8 +1561,8 @@ std::pair Libruntime::GetInstance(const { auto [meta, err] = this->invokeAdaptor->GetInstance(name, nameSpace, timeoutSec); if (err.OK() && meta.needOrder) { - this->invokeOrderMgr->RegisterInstance( - nameSpace.empty() ? this->config->ns + "-" + name : nameSpace + "-" + name); + this->invokeOrderMgr->RegisterInstance(nameSpace.empty() ? this->config->ns + "-" + name + : nameSpace + "-" + name); } return std::make_pair<>(meta, err); } diff --git a/src/libruntime/libruntime.h b/src/libruntime/libruntime.h index f942e8de2a4fd1a6af99c6677ec35d772fda1baf..fd623b6cabf87005a58dafd43b875980ff1cad47 100644 --- a/src/libruntime/libruntime.h +++ b/src/libruntime/libruntime.h @@ -578,6 +578,10 @@ public: */ virtual void GroupTerminate(const std::string &groupName); + virtual ErrorInfo GroupSuspend(const std::string &groupName); + + virtual ErrorInfo GroupResume(const std::string &groupName); + /*! @brief Get instances associated with the specified object ID within a timeout @param objId the ID of the object diff --git a/src/libruntime/rgroupmanager/resource_group_create_spec.cpp b/src/libruntime/rgroupmanager/resource_group_create_spec.cpp index 1bbaf4231791269d82802ab6b438bf8a258c5caa..77f11c55a891ed288e7d488d0ecb722f41ac9587 100644 --- a/src/libruntime/rgroupmanager/resource_group_create_spec.cpp +++ b/src/libruntime/rgroupmanager/resource_group_create_spec.cpp @@ -15,21 +15,11 @@ */ #include "resource_group_create_spec.h" +#include "src/libruntime/utils/utils.h" namespace YR { namespace Libruntime { -GroupPolicy ConvertStrategyToPolicy(const std::string &stategy) -{ - static const std::unordered_map strategyMap = { - {"PACK", GroupPolicy::Pack}, - {"STRICT_PACK", GroupPolicy::StrictPack}, - {"SPREAD", GroupPolicy::Spread}, - {"STRICT_SPREAD", GroupPolicy::StrictSpread}}; - auto it = strategyMap.find(stategy); - return (it != strategyMap.end()) ? it->second : GroupPolicy::None; -} - void ResourceGroupCreateSpec::BuildCreateResourceGroupRequest() { requestCreateRGroup.set_requestid(this->requestId); diff --git a/src/libruntime/utils/utils.cpp b/src/libruntime/utils/utils.cpp index f1071255ae2ad809094ed9f1b8dfb6061a5a4b14..d48e7e5bacfba1df0e9fa2a114b5911be99de513 100644 --- a/src/libruntime/utils/utils.cpp +++ b/src/libruntime/utils/utils.cpp @@ -241,4 +241,15 @@ bool WillSizeOverFlow(size_t a, size_t b) { return b > (std::numeric_limits::max() - a); } + +GroupPolicy ConvertStrategyToPolicy(const std::string &stategy) +{ + static const std::unordered_map strategyMap = { + {"PACK", GroupPolicy::Pack}, + {"STRICT_PACK", GroupPolicy::StrictPack}, + {"SPREAD", GroupPolicy::Spread}, + {"STRICT_SPREAD", GroupPolicy::StrictSpread}}; + auto it = strategyMap.find(stategy); + return (it != strategyMap.end()) ? it->second : GroupPolicy::None; +} } // namespace YR diff --git a/src/libruntime/utils/utils.h b/src/libruntime/utils/utils.h index 3810a5543f5cb07351dd0b665385b0b3aaff46a3..85ba9d9b3c4eef3008848390dc0b0efdc91159f5 100644 --- a/src/libruntime/utils/utils.h +++ b/src/libruntime/utils/utils.h @@ -34,6 +34,8 @@ namespace YR { using CallResult = ::core_service::CallResult; using NotifyRequest = ::runtime_service::NotifyRequest; +using GroupPolicy = ::common::GroupPolicy; + using datasystem::ConnectOptions; struct IpAddrInfo { std::string ip; @@ -58,4 +60,5 @@ bool IsLaterThan(const std::string ×tamp1, const std::string ×tamp2, d std::tm ParseTimestamp(const std::string ×tamp); int32_t ToMs(int32_t timeoutS); bool WillSizeOverFlow(size_t a, size_t b); +GroupPolicy ConvertStrategyToPolicy(const std::string &stategy); } // namespace YR diff --git a/src/proto/libruntime.proto b/src/proto/libruntime.proto index 485c9f4d33f7bcd87f41d3b99d58b513bb2e1865..0215e17d2091f18797a6a82fe606b8797b8ae948 100644 --- a/src/proto/libruntime.proto +++ b/src/proto/libruntime.proto @@ -54,6 +54,8 @@ enum Signal { Subsribe = 9; Update = 10; Unsubsribe = 11; + GroupSuspend = 16; + GroupResume = 17; UpdateAlias = 64; UpdateFrontend = 65; UpdateScheduler = 66; diff --git a/test/libruntime/group_manager_test.cpp b/test/libruntime/group_manager_test.cpp index d064d9ff0867c92255f00dc2152947983475609c..e317f08af6153225a01ac1e55ec1a4ca8e24289a 100644 --- a/test/libruntime/group_manager_test.cpp +++ b/test/libruntime/group_manager_test.cpp @@ -102,7 +102,7 @@ TEST_F(GroupManagerTest, IsGroupExistTest) opts.groupName = "groupName"; GroupOpts groupOpts{"groupName", -1}; auto group = std::make_shared("groupName", "tenantId", groupOpts, this->fsClient, this->waitManager, - this->memoryStore); + this->memoryStore, this->invokeOrderMgr); auto isGroupExist1 = groupManager->IsGroupExist(opts.groupName); ASSERT_EQ(isGroupExist1, false); @@ -179,7 +179,7 @@ TEST_F(GroupManagerTest, GroupTerminateTest) opts.groupName = "groupName"; GroupOpts groupOpts{"groupName", -1}; auto group = std::make_shared("groupName", "tenantId", groupOpts, this->fsClient, this->waitManager, - this->memoryStore); + this->memoryStore, this->invokeOrderMgr); auto spec = std::make_shared(); CreateRequest req; spec->requestCreate = req; @@ -202,7 +202,7 @@ TEST_F(GroupManagerTest, GroupWaitTest) opts.groupName = "groupName"; GroupOpts groupOpts{"groupName", -1}; auto group = std::make_shared("groupName", "tenantId", groupOpts, this->fsClient, this->waitManager, - this->memoryStore); + this->memoryStore, this->invokeOrderMgr); auto groupGet1 = groupManager->GetGroup(opts.groupName); ASSERT_EQ(groupGet1, nullptr); auto errorInfo1 = groupManager->Wait(opts.groupName); diff --git a/test/libruntime/invoke_adaptor_test.cpp b/test/libruntime/invoke_adaptor_test.cpp index 269992fc96b7b06a92fde65ca85724c3010e357c..f0f1250c63f835248b457f113e85cce446825c9c 100644 --- a/test/libruntime/invoke_adaptor_test.cpp +++ b/test/libruntime/invoke_adaptor_test.cpp @@ -160,7 +160,8 @@ TEST_F(InvokeAdaptorTest, ParseCreateRequestTest) pbArg->set_type(Arg_ArgType::Arg_ArgType_VALUE); InvokeSpec invokeSpec; invokeSpec.invokeType = libruntime::InvokeType::CreateInstance; - pbArg->set_value(invokeSpec.BuildCreateMetaData(*invokeAdaptor->librtConfig)); + std::string meta; + pbArg->set_value(invokeSpec.BuildCreateMetaData(*invokeAdaptor->librtConfig, meta)); auto pbArg2 = request.add_args(); pbArg2->set_type(common::Arg::OBJECT_REF); @@ -282,7 +283,8 @@ TEST_F(InvokeAdaptorTest, InitCallTest) auto pbArg1 = req.add_args(); pbArg1->set_type(Arg_ArgType::Arg_ArgType_VALUE); InvokeSpec invokeSpec; - pbArg1->set_value(invokeSpec.BuildCreateMetaData(*invokeAdaptor->librtConfig)); + std::string meta; + pbArg1->set_value(invokeSpec.BuildCreateMetaData(*invokeAdaptor->librtConfig, meta)); auto res1 = invokeAdaptor->InitCall(req, metaData); ASSERT_EQ(res1.code(), ::common::ERR_NONE); libruntime::MetaConfig *config = metaData.mutable_config(); @@ -963,7 +965,7 @@ TEST_F(InvokeAdaptorTest, GetInstanceIdsTest) ASSERT_EQ(err1.Code(), ErrorCode::ERR_INNER_SYSTEM_ERROR); GroupOptions opts; auto fsClient = std::make_shared(); - auto group = std::make_shared("groupname"); + auto group = std::make_shared("groupname", opts); invokeAdaptor->groupManager->AddGroup(group); auto [vec2, err2] = invokeAdaptor->GetInstanceIds("objid", "groupname"); ASSERT_EQ(vec2.size() == 1, true); diff --git a/test/st/python/test_actor.py b/test/st/python/test_actor.py index 78688641eb5e36e01f6994465651d293fec2ba35..62d622ab8f7eb016b4cb5fca778d5fe4e4f046e9 100644 --- a/test/st/python/test_actor.py +++ b/test/st/python/test_actor.py @@ -192,7 +192,8 @@ def test_cloud_init(init_yr): def init_oncloud(self, function_id, server_address, ds_address): import multiprocessing context = multiprocessing.get_context("spawn") - p = context.Process(target=init_in_process, args=(function_id, server_address, ds_address)) + p = context.Process(target=init_in_process, args=( + function_id, server_address, ds_address)) p.start() p.join() return "success" + str(p.exitcode) @@ -207,7 +208,8 @@ def test_cloud_init(init_yr): function_id = os.getenv("YR_PYTHON_FUNC_ID") server_address = os.getenv("YR_SERVER_ADDRESS") ds_address = os.getenv("YR_DS_ADDRESS") - res = yr.get(actor.init_oncloud.invoke(function_id, server_address, ds_address)) + res = yr.get(actor.init_oncloud.invoke( + function_id, server_address, ds_address)) assert "success0" in res counter = yr.get_instance('counter-actor') @@ -364,3 +366,33 @@ def test_create_and_invoke_in_different_method(init_yr): assert (res[1], 2) invoke_actor() + + +@pytest.mark.smoke +def test_group(init_yr): + group_opts = yr.GroupOptions() + group_name = "test" + g = yr.Group(group_name, group_opts) + opts = yr.InvokeOptions() + opts.group_name = group_name + ins = Counter.options(opts).invoke() + g.invoke() + res = ins.add.invoke() + assert (yr.get(res), 1) + g.terminate() + + +@pytest.mark.smoke +def test_group_suspend_resume(init_yr): + group_opts = yr.GroupOptions() + group_name = "test" + g = yr.Group(group_name, group_opts) + opts = yr.InvokeOptions() + opts.group_name = group_name + ins = Counter.options(opts).invoke() + g.invoke() + res = ins.add.invoke() + assert (yr.get(res), 1) + g.suspend() + g.resume() + g.terminate()