개요
파이썬에서 병렬처리를 쉽게 해줄 수 있는 라이브러리 입니다. 단순히 병렬처리 뿐만 아니라 머신러닝을 위한 다양한 기능이 있는 파이토치와 같은 프레임워크를 지향하는 것 같습니다.
저는 간단히 멀티프로세싱 로직만 써보고 해당 내용을 정리하고자 합니다.
설치
pip를 이용해서 간단히 설치해줍니다.
pip install ray
사용법 - 1
가장 기본적인 사용법을 정리합니다. 상황은 5개의 파일이 있고, 하나의 파일을 처리하는데 5초가 걸리는 시나리오를 생각합니다. 순차방식으로 처리를 하면 대략 25초 정도의 시간이 걸려야 하지만, 이를 병렬처리를 통해 효과적으로 처리해보겠습니다.
코드
import ray
import time
ray.init
@ray.remote
def process_task(file_path):
"""
독립적으로 실행할 태스크 정의
"""
# open file & processing
print("processing ... : {}".format(file_path))
time.sleep(5)
return True
# 입력 파일 정의
inputs=["/tmp/file1.txt", "/tmp/file2.txt", "/tmp/file3.txt",
"/tmp/file4.txt", "/tmp/file5.txt"]
# ray object ref 리스트 정의
tasks = [process_task.remote(path) for path in inputs]
# 병렬처리 시작
ray.get(tasks)
실행결과
$ export RAY_DEDUP_LOGS=0 && time python3 ./r1.py
2023-12-15 07:08:24,179 INFO worker.py:1673 -- Started a local Ray instance.
(process_task pid=551560) processing ... : /tmp/file4.txt
(process_task pid=551562) processing ... : /tmp/file5.txt
(process_task pid=551547) processing ... : /tmp/file3.txt
(process_task pid=551564) processing ... : /tmp/file1.txt
(process_task pid=551566) processing ... : /tmp/file2.txt
real 0m10.795s
user 0m4.835s
sys 0m7.360s
분석
먼저 독립적으로 실행할 함수를 정의하고 @ray.remote 데코레이터로 지정해주도록 합니다. 그리고 파일목록을 순회하면서 처리할 파일들을 순차적으로 remote()함수를 호출해줍니다.
remote()함수의 리턴값은 object_refs 타입으로, ray가 주고받는 레퍼런스 형식의 값입니다.
이렇게 실행하면, 5개의 프로세스를 실행해서 독립적으로 수행을 합니다. 소요시간도 25초보다 훨씬 절약된 것을 볼 수 있습니다.

한가지만 더 살펴보자면, ray.get()을 수행하는 순간 병렬처리가 시작되고, 먼저 끝나는 태스크는 대기하게 됩니다. 위에 예제에서 태스크마다 5초가 걸리는 가정을 하였는데요, 어떤 태스크가 10초가 걸린다고 한다면 모든 태스크는 10초가 될때까지 기다리게 됩니다.
모든 결과를 처리한 이후에 어떤 작업을 수행해야하는 경우 비효율적인 상황이 발생할 수 있습니다.
먼저 끝난 task의 결과물 부터 어떤 연산을 시작하게 하면 더 효과적으로 할 수 있는데요, 그렇게 할 수 있는 방법은 아래와 같습니다.
사용법 - 2
태스크가 끝남과 동시에 순차적으로 어떤 일을 처리하게 하려면 아래와 같이 ray.wiat()함수를 사용하면 됩니다.
코드
import ray
import time
ray.init
@ray.remote
def process_task(file_path):
"""
독립적으로 실행할 태스크 정의
"""
# open file & processing
print("processing ... : {}".format(file_path))
time.sleep(5)
return True
inputs=["/tmp/file1.txt", "/tmp/file2.txt", "/tmp/file3.txt",
"/tmp/file4.txt", "/tmp/file5.txt"]
tasks = [process_task.remote(path) for path in inputs]
while tasks:
done, tasks = ray.wait(tasks)
print("return value = {}".format(ray.get(done[0])))
실행결과
$ export RAY_DEDUP_LOGS=0 && time python3 ./r2.py
2023-12-15 08:07:54,242 INFO worker.py:1673 -- Started a local Ray instance.
(process_task pid=556749) processing ... : /tmp/file3.txt
(process_task pid=556743) processing ... : /tmp/file2.txt
(process_task pid=556772) processing ... : /tmp/file1.txt
(process_task pid=556777) processing ... : /tmp/file5.txt
(process_task pid=556769) processing ... : /tmp/file4.txt
return value = True
return value = True
return value = True
return value = True
return value = True
real 0m11.980s
user 0m5.551s
sys 0m7.369s
분석
대부분 동일하지만, ray.get()함수가 ray.wait()함수로 변경된 것을 볼 수 있습니다. wait()함수는 두개의 리스트를 반환하는데, 하나는 작업이 완료된 리스트, 그리고 두번째는 완료되지 않은 리스트입니다.
done 값의 경우 리스트이고, 완료된 것이기때문에 done[0]과 같이 하나의 아이템을 엑세스 할 수 있고요, 이것을 ray.get()으로 그 값을 가져오게 됩니다.
이 부분에서 좀 헷갈릴 수 있는데요, ray.get()을 하면 병렬 처리가 시작된다고 위에서 말씀드렸는데, 이번에는 단순히 리턴값을 가지고 오기만 하였습니다. 추측해본건데, object_ref의 타입에 따라서 동적으로 동작하는 듯 보입니다.
task관련 object라면 실제 병렬처리를 시작할 것이고, 단순히 리턴값에 대한 타입이면 해당 값을 바로 가져올 수 있는 기능인 것 같습니다. 그런 의미에서 get이라는 네이밍은 적당히 추상화가 잘 되어있는 듯 보이네요.

while loop를 통해서 병렬 처리가 끝난 object_ref를 알 수 있고 바로바로 다음 job을 실행할 수 있게 되었습니다.
그림으로 보자면 위와 같이 좀더 효율적으로 job을 처리한다는 것을 알 수 있습니다.
마치며
c/c++에서 스레드 프로그래밍을 할때, 특히 네트워크 프로그래밍에서 정말 좌절을 맛본기억이 있었는데요, python을 사용하면서 그래도 많이 추상화가 되어 스레드, 스레드풀 등 다양한 방법을 쉽게 접했던 기억이 있습니다.
그런데, ray라는 라이브러리는 이보다도 더 강력한 추상화 기능을 제공하여 그냥 초기화 하고, 스레드로 동작할 함수를 데코레이터로 묶어주고 get이나 wait 함수로만 호출해주면 바로 동작을 하는데 정말 편리했습니다.
간단한 배치작업들, 여러파일 작업들은 위와 같이 간단한 작업으로 전환 가능하니 한번씩 써보시면 좋을 것 같습니다.
참고
https://docs.ray.io/en/latest/ray-overview/getting-started.html
https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html?highlight=tip