分布式框架Ray入门学习笔记
0.内容概览
1.Ray安装
# 1.Core
>> pip install -U "ray"
# 2.Core, Dashboard, Cluster Launcher
>> pip install -U "ray[default]"
# 3.Core, Data
>> pip install -U "ray[data]"
# 4.Core, Train
>> pip install -U "ray[train]"
# 5.Core, Tune
>> pip install -U "ray[tune]"
# 6.Core, Dashboard, Cluster Launcher, Serve
>> pip install -U "ray[serve]"
# 7.Core, Dashboard, Cluster Launcher, Serve with gRPC support
>> pip install -U "ray[serve-grpc]"
# 8.Core, Tune, RLlib
>> pip install -U "ray[rllib]"
# 9.Core, Dashboard, Cluster Launcher, Data, Train, Tune, Serve, RLlib
>> pip install -U "ray[all]"
2.Ray Core
Ray Core提供了少量核心原语(即tasks,actors,objects),用于分布式应用地构建和拓展,下面通过简单的示例展示如何将函数和类转换为Ray tasks和actors,及如何使用Ray objects。
2.1 运行一个Task
Ray让你能在集群中作为一个远程任务来运行函数,要做到这个,仅需使用装饰器 @ray.remote
来声明你想要远程运行此函数,然后使用 remote()
来调用这个函数,这个远程调用就会返回一个 future
,即 Ray object
引用,最后,使用 ray.get()
来获取具体值。
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
# 1.初始化Ray
ray.init()
# 2.定义一个远程函数
@ray.remote
def greet(name):
return "Hello, {}!".format(name)
if __name__ == "__main__":
# 3.启动远程任务
remote_greet = greet.remote("World")
# 4.获取远程任务结果
result = ray.get(remote_greet)
# 5.打印结果
print(result)
运行函数:
>> python hello_world.py
2024-12-11 17:04:45,419 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
Hello, World!
2.2 调用Actor
Ray提供 actors
来让你能够跨actor实例间并行计算,当你实例化一个类,也即是一个Ray actor,Ray将会在集群中启动一个该类的远程实例。这个actor然后就可以执行远程方法调用并维持自身内部状态:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
# 1.初始化Ray
ray.init()
# 2.定义一个Counter actor
@ray.remote
class Counter:
def __init__(self):
self.i = 0
def get(self):
return self.i
def incr(self, value):
self.i += value
if __name__ == '__main__':
# 3.创建一个Counter actor
c = Counter.remote()
# 4.调用actor的方法
for _ in range(10):
c.incr.remote(1)
# 5.获取actor的状态
print(ray.get(c.get.remote())) # Output: 10
2.3 传递object
如上所示,Ray在它的分布式目标存储中对task和actor进行存储,后面返回可以被检索的object引用。object引用可以通过 ray.put
来显式创建,object引用可以作为参数值的替换被传递给tasks。
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
# 1.初始化Ray
ray.init()
import numpy as np
# 2.定义一个远程函数
@ray.remote
def sum_matrix(matrix):
return np.sum(matrix)
if __name__ == '__main__':
# 3.调用远程函数使用字面量值
print(ray.get(sum_matrix.remote(np.ones((100, 100))))) # 10000.0
# 4.将一个大矩阵放入对象存储中,并调用远程函数
matrix_ref = ray.put(np.ones((1000, 1000)))
print(ray.get(sum_matrix.remote(matrix_ref))) # 1000000.0
3. 核心概念
3.1 Tasks
Tasks:Ray使得任意函数能够异步地执行在单独地Python工作器上,这些函数被称为Ray远程函数,它们地异步调用称为Ray任务,这里是一个示例:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
# 1.一个常规的python函数
def normal_function():
return 1
# 2.通过添加装饰器`@ray.remote`,一个常规的函数就
# 变成了一个Ray远程函数
@ray.remote
def my_function():
return 1
# 3.定义另外一个远程函数,该函数会等待10秒钟
@ray.remote
def slow_function():
time.sleep(10)
return 1
if __name__ == '__main__':
# 4.使用remote方法调用远程函数,这将立即返回一个对象引用(a future)
# 然后创建一个将被执行在worker进程中的任务
obj_ref = my_function.remote()
# 5.使用ray.get()方法获取结果
assert ray.get(obj_ref) == 1
# 6.Ray任务被并行执行了,所有计算都在后台进行,
# 由ray的内部事件循环驱动
for _ in range(4):
slow_function.remote()
3.1.1 指定资源请求
运行时,show_function
会异步被执行,主进程不会等待。Ray还允许指定在任务中的资源请求,如CPU、GPU或一些自定义资源,如:
# 1.指定需要资源
@ray.remote(num_cpu=4, num_gpus=2)
def my_function():
return 1
# 2.重写默认资源需求
my_function.options(num_cpus=3).remote()
3.1.2 传递对象引用
除了传值之外,远程函数还可以传对象引用,当任务执行时,函数体内部的参数将是底层值,例如:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
def my_function():
return 1
@ray.remote
def function_with_an_arguement(value):
return value + 1
if __name__ == '__main__':
obj_ref1 = my_function.remote()
print(ray.get(obj_ref1))
assert ray.get(obj_ref1) == 1
obj_ref2 = function_with_an_arguement.remote(obj_ref1)
print(ray.get(obj_ref2))
assert ray.get(obj_ref2) == 2
注意下列行为:
- (1) 当第二个任务依赖于第一个任务的输出时,Ray将会在执行完第一个任务后再执行第二个任务;
- (2) 若两个任务被安排在不同机器上,第一个任务的输出将通过网络发送到第二个任务所在机器上。
3.1.3 等待部分结果
对Ray任务的结果调用 ray.get
将会阻塞任务,直至任务执行完毕。在启动一系列任务后,如果想知道哪一个任务执行完毕,但是又不想阻塞它们,可以使用 ray.wait()
接口,例如:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
def slow_function():
time.sleep(10)
return 1
if __name__ == '__main__':
object_refs = [slow_function.remote() for _ in range(2)]
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
print(ray.get(ready_refs[0]))
3.1.4 多个返回值
默认Ray任务仅返回一个目标引用,然而,你可以配置Ray任务的 num_returns
选项来返回多个目标引用:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
def return_sigle():
return 0, 1, 2
@ray.remote(num_returns=3)
def return_multiple():
return 0, 1, 2
# 迭代器
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
if __name__ == '__main__':
object_ref = return_sigle.remote()
assert ray.get(object_ref) == (0, 1, 2)
print(ray.get(object_ref)) # Output: (0, 1, 2)
object_ref0, object_ref1, object_ref2 = return_multiple.remote()
print(ray.get(object_ref0))
print(ray.get(object_ref1))
print(ray.get(object_ref2))
assert ray.get(object_ref0) == 0 # Output: 0
assert ray.get(object_ref1) == 1 # Output: 1
assert ray.get(object_ref2) == 2 # Output: 2
a, b, c = return_multiple_as_generator.remote()
print(ray.get(a))
print(ray.get(b))
print(ray.get(c))
assert ray.get(a) == 0 # Output: 0
assert ray.get(b) == 1 # Output: 1
assert ray.get(c) == 2 # Output: 2
3.1.5 取消任务
Ray任务可以通过对返回的目标引用调用 ray.cancel()
被取消掉:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
def blocking_operation():
time.sleep(10e6)
if __name__ == '__main__':
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
try:
ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
print("Task cancelled successfully.")
3.2 Actors
Actors:Actor将Ray API从函数(tasks)扩展到类,一个Actor本质上是一个有状态的工作器(或服务)。当新Actor被实例化后,会创建一个新工作器,Actor方法被调度(scheduled)到该特定地工作器,同时actor的方法可访问和改变该工作器的状态。
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
if __name__ == '__main__':
counter = Counter.remote()
for i in range(10):
counter.increment.remote()
print(ray.get(counter.get_counter.remote())) # Output: 10
3.2.1 指定需要资源
你同样可以指定actors所需要资源:
# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
pass
3.2.2 调用Actor
我们可以通过 remote
操作调用actor的方法来跟actor交互,然后对每一个目标引用使用 ray.get
方法来检索实际值:
# 调用actor
obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
不同actors上的方法调用可以被并行执行,相同actor上的方法调用则会按照调用顺序串行执行,并且相同actor上的方法将彼此间共享状态,如:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
if __name__ == '__main__':
# 1.创建十个Counter的actor对象
counters = [Counter.remote() for i in range(10)]
# 2.调用每一个actor对象的increment方法,并打印返回值
results = ray.get([c.increment.remote() for c in counters])
print(results) # [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
# 3.调用第一个actor对象的increment方法五次,任务被串行执行,并且共享状态
results = ray.get([counters[0].increment.remote() for i in range(5)])
print(results) # [1, 2, 3, 4, 5]
3.2.3 传递Actor句柄
Actor句柄可以传递到其它任务中,我们可以定义使用Actor句柄的远程函数(或Actor方法):
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
ray.init()
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
@ray.remote
def f(counter):
for i in range(10):
time.sleep(0.1)
counter.increment.remote()
if __name__ == '__main__':
counter = Counter.remote()
[f.remote(counter) for _ in range(3)]
for _ in range(10):
time.sleep(0.1)
print(ray.get(counter.get_counter.remote()))
输出结果:
2024-12-12 11:14:49,250 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
0
6
9
12
15
18
21
24
27
30
3.2.4 取消Actor
可以通过对对象引用调用 ray.cancel()
来取消Actor任务:
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
import asyncio
ray.init()
@ray.remote
class Actor:
async def f(self):
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("actor cancelled")
if __name__ == '__main__':
actor = Actor.remote()
ref = actor.f.remote()
time.sleep(1)
ray.cancel(ref)
try:
ray.get(ref)
except ray.exceptions.RayTaskError:
print("Object reference was cancelled.")
输出结果:
2024-12-12 11:23:56,867 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(Actor pid=31976) actor cancelled
(Actor pid=31976) *** SIGSEGV received at time=1733973838 on cpu 6 ***
(Actor pid=31976) PC: @ 0x7f21c4b3c1d7 (unknown) boost::fibers::algo::round_robin::pick_next()
(Actor pid=31976) @ 0x7f21c6474090 3328 (unknown)
(Actor pid=31976) @ 0x7f21c4b3c088 48 boost::fibers::wait_queue::suspend_and_wait()
(Actor pid=31976) @ 0x7f21c4b3b475 64 boost::fibers::mutex::lock()
(Actor pid=31976) @ 0x7f21c4ad26ed 96 std::_Function_handler<>::_M_invoke()
(Actor pid=31976) @ 0x7f21c4acbf55 96 boost::fibers::worker_context<>::run_()
(Actor pid=31976) @ 0x7f21c4acbcd0 80 boost::context::detail::fiber_entry<>()
(Actor pid=31976) @ 0x7f21c4b3c5af (unknown) make_fcontext
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: *** SIGSEGV received at time=1733973838 on cpu 6 ***
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: PC: @ 0x7f21c4b3c1d7 (unknown) boost::fibers::algo::round_robin::pick_next()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c6474090 3328 (unknown)
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4b3c088 48 boost::fibers::wait_queue::suspend_and_wait()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4b3b475 64 boost::fibers::mutex::lock()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4ad26ed 96 std::_Function_handler<>::_M_invoke()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4acbf55 96 boost::fibers::worker_context<>::run_()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4acbcd0 80 boost::context::detail::fiber_entry<>()
(Actor pid=31976) [2024-12-12 11:23:58,455 E 31976 32015] logging.cc:361: @ 0x7f21c4b3c5af (unknown) make_fcontext
(Actor pid=31976) Fatal Python error: Segmentation fault
(Actor pid=31976)
Object reference was cancelled.
3.3 Objects
Objects:在Ray中,任务和Actors在对象上创建和计算,我们将这些对象称为远程对象,因为它们可以存储在集群中任何位置,并且我们使用对象引用来引用它们。远程对象缓存在Ray分布式共享内存对象存储中,并且集群中每个节点都有一个对象存储。在集群设置中,与谁持有对象引用无关,远程对象可以存在于一个或多个节点上。
对象引用本质上是一个指针或唯一ID,可用于引用远程对象而无需查看其值,如果你熟悉future,Ray对象引用在概念上是相似的,对象引用可通过两种方式创建:
- (1) 远程函数调用返回得到;
- (2) 使用
ray.put()
返回得到;
import ray
ray.init()
y = 1
obj_ref = ray.put(y)
3.3.1 获取对象数据
你可以使用 ray.get()
方法从一个对象引用获取远程对象的结果。如果当前节点的对象存储未包含对象,则下载该对象。
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
import time
from ray.exceptions import GetTimeoutError
ray.init()
@ray.remote
def long_running_function():
time.sleep(8)
if __name__ == '__main__':
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]
obj_ref = long_running_function.remote()
try:
ray.get(obj_ref, timeout=4)
except GetTimeoutError:
print("`get` time out.")
3.3.2 传递对象
Ray对象引用可以在Ray应用程序中自由传递,这意味着它们可以作为参数传递给任务、actor方法、甚至存储在其它对象中。对象通过分布式引用计数被跟踪,一旦对象所有引用被删除掉,它们的数据自动释放。
有两种不同方式可以传递对象到ray的任务或方法,根据对象传递方法,ray将会决定是否在执行任务前解引用对象。
- (1) 将对象作为顶层参数传递:当一个对象直接作为顶层参数传递给一个任务,Ray将会解引用对象。这意味着Ray将获取所有顶层对象引用参数的底层数据,直到对象数据完全可用时才执行任务。
'''
Author: chenjingyu
Date: 2024-12-11 16:49:46
Contact: 2458006466@qq.com
Description: hello world
'''
import ray
@ray.remote
def echo(a: int, b: int, c: int):
print(a, b, c)
if __name__ == '__main__':
# 1.传字面量参数(1, 2, 3)给echo函数
echo.remote(1, 2, 3)
# 2.将值(1, 2, 3)放入ray的对象存储中,再传给echo函数
a, b, c = ray.put(1), ray.put(2), ray.put(3)
echo.remote(a, b, c)
- (2) 将对象作为嵌套参数传递:当对象在嵌套对象中传递时(如,Python列表中),Ray不会解引用它。这意味着任务将需要调用
ray.get()
引用来获取具体值。但是,若任务从不调用ray.get()
,则对象值永远不需要传输到运行任务的机器上。我们建议尽可能将对象作为顶层参数传递,但嵌套参数对于将对象传递给其它任务而无需查看数据时很有用:
import ray
@ray.remote
def echo(a: int, b: int, c: int):
print(a, b, c)
@ray.remote
def echo_and_get(x_list):
print("args: ", x_list)
print("values: ", ray.get(x_list))
if __name__ == '__main__':
a, b, c = ray.put(1), ray.put(2), ray.put(3)
echo_and_get.remote([a, b, c])
3.3.3 对象的闭包捕获
你还可以通过闭包像任务传递对象。当你有一个大型对象,想要多个任务或actors之间逐字共享,并且不想将其作为参数重复传递时,这种方法会很方便。但注意,定义一个通过对象引用关闭任务将通过引用计数固定该对象,因此,该对象在作业完成前不会被驱逐。
import ray
a, b, c = ray.put(1), ray.put(2), ray.put(3)
@ray.remote
def print_via_capture():
print(ray.get([a, b, c]))
if __name__ == '__main__':
print_via_capture.remote()
3.4 Environment Dependencies
你的ray应用可能存在于Ray脚本之外的依赖项,例如:
- (1) 你的ray脚本可能import/depend一些python包;
- (2) 你的ray脚本可能查询一些可访问的特殊环境变量;
- (3) 你的ray脚本可能import一些脚本之外的文件;
为了处理这个问题,你可以(1) 使用Ray的 Cluster Launcher
预先在集群里准备你的依赖项(例如,使用一个容器镜像),或 (2) 使用Ray的 runtime environment
在线安装它们;
对于生产使用或不变环境,我们推荐安装你的依赖项到一个容器镜像中,并使用 cluster Launcher
指定镜像。对于动态环境(如,生产环境和实验环境),我们推荐使用运行时环境。
3.5 Scheduling
对于每个任务或actor,Ray将会选择一个节点运行它,调度决策是基于如下因素:
3.5.1 Resources
每个任务或actor有特定的资源请求。给定这些,一个节点可以是下面几种状态之一:
-
Feasible:节点有运行任务或actor所需资源。存在下面两个子状态来决定当前这些资源的可用性:
- 可用:节点有所需资源,并且当前它们是空闲状态的;
- 不可用:节点有所需资源,但是它们当前正在被别的任务或actors使用中;
-
Infeasible:节点没有所需资源。例如,一个纯CPU节点对于一个GPU任务而言就是不可行的。
资源需求是硬性需求,这意味着只有可行节点才有资格运行任务或actor。如果没有可行节点,Ray会选择一个可用节点或按照下面讨论的因素,等待一个不可用节点变成可用节点。如果所i有节点都可行,任务或actor将不能被调度直到集群中添加了可用节点为止。
import ray
ray.init()
if __name__ == '__main__':
num_gpus = int(ray.available_resources()["GPU"])
print(f"num_gpus: {num_gpus}")
num_cpus = int(ray.available_resources()["CPU"])
print(f"num_cpus: {num_cpus}")
3.5.2 Scheduling Strategies
任务或actors支持使用 scheduling_strategy
选项来指定用于决定在所有可行节点中最好节点的策略。当前支持的策略有如下几种:
DEFAULT:DEFAULT是Ray使用的默认策略。Ray将任务或actors调度到一组前k个节点上。具体来说,节点排序首先倾向于已经有安排任务或actors的节点(局部性),然后倾向于那些较低资源利用率的节点(负载均衡)。在前k个组中,节点被随机选取来进一步提升负载均衡并减轻大型集群中冷启动造成的延迟。
实现方面,Ray基于逻辑资源利用率对集群中每个节点计算一个分数,如果利用率低于一个阈值(由操作系统环境变量 RAY_scheduler_spread_threshold
控制,默认是0.5),得分就是0,反之,分数就是自身资源利用率(分数为1表示节点已充分利用)。Ray通过从得分最低的前k个节点中随机选取最佳节点来进行调度。k是(集群中节点数乘以环境变量 RAY_scheduler_top_k_fraction
)和环境变量 Ray_scheduler_top_k_absolute
之间的最大值,默认情况为节点总数的20%。
当前ray处理不需要任何资源(即 num_cpus=0
没有其他资源)的actors,方法是随机选择集群中的节点,而不考虑资源利用率。由于节点是随机选择的,因此不需要任何资源的actors会有效地分布在集群中。
@ray.remote
def func():
return 1
@ray.remote(num_cpus=1)
class Actor:
pass
# If unspecified, "DEFAULT" scheduling strategy is used.
func.remote()
actor = Actor.remote()
# Explicitly set scheduling strategy to "DEFAULT".
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()
# Zero-CPU (and no other resources) actors are randomly assigned to nodes.
actor = Actor.options(num_cpus=0).remote()
"SPREAD":"SPREAD"策略将会尝试在可用节点间传播任务或actors
@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
return 2
@ray.remote(num_cpus=1)
class SpreadActor:
pass
# Spread tasks across the cluster.
[spread_func.remote() for _ in range(10)]
# Spread actors across the cluster.
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]