Skip to content

Commit

Permalink
Infinite sampler (open-mmlab#5996)
Browse files Browse the repository at this point in the history
* add infinite sampler

* add docstring

* sup dp

* rename it to DistributedInfiniteGroupBatchSampler

* fix default value

* support shuffle is false

* resolve comments

* add two 90k config

* fix dp case

* avoid bc breaking

* fix doc

Co-authored-by: zhangshilong <[email protected]>
  • Loading branch information
yhcao6 and jshilong authored Oct 25, 2021
1 parent e43df7c commit d145b0e
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 14 deletions.
15 changes: 15 additions & 0 deletions configs/faster_rcnn/faster_rcnn_r50_caffe_fpn_90k_coco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
_base_ = 'faster_rcnn_r50_caffe_fpn_1x_coco.py'

# learning policy
lr_config = dict(
policy='step',
warmup='linear',
warmup_iters=500,
warmup_ratio=0.001,
step=[60000, 80000])

# Runner type
runner = dict(_delete_=True, type='IterBasedRunner', max_iters=90000)

checkpoint_config = dict(interval=10000)
evaluation = dict(interval=10000, metric='bbox')
15 changes: 15 additions & 0 deletions configs/retinanet/retinanet_r50_fpn_90k_coco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
_base_ = 'retinanet_r50_fpn_1x_coco.py'

# learning policy
lr_config = dict(
policy='step',
warmup='linear',
warmup_iters=500,
warmup_ratio=0.001,
step=[60000, 80000])

# Runner type
runner = dict(_delete_=True, type='IterBasedRunner', max_iters=90000)

checkpoint_config = dict(interval=10000)
evaluation = dict(interval=10000, metric='bbox')
9 changes: 6 additions & 3 deletions mmdet/apis/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ def train_detector(model,
f'{cfg.data.imgs_per_gpu} in this experiments')
cfg.data.samples_per_gpu = cfg.data.imgs_per_gpu

runner_type = 'EpochBasedRunner' if 'runner' not in cfg else cfg.runner[
'type']
data_loaders = [
build_dataloader(
ds,
cfg.data.samples_per_gpu,
cfg.data.workers_per_gpu,
# cfg.gpus will be ignored if distributed
len(cfg.gpu_ids),
# `num_gpus` will be ignored if distributed
num_gpus=len(cfg.gpu_ids),
dist=distributed,
seed=cfg.seed) for ds in dataset
seed=cfg.seed,
runner_type=runner_type) for ds in dataset
]

# put model on gpus
Expand Down
55 changes: 45 additions & 10 deletions mmdet/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from mmcv.utils import Registry, build_from_cfg
from torch.utils.data import DataLoader

from .samplers import DistributedGroupSampler, DistributedSampler, GroupSampler
from .samplers import (DistributedGroupSampler, DistributedSampler,
GroupSampler, InfiniteBatchSampler,
InfiniteGroupBatchSampler)

if platform.system() != 'Windows':
# https://github.com/pytorch/pytorch/issues/973
Expand Down Expand Up @@ -87,6 +89,7 @@ def build_dataloader(dataset,
dist=True,
shuffle=True,
seed=None,
runner_type='EpochBasedRunner',
**kwargs):
"""Build PyTorch DataLoader.
Expand All @@ -103,28 +106,59 @@ def build_dataloader(dataset,
dist (bool): Distributed training/test or not. Default: True.
shuffle (bool): Whether to shuffle the data at every epoch.
Default: True.
runner_type (str): Type of runner. Default: `EpochBasedRunner`
kwargs: any keyword argument to be used to initialize DataLoader
Returns:
DataLoader: A PyTorch dataloader.
"""
rank, world_size = get_dist_info()

if dist:
# DistributedGroupSampler will definitely shuffle the data to satisfy
# that images on each GPU are in the same group
if shuffle:
sampler = DistributedGroupSampler(
dataset, samples_per_gpu, world_size, rank, seed=seed)
else:
sampler = DistributedSampler(
dataset, world_size, rank, shuffle=False, seed=seed)
# When model is :obj:`DistributedDataParallel`,
# `batch_size` of :obj:`dataloader` is the
# number of training samples on each GPU.
batch_size = samples_per_gpu
num_workers = workers_per_gpu
else:
sampler = GroupSampler(dataset, samples_per_gpu) if shuffle else None
# When model is obj:`DataParallel`
# the batch size is samples on all the GPUS
batch_size = num_gpus * samples_per_gpu
num_workers = num_gpus * workers_per_gpu

if runner_type == 'IterBasedRunner':
# this is a batch sampler, which can yield
# a mini-batch indices each time.
# it can be used in both `DataParallel` and
# `DistributedDataParallel`
if shuffle:
batch_sampler = InfiniteGroupBatchSampler(
dataset, batch_size, world_size, rank, seed=seed)
else:
batch_sampler = InfiniteBatchSampler(
dataset,
batch_size,
world_size,
rank,
seed=seed,
shuffle=False)
batch_size = 1
sampler = None
else:
if dist:
# DistributedGroupSampler will definitely shuffle the data to
# satisfy that images on each GPU are in the same group
if shuffle:
sampler = DistributedGroupSampler(
dataset, samples_per_gpu, world_size, rank, seed=seed)
else:
sampler = DistributedSampler(
dataset, world_size, rank, shuffle=False, seed=seed)
else:
sampler = GroupSampler(dataset,
samples_per_gpu) if shuffle else None
batch_sampler = None

init_fn = partial(
worker_init_fn, num_workers=num_workers, rank=rank,
seed=seed) if seed is not None else None
Expand All @@ -134,6 +168,7 @@ def build_dataloader(dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_workers,
batch_sampler=batch_sampler,
collate_fn=partial(collate, samples_per_gpu=samples_per_gpu),
pin_memory=False,
worker_init_fn=init_fn,
Expand Down
6 changes: 5 additions & 1 deletion mmdet/datasets/samplers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .distributed_sampler import DistributedSampler
from .group_sampler import DistributedGroupSampler, GroupSampler
from .infinite_sampler import InfiniteBatchSampler, InfiniteGroupBatchSampler

__all__ = ['DistributedSampler', 'DistributedGroupSampler', 'GroupSampler']
__all__ = [
'DistributedSampler', 'DistributedGroupSampler', 'GroupSampler',
'InfiniteGroupBatchSampler', 'InfiniteBatchSampler'
]
171 changes: 171 additions & 0 deletions mmdet/datasets/samplers/infinite_sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import itertools

import numpy as np
import torch
from mmcv.runner import get_dist_info
from torch.utils.data.sampler import Sampler


class InfiniteGroupBatchSampler(Sampler):
"""Similar to `BatchSampler` warping a `GroupSampler. It is designed for
iteration-based runners like `IterBasedRunner` and yields a mini-batch
indices each time, all indices in a batch should be in the same group.
The implementation logic is referred to
https://github.com/facebookresearch/detectron2/blob/main/detectron2/data/samplers/grouped_batch_sampler.py
Args:
dataset (object): The dataset.
batch_size (int): When model is :obj:`DistributedDataParallel`,
it is the number of training samples on each GPU.
When model is :obj:`DataParallel`, it is
`num_gpus * samples_per_gpu`.
Default : 1.
world_size (int, optional): Number of processes participating in
distributed training. Default: None.
rank (int, optional): Rank of current process. Default: None.
seed (int): Random seed. Default: 0.
shuffle (bool): Whether shuffle the indices of a dummy `epoch`, it
should be noted that `shuffle` can not guarantee that you can
generate sequential indices because it need to ensure
that all indices in a batch is in a group. Default: True.
""" # noqa: W605

def __init__(self,
dataset,
batch_size=1,
world_size=None,
rank=None,
seed=0,
shuffle=True):
_rank, _world_size = get_dist_info()
if world_size is None:
world_size = _world_size
if rank is None:
rank = _rank
self.rank = rank
self.world_size = world_size
self.dataset = dataset
self.batch_size = batch_size
self.seed = seed if seed is not None else 0
self.shuffle = shuffle

assert hasattr(self.dataset, 'flag')
self.flag = self.dataset.flag
self.group_sizes = np.bincount(self.flag)
# buffer used to save indices of each group
self.buffer_per_group = {k: [] for k in range(len(self.group_sizes))}

self.size = len(dataset)
self.indices = self._indices_of_rank()

def _infinite_indices(self):
"""Infinitely yield a sequence of indices."""
g = torch.Generator()
g.manual_seed(self.seed)
while True:
if self.shuffle:
yield from torch.randperm(self.size, generator=g).tolist()

else:
yield from torch.arange(self.size).tolist()

def _indices_of_rank(self):
"""Slice the infinite indices by rank."""
yield from itertools.islice(self._infinite_indices(), self.rank, None,
self.world_size)

def __iter__(self):
# once batch size is reached, yield the indices
for idx in self.indices:
flag = self.flag[idx]
group_buffer = self.buffer_per_group[flag]
group_buffer.append(idx)
if len(group_buffer) == self.batch_size:
yield group_buffer[:]
del group_buffer[:]

def __len__(self):
"""Length of base dataset."""
return self.size

def set_epoch(self, epoch):
"""Not supported in `IterationBased` runner."""
raise NotImplementedError


class InfiniteBatchSampler(Sampler):
"""Similar to `BatchSampler` warping a `DistributedSampler. It is designed
iteration-based runners like `IterBasedRunner` and yields a mini-batch
indices each time.
The implementation logic is referred to
https://github.com/facebookresearch/detectron2/blob/main/detectron2/data/samplers/grouped_batch_sampler.py
Args:
dataset (object): The dataset.
batch_size (int): When model is :obj:`DistributedDataParallel`,
it is the number of training samples on each GPU,
When model is :obj:`DataParallel`, it is
`num_gpus * samples_per_gpu`.
Default : 1.
world_size (int, optional): Number of processes participating in
distributed training. Default: None.
rank (int, optional): Rank of current process. Default: None.
seed (int): Random seed. Default: 0.
shuffle (bool): Whether shuffle the dataset or not. Default: True.
""" # noqa: W605

def __init__(self,
dataset,
batch_size=1,
world_size=None,
rank=None,
seed=0,
shuffle=True):
_rank, _world_size = get_dist_info()
if world_size is None:
world_size = _world_size
if rank is None:
rank = _rank
self.rank = rank
self.world_size = world_size
self.dataset = dataset
self.batch_size = batch_size
self.seed = seed if seed is not None else 0
self.shuffle = shuffle
self.size = len(dataset)
self.indices = self._indices_of_rank()

def _infinite_indices(self):
"""Infinitely yield a sequence of indices."""
g = torch.Generator()
g.manual_seed(self.seed)
while True:
if self.shuffle:
yield from torch.randperm(self.size, generator=g).tolist()

else:
yield from torch.arange(self.size).tolist()

def _indices_of_rank(self):
"""Slice the infinite indices by rank."""
yield from itertools.islice(self._infinite_indices(), self.rank, None,
self.world_size)

def __iter__(self):
# once batch size is reached, yield the indices
batch_buffer = []
for idx in self.indices:
batch_buffer.append(idx)
if len(batch_buffer) == self.batch_size:
yield batch_buffer
batch_buffer = []

def __len__(self):
"""Length of base dataset."""
return self.size

def set_epoch(self, epoch):
"""Not supported in `IterationBased` runner."""
raise NotImplementedError

0 comments on commit d145b0e

Please sign in to comment.