Python GPU排队脚本

最近任务的模态太多了,要跑很多次训练脚本

于是就写了这个脚本,主要功能就是检测GPU的剩余显存,如果空闲就跑列表里没启动的任务,任务这边用命令行调用的方式,还是比较灵活的

估计不咋完善,因为是今天晚上临时写的,后面再慢慢加功能或者debug啥的吧

import subprocess
import time
import os
import signal


def get_gpu_memory():
    """
    使用 nvidia-smi 获取所有 GPU 的显存占用信息。
    返回一个字典,key 为 GPU ID,value 为显存占用(单位:MB)。
    """
    gpu_memory = {}
    # 获取 nvidia-smi 输出,包含显存使用信息
    command = "nvidia-smi --query-gpu=index,memory.used,memory.total --format=csv,noheader,nounits"
    result = subprocess.check_output(command, shell=True).decode("utf-8").strip().splitlines()

    for line in result:
        index, memory_used, memory_total = map(int, line.split(","))
        gpu_memory[index] = memory_used / 1024  # 转换为 GB
    return gpu_memory


def get_available_gpu(gpu_ids, min_memory=1.0):
    """
    获取指定 GPU ID 列表中显存占用少于 min_memory (GB) 的 GPU ID。
    """
    gpu_memory = get_gpu_memory()
    available_gpus = [gpu_id for gpu_id in gpu_ids if gpu_memory.get(gpu_id, 0) < min_memory]
    return available_gpus


def run_train_command(command, gpu_id):
    """
    运行训练命令,指定使用的 GPU,并返回子进程对象。
    """
    full_command = f"CUDA_VISIBLE_DEVICES={gpu_id} {command}"
    print(f"Running command: {full_command}")
    # 使用 subprocess.Popen 启动训练任务,以便可以后续监控
    process = subprocess.Popen(full_command, shell=True, preexec_fn=os.setsid)
    return process


def wait_for_gpu(min_memory=1.0, gpu_ids_to_check=[0, 1]):
    """
    检查并等待至少一个 GPU 空闲(显存小于指定值)。
    """
    while True:
        available_gpus = get_available_gpu(gpu_ids_to_check, min_memory)
        if available_gpus:
            print(f"Available GPUs found: {available_gpus}")
            return available_gpus
        # print("No available GPUs. Waiting...")
        time.sleep(5)  # 每 5 秒检查一次

def terminate_processes(processes):
    """
    终止所有运行的子进程。
    """
    print("\nTerminating all running processes...")
    for process in processes:
        try:
            # 使用 os.killpg 终止整个进程组
            os.killpg(os.getpgid(process.pid), signal.SIGTERM)
            print(f"Terminated process {process.pid}")
        except Exception as e:
            print(f"Failed to terminate process {process.pid}: {e}")

def main():
    # 需要执行的训练命令列表(可以根据需求修改)
    train_commands = [
        "python train.py --sequence A70",
        "python train.py --sequence AD",
        "python train.py --sequence AZ",
        "python train.py --sequence V70",
        "python train.py --sequence AZ"
    ]

    # 指定你希望检查的 GPU ID 列表
    gpu_ids_to_check = [0, 1]  # 假设你要检查 GPU 0 和 GPU 1

    # 持续循环直到所有训练命令执行完毕
    processes = []  # 用于保存所有的子进程

    try:
        while train_commands:
            # 获取空闲 GPU
            available_gpus = wait_for_gpu(min_memory=1.0, gpu_ids_to_check=gpu_ids_to_check)

            # 执行训练任务
            command = train_commands.pop(0)  # 取出队列中的第一个训练命令
            gpu_id = available_gpus[0]  # 选择一个空闲的 GPU 执行任务

            # 运行任务并返回子进程对象
            process = run_train_command(command, gpu_id)
            processes.append(process)  # 保存子进程

            # 等待短暂的时间,避免过快地进行下一个任务
            time.sleep(60)

        # 等待所有任务执行完毕
        for process in processes:
            process.wait()  # 阻塞,直到子进程完成
            print("A training task has finished.")

        print("All tasks completed.")
    except KeyboardInterrupt:
        # 捕获 Ctrl+C 中断
        print("\nKeyboardInterrupt detected! Cleaning up...")
        terminate_processes(processes)

    finally:
        # 确保无论如何都清理子进程
        terminate_processes(processes)


if __name__ == "__main__":
    main()

发表评论