Accelerate
是一个库,只需添加四行代码,就可以在任何分布式 configuration
中运行相同的 PyTorch
代码:
x+ from accelerate import Accelerator
+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )
for batch in training_dataloader:
optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
+ accelerator.backward(loss)
optimizer.step()
scheduler.step()
上述代码可以通过 Accelerate
的 CLI
接口在任何系统上启动:
xxxxxxxxxx
accelerate launch {my_script.py}
安装:
xxxxxxxxxx
pip install accelerate
conda install -c conda-forge accelerate
pip install git+https://github.com/huggingface/accelerate
配置:
xxxxxxxxxx
accelerate config
然后 accelerate
会向你询问一些问题从而生成配置。
检查配置:
xxxxxxxxxx
accelerate env
为了使用 Accelerate
,你只需要修改四件事:
首先,导入 Accelerator
并创建一个 accelerator
对象:
xxxxxxxxxx
from accelerate import Accelerator
accelerator = Accelerator()
然后,移除针对你的模型和输入数据的所有 .to(device)
或 .cuda()
的调用。accelerator
将会为你正确处理这个问题,并为你把所有这些对象放在正确的设备上。
如果你知道你在做什么,你可以保留那些 .to(device)
的调用,但你应该使用 accelerator
对象提供的设备: accelerator.device
。
要完全停用自动的 device placement
,在初始化 Accelerator
时传递 device_placement=False
。
接着,将所有与训练有关的对象(optimizer, model, training dataloader, learning rate scheduler
)传递给accelerator.prepare()
方法。这将确保一切都为训练做好准备。
xxxxxxxxxx
model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, lr_scheduler
)
具体而言,training dataloader
将被分片到所有可用的 GPU/TPU
核心上,这样每个设备都能看到训练数据集的不同部分。此外,所有进程的随机数状态将在每次迭代开始时通过 dataloader
进行同步,以确保数据以相同的方式被混洗(如果你决定使用shuffle=True
或任何类型的 random sampler
)。
训练的实际 batch size
将是使用的设备数量乘以你在脚本中设置的 batch size
。另外,你可以在创建 accelerator
对象时使用 split_batches=True
参半,此时无论你在多少个 GPU
上运行你的脚本,实际 batch size
都会保持不变。
你需要再开始实际的 training loop
之前执行 accelerator.prepare()
。
只有当 scheduler
需要再每个 optimizer step
中被 stepped
时,才需要把 learning rate scheduler
传递给 prepare()
。
任何获取 training dataloader length
的方法(例如,你需要记录 total training step
)都应该在 accelerator.prepare()
之后进行。
你可能想、也可能不想把你的validation dataloader
发送到 prepare()
,这取决于你是否想运行分布式评估。
最后,用 accelerator.backward(loss)
替换 loss.backward()
。
现在,你的脚本将在你的本地机器上运行,也可以在多个 GPU
或 TPU
上运行。你可以使用你喜欢的工具来启动分布式训练,或者你可以使用 Accelerate launcher
启动。
分布式评估:
可以进行常规评估,此时你需要将 validation dataloader
保持在 accelerator.prepare()
之外。并且,你需要手动将 input
数据放在 accelerator.device
上。
也可以进行分布式评估,此时你需要将 validation dataloader
放置在 accelerator.prepare()
之内:
xxxxxxxxxx
validation_dataloader = accelerator.prepare(validation_dataloader)
就像 training dataloader
,这意味着在分布式评估时,每个设备将仅看到部分 evaluation
数据。这意味着你需要把 predictions
进行 group
。可以通过 accelerator.gather_for_metrics()
方法来实现:
xxxxxxxxxx
for inputs, targets in validation_dataloader:
predictions = model(inputs)
# Gather all predictions and targets
all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
# Example of use with a Datasets.Metric
metric.add_batch(all_predictions, all_targets)
类似 training dataloader
,把 validation dataloader
传入 prepare()
可能会改变该 dataloader
:如果你在 GPU
上运行,则它的长度将被除以 batch size
将被乘以 split_batches=True
。
任何获取 validation dataloader length
的方法都应该在 accelerator.prepare()
之后进行。
数据集末尾的一些数据可能是重复的,所以这个 batch
的数据可以平均分配给所有的工作者。因此,应该通过gather_for_metrics()
方法计算指标,以便在收集时自动删除重复的数据。如果出于某种原因,你不希望自动完成这项工作,可以用 accelerator.gather()
来收集所有进程的数据,然后手动完成。
gather()
和 gather_for_metrics()
要求每个进程上的张量是相同尺寸的。如果你在每个进程上有不同尺寸的张量(例如,当动态填充到一个 batch
的最大长度时),你应该使用 accelerator.gather.pad_across_processes()
方法将张量填充到跨进程的最大尺寸。
启动分布式脚本:你可以使用常规命令来启动你的分布式训练(如 PyTorch
的 torch.distributed.launch
),它们与 Accelerate
完全兼容。这里唯一需要注意的是: Accelerate
使用 environment
来确定所有有用的信息,所以 torch.distributed.launch
应与标志 --use_env
一起使用。
Accelerate
还提供了一个 CLI
工具,它统一了所有的 launcher
,所以你只需要记住一个命令:
xxxxxxxxxx
accelerate config
你需要回答问题,然后 Accelerate
将在你的 cache folder
创建一个 default_config.yaml
文件。这个缓存目录是(根据优先级递减):
HF_HOME
的内容,以 accelerate
为后缀。XDG_CACHE_HOME
的内容,以 huggingface/accelerate
为后缀。~/.cache/huggingface/accelerate
。你也可以通过标志 --config_file
来指定你要保存的文件的位置。
然后,你可以通过运行来测试你的设置是否一切顺利:
xxxxxxxxxx
accelerate test
这将启动一个简短的脚本,测试分布式环境。你也可以在测试期间指定配置文件的位置:
xxxxxxxxxx
accelerate test --config_file path_to_config.yaml
如果测试通过,你可以通过如下的命令来执行你的脚本:
xxxxxxxxxx
accelerate launch path_to_script.py --args_for_the_script
也可以指定配置文件的位置:
xxxxxxxxxx
accelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script
从 notebook
中启动:在 Accelerate 0.3.0
中引入了一个 notebook_launcher()
从而帮助你在 notebook
上启动训练。
只要在 notebook
的一个 cell
中定义一个负责整个 train and/or evaluation
的函数,然后用以下代码执行一个 cell
:
xxxxxxxxxx
from accelerate import notebook_launcher
notebook_launcher(training_function)
注意:你的 Accelerator
对象应该只在 training_function
中定义,这是因为初始化应该只在 launcher
内完成。
在 TPU
上训练:如果你想在 TPU
上启动你的脚本,有一些注意事项是你应该注意的。在幕后,TPU
将为你的 training step
(前向传播、反向传播、以及 optimizer step
)中发生的所有操作创建一个 graph
。这就是为什么你的第一个训练步总是非常长,因为建立和编译这个 graph
需要一些时间。
好消息是,这个编译将被缓存,所以第二步和所有后续的 step
将更快。坏消息是,这只适用于你的所有 step
做完全相同的操作,这意味着:
batch
必须有用相同的张量尺寸。step
中存在循环,那么循环次数在每个 step
必须相同)。如果上述任何一项在两个 step
之间发生变化,都会触发新的编译,这将再次花费大量时间。在实践中,这意味着:你必须特别注意让你的输入中的所有张量具有相同的形状(所以没有动态填充),并且不应该使用具有 for
循环的层,其中 for
循环的根据 input
的不同而具有不同长度(如 LSTM
)。否则,训练会慢得令人难受。
可以针对 TPU
执行一些特殊的代码:
xxxxxxxxxx
from accelerate import DistributedType
if accelerator.distributed_type == DistributedType.TPU:
# do something of static shape
else:
# go crazy and be dynamic
最后要注意的是:如果你的模型有 tied weight
(比如语言模型将 embedding matrix
的权重与 decoder
的权重绑定),将这个模型移动到 TPU
(无论是你自己移动、还是由 prepare()
移动)会破坏绑定。你将需要在之后重新绑定权重。
在单个进程上执行的语句:有些语句只需要在特定的进程上执行而无需在所有进程上执行,如数据下载、记录日志、以及打印进度条。此时可以执行:
xxxxxxxxxx
if accelerator.is_local_main_process:
# Is executed once per server
from tqdm.auto import tqdm
progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
local
意思是每台机器上运行:如果你在两台服务器上训练,其中每台服务器有几个 GPU
,则代码将在每台服务器上执行一次。
如果你希望对所有进程仅执行一次(如,上传模型到 model hub
),则可以执行:
xxxxxxxxxx
if accelerator.is_main_process:
# Is executed once only
对于 print
语句,你希望在每台机器上执行一次,则可以用 accelerator.print
代替 print
函数。
延迟执行:当你运行你的常规脚本时,指令是按顺序执行的。使用 Accelerate
在几个 GPU
上同时部署你的脚本会带来一个复杂的问题:虽然每个进程都是按顺序执行所有指令,但有些可能比其他的快。
你可能需要等待所有进程达到一定程度后再执行某条指令。例如,在确定每个进程都完成了训练之前,你不应该保存一个模型。要做到这一点,可以执行:
xxxxxxxxxx
accelerator.wait_for_everyone()
这条指令将阻塞所有先到的进程,直到所有其他进程都到达该点(如果你只在一个 GPU
或 CPU
上运行你的脚本,这不会有任何作用)。
保存/加载模型:保存训练好的模型可能需要一些调整:
首先,你应该等待所有的进程到达脚本中的 “延迟执行” 所描述的那个点。
然后,你应该在保存模型之前 unwrap
你的模型。这是因为在通过 prepare()
方法时,你的模型可能被 wrap
从而用于分布式训练。如:
xxxxxxxxxx
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model.state_dict(), filename)
如果你的脚本包含加载 checkpoint
的逻辑,我们也建议你在 unwrapped model
中加载你的权重(这只在 prepare()
后使用加载函数时有用)。如:
xxxxxxxxxx
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.load_state_dict(torch.load(filename))
保存/加载整个状态:当训练你的模型时,你可能想保存模型、优化器、随机数生成器、以及潜在的 LR scheduler
的当前状态,以便在同一个脚本中恢复训练。你可以分别使用 save_state()
和 load_state()
来做到这一点,只需简单地传入一个保存位置。
如果你通过 register_for_checkpointing()
注册了任何其他需要存储的 stateful item
,它们也会被保存和/或加载。
示例:
xxxxxxxxxx
from accelerate import Accelerator
import torch
accelerator = Accelerator()
my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99)
my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader)
# Register the LR scheduler
accelerate.register_for_checkpointing(my_scheduler)
# Save the starting state
accelerate.save_state("my/save/path")
device = accelerator.device
my_model.to(device)
# Perform training
for epoch in range(num_epochs):
for batch in my_training_dataloader:
my_optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = my_model(inputs)
loss = my_loss_function(outputs, targets)
accelerator.backward(loss)
my_optimizer.step()
my_scheduler.step()
# Restore previous state
accelerate.load_state("my/save/path")
梯度裁剪:如果你在脚本中使用梯度剪裁,你应该把对 torch.nn.utils.clip_grad_norm_
或 torch.nn.utils.clip_grad_value_
的调用分别替换为 accelerator.clipgrad_norm()
和 accelerator.clipgrad_value()
。
混合精度训练:如果你用 Accelerate
在混合精度下训练,那么模型内的计算将以混合精度进行,而模型外的每一次计算都将以 full precision
执行。例如,loss
的计算通常在模型外,且涉及 softmax
。然而,你可能想把你的 loss
计算放在 accelerator.autocast
上下文管理器中:
xxxxxxxxxx
with accelerator.autocast():
loss = complex_loss_function(outputs, target)
混合精度训练的另一个注意事项是:梯度会在开始时跳过一些更新,有时在训练过程中也会跳过。这是因为动态损失缩放 dynamic loss scaling
策略,在训练过程中会有一些时刻,梯度已经溢出,loss scaling factor
会减少,从而避免在下一步再次发生这种情况。
这意味着你可能会在没有梯度更新的时候就更新你的 learning rate scheduler
。这在一般情况下是没有问题的,但是当你的训练数据非常少,或者你的 scheduler
的第一个学习率值非常重要时,可能会有影响。在这种情况下,你可以跳过 learning rate scheduler
的更新:
xxxxxxxxxx
if not accelerator.optimizer_step_was_skipped:
lr_scheduler.step()
梯度累积:要执行梯度累积,请使用 accumulate()
并指定 gradient_accumulation_steps
。在多设备训练时,这也会自动确保梯度同步或不同步,检查是否真的应该执行该 step
,并自动计算损失:
xxxxxxxxxx
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
for input, label in training_dataloader:
with accelerator.accumulate(model):
predictions = model(input)
loss = loss_function(predictions, label)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
相比之下,传统的梯度累加方法会用更冗长的代码:
xxxxxxxxxx
+ from accelerate import Accelerator
+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )
for index, batch in enumerate(training_dataloader):
inputs, targets = batch
- inputs = inputs.to(device)
- targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
+ accelerator.backward(loss)
if (index+1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
DeepSpeed
:DeepSpeed
支持是实验性的,所以底层 API
将在不久的将来发展,可能会有一些轻微的破坏性变化。具体而言, Accelerate
还不支持你自己编写的 DeepSpeed
配置,这将在下一个版本中添加。
使用 accelerate launch
:
xxxxxxxxxx
accelerate launch {script_name.py} --arg1 --arg2 ...
指定单个 GPU
:
xxxxxxxxxx
CUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...
在两个 GPU
上混合精度训练:
xxxxxxxxxx
accelerate launch --multi_gpu --mixed_precision=fp16 --num_processes=2 {script_name.py} {--arg1} {--arg2} ...
建议总是在 accelerate launch
之前执行 accelerate config
,这样就无需再 accelerate launch
中指定各种配置。
在 notebook
中 launch
:
CUDA
的代码在一个函数中,该函数被传递给 notebook_launcher()
。num_processes
为训练的设备数量(如,GPU, CPU, TPU
数量)。TPU
,在 training loop
函数之外声明你的模型。如:
xxxxxxxxxx
from accelerate import notebook_launcher
args = ("fp16", 42, 64)
notebook_launcher(training_loop, args, num_processes=2)
对于 TPU
:
xxxxxxxxxx
model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id))
args = (model, "fp16", 42, 64)
notebook_launcher(training_loop, args, num_processes=8)
启用 FSDP
:
首先进行配置:accelerate config
。FSDP
配置的一个例子:
xxxxxxxxxx
compute_environment: LOCAL_MACHINE
deepspeed_config: {}
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
fsdp_backward_prefetch_policy: BACKWARD_PRE
fsdp_offload_params: false
fsdp_sharding_strategy: 1
fsdp_state_dict_type: FULL_STATE_DICT
fsdp_transformer_layer_cls_to_wrap: GPT2Block
machine_rank: 0
main_process_ip: null
main_process_port: null
main_training_function: main
mixed_precision: 'no'
num_machines: 1
num_processes: 2
use_cpu: false
然后开始训练:
xxxxxxxxxx
accelerate launch examples/nlp_example.py
这些配置参数的含义为:
Sharding Strategy
:
FULL_SHARD
:对 optimizer states, gradients, parameters
都进行分片。SHARD_GRAD_OP
:仅对 optimizer states, gradients
进行分片。NO_SHARD
:不进行分片。Offload Params
:一个布尔值,指定是否将 parameters
和 gradients
卸载到 CPU
。
Auto Wrap Policy
:可以为 TRANSFORMER_BASED_WRAP, SIZE_BASED_WRAP, NO_WRAP
。
Transformer Layer Class to Wrap
:当使用 TRANSFORMER_BASED_WRAP
时,指定特定的 transformer layer class name
(大小写敏感)从而执行 wrap
。如 BertLayer, GPTJBlock, T5Block,...
。
Min Num Params
:使用 SIZE_BASED_WRAP
的最小参数数量。
Backward Prefetch
:可以为 BACKWARD_PRE, BACKWARD_POST, NO_PREFETCH
。
State Dict Type
:可以为 FULL_STATE_DICT, LOCAL_STATE_DICT, SHARDED_STATE_DICT
。
有几个需要注意的地方:
PyTorch FSDP
会自动 wrap
子模块,对参数进行扁平化处理,并将参数分片。由于这个原因,任何在 model wrapping
之前创建的 optimizer
都会被破坏,并占用更多的内存。因此,强烈建议在创建 optimizer
之前准备好模型,这也是很有效的。Accelerate
将自动 wrap
模型,并在单个模型的情况下为你创建一个优化器,并发出警告信息:
xxxxxxxxxx
FSDP Warning: When using FSDP, it is efficient and recommended to call prepare for the model before creating the optimizer.
下面是使用 FSDP
时准备模型和优化器的推荐方法:
xxxxxxxxxx
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)
+ model = accelerator.prepare(model)
optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr)
- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
- )
+ optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
+ optimizer, train_dataloader, eval_dataloader, lr_scheduler
+ )
在单个模型的情况下,如果你用多个 parameter groups
创建了优化器,并且用它们一起调用 prepare
,那么 parameter groups
将被丢失,并显示以下警告:
xxxxxxxxxx
FSDP Warning: When using FSDP, several parameter groups will be conflated into a single one due to nested module wrapping and parameter flattening.
这是因为,由于嵌套的 FSDP
模块的参数扁平化为一维数组,在 wrapping
前创建的 parameter groups
在 wrapping
后将没有意义。
在有多个模型的情况下,有必要在创建优化器之前准备好模型,否则会抛出一个错误。然后将优化器以与相应模型相同的顺序传递给 prepare()
方法,否则 accelerator.save_state()
和 accelerator.load_state()
将导致错误/意外的行为。
这个功能与 Transformers library
的 run_translation.py
脚本中的 --predict_with_generate
不兼容。
对于更多的控制,用户可以利用 FullyShardedDataParallelPlugin
。在创建这个类的实例后,用户可以把它传递给 Accelerator
类的实例。
启用 DeepSpeed
:DeepSpeed
实现了 ZeRO
论文中描述的一切。目前,它提供了如下的支持:Optimizer state partitioning
(ZeRO stage 1
)、Gradient partitioning
(ZeRO stage 2
)、Parameter partitioning
(ZeRO stage 3
)、Custom mixed precision training handling
、一系列基于 CUDA
扩展的快速优化器、ZeRO-Offload
到 CPU
和 Disk/NVMe
。
DeepSpeed ZeRO-2
主要只用于训练,因为它的功能对推理没有用处。DeepSpeed ZeRO-3
也可以用于推理,因为它允许在多个 GPU
上加载巨大的模型。
Accelerate
通过两种方式集成 DeepSpeed
:
deepspeed
配置文件来集成。它支持 DeepSpeed
的所有核心功能,并为用户提供了很大的灵活性。用户可能需要根据配置来改变几行代码。deepspeed_plugin
来集成。这支持 DeepSpeed
功能的子集,并对其余的配置使用默认选项。用户不需要改变任何代码。什么被集成了?
训练:DeepSpeed ZeRO
训练支持完整的 ZeRO stages 1, 2 and 3
、以及 optimizer states, gradients and parameters
的 CPU/Disk offload
。
Stage 1
:将 optimizer states
分片到数据并行 workers/GPUs
上。Stage 2
:将 optimizer states + gradients
分片到数据并行 workers/GPUs
上。Stage 3
:将 optimizer states + gradients + model parameters
分片到数据并行 workers/GPUs
上。Optimizer Offload
:将 optimizer states + gradients
卸载到 CPU/Disk
,建立在 ZERO Stage 2
之上。Param Offload
:将 model parameters
卸载到 CPU/Disk
,建立在 ZERO Stage 3
之上。注意:关于 Disk Offload
,磁盘应该是 NVME
的,以便有好的速度,但技术上可以在任何磁盘上工作。
推断:DeepSpeed ZeRO Inference
支持 ZeRO Stage 3
与 ZeRO-Infinity
。它使用与训练相同的 ZeRO
协议,但它不使用优化器和 lr scheduler
。
如何工作:
首先安装 DeepSpeed version >=0.6.5
。
然后配置:accelerate config
。一个配置的例子:
xxxxxxxxxx
compute_environment: LOCAL_MACHINE
deepspeed_config:
gradient_accumulation_steps: 1
gradient_clipping: 1.0
offload_optimizer_device: none
offload_param_device: none
zero3_init_flag: true
zero_stage: 2
distributed_type: DEEPSPEED
fsdp_config: {}
machine_rank: 0
main_process_ip: null
main_process_port: null
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 2
use_cpu: false
最后执行训练:accelerate launch examples/nlp_example.py
。
配置参数的含义:
zero_stage
:0
表示禁用,1
表示 optimizer state partitioning
,2
表示 optimizer+gradient state partitioning
,3
表示 optimizer+gradient+parameter partitioning
。gradient_accumulation_steps
:一个整数,表示在 averaging
和 applying
这些梯度之前,积累梯度的 training steps
数量。gradient_clipping
:一个浮点数,指定启用梯度剪裁的值。offload_optimizer_device
:none
表示禁用 optimizer offloading
,cpu
表示 offload optimizer
到 CPU
,nvme
表示offload optimizer
到 NVMe SSD
。仅适用于 ZeRO >= Stage-2
。offload_param_device
:none
表示禁用 parameter offloading
,cpu
表示 offload parameter
到 CPU
,nvme
表示offload parameter
到 NVMe SSD
。仅适用于 ZeRO Stage-3
。zero3_init_flag
:决定是否启用 deepspeed.zero.Init
来构建大规模模型。只适用于 ZeRO Stage-3
。zero3_save_16bit_model
:决定是否在使用 ZeRO Stage-3
时保存 16
位模型权重。mixed_precision
:no
用于 FP32
训练,fp16
用于 FP16
混合精度训练,bf16
用于 BF16
混合精度训练。当使用配置文件时,需要修改一些代码:
DeepSpeed Optimizers and Schedulers
:
如果是 DeepSpeed Optim + DeepSpeed Scheduler
:用户必须使用 enhance.utils.DummyOptim
和enhance.utils.DummyScheduler
来取代他们代码中的 PyTorch/Custom
优化器和调度器:
xxxxxxxxxx
# Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer
optimizer_cls = (
torch.optim.AdamW
if accelerator.state.deepspeed_plugin is None
or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config
else DummyOptim
)
optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate)
# Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler
if (
accelerator.state.deepspeed_plugin is None
or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config
):
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.max_train_steps,
)
else:
lr_scheduler = DummyScheduler(
optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps
)
Custom Optim + Custom Scheduler
:当 DeepSpeed
配置文件中没有 optimizer key
和 scheduler key
的情况。在这种情况下,不需要用户修改代码,通过 DeepSpeed Plugin
使用集成时就是这种情况。
Custom Optim + DeepSpeed Scheduler
:这种情况下,用户必须使用accelerate.utils.DummyScheduler
来替换代码中的 PyTorch/Custom scheduler
。
DeepSpeed Optim + Custom Scheduler
:这将导致一个错误,因为当使用 DeepSpeed Optim
时必须使用 DeepSpeed Scheduler
。
DeepSpeed
配置文件中存在一些 "auto"
值,这些值是由 prepare
方法根据所提供的模型、dataloaders
、dummy optimizer
和 dummy schedulers
自动处理的。那些不是 "auto"
的字段必须由用户明确指定。如 zero_stage2_config.json
文件:
xxxxxxxxxx
{
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": "auto",
"weight_decay": "auto",
"torch_adam": true,
"adam_w_mode": true
}
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto",
"total_num_steps": "auto"
}
},
"zero_optimization": {
"stage": 2,
"allgather_partitions": true,
"allgather_bucket_size": 2e8,
"overlap_comm": true,
"reduce_scatter": true,
"reduce_bucket_size": "auto",
"contiguous_gradients": true
},
"gradient_accumulation_steps": 1,
"gradient_clipping": "auto",
"steps_per_print": 2000,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": false
}
保存和加载:
对于 ZeRO Stage-1
和 ZeRO Stage-2
,模型的保存和加载不需要改动。
对于 ZeRO Stage-3
,state_dict
仅只包含占位符,因为模型的权重被分片到多个 GPU
。ZeRO Stage-3
有两个选项:
保存整个 16
位的模型权重,然后使用 model.load_state_dict(torch.load(pytorch_model.bin))
来直接加载。为此,要么在 DeepSpeed
配置文件中把 zero_optimization.stage3_gather_16bit_weights_on_model_save
设为True
,要么在 DeepSpeed Plugin
中把 zero3_save_16bit_model
设为 True
。
请注意,这个选项需要在一个 GPU
上整合权重,这可能会很慢,而且对内存要求很高,所以只有在需要时才使用这个功能。
示例:
xxxxxxxxxx
unwrapped_model = accelerator.unwrap_model(model)
# New Code #
# Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if
# `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or
# `zero3_save_16bit_model` is True in DeepSpeed Plugin.
# For Zero Stages 1 and 2, models are saved as usual in the output directory.
# The model name saved is `pytorch_model.bin`
unwrapped_model.save_pretrained(
args.output_dir,
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
state_dict=accelerator.get_state_dict(model),
)
为了获得 32
位的权重,首先使用 model.save_checkpoint()
保存模型:
xxxxxxxxxx
success = model.save_checkpoint(PATH, ckpt_id, checkpoint_state_dict)
status_msg = "checkpointing: PATH={}, ckpt_id={}".format(PATH, ckpt_id)
if success:
logging.info(f"Success {status_msg}")
else:
logging.warning(f"Failure {status_msg}")
这将在 checkpoint
目录下创建 ZeRO model
和 optimizer
的 partitions
以及 zero_to_fp32.py
脚本。你可以使用这个脚本来做离线整合,这不需要配置文件或 GPU
。如:
xxxxxxxxxx
cd /path/to/checkpoint_dir
./zero_to_fp32.py . pytorch_model.bin
# Processing zero checkpoint at global_step1
# Detected checkpoint of type zero stage 3, world_size: 2
# Saving fp32 state dict to pytorch_model.bin (total_numel=60506624)
要想加载 32
位的模型,做法如下:
xxxxxxxxxx
from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
unwrapped_model = accelerator.unwrap_model(model)
fp32_model = load_state_dict_from_zero_checkpoint(unwrapped_model, checkpoint_dir)
如果你仅仅想得到 state_dict
,做法如下:
xxxxxxxxxx
from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir)
注意,加载时需要大约 2
倍于 final checkpoint
大小的内存。
ZeRO Inference
:DeepSpeed ZeRO Inference
支持 ZeRO stage 3
。通过 accelerate
的集成,你只需要 prepare
模型和 dataloader
,如下:
xxxxxxxxxx
model, eval_dataloader = accelerator.prepare(model, eval_dataloader)
注意事项:
DeepSpeed
的 Pipeline Parallelism
。mpu
,限制了 Megatron-LM
中支持的张量并行。目前 Accelerate
支持如下的 tracker
:TensorBoard, WandB, CometML, MLFlow
,如:
xxxxxxxxxx
from accelerate import Accelerator
from accelerate.utils import LoggerType
accelerator = Accelerator(log_with="all") # For all available trackers in the environment
accelerator = Accelerator(log_with="wandb")
accelerator = Accelerator(log_with=["wandb", LoggerType.TENSORBOARD])
然后需要初始化 tracker
:
xxxxxxxxxx
hps = {"num_iterations": 5, "learning_rate": 1e-2}
accelerator.init_trackers("my_project", config=hps)
然后记录日志:
xxxxxxxxxx
accelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)
最后在训练结束时调用:accelerator.end_training()
。
你也可以通过 accelerator.get_tracker
来获取内置的 tracker
对象:
xxxxxxxxxx
wandb_tracker = accelerator.get_tracker("wandb")
if accelerator.is_main_process:
wandb_run.log_artifact(some_artifact_to_log)
处理大模型:常规的加载预训练模型的方式:
xxxxxxxxxx
import torch
my_model = ModelClass(...) # step 1
state_dict = torch.load(checkpoint_file) # step 2
my_model.load_state_dict(state_dict) # step 3
这对于常规大小的模型而言很有效,但是无法处理大型模型:在 step 1
我们在 RAM
中加载一个完整版本的模型,并花一些时间随机初始化权重(这将在 step 3
被丢弃);在 step 2
,我们在 RAM
中加载另一个完整版本的模型,并使用预训练的权重。
Accelerate
提供一些工具来帮助处理大模型(这些 API
是实验性质的,未来可能会发生改变):
init_empty_weights
上下文管理器:初始化一个模型而无需使用任何内存。这依赖于 PyTorch 1.9
中引入的 meta device
。
xxxxxxxxxx
from accelerate import init_empty_weights
with init_empty_weights():
my_model = ModelClass(...)
在该上下文管理器中,每当有一个 parameter
被创建时,它就被立即移动到 meta device
。
sharded checkpoints
:有可能你的模型太大从而无法装入内存,这并不意味着它不能被加载:如果你有一个或几个 GPU
,这就有更多的内存可用于存储你的模型。此时需要你的 checkpoint
被拆分为几个小文件,即 checkpoint shards
。
Accelerate
将处理 checkpoint shards
,但是要满足如下格式:你的 checkpoint shards
应该放在一个文件夹中,并且有几个包含部分 state dict
的文件、以及一个 index.json
文件(将 parameter name
映射到包含该 parameter weights
的文件)。如:
xxxxxxxxxx
first_state_dict.bin
index.json
second_state_dict.bin
其中 index.json
内容为:
xxxxxxxxxx
{
"linear1.weight": "first_state_dict.bin",
"linear1.bias": "first_state_dict.bin",
"linear2.weight": "second_state_dict.bin",
"linear2.bias": "second_state_dict.bin"
}
load_checkpoint_and_dispatch
:在 empty model
中加载一个 checkpoint
。它支持 full checkpoints
(包含整个 state dict
的单一文件)以及 sharded checkpoints
。它还会在你可用的设备( GPU
、CPU
)上自动分配这些权重,所以如果你正在加载一个 sharded checkpoints
,最大的RAM
用量将是最大分片的大小。
例如:
xxxxxxxxxx
git clone https://huggingface.co/sgugger/sharded-gpt-j-6B
cd sharded-gpt-j-6B
git-lfs install
git pull
初始化模型:
xxxxxxxxxx
from accelerate import init_empty_weights
from transformers import AutoConfig, AutoModelForCausalLM
checkpoint = "EleutherAI/gpt-j-6B"
config = AutoConfig.from_pretrained(checkpoint)
with init_empty_weights():
model = AutoModelForCausalLM.from_config(config)
加载权重:
xxxxxxxxxx
from accelerate import load_checkpoint_and_dispatch
model = load_checkpoint_and_dispatch(
model, "sharded-gpt-j-6B", device_map="auto", no_split_module_classes=["GPTJBlock"]
)
通过 device_map="auto"
,Accelerate
根据可用资源自动决定将模型的每一层放在哪里:
GPU
上的最大可用空间。CPU
上。RAM
,我们将剩余的权重作为内存映射的张量存储在硬盘上。no_split_module_classes=["GPTJBlock"]
表示属于 GPTJBlock
的模块不应该在不同的设备上分割。你应该在这里设置所有包括某种残差连接的 block
。
可以通过 model.hf_device_map
查看模型的权重的设备。
分布式训练的复现:
设置随机数种子:
xxxxxxxxxx
from accelerate import set_seed
set_seed(42)
它在内部设置了五种随机数种子:
xxxxxxxxxx
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# ^^ safe to call this function even if cuda is not available
if is_tpu_available():
xm.set_rng_state(seed)
设置 batch size
:当使用 Accelerate
训练时,传递给 dataloader
的 batch size
是 batch size/GPU
,因此 final batch size
是 batch size * device num
。
设置学习率:学习率应该和 device num
成正比,如:
xxxxxxxxxx
learning_rate = 1e-3
accelerator = Accelerator()
learning_rate *= accelerator.num_processes
optimizer = AdamW(params=model.parameters(), lr=learning_rate)
梯度同步:在 DDP
中,PyTorch
在一些特定的点上进行进程间通信。然而在梯度累积时,你会累积 n
个 loss
并跳过 .backward()
。这可能会导致明显的减速,因为所有的进程都需要与它们进行更多次的通信。
可以通过 no_sync
上下文管理器来避免:
xxxxxxxxxx
ddp_model, dataloader = accelerator.prepare(model, dataloader)
for index, batch in enumerate(dataloader):
inputs, targets = batch
# Trigger gradient synchronization on the last batch
if index != (len(dataloader)-1):
- with ddp_model.no_sync():
+ with accelerator.no_sync(model):
# Gradients only accumulate
outputs = ddp_model(inputs)
loss = loss_func(outputs, targets)
accelerator.backward(loss)
else:
# Gradients finally sync
outputs = ddp_model(inputs)
loss = loss_func(outputs)
accelerator.backward(loss)
或者直接使用 accelerator.accumulate
:
xxxxxxxxxx
ddp_model, dataloader = accelerator.prepare(model, dataloader)
for batch in dataloader:
with accelerator.accumulate(model):
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
进程间同步:
xxxxxxxxxx
accelerator.wait_for_everyone()
这将阻塞所有进程直到所有进程都达到该点。
用途:
加载数据集:
xxxxxxxxxx
with accelerator.main_process_first():
datasets = load_dataset("glue", "mrpc")
这等价于:
xxxxxxxxxx
# First do something on the main process
if accelerator.is_main_process:
datasets = load_dataset("glue", "mrpc")
else:
accelerator.wait_for_everyone()
# And then send it to the rest of them
if not accelerator.is_main_process:
datasets = load_dataset("glue", "mrpc")
else:
accelerator.wait_for_everyone()
存取 state_dict
:
xxxxxxxxxx
if accelerator.is_main_process:
model = accelerator.unwrap_model(model)
torch.save(model.state_dict(), "weights.pth")
with accelerator.main_process_first():
state = torch.load("weights.pth")
model.load_state_dict(state)
在 global main
进程上 tokenizing
,然后传播到每个 worker
:
xxxxxxxxxx
datasets = load_dataset("glue", "mrpc")
with accelerator.main_process_first():
tokenized_datasets = datasets.map(
tokenize_function,
batched=True,
remove_columns=["idx", "sentence1", "sentence2"],
)
Accelerator
最佳实践:
print
语句应该由accelerator.print()
代替,每个进程打印一次。
xxxxxxxxxx
- print("My thing I want to print!")
+ accelerator.print("My thing I want to print!")
每个 server
执行一次的语句,应该使用 accelerator.is_local_main_process
:
xxxxxxxxxx
if accelerator.is_local_main_process:
do_thing_once_per_server()
或者使用 accelerator.on_local_main_process()
装饰器:
xxxxxxxxxx
on_local_main_process .
def do_my_thing():
"Something done once per server"
do_thing_once_per_server()
所有 server
中仅执行一次的语句,应该使用 accelerator.is_main_process
:
xxxxxxxxxx
if accelerator.is_main_process:
do_thing_once()
或者使用 accelerator.on_main_process()
装饰器:
xxxxxxxxxx
on_main_process .
def do_my_thing():
"Something done once per server"
do_thing_once()
在指定的进程(局部编号或全局编号)上执行的语句,也可以使用如下的装饰器:
xxxxxxxxxx
on_local_process(local_process_idx=0) .
def do_my_thing():
"Something done on process index 0 on each server"
do_thing_on_index_zero_on_each_server()
on_process(process_index=0) .
def do_my_thing():
"Something done on process index 0"
do_thing_on_index_zero()
同步控制:使用 accelerator.wait_for_everyone()
来确保所有进程在继续之前,先到达该点。例如,在模型保存之前很有用。
保存和加载:使用 accelerator.unwrap_model()
来删除所有在分布式过程中添加的特殊的 model wrapper
。
xxxxxxxxxx
model = MyModel()
model = accelerator.prepare(model)
# Unwrap
model = accelerator.unwrap_model(model)
使用 accelerator.save()
而不是 torch.save()
:
xxxxxxxxxx
state_dict = model.state_dict()
- torch.save(state_dict, "my_state.pkl")
+ accelerator.save(state_dict, "my_state.pkl")
使用 accelerator.clipgrad_norm()
而不是 torch.nn.utils.clip_grad_norm_()
;使用 accelerator.clipgrad_value()
而不是torch.nn.utils.clip_grad_value()
。
梯度累积:要执行梯度累积,请使用 accelerator.accumulate()
并指定 gradient_accumulation_steps
。即使在多设备训练时,它也会自动处理。
xxxxxxxxxx
- accelerator = Accelerator()
+ accelerator = Accelerator(gradient_accumulation_steps=2)
for (input, label) in training_dataloader:
+ with accelerator.accumulate(model):
predictions = model(input)
loss = loss_function(predictions, labels)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
class accelerate.Accelerator
:Accelerator
类,用于分布式训练或混合精度训练。
xxxxxxxxxx
class accelerate.Accelerator(
device_placement: bool = Trues,
plit_batches: bool = False,
fp16: bool = None,
mixed_precision: typing.Union[accelerate.utils.dataclasses.PrecisionType, str] = None,
gradient_accumulation_steps: int = 1,
cpu: bool = False,
deepspeed_plugin: DeepSpeedPlugin = None,
fsdp_plugin: FullyShardedDataParallelPlugin = None,
megatron_lm_plugin: MegatronLMPlugin = None,
rng_types: typing.Union[typing.List[typing.Union[str, accelerate.utils.dataclasses.RNGType]], NoneType] = None,
log_with: typing.Union[typing.List[typing.Union[str, accelerate.utils.dataclasses.LoggerType, accelerate.tracking.GeneralTracker]], NoneType] = None,
logging_dir: typing.Union[str, os.PathLike, NoneType] = None,
dispatch_batches: typing.Optional[bool] = None,
even_batches: bool = True,
step_scheduler_with_optimizer: bool = True,
kwargs_handlers: typing.Optional[typing.List[accelerate.utils.dataclasses.KwargsHandler]] = None,
dynamo_backend: typing.Union[accelerate.utils.dataclasses.DynamoBackend, str] = None
)
参数:
device_placement
:一个布尔值,默认为 True
,指定 accelerator
是否应该将对象放在 device
上(由 dataloader, model
等等产生的张量)。
split_batches
:一个布尔值,默认为 False
,指定 accelerator
是否应该将 dataloaders
产生的 batches
在设备上进行分割。
True
,实际使用的 batch size
在任何类型的分布式进程中都是一样的,但它必须是你使用的 num_processes
(即,进程数量)的整数倍。False
,实际使用的 batch size
将是你脚本中设置的 batch size
乘以进程数。mixed_precision
:一个字符串,指定是否使用混合精度训练(fp16
或 bfloat16
)。可以为 'no', 'fp16', 'bf16'
。默认为环境变量 ACCELERATE_MIXED_PRECISION
中的值,或者通过 accelerate.launch
传入的选项。
'fp16'
要求 pytorch 1.6
及其以上版本,'bf16'
要求 pytorch 1.10
及其以上版本。
gradient_accumulation_steps
:一个整数,指定在累积梯度之前应该经过多少个 step
。默认为 1
,表示没有梯度累积。一个大于 1
的数字应该与 Accelerator.accumulate
相结合。
cpu
:一个布尔值,指定是否强制脚本在 CPU
上执行。如果设置为 True
,将忽略 GPU
的可用性,并且仅强制在一个进程上执行。默认为 False
。
deepspeed_plugin
:一个 DeepSpeedPlugin
,用于调整 DeepSpeed
的相关的参数。也可以通过 accelerate config
来直接调整 DeepSpeed
。
fsdp_plugin
:一个 FullyShardedDataParallelPlugin
,用于调整 FSDP
的相关的参数。也可以通过 accelerate config
来直接调整 FSDP
。
megatron_lm_plugin
:一个 MegatronLMPlugin
,用于调整 FSDPMegatronLM
的相关的参数。也可以通过 accelerate config
来直接调整 MegatronLM
。
rng_types
:一个关于字符串或 RNGType
的列表,它指定了一个关于随机数生成器的列表,用于在 dataloaders
的每个 iteration
开始时进行同步。应该是如下的一个或几个:
"torch"
:基本的 torch
的随机数生成器。"cuda"
:CUDA
随机数生成器(仅限于 GPU
)。"xla"
:XLA
随机数生成器(仅咸鱼 TPU
)。"generator"
:sampler
(或 batch sampler
)的 torch.Generator
、或 iterable dataset
的 torch.Generator
。如果 PyTorch
版本 <=1.5.1
,将默认为 ["torch"]
;如果 PyTorch
版本 >=1.6
,则默认为 ["generator"]
。
log_with
:一个关于字符串、LoggerType
、GeneralTracker
的列表,指定 loggers
。可以为如下的一个或几个:"all", "tensorboard", "wandb", "comet_ml"
。
如果选择了 "all"
,就会接收环境中所有可用的 trackers
并初始化它们。也可以接受用于自定义 tracker
的 GeneralTracker
的实现,并且可以与 "all"
结合使用。
logging_dir
:一个字符串或 os.PathLike
,指定用于日志的目录的路径。
dispatch_batches
:一个布尔值,如果为 "True"
,Accelerator
准备的 dataloader
只在 global main
进程上进行迭代,然后将 batch
分割并广播给每个 worker
进程。对于底层数据集是 IterableDataset
的 DataLoader
,默认为 True
,否则为False
。
even_batches
:一个布尔值,如果设置为 True
,在所有进程的 total batch size
不能完全分割数据集的情况下,数据集开头的样本将被重复,这样 batch
就可以在所有 worker
之间平均分配。默认为 True
。
step_scheduler_with_optimizer
:一个布尔值,如果学习率 scheduler
与优化器同时 step
,则设置为 True
;否则设置为 False
。默认为 True
。
kwargs_handlers
:一个关于 KwargHandler
的列表,用于自定义如何创建与分布式训练或混合精度相关的对象。
dynamo_backend
:一个字符串或 DynamoBackend
,设置一个 dynamo
后端从而利用 Torch dynamo
优化你的训练。默认为 'no'
。
属性:
device
:一个 Torch.device
对象,表示要使用的设备。distributed_type
:一个 DistributedType
对象,表示分布式训练配置。local_process_index
:一个整数,表示当前机器上的进程编号。mixed_precision
:一个字符串,表示配置好的混合精度模式。num_processes
:一个整数,表示用于训练的进程总数。optimizer_step_was_skipped
:一个布尔值,表示当学习率不应该被改变的情况下,优化器的更新是否被跳过(因为混合精度中的梯度溢出)。process_index
:一个整数,表示当前进程在所有进程中的总编号。state
:一个 AcceleratorState
,表示分布式的 setup state
。sync_gradients
:一个布尔值,表示目前梯度是否在所有进程中被同步。use_distributed
:一个布尔值,表示当前配置是否用于分布式训练。方法:
accumulate(model)
:一个上下文管理器,它 wrap
模型并自动进行梯度累积。
参数:model
:一个 torch.nn.Module
对象,它是被 Accelerator.prepare
准备好之后的模型。
autocast()
:如果启用的话,将在这个上下文管理器中的代码块内应用自动混合精度。否则不会发生任何变化。
backward(loss, **kwargs)
:根据 Accelerator.gradient_accumulation_steps
对梯度进行调整,并根据配置来调用正确的backward()
。应该用来代替 loss.backward()
。
clear()
:是 Accelerate.free_memory
的别名,释放所有内部对象的引用并调用垃圾收集器。你应该在两个不同 models/optimizers
训练之间调用这个方法。
clip_grad_norm_(parameters, max_norm, norm_type = 2 ) -> torch.Tensor
:参数梯度的总范数(将所有参数视为单个向量来看待)的范数截断。应该用来代替 torch.nn.utils.clip_grad_norm_
。
参数:
parameters
:待截断梯度的参数列表。max_norm
:梯度阈值。norm_type
:范数类型。clip_grad_value_(parameters, clip_value ) -> torch.Tensor
:参数梯度的数值截断(绝对值)。应该用来代替 torch.nn.utils.clip_grad_value_
。
参数:
parameters
:待截断取值的参数列表。clip_value
:参数阈值(绝对值)。end_training()
:运行任何特殊的 end training behavior
,比如只在 global main
进程上停止 tracker
。如果使用实验跟踪,应始终在脚本的最后调用 end_training()
。
free_memory()
:参考 clear()
。
gather(tensor) -> torch.Tensor, or a nested tuple/list/dictionary of torch.Tensor
:跨所有进程收集 tensor
的取值,并在第一个维度上将其拼接起来。在进行评估时,对所有进程的预测进行 regroup
是很有用的。
注意:这种收集发生在所有进程中。
参数:tensor
:一个张量或张量的集合,表示需要跨所有进程来收集取值的张量。
返回:返回收集后的结果,类型与 tensor
相同。
gather_for_metrics(tensor)
:与 gather()
作用相同,但是 gather_for_metrics
可能会丢弃 last batch
中重复的数组。它经常被用于收集 inputs
和 targets
来计算指标。
参数和返回值:参考 gather()
。
get_state_dict( model, unwrap = True )
:以 full precision
来返回一个模型的 state_dict
,这个模型是被 accelerator.prepare()
处理过的。
参数:
model
:一个 PyTorch
模型,它被 accelerator.prepare()
处理过。unwrap
:一个布尔值,指定是否返回原始的 state_dict
。如果为 False
,则返回 wrapped state_dict
。默认为 True
。get_tracker(name: str)
:基于 name
从 self.trackers
中返回一个 tracker
,仅在 global main
进程上有效。
参数:name
:一个字符串,指定 tracker
的名字。
init_trackers(project_name: str, config: Optional[dict] = None, init_kwargs: Optional[dict] = {})
:为存储在 self.log_with
中的所有 trackers
执行初始化。
参数:
project_name
:一个字符串,指定 project
的名字。config
:一个字典,指定 starting configuration
。init_kwargs
:一个字典,它将被传递给 tracker
的初始化方法。join_uneven_inputs(joinables, even_batches = None )
:一个上下文管理器,用于在 uneven
的输入上进行分布式训练或分布式评估。它作为 torch.distributed.algorithms.join
的 wrapper
,当 total batch size
无法整除 dataset length
时很有用。
仅支持多 GPU
上的 Distributed Data Parallel training
。对于其它配置,则没有效果。
参数:
joinables
:关于 torch.distributed.algorithms.Joinable
的列表,它为 torch.distributed.algorithms.Joinable
所子类化的模型或优化器,如 Accelerator.prepare
所准备的 PyTorch Module
。
even_batches
:一个布尔值,它覆盖 Accelerator
中设置的 even_batches
的值。如果未提供,则默认使用 Accelerator
中的 even_batches
的值。
对于 iterable-style
的 dataloader
,该参数不生效。
示例:
xxxxxxxxxx
from accelerate import Accelerator
accelerator = Accelerator(even_batches=True)
ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
with accelerator.join_uneven_inputs([ddp_model], even_batches=False):
for input, output in dataloader:
outputs = model(input)
loss = loss_func(outputs)
loss.backward()
optimizer.step()
optimizer.zero_grad()
load_state(input_dir: str)
:加载 model, optimizer, scaler, RNG generators, registered objects
的当前状态。必须与 accelerator.save_state()
配合工作。
参数:input_dir
:一个字符串或 os.PathLike
对象,指定存放 state
的目录。
local_main_process_first()
:让 local main
进程先进入一个 with block
,其它进程将在 local main
进程退出后进入 with block
。
log(values: dict, step: Optional[int] = None, log_kwargs: Optional[dict] = {})
:记录 values
到 self.trackers
中的所有 trackers
,仅在 global main
进程上生效。
参数:
values
:一个字典,仅包含 int/float/str
数据类型,表示待记录的值。step
:一个整数,指定 run step
。log_kwargs
:一个字典,它将被传递给 tracker
的 log
函数。main_process_first()
:让 global main
进程先进入一个 with block
,其它进程将在global main
进程退出后进入 with block
。
no_sync(model)
:一个上下文管理器,它通过调用 torch.nn.parallel.DistributedDataParallel.no_sync
来禁用跨DDP
进程的梯度同步。如果模型不在 DDP
中,这个上下文管理器不做任何事情。
参数:model
:一个 torch.nn.Module
对象,它被 accelerator.prepare()
处理过。
示例:
xxxxxxxxxx
from accelerate import Accelerator
accelerator = Accelerator()
dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
input_a = next(iter(dataloader))
input_b = next(iter(dataloader))
with accelerator.no_sync():
outputs = model(input_a)
loss = loss_func(outputs)
accelerator.backward(loss)
# No synchronization across processes, only accumulate gradients
outputs = model(input_b)
loss = loss_func(outputs)
accelerator.backward(loss)
# Synchronization across all processes
optimizer.step()
optimizer.zero_grad()
on_last_process(func)
:一个装饰器,将仅仅在最后一个进程上运行被装饰的函数。
on_local_main_process(func)
:一个装饰器,将仅仅在 local main
进程上运行被装饰的函数。
on_local_process(local_process_idx)
:一个装饰器,将仅仅在 local process index
进程上运行被装饰的函数。
on_main_process(func)
:一个装饰器,将仅仅在 global main
进程上运行被装饰的函数。
on_process(process_idx)
:一个装饰器,将仅仅在 global process index
进程上运行被装饰的函数。
pad_across_processes(tensor, dim = 0, pad_index = 0, pad_first = False )
:递归地将所有设备的张量(位于嵌套的 list/tuple/dict
中)填充到相同的 size
,以便它们可以安全地被收集起来。
参数:
tensor
:torch.Tensor
的 list/tuple/dict
,指定被收集的数据。dim
:一个整数,指定填充哪一维。默认为 0
。pad_index
:一个整数,指定用什么值来填充。pad_first
:一个布尔值,指定是否从开始填充。默认为 False
,表示从尾部填充。prepare(*args, device_placement = None )
:为分布式训练和混合精度准备 args
中传递的所有对象,然后以相同的顺序返回。
如果你使用一个 model
来用于推断,并且没有任何形式的混合精度,那么你不需要 prepare
该 model
。
参数:
args
:一个列表,可以包含如下类型的对象:torch.utils.data.DataLoader
、torch.nn.Module
、torch.optim.Optimizer
、torch.optim.lr_scheduler.LRScheduler
。device_placement
:一个关于布尔值的列表,要求长度与 args
相同,分别指定每个被 prepare
的对象是否 automatic device placement
。prepare_data_loader(data_loader: DataLoaderde, dvice_placement = None )
:准备一个 PyTorch DataLoader
用于分布式训练。推荐使用 prepare()
函数。
参数:参考 prepare()
函数。
prepare_model(model: Module, dvice_placement = None )
:准备一个 PyTorch model
用于分布式训练。推荐使用 prepare()
函数。
参数:参考 prepare()
函数。
prepare_optimizer(optimizer: Optimizer, dvice_placement = None )
:准备一个 PyTorch Optimizer
用于分布式训练。推荐使用 prepare()
函数。
参数:参考 prepare()
函数。
prepare_scheduler(scheduler: _LRScheduler, dvice_placement = None )
:准备一个 PyTorch Scheduler
用于分布式训练。推荐使用 prepare()
函数。
参数:参考 prepare()
函数。
print(*args, **kwargs )
:用于替代 python
内置的 print()
函数从而仅在每个 server
上打印一次。
reduce(tensor, reduction = 'sum' ) -> torch.Tensor, or a nested tuple/list/dictionary of torch.Tensor
:跨所有进程来 reduce
指定的张量。注意,所有的进程都将得到被 reduce
之后的值。
参数:
tensor
:一个 Tensor
或 Tensor
的集合,指定要被 reduce
的张量。reduction
:一个字符串,指定 reduce
方式,可以为 'sum', 'mean', 'none'
。如果为 'none'
,则不做任何操作。默认为 'sum'
。返回:与 tensor
类型相同,表示被 reduce
之后的值。
register_for_checkpointing(*objects)
:注册对象,从而在 save_state/load_state
将该对象保存或加载。
注意,该方法应该在同一脚本中加载或保存 state
时利用。它不是被设计用来在不同的脚本中使用的。
参数:objects
:被注册的对象,每个对象必须有一个 load_state_dict
方法和一个 state_dict
方法。
save(obj, f)
:在每台机器上保存 obj
一次,用于替代 torch.save
。
参数:
obj
:被保存的对象。f
:一个字符串或 os.PathLike
对象,指定存储路径。save_state(output_dir: str)
:保存 model, optimizer, scaler, RNG generator, registered object
等对象的当前状态。
参数:output_dir
:一个字符串或 os.PathLike
对象,指定存储路径。
unscale_gradients(optimizer = None )
:在使用 AMP
的混合精度训练中 unscale
梯度。在所有其他 setting
中,该方法不做任何事情。
参数:optimizer
:一个 Optimizer
或一组 Optimizer
,指定哪些 optimizer
需要 unscale
梯度。如果未设置,则默认为传递给 prepare()
方法的所有 optimizers
。
unwrap_model(model, keep_fp32_wrapper: bool = False )
:unwrap model
,其中 model
是经过 prepare()
处理过的。该方法常用于保存 model
。
参数:
model
:一个 torch.nn.Module
,指定需要被 unwrap
的模型。keep_fp32_wrapper
:一个布尔值,指定是否需要移除 mixed precision hook
(如果有的话)。默认为 False
。wait_for_everyone()
:将停止当前进程的执行,直到其他所有进程都达到该点(因此,当脚本只在一个进程中运行时,这没有任何作用)。在保存模型前执行该方法很有用。
class accelerate.state.AcceleratorState
:单例类,它存储当前训练环境的信息。该类是不可变的,在第一次初始化之后就保存不变。
xxxxxxxxxx
class accelerate.state.AcceleratorState(
mixed_precision: str = None,
cpu: bool = False,
dynamo_backend = None,
deepspeed_plugin = None,
fsdp_plugin = None,
megatron_lm_plugin = None,
_from_accelerator: bool = False,
**kwargs)
参数参考 Accelerator
。
属性:
device
:一个 torch.device
,表示要使用的设备。distributed_type
:一个 DistributedType
,表示当前使用的分布式环境的类型。initialized
:一个布尔值,表示 AcceleratorState
是否已经从 Accelerator
得到初始化。local_process_index
:一个整数,表示当前进程在当前 server
上的索引。mixed_precision
:一个字符串,表示当前脚本是否会使用混合精度,如果是的话,正在执行的混合精度的类型。num_processes
:一个整数,表示当前并行启动的进程的数量。process_index
:一个整数,表示当前进程的 global index
。class accelerate.state.GradientState()
:单例类,它存储梯度同步相关的信息从而用于梯度累积。该类是不可变的,在第一次初始化之后就保存不变。
属性:
end_of_dataloader
:一个布尔值,表示我们是否已经到达了当前 dataloader
的结束。remainder
:一个整数,表示填充 dataloader
所需要增加的额外样本的数量。sync_gradients
:一个布尔值,表示梯度是否应该在所有设备上同步。accelerate config
命令:启动一系列提示,为你的训练系统创建并保存 default_config.yml
配置文件。该命令应该总是最先执行。
xxxxxxxxxx
accelerate config [arguments] # 或者 accelerate-config [arguments]
命令参数:
--config_file CONFIG_FILE (str)
:配置文件的存储路径。默认为 default_config.yaml
文件名,存放在 cache location
。-h, --help (bool)
:展示帮助信息。accelerate config default
命令:启动一系列提示,为你的训练系统创建并保存 default_config.yml
配置文件,但是会在命令行中配置一些参数,如 --mixed_precision
等等。
xxxxxxxxxx
accelerate config default [arguments] # 或者 accelerate-config default [arguments]
用法参考 accelerate config
。
accelerate config update
:命令:用一组新的参数更新已有的配置文件中的对应项。
xxxxxxxxxx
accelerate config update [arguments] # 或者 accelerate-config update [arguments]
用法参考 accelerate config
。
accelerate env
:列出配置文件的内容。
xxxxxxxxxx
accelerate env [arguments] # 或者 accelerate-env [arguments]
用法参考 accelerate config
。
accelerate launch
:launch
一个脚本。
xxxxxxxxxx
accelerate launch [arguments] {training_script} --{training_script-argument-1} --{training_script-argument-2} ...
位置参数:
{training_script}
:脚本的完整路径。--{training_script-argument-1}
:脚本的参数。可选参数:
-h, --help (bool)
:展示帮助信息。--config_file CONFIG_FILE (str)
:替换默认的配置文件。-m, --module (bool)
:将 launch script
解释为一个 Python
模块,即通过 python -m
执行。--no_python (bool)
:跳过在脚本前加上 "python"
,直接执行它。当脚本不是 Python
脚本时很有用。--debug (bool)
:当发生故障时,是否打印出 torch.distributed stack trace
。-q, --quiet (bool)
:将子进程的错误信息从 launch stack trace
切换到仅展示相关的信息。仅用于 DeepSpeed
和单进程。下面的参数可以通过 accelerate config
来配置。也可以在 launch
时配置(或更新):
硬件选择参数:
--cpu (bool)
:是否强制在 CPU
上进行训练。--multi_gpu (bool)
:是否应该启动分布式 GPU
训练。--mps (bool)
:否应该在 MacOS
机器上使用支持 MPS
的 GPU
设备。--tpu (bool)
:是否应该启动 TPU
训练。资源选择参数:
--mixed_precision {no,fp16,bf16} (str)
:是否使用混合精度训练。 BF16
训练仅在 Nvidia Ampere GPU
和 PyTorch 1.10
或更高版本上支持。--num_processes NUM_PROCESSES (int)
:要并行启动的进程总数。--num_machines NUM_MACHINES (int)
:本次训练中使用的机器总数。--num_cpu_threads_per_process NUM_CPU_THREADS_PER_PROCESS (int)
:每个进程的 CPU
线程数。可以进行调优以获得最佳性能。训练方式选择参数:
--use_deepspeed (bool)
:是否使用 DeepSpeed
进行训练。--use_fsdp (bool)
:是否使用 FullyShardedDataParallel
进行训练。--use_megatron_lm (bool)
:是否使用 Megatron-LM
进行训练。分布式 GPU
参数:以下参数只有在传递了 multi_gpu
或者通过 accelerate config
配置了 multi-gpu training
时才有用。
--gpu_ids (str)
:在这台机器上应该使用哪些 GPU
(通过 id
指定)进行训练,以逗号分隔的方式列出。--same_network (bool)
:用于多节点训练的所有机器是否存在于同一个 local network
。--machine_rank MACHINE_RANK (int)
:启动这个脚本的机器的 rank
(即,编号)。--main_process_ip MAIN_PROCESS_IP (str)
:rank 0
的机器的 IP
地址。--main_process_port MAIN_PROCESS_PORT (int)
:与 rank 0
的机器通信的端口。--rdzv_conf (str)
:额外的 rendezvous
配置(<key1>=<value1>,<key2>=<value2>,…
) 。--max_restarts (int)
:worker group
最多重启多少次(之后不再重启而是失败)。--monitor_interval (float)
:监控 worker
状态的时间间隔,单位是秒。TPU
参数:以下参数只有在传递了 tpu
或者通过 accelerate config
配置了 tpu training
时才有用。
--main_training_function MAIN_TRAINING_FUNCTION (str)
:脚本中要执行的主函数的名称。--downcast_bf16 (bool)
:当在 TPU
上使用 bf16
精度时,是否 float
和 double
张量都被类型转换到 bfloat16
,还是 double
张量仍为 float32
。DeepSpeed
参数:以下参数只有在传递了 use_deepspeed
或者通过 accelerate config
配置了 deepspeed
时才有用。
--deepspeed_config_file (str)
:DeepSpeed
配置文件。--zero_stage (int)
:DeepSpeed
的 ZeRO
优化阶段。--offload_optimizer_device (str)
:决定在哪里(none|cpu|nvme
)卸载优化器状态。--offload_param_device (str)
:决定在哪里(none|cpu|nvme
)卸载 parameters
。--gradient_accumulation_steps (int)
:训练脚本中使用的gradient_accumulation_steps
的数量。--gradient_clipping (float)
:训练脚本中使用的梯度剪裁值。--zero3_init_flag (str)
:决定是否( true|false
)启用 deepspeed.zero.Init
来构建大规模模型。只适用于DeepSpeed ZeRO Stage-3
。--zero3_save_16bit_model (str)
:决定在使用 ZeRO Stage-3
时是否(true|false
)保存 16
位模型权重。只适用于DeepSpeed ZeRO Stage-3
。--deepspeed_hostfile (str)
:用于配置多节点计算资源的DeepSpeed hostfile
。--deepspeed_exclusion_filter (str)
:当使用多节点配置时,DeepSpeed exclusion filter
字符串。--deepspeed_inclusion_filter (str)
:当使用多节点配置时,DeepSpeed inclusionfilter
字符串。--deepspeed_multinode_launcher (str)
:要使用的 DeepSpeed
多节点 launcher
。Fully Sharded Data Parallelism
参数:以下参数只有在传递了 use_fdsp
或者通过 accelerate config
配置了 Fully Sharded Data Parallelism
时才有用。
--fsdp_offload_params (str)
: 决定是否( true|false
)将 parameters
和梯度卸载到 CPU
。--fsdp_min_num_params (int)
:FSDP
默认的 Default Auto Wrapping
的 parameters
的最少数量。--fsdp_sharding_strategy (int)
:FSDP
的分片策略。--fsdp_auto_wrap_policy (str)
:FSDP
的 auto wrap policy
。--fsdp_transformer_layer_cls_to_wrap (str)
:要 wrap
的 Transformer layer class name
(区分大小写),例如:BertLayer, GPTJBlock, T5Block ...
。--fsdp_backward_prefetch_policy (str)
:FSDP
的 backward prefetch policy
。--fsdp_state_dict_type (str)
:FSDP
的 state dict
类型。Megatron-LM
参数:以下参数只有在传递了 use_megatron_lm
或者通过 accelerate config
配置了 Megatron-LM
时才有用。
--megatron_lm_tp_degree ('')
:Megatron-LM
的张量并行(Tensor Parallelism: TP
)度。--megatron_lm_pp_degree ('')
:Megatron-LM
的管道平行(Pipeline Parallelism: PP
)度。--megatron_lm_num_micro_batches ('')
:当管道并行度大于 1
时,Megatron-LM
的 micro batch
数量。--megatron_lm_sequence_parallelism ('')
:当张量并行度大于 1
时,决定是否(true|false
)启用序列并行 Sequence Parallelism
。--megatron_lm_recompute_activations ('')
:决定是否(true|false
)启用 Selective Activation Recomputation
。--megatron_lm_use_distributed_optimizer ('')
:决定是否(true|false
)使用分布式优化器,将优化器状态和梯度分片到 Data Pralellel: DP
的 ranks
。--megatron_lm_gradient_clipping ('')
:Megatron-LM
基于全局 L2
范数的梯度裁剪值( 0
表示禁用)。AWS SageMaker
参数:以下参数仅当在 SageMake
中训练时才有用。
--aws_access_key_id AWS_ACCESS_KEY_ID (str)
:用于启动 Amazon SageMaker
训练工作的 AWS_ACCESS_KEY_ID
。--aws_secret_access_key AWS_SECRET_ACCESS_KEY (str)
:用于启动 Amazon SageMaker
训练工作的AWS_SECRET_ACCESS_KEY
。accelerate tpu-config
:配置 tpu
训练。
xxxxxxxxxx
accelerate tpu-config [arguments]
可选参数:
-h, --help (bool)
:展示帮助信息。配置参数:下面参数也可以通过 accelerate config
来配置:
--config_file (str)
:accelerate
配置文件的路径。--tpu_name (str)
:要使用的 TPU
的名称。如果没有指定,将使用配置文件中指定的 TPU
。--tpu_zone (str)
:要使用的 TPU
的 zone
。如果没有指定,将使用配置文件中指定的 zone
。TPU
参数:下面的参数用于配置 TPU
。
--command_file (str)
:一个文件的路径,该文件包含启动时在 pod
上运行的命令。--command (str)
:要在 pod
上运行的命令。可以传递多次。--install_accelerate (bool)
:是否在 pod
上安装 accelerate
。默认为 False
。--accelerate_version (str)
:在 pod
上安装 accelerate
的版本。如果不指定,将使用最新的 pypi
版本。指定 'dev'
可以从 GitHub
安装。--debug (bool)
:如果设置,将打印将运行的命令,而不是运行它。accelerate test
:执行 accelerate/test_utils/test_script.py
从而确保 Accelerate
被正确地配置。
xxxxxxxxxx
accelerate test [arguments] # 或 accelerate-test [arguments]
可选参数:
--config_file CONFIG_FILE (str)
:配置文件的存储路径。默认为 default_config.yaml
文件名,存放在 cache location
。-h, --help (bool)
:展示帮助信息。class accelerate.tracking.GeneralTracker()
:所有 Tracker
的基类。
方法(每个方法都应该接受 **kwargs
参数):
finish()
:应该运行位于 tracking API
中的任何 finalizing function
。如果API
中没有这类 finalizing function
,则不要重写 finish()
方法。
log(values: dict, step: typing.Optional[int], **kwargs )
:记录当前 run
的日志。
参数:
values
:一个字典,指定 key-value
的要被记录的内容。注意,key
为字符串,而value
必须是字符串、浮点数、或整数类型。step
:一个整数,指定当前的 run step
。store_init_configuration(values: dict )
:将 values
记录为超参数。
参数:参考 log()
。
class accelerate.tracking.TensorBoardTracker
:Tensorboard Tracker
。应该在你的脚本开始的地方就被初始化。
xxxxxxxxxx
class accelerate.tracking.TensorBoardTracker(
run_name: str, logging_dir: typing.Union[str, os.PathLike, NoneType] = None, **kwargs
)
参数:
run_name
:一个字符串,指定当前 experiment run
的名字。logging_dir
:一个字符串或 os.PathLike
,指定 TensorBoard logs
存储的位置。kwargs
:关键字参数,传递给 tensorboard.SummaryWriter.__init__
方法。class accelerate.tracking.WandBTracker(run_name: str, **kwargs )
:WandB Tracker
。应该在你的脚本开始的地方就被初始化。
参数:
run_name
:一个字符串,指定当前 experiment run
的名字。kwargs
:关键字参数,传递给 wandb.init
方法。class accelerate.tracking.CometMLTracker(run_name: str, **kwargs )
:comet_ml Tracker
。应该在你的脚本开始的地方就被初始化。API key
必须存储在 Comet
配置文件中。
参数:
run_name
:一个字符串,指定当前 experiment run
的名字。kwargs
:关键字参数,传递给 Experiment.__init__
方法。accelerate.notebook_launcher( function, args = (), num_processes = None, mixed_precision = 'no', use_port = '29500')
:启动一个训练函数。如果当前环境中允许的话(如,具有多核的 TPU
),使用几个进程。
要使用这个 notebook_launcher
,在调用之前,notebook session
中必须对 CUDA
设备没有任何调用。如果有任何调用,你将需要重启 notebook
,并确保没有 cell
使用任何 CUDA
设备。
参数:
function
:一个可调用对象,指定要执行的训练函数。如果它接受参数,第一个参数应该是运行进程的 index
。args
:一个元组,指定传递给函数的参数的元组(函数将接收到 *args
)。num_processes
:一个整数,指定训练时使用的进程的数量。如果有 TPU
,则在 Colab/Kaggle
中默认为 8
,否则为可用的GPU
数量。mixed_precision
:一个字符串,指定混合精度训练。默认为 'no'
。use_port
:一个字符串,指定启动多 GPU
训练时用于进程间通信的端口。默认为 '29500'
。示例:
xxxxxxxxxx
# Assume this is defined in a Jupyter Notebook on an instance with two GPUs
from accelerate import notebook_launcher
def train(*args):
# Your training function here
...
notebook_launcher(train, args=(arg1, arg2), num_processes=2, mixed_precision="fp16")
accelerate.debug_launcher(function, args = (), num_processes = 2)
:在 CPU
上使用几个进程启动一个训练函数从而用于调试。
debug_launcher
仅用于调试,不应该用于真实的训练。它将仅使用 CPU
。
参数:参考 notebook_launcher
。
accelerate
有自己的 logging
工具从而用于分布式系统。使用方法为:用 accelerate.logging
代替 Python logging
。如:
xxxxxxxxxx
- import logging
+ from accelerate.logging import get_logger
- logger = logging.getLogger(__name__)
+ logger = get_logger(__name__)
accelerate.logging.get_logger(name: str, log_level: str = None )
:返回一个 logging.Logger
,它可以用于多进程环境。
参数:
name
:一个字符串,指定 logger
名字。log_level
:一个字符串,指定 log level
。默认为 LOG_LEVEL
环境变量指定的。如果没有 LOG_LEVEL
环境变量,则默认为 INFO
。如果一个 log
应该在所有进程上都记录,那么使用 main_process_only=False
;否则仅在全局主进程上记录。
xxxxxxxxxx
from accelerate.logging import get_logger
logger = get_logger(__name__)
logger.info("My log", main_process_only=False) # 所有进程都记录
logger.debug("My log", main_process_only=True) # 仅全局主进程记录
logger = get_logger(__name__, accelerate_log_level="DEBUG")
logger.info("My log") # level 太低,被过滤
logger.debug("My second log") # level 符合条件,记录
accelerate.init_empty_weights(include_buffers: bool = False)
:一个上下文管理器,在这个管理器下,模型被初始化为所有 parameters
都在 meta device
上,因此创建一个空模型。当仅仅初始化模型就会耗尽可用的内存时,这很有用。
参数:include_buffers
:一个布尔值,指定在模型初始化时是否将所有的 buffers
也放在 meta device
上。
示例:
xxxxxxxxxx
import torch.nn as nn
from accelerate import init_empty_weights
# Initialize a model with 100 billions parameters in no time and without using any RAM.
with init_empty_weights():
tst = nn.Sequential(*[nn.Linear(10000, 10000) for _ in range(1000)])
accelerate.cpu_offload( model: Module, execution_device: typing.Optional[torch.device] = None, offload_buffers: bool = False, state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None, preload_module_classes: typing.Optional[typing.List[str]] = None)
:将模型的所有 parameters
卸载到 CPU
上。此时,仅在 CPU
中保留一份 state dict
(GPU
中没有了)。在前向传播过程中,parameters
将从 state dict
中提取,并在需要时放在 execution_device
上,然后再次卸载。
xxxxxxxxxx
accelerate.cpu_offload(
model: Module, execution_device: typing.Optional[torch.device] = None,
offload_buffers: bool = False, state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None,
preload_module_classes: typing.Optional[typing.List[str]] = None)
参数:
model
:一个 torch.nn.Module
,指定被卸载的模型。execution_device
:一个 torch.device
,指定执行模型前向传播的设备(应该是 GPU
)。将默认为模型的第一个 parameter device
。offload_buffers
:一个布尔值,指定是否也同时卸载模型的 buffer
。state_dict
:一个字典,指定模型的 state dict
(它将被保持在 CPU
)。preload_module_classes
:一个关于字符串的列表,它们的实例应该在前向传播开始时就加载权重(包括实例中包含的子模块)。accelerate.disk_offload()
:将模型的所有 parameters
卸载到磁盘上。此时,模型的所有 parameter
将作为内存映射 array
被卸载到一个给定的目录中。在前向传播过程中,parameters
将从该目录中访问,并在需要时放在 execution_device
上,然后再次卸载。
xxxxxxxxxx
accelerate.disk_offload(
model: Module, offload_dir: typing.Union[str, os.PathLike],
execution_device: typing.Optional[torch.device] = None, offload_buffers: bool = False,
preload_module_classes: typing.Optional[typing.List[str]] = None)
参数:
offload_dir
:一个字符串或 os.PathLike
,指定卸载权重到哪个目录。model/execution_device/offload_buffers/preload_module_classes
:参考 cpu_offload()
。accelerate.dispatch_model()
:根据一个给定的设备映射来 dispatch
一个模型。模型的各层可能分布在 GPU
上,也可能卸载在CPU
甚至磁盘上。
xxxxxxxxxx
accelerate.dispatch_model(
model: Module,device_map: typing.Dict[str, typing.Union[str, int, torch.device]],
main_device: typing.Optional[torch.device] = None,
state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None,
offload_dir: typing.Union[str, os.PathLike, NoneType] = None,
offload_index: typing.Union[typing.Dict[str, str], NoneType] = None,
offload_buffers: bool = False, preload_module_classes: typing.Optional[typing.List[str]] = None)
参数:
model
:一个 torch.nn.Module
,指定要 dispatch
的模型。
device_map
:一个字典,将模型 state_dict
中的模块名称映射到它们应该放置的设备。注意,设备名称可以是 "disk"
,即使它不是 torch.device
的正确值。
它不需要细化到每个 parameter/buffer
名称,一旦一个给定的模块名称在里面,它的每个子模块都将被发送到同一个设备。要让 Accelerate
自动计算出最优化的设备映射,请设置 device_map="auto"
。
main_device
:一个字符串、整数、或 torch.device
,指定主执行设备。默认为 device_map
中不同于 "cpu"
或 "disk"
的第一个设备。
state_dict
:一个字典,指定模型哪些部分的 state dict
被保留在 CPU
上。
offload_dir
:一个字符串或 os.PathLike
,指定卸载模型权重的文件夹(或者模型权重已经被卸载的地方)。
offload_index
:一个字典,从权重名称到权重信息( dtype/shape
或 safetensors
文件名)。默认为 save_folder
中保存的 index
。
offload_buffers/preload_module_classes
:参考 cpu_offload()
。
accelerate.load_checkpoint_and_dispatch()
:加载一个 checkpoint
(可能是被分片后的)到模型。可能在加载时将权重发送到一个给定的设备上,并添加各种 hooks
,这些 hooks
这个模型正常运行(即使在设备间分割)。
xxxxxxxxxx
accelerate.load_checkpoint_and_dispatch(
model: Module, checkpoint: typing.Union[str, os.PathLike],
device_map: typing.Union[str, typing.Dict[str, typing.Union[str, int, torch.device]], NoneType] = None,
max_memory: typing.Union[typing.Dict[typing.Union[int, str], typing.Union[int, str]], NoneType] = None,
no_split_module_classes: typing.Optional[typing.List[str]] = None,
offload_folder: typing.Union[str, os.PathLike, NoneType] = None, offload_buffers: bool = False,
dtype: typing.Union[str, torch.dtype, NoneType] = None, offload_state_dict: typing.Optional[bool] = None,
preload_module_classes: typing.Optional[typing.List[str]] = None )
参数:
model
:一个 torch.nn.Module
,指定需要加载 checkpoint
的模型。checkpoint
:一个字符串或 os.PathLike
对象,指定 checkpoint
的路径名。可以为:包含整个模型 state dict
的文件的路径、一个 .json
文件的路径(该 .json
文件包含 sharded checkpoint
的索引)、一个路径包含唯一的 .index.json
文件和 shards checkpoint
。max_memory
:一个字典,指定每个设备的最大内存。如果不设置,将默认为每个 GPU
的最大内存,以及可用的 CPU RAM
。no_split_module_classes
:一个关于字符串的列表,指定哪些 layer
不能跨设备分割(如,包含残差连接的层)。dtype
:一个字符串或 torch.dtype
,指定权重在加载时被转换为什么类型。offload_folder
:一个字符串或 os.PathLike
对象,如果 device_map
包含 "disk"
,那么 offload_folder
指定卸载权重的目录。offload_state_dict
:一个布尔值,如果为 True
,则临时卸载 CPU state dict
到硬盘上从而防止 CPU out of memory
。如果 device map
包含 "disk"
,则默认为 True
。device_map/offload_buffers/preload_module_classes
:参考 cpu_offload()
。示例:
xxxxxxxxxx
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
from huggingface_hub import hf_hub_download
from transformers import AutoConfig, AutoModelForCausalLM
# Download the Weights
checkpoint = "EleutherAI/gpt-j-6B"
weights_location = hf_hub_download(checkpoint, "pytorch_model.bin")
# Create a model and initialize it with empty weights
config = AutoConfig.from_pretrained(checkpoint)
with init_empty_weights():
model = AutoModelForCausalLM.from_config(config)
# Load the checkpoint and dispatch it to the right devices
model = load_checkpoint_and_dispatch(
model, weights_location, device_map="auto", no_split_module_classes=["GPTJBlock"]
)
class accelerate.hooks.ModelHook()
:hook
包含一些回调函数,这些回调函数在模块的前向传播之前或之后被调用。
与 PyTorch
现有的 hook
不同的是,这里的 hook
会沿着 kwargs
传递。
属性:
no_grad(bool, optional, defaults to False)
:是否在 torch.no_grad()
上下文管理器中执行实际的前向传播过程。方法:
detach_hook(module)
:当 hook
从一个模块中 detach
时执行该方法。
参数:module
:一个 torch.nn.Module
对象,指定模块。
init_hook(module)
:当 hook
attach
到一个模块时执行该方法。
参数:module
:一个 torch.nn.Module
对象,指定模块。
post_forward(module, output) -> any
:在模块的前向传播之后立即执行。
参数:
module
:一个 torch.nn.Module
对象,指定模块。output
:任何对象,表示模块前向传播的结果。返回:后处理的 output
。
pre_forward(module, *args, **kwargs) -> Tuple[Tuple[Any], Dict[Str, Any]]
:在模块的前向传播之前执行。
参数:
module
:一个 torch.nn.Module
对象,指定模块。args, kwargs
:位置参数和关键字参数,被传递给 module
。返回值:处理过的 args
和 kwargs
的元组。
class accelerate.hooks.AlignDevicesHook()
:一个通用的 ModelHook
,确保 inputs
和模型权重在相关模块的前向传播中处于同一设备上,可能在前向传播之后卸载权重。
xxxxxxxxxx
class accelerate.hooks.AlignDevicesHook(
execution_device: typing.Union[int, str, torch.device, NoneType] = None,
offload: bool = False, io_same_device: bool = False, weights_map: typing.Optional[typing.Mapping] = None,
offload_buffers: bool = False, place_submodules: bool = False
)
参数:
execution_device
:一个 torch.device
,指定在前向传播之前,inputs
和模型权重应该放在哪个设备上。offload
:一个布尔值,指定权重是否应该在前向传播后被卸载。io_same_device
:一个布尔值,指定 outputs
是否应放在与 inputs
相同的设备上。weights_map
:一个 Mapping
(可能是惰性的),指定当模型权重被卸载时,从参数名到张量值的映射。offload_buffers
:一个布尔值,指定卸载时是否也包含模块的 buffers
。place_submodules
:一个布尔值,指定在 init_hook
事件中是否将子模块放在 execution_device
上。class accelerate.hooks.SequentialHook(*hooks)
:一个 hook
,可以包含几个 hook
,并在每个事件中对这些子 hook
进行迭代。
accelerate.hooks.add_hook_to_module( module: Module, hook: ModelHook, append: bool = False) -> torch.nn.Module
:在一个给定的模块中添加一个 hook
。
这将重写模块的 forward()
方法,使其包含 hook
。如果要删除这一行为并恢复原来的 forward()
方法,请使用remove_hook_from_module()
。
module
:一个 torch.nn.Module
对象,指定需要 attach hook
的模块。hook
:一个 ModelHook
,指定 hook
。append
:一个布尔值,指定当前的 hook
是否与 module
上已有的 hook
串起来(即,hook list
)、或者替代已有的 hook
。默认为替代行为(False
)。返回相同的 module
,但是已经 attach
了 hook
。注意,module
已经被原地修改了。
accelerate.hooks.attach_execution_device_hook( module: Module, execution_device: typing.Union[int, str, torch.device], preload_module_classes: typing.Optional[typing.List[str]] = None)
:递归地将 AlignDevicesHook
附加到一个给定模型的所有子模块,以确保它们有正确的执行设备。
参数:
module
:一个 torch.nn.Module
对象,指定需要 attach hook
的模块。execution_device
:一个整数、字符串、或 torch.device
,指定前向传播之前,inputs
和模型权重应该放到哪个设备。preload_module_classes
:参考 cpu_offload()
。accelerate.hooks.attach_align_device_hook( module: Module, execution_device: typing.Optional[torch.device] = None, offload: bool = False, weights_map: typing.Optional[typing.Mapping] = None, offload_buffers: bool = False, module_name: str = '', preload_module_classes: typing.Optional[typing.List[str]] = None)
:递归地将 AlignDevicesHook
附加到一个给定模型的所有子模块。
参数:
module/execution_device/preload_module_classes
:参考 attach_execution_device_hook()
。offload/weights_map/offload_buffers
:参考 AlignDevicesHook()
初始化方法。module_name
:一个字符串,指定模块的名字。accelerate.hooks.attach_align_device_hook_on_blocks(module: Module, execution_device: typing.Union[torch.device, typing.Dict[str, torch.device], NoneType] = None, offload: typing.Union[bool, typing.Dict[str, bool]] = False, weights_map: typing.Mapping = None, offload_buffers: bool = False, module_name: str = '', preload_module_classes: typing.Optional[typing.List[str]] = None)
:根据需要将AlignDevicesHook
附加到一个给定模型的所有 blocks
上。
参数:参考 attach_align_device_hook()
。
accelerate.hooks.remove_hook_from_module(module: Module, recurse = False ) -> torch.nn.Module
:移除模块上附加的任何 hook
。
参数:
module
:一个 torch.nn.Module
对象,指定需要 detach hook
的模块。recurse
:一个布尔值,指定是否递归地移除。返回相同的 module
,但是已经 detach
了 hook
。注意,module
已经被原地修改了。
accelerate.hooks.remove_hook_from_submodules(module: Module)
:递归地删除一个给定模型的子模块上的所有 hook
。
参数:module
:一个 torch.nn.Module
对象,指定需要 detach hook
的模块。
class accelerate.DeepSpeedPlugin
:用于集成 DeepSpeed
的插件。
xxxxxxxxxx
class accelerate.DeepSpeedPlugin(
hf_ds_config: typing.Any = None, gradient_accumulation_steps: int = None,
gradient_clipping: float = None, zero_stage: int = None, is_train_batch_min: str = True,
offload_optimizer_device: bool = None, offload_param_device: bool = None,
zero3_init_flag: bool = Nonezero3_save_16bit_model: bool = None
)
方法:
deepspeed_config_process( prefix = '', mismatches = None, config = None, must_match = True, **kwargs )
:用 kwargs
的内容来处理 DeepSpeed config
。class accelerate.utils.DummyOptim(params, lr = 0.001, weight_decay = 0, **kwargs)
:Dummy optimizer
。 当在 deepspeed
配置文件中指定 optimizer config
时,如果遵循常规的训练循环则用该 Dummy optimizer
。
参数:
lr
:一个浮点数,指定学习率。params
:一个可迭代对象或字典,指定 parameters
或 parameter group
。weight_decay
:一个浮点数,指定权重衰减。kwargs
:关键字参数。class accelerate.utils.DummyScheduler( optimizer, total_num_steps = None, warmup_num_steps = 0, **kwargs )
:Dummy scheduler
。当在 deepspeed
配置文件中指定 scheduler config
时,如果遵循常规的训练循环则用该 Dummy scheduler
。
参数:
optimizer
:一个 torch.optim.optimizer.Optimizer
:指定优化器。total_num_steps
:一个整数,指定总的 step
数。warmup_num_steps
:一个整数,指定 warmup
的 step
数。kwargs
:关键字参数。class accelerate.utils.DeepSpeedEngineWrappe(engine)
:用于 deepspeed.runtime.engine.DeepSpeedEngine
的 wrapper
。它用于常规的训练循环。
参数:engine
:一个 deepspeed.runtime.engine.DeepSpeedEngine
,指定被 wrap
的 deepspeed engine
。
class accelerate.utils.DeepSpeedOptimizerWrapper(optimizer)
:用于 deepspeed optimizer
的 wrapper
。
参数:optimizer
:一个 torch.optim.optimizer.Optimizer
,指定被 wrap
的优化器。
class accelerate.utils.DeepSpeedSchedulerWrapper(scheduler, optimizers)
:用于 deepspeed scheduler
的 wrapper
。
参数:
scheduler
:一个 torch.optim.lr_scheduler.LambdaLR
,指定被 wrap
的 scheduler
。optimizers
:一个torch.optim.Optimizer
或它的列表,指定被 wrap
的 optimizer
。class accelerate.utils.MegatronLMPlugin
:Megatron-LM
的插件,用于实现张量并行、管道并行、序列并行和数据并行。还可以启用 selective activation recomputation
和 optimized fused kernel
。
xxxxxxxxxx
class accelerate.utils.MegatronLMPlugin(
tp_degree: int = None,
pp_degree: int = None,
num_micro_batches: int = None,
gradient_clipping: float = None,
sequence_parallelism: bool = None,
recompute_activation: bool = None,
use_distributed_optimizer: bool = None,
pipeline_model_parallel_split_rank: int = None,
num_layers_per_virtual_pipeline_stage: int = None,
is_train_batch_min: str = True,
train_iters: int = None,
train_samples: int = None,
weight_decay_incr_style: str = 'constant',
start_weight_decay: float = None,
end_weight_decay: float = None,
lr_decay_style: str = 'linear',
lr_decay_iters: int = None,
lr_decay_samples: int = None,
lr_warmup_iters: int = None,
lr_warmup_samples: int = None,
lr_warmup_fraction: float = None,
min_lr: float = 0,
consumed_samples: typing.List[int] = None,
no_wd_decay_cond: typing.Optional[typing.Callable] = None,
scale_lr_cond: typing.Optional[typing.Callable] = None,
lr_mult: float = 1.0,
megatron_dataset_flag: bool = False,
seq_length: int = None,
encoder_seq_length: int = None,
decoder_seq_length: int = None,
tensorboard_dir: str = None,
set_all_logging_options: bool = False,
eval_iters: int = 100,
eval_interval: int = 1000,
return_logits: bool = False,
custom_train_step_class: typing.Optional[typing.Any] = None,
custom_train_step_kwargs: typing.Union[typing.Dict[str, typing.Any], NoneType] = None,
custom_model_provider_function: typing.Optional[typing.Callable] = None,
custom_prepare_model_function: typing.Optional[typing.Callable] = None,
other_megatron_args: typing.Union[typing.Dict[str, typing.Any], NoneType] = None
)
class accelerate.utils.MegatronLMDummyScheduler(optimizer, total_num_steps = None, warmup_num_steps = 0, **kwargs )
:MegatronLM Dummy scheduler
。当在 megatron
配置文件中指定 scheduler config
时,如果遵循常规的训练循环则用该 Dummy scheduler
。
参数:参考 DummyScheduler
。
class accelerate.utils.MegatronLMDummyDataLoader(**dataset_kwargs)
:Dummy DataLoader
,仅用于常规的训练循环。
class accelerate.utils.AbstractTrainStep(name)
:用于 batching
的抽象类。
class accelerate.utils.GPTTrainStep(args)
:GPT train step
类。
参数:args
:指定 Megatron-LM
的参数。
class accelerate.utils.BertTrainStep(args)
:Bert train step
类。
参数:args
:指定 Megatron-LM
的参数。
class accelerate.utils.T5TrainStep(args)
:T5 train step
类。
参数:args
:指定 Megatron-LM
的参数。
accelerate.utils.avg_losses_across_data_parallel_group(losses)
:跨 data parallel group
来对损失函数值取平均。
参数:losses
:张量的列表,指定哪些 loss
需要跨 data parallel group
取平均。