https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
문서에 나와있는 설명으로는 default process group을 초기화하고, distributed package 또한 초기화하는 동작을 한다.
process group을 초기화하는 방법에는 크게 두가지의 주된 방법이 있다:
- store, rank, world_size를 명시적으로 입력하는 방법
- peer들을 어떻게/어디서 찾을지에 대해 나타내는 init_method를 입력하는 방법
소스 코드를 좀 더 확인해보자.
def init_process_group(
backend: Union[str, Backend] = None,
init_method: Optional[str] = None,
timeout: timedelta = default_pg_timeout,
world_size: int = -1,
rank: int = -1,
store: Optional[Store] = None,
group_name: str = "",
pg_options: Optional[Any] = None,
):
...
문서에서는 인자에 대한 설명을 아래와 같이 확인할 수 있다:
- backend (str or Backend, optional) – 사용할 backend. 빌드-타임 설정에 따라 mpi, gloo, nccl, and ucc를 사용할 수 있음. backend가 주어지지 않는다면 gloo and nccl backend가 모두 생성됨. nccl backend를 사용하는 머신당 다수의 프로세스를 사용한다면, 각각의 프로세스는 모두 서로 배제적인 GPU를 가져야만 함. 여러 프로세스에서 GPU를 공유하는 것은 deadlock을 발생시킬 수 있음.
- init_method (str, optional) – process group을 초기화하기 위한 URL 명세. Default “env://”. store와는 상호 배제적임.
- world_size (int, optional) – 작업에 참여하는 process 수. store가 입력되었다면 반드시 입력되어야 함.
- rank (int, optional) – 현재 process의 rank (0 ~ world_size-1). store가 입력되었다면 반드시 입력되어야 함.
- store (Store, optional) – 모든 worker에서 접근할 수 있는 Key/value store. connection/address information을 교환할 때 사용됨. init_method과는 상호 배제적임.
- timeout (timedelta, optional) – process group에서 operation을 수행할 때의 timeout. Default 30분. gloo backend에 적용 가능. nccl에서는 NCCL_BLOCKING_WAIT 또는 NCCL_ASYNC_ERROR_HANDLING 환경변수가 1로 설정되어 있을 때만 적용가능. NCCL_BLOCKING_WAIT 이 설정되면, timeout 값은 예외를 발생시키기 전에 프로세스가 차단되고 comllectives가 완료되기를 기다리는 기간임. NCCL_ASYNC_ERROR_HANDLING 이 설정되면, timeout 값은 collectives가 비동기적으로 aborted되고, process가 crash되기까지의 시간임. NCCL_BLOCKING_WAIT 는 사용자에게 catch가능하고 처리가능한 에러를 제공하지만 blocking이라는 특성 때문에 성능 오버헤드가 있음. 반면에, NCCL_ASYNC_ERROR_HANDLING 는 아주 작은 성능 오버헤드만 있지만, 에러 발생시 프로세스를 crash시킴. 이러한 것들은 CUDA execution이 비동기적일뿐더러 사용자 코드를 계속 수행한다면 실패한 비동기 NCCL 연산이 이후의 corrupted 데이터에서 실행되는 CUDA 연산이 되기 때문에 더 이상 안전하지 않아서 발생함.
- group_name (str, optional, deprecated) – Group name.
- pg_options (ProcessGroupOptions, optional) – 특정 process group을 생성하는 과정에서 필요한 추가적인 옵션들을 명세. 현재는 nccl backend를 위한 ProcessGroupNCCL.Options 만 제공.
torch.distributed.init_process_group 함수는 _world, _backend, _default_pg_init_method를 global 키워드를 사용해 전역변수화하는 것으로 시작한다.
...
global _world
global _backend
global _default_pg_init_method
...
그런 다음 몇가지 인자에 대한 조건을 처리한 뒤, backend 인자에 맞게 backend를 Backend 객체로 초기화한다.
if backend:
backend = Backend(backend)
else:
backend = Backend("undefined")
MPI backend인 경우, _new_process_group_helper 함수에 다음과 같은 인자로 process_group을 초기화한다. (world_size, rank, storea를 사용하지 않음)
if backend == Backend.MPI:
if world_size != -1 or rank != -1:
warnings.warn(
"For MPI backend, world_size ({}) and rank ({}) "
"are ignored since they are assigned by the "
"MPI runtime.".format(world_size, rank)
)
default_pg = _new_process_group_helper(
-1, -1, [], backend, None, group_name=group_name, timeout=timeout
)
_update_default_pg(default_pg)
다른 backend일 경우, _new_process_group_helper 함수에 다음과 같은 인자를 전달하여 process group을 초기화한다.
else:
# backward compatible API
if store is None:
rendezvous_iterator = rendezvous(
init_method, rank, world_size, timeout=timeout
)
store, rank, world_size = next(rendezvous_iterator)
store.set_timeout(timeout)
# Use a PrefixStore to avoid accidental overrides of keys used by
# different systems (e.g. RPC) in case the store is multi-tenant.
store = PrefixStore("default_pg", store)
default_pg = _new_process_group_helper(
world_size,
rank,
[],
backend,
store,
pg_options=pg_options,
group_name=group_name,
timeout=timeout,
)
_update_default_pg(default_pg)
결국 process group 초기화는 _new_process_group_helper 함수에서 진행되는데, 자세히 살펴보자.
def _new_process_group_helper(
group_size,
group_rank,
global_ranks_in_group,
backend,
store,
pg_options=None,
group_name=None,
timeout=default_pg_timeout,
):
"""
Create a new distributed process group.
This function must be called by ALL processes in the global group, even if
the calling process is not part of the newly created group. In that case,
this function returns GroupMember.NON_GROUP_MEMBER.
This function is called with ``global_ranks_in_group == []`` for the default group.
"""
global _world
함수의 주요한 내용은 backend_config의 device_backend_map에 대한 for loop이다.
prefix_store = PrefixStore(f"{group_name}/", store)
base_pg_options = ProcessGroup.Options(backend=str(backend))
base_pg_options._timeout = timeout
pg: ProcessGroup = ProcessGroup(prefix_store, group_rank, group_size, base_pg_options)
backend_config = BackendConfig(backend)
for device, backend_str in backend_config.get_device_backend_map().items():
...
device_backend_map은 일반적으로 아래와 같다. backend를 지정해주지 않았다면, cpu는 gloo로 cuda는 nccl로 구성된다.
backend_val = Backend(backend)
self.device_backend_map = {
"cpu": backend_val,
"cuda": backend_val,
}
_new_process_group_helper는 결국 cpu 및 gpu에 대한 backend process group 객체를 생성하여 반환한다.
elif backend_str == Backend.GLOO:
# TODO: remove this check after lazy initialization is supported
# if pg_options is not None:
# raise RuntimeError("GLOO options not supported")
backend_class = ProcessGroupGloo(backend_prefix_store, group_rank, group_size, timeout=timeout)
backend_type = ProcessGroup.BackendType.GLOO
elif backend_str == Backend.NCCL:
if not is_nccl_available():
raise RuntimeError("Distributed package doesn't have NCCL " "built in")
if pg_options is not None:
assert isinstance(
pg_options, ProcessGroupNCCL.Options
), "Expected pg_options argument to be of type ProcessGroupNCCL.Options"
else:
# default pg_options for NCCL
pg_options = ProcessGroupNCCL.Options()
pg_options.is_high_priority_stream = False
pg_options._timeout = timeout
backend_class = ProcessGroupNCCL(backend_prefix_store, group_rank, group_size, pg_options)
backend_type = ProcessGroup.BackendType.NCCL
...
if issubclass(type(backend_class), ProcessGroup):
pg = backend_class
break
...
# update global state
_world.pg_map[pg] = (backend, prefix_store)
_world.pg_names[pg] = group_name
_world.pg_backend_config[pg] = str(backend_config)
return pg