背景
私が調べた限り、Ray v2.4.0時点で、多数のノードにまたがって動作するmutexは実装されていない。
一応、Actorが共有リソースと排他処理を実現しているので、これで十分な場合もあるだろう。しかしActorはあるノードの中で1個のプロセスとして動作するらしく1、どのノードのどのプロセスからActorのメソッドを呼び出しても特定のプロセス内で呼び出されてしまう。私はプロセスごとにローカルかつ排他的に呼び出したい処理があったので、Actorの持つメソッドという方法では実現できなかった。
単一のノード内で動かす場合はAsyncIOなどが動作するという話も見かけたのだが、私の場合は残念ながら分散処理をさせたいのでこれも不適当だ。
というわけで、lock、unlockが可能なmutexを作成することにした。幸いなことに、Actorは必ず排他的に処理を実行できるので、これを使えばmutexのような動作をさせることはできる。
動作確認はWindows 10のPC2台で行っている。RayのWindows版は現状ベータ版相当であることは留意されたい。
実装
0.1秒に一度mutexのlock状態を確認するだけの単純な仕組みである。いわゆるビジーウェイトによる極めていい加減な実装であるが、私はRayの中身についてろくに理解できていないので、これが今の私にできうる最大限であるし、私の用途ではこの程度の雑な実装でも特に問題にならないだろう。
リモート関数taskのうち前半のtime.sleep(2)
は並列に実行されるが、その後のlock-unlock内に括られているtime.sleep(1)
は排他処理となっている。
import ray import time import os @ray.remote(num_cpus=0) def task(num, mutex): time.sleep(2) print(f"task {num}") lock(mutex) time.sleep(1) unlock(mutex) print(f"done {num}.") return num @ray.remote class RayMutex: def __init__(self): self.lock = False def is_locked(self): return self.lock def set_unlocked(self): self.lock = False def set_locked(self): if self.lock: return True self.lock = True return False def unlock(mutex): mutex.set_unlocked.remote() def lock(mutex, interval=0.1): waiting = True while waiting: while ray.get(mutex.is_locked.remote()): time.sleep(interval) waiting = ray.get(mutex.set_locked.remote()) if __name__ == "__main__": os.environ["RAY_DEDUP_LOGS"] = "0" ray.init(address="auto") mutex = RayMutex.remote() futures = [ task.remote(i, mutex) for i in range(0, 10) ] res = [ ray.get(f) for f in futures ]
- 例えば、Actorが(ip=192.168.0.10, pid=12345)に割り当てられたとする。このとき、別のノードのタスク(ip=192.168.0.20, pid=67890)からActorのメソッドを呼び出したとしても、そのメソッドは(ip=192.168.0.10, pid=12345)の中で実行される。これは私の望む動作ではなかった。↩