source

python 다중 처리를 시도하는 Windows의 RuntimeError

nicesource 2023. 1. 8. 14:34
반응형

python 다중 처리를 시도하는 Windows의 RuntimeError

저는 윈도 머신에서 스레딩과 멀티프로세싱을 사용하여 첫 번째 정식 파이썬 프로그램을 시도하고 있습니다.python은 아래와 같은 메시지를 보내서 프로세스를 시작할 수 없습니다.문제는 메인 모듈에서 스레드를 실행하지 않는다는 것입니다.스레드는 클래스 내의 개별 모듈에서 처리됩니다.

편집: 그런데 이 코드는 ubuntu에서 정상적으로 작동합니다.윈도에서는 잘 안 돼

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

제 원래 코드는 꽤 길지만, 코드 요약본으로 에러를 재현할 수 있었습니다.이 파일은 2개의 파일로 분할됩니다.첫 번째 파일은 메인모듈이며 프로세스/스레드를 처리하고 메서드를 호출하는 모듈을 Import하는 것 외에는 거의 수행되지 않습니다.두 번째 모듈은 코드의 고기가 있는 곳입니다.


test Main.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

parallelTestModule 입니다.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

Windows windows( 、 [ Import ( 실 ) ) ], 이렇게 돼요.if __name__ == '__main__':서브프로세스가 재귀적으로 생성되지 않도록 메인모듈에 가드를 설정합니다.

★★testMain.py:

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

testMain의 주요 함수에 코드를 입력해 보십시오.화이

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

다음의 문서를 참조해 주세요.

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

말하자면

새로운 Python 인터프리터가 의도하지 않은 부작용(새로운 프로세스 시작 등)을 일으키지 않고 메인 모듈을 안전하게 가져올 수 있는지 확인합니다.

...을 사용하여을 사용하여if __name__ == '__main__'

이전 답변은 맞지만, 언급하는 데 도움이 될 만한 작은 문제가 있습니다.

메인 모듈이 글로벌 변수 또는 클래스 멤버 변수를 정의하고 새로운 오브젝트로 초기화(또는 사용)하는 다른 모듈을 Import하는 경우 동일한 방법으로 Import 조건을 설정해야 할 수 있습니다.

if __name__ ==  '__main__':
  import my_module

@말했듯이 또는 @Offer .if __name__ == '__main__':

제 경우엔 이렇게 끝났죠

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()

안녕하세요 여기는 멀티프로세스용 구조입니다.

from multiprocessing import Process
import time


start = time.perf_counter()


def do_something(time_for_sleep):
    print(f'Sleeping {time_for_sleep} second...')
    time.sleep(time_for_sleep)
    print('Done Sleeping...')



p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])


if __name__ == '__main__':
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    finish = time.perf_counter()
    print(f'Finished in {round(finish-start,2 )} second(s)')

을 넣지 if __name__ == '__main__':실행하려는 프로그램을 실행하기만 하면 됩니다.

제 경우, 코드를 만들기 전에 변수를 사용한 단순한 버그입니다.위의 솔루션을 시도하기 전에 확인해 볼 가치가 있습니다.왜 이런 에러 메시지를 받았는지 아무도 모르죠

아래 솔루션은 python multiprocessing과 pytorch multiprocessing 모두에 사용할 수 있습니다.

다른 답변에서 언급한 바와 같이 수정사항은if __name__ == '__main__':그러나 저는 여러 스크립트 및 모듈을 사용하고 있기 때문에 어디서부터 시작해야 할지 몇 가지 문제에 직면했습니다.첫 번째 함수를 main 내부로 호출할 수 있으면 여러 프로세스를 생성하기 전의 모든 기능을 호출할 수 있습니다(이유는 확실하지 않습니다).

(수입 전부터) 맨 앞줄에 놓는 것이 효과가 있었다.첫 번째 함수 반환 시간 초과 오류만 호출합니다.아래는 제 코드의 첫 번째 파일이며, 여러 함수를 호출한 후 멀티프로세싱을 사용하지만 첫 번째에 메인을 넣는 것만이 유일한 수정점인 것 같습니다.

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

    # Get demonstration data if necessary and behavior clone
    if job_data['algorithm'] != 'NPG':
        print("========================================")
        print("Collecting expert demonstrations")
        print("========================================")
        demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))

        ########################################################################################
        demo_paths = demo_paths[0:3]
        print (job_data['demo_file'], len(demo_paths))
        for d in range(len(demo_paths)):
            feats = demo_paths[d]['features']
            feats = np.vstack(feats)
            demo_paths[d]['observations'] = feats

        ########################################################################################

        bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
                      lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)

        in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
        bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
        bc_agent.set_variance_with_data(out_scale)

        ts = timer.time()
        print("========================================")
        print("Running BC with expert demonstrations")
        print("========================================")
        bc_agent.train()
        print("========================================")
        print("BC training complete !!!")
        print("time taken = %f" % (timer.time() - ts))
        print("========================================")

        # if job_data['eval_rollouts'] >= 1:
        #     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
        #     print("Score with behavior cloning = %f" % score[0][0])

    if job_data['algorithm'] != 'DAPG':
        # We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
        demo_paths = None

    # ===============================================================================
    # RL Loop
    # ===============================================================================

    rl_agent = DAPG(e, policy, baseline, demo_paths,
                    normalized_step_size=job_data['rl_step_size'],
                    lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
                    seed=job_data['seed'], save_logs=True
                    )

    print("========================================")
    print("Starting reinforcement learning phase")
    print("========================================")


    ts = timer.time()
    train_agent(job_name=JOB_DIR,
                agent=rl_agent,
                seed=job_data['seed'],
                niter=job_data['rl_num_iter'],
                gamma=job_data['rl_gamma'],
                gae_lambda=job_data['rl_gae'],
                num_cpu=job_data['num_cpu'],
                sample_mode='trajectories',
                num_traj=job_data['rl_num_traj'],
                num_samples= job_data['rl_num_samples'],
                save_freq=job_data['save_freq'],
                evaluation_rollouts=job_data['eval_rollouts'])
    print("time taken = %f" % (timer.time()-ts))

저도 같은 문제에 부딪혔어요.@ofter 메서드는 주의할 점이 있기 때문에 정확합니다.다음은 참고용으로 수정한 성공한 디버깅 코드입니다.


if __name__ == '__main__':
    import matplotlib.pyplot as plt
    import numpy as np
    def imgshow(img):
        img = img / 2 + 0.5
        np_img = img.numpy()
        plt.imshow(np.transpose(np_img, (1, 2, 0)))
        plt.show()

    dataiter = iter(train_loader)
    images, labels = dataiter.next()

    imgshow(torchvision.utils.make_grid(images))
    print(' '.join('%5s' % classes[labels[i]] for i in range(4)))

참고로 서브루틴은 없고 메인 프로그램만 있는데 당신과 같은 문제가 있어요.이는 프로그램 세그먼트 중간에 Python 라이브러리 파일을 가져올 때 다음을 추가해야 함을 보여줍니다.

if __name__ == '__main__':

python 3.8.5를 사용하는 yolo v5의 경우

if __name__ == '__main__':
    from yolov5 import train
    train.run()

언급URL : https://stackoverflow.com/questions/18204782/runtimeerror-on-windows-trying-python-multiprocessing

반응형