python multiprocessing getting empty result

This is the regular function without using multiprocessing

def runMonte(loanpool, structured_securities, tolerance, NSIM):
    structured_securities.addTranche(0.8, 0.05, 0)
    structured_securities.addTranche(0.2, 0.08, 1)
    tranche_percent = [0.8, 0.2]
    coeff = [1.2, 0.8]  # Tranche A has coeff of 1.2, Tranche B has coeff of 0.8
    rates = [0.05, 0.08]  # Tranche A has rate of 5%, Tranche B has rate of 8%
    yields = []
    while True:
        new_rates = []
        for index, tranche in enumerate(structured_securities.trancheList):
            tranche.rate = rates[index]  # give each tranche a new rate based on the original or modified rate
        average_DIRR_AL = simulateWaterfall(loanpool, structured_securities, NSIM)
        yields.append(Tranche.calculateYield(average_DIRR_AL[0][0], average_DIRR_AL[0][1]))
        yields.append(Tranche.calculateYield(average_DIRR_AL[1][0], average_DIRR_AL[1][1]))
        for index, tranches in enumerate(structured_securities.trancheList):
            new_rates.append(Tranche.newTrancheRate(tranches.rate, coeff[index], yields[index]))
        diffs = Tranche.diff(tranche_percent, rates, new_rates)
        if diffs < tolerance:
            break
        rates = new_rates  # modify the tranche rate to reflect the yields, return to the while loop

    for index, tranche in enumerate(structured_securities.trancheList):
        average_DIRR_AL[index].append(Tranche.DIRR_Rating(average_DIRR_AL[index][0]))
        average_DIRR_AL[index].append(tranche.rate)

    return average_DIRR_AL

And I am getting the result that I wanted, as the following:

[[0.0025041053262339907, 28.31594686263471, 'Baa1', 0.06685305303811526], [0.005659780599164244, 11.074511263338902, 'Baa3', 0.06579310384106068]]

When I use the multiprocessing as followed, I just got empty result I use the below function to call the multiprocessing functions (just modify parts of my runMonte)

def runMonteParallel(loanpool, structured_securities, tolerance, NSIM, num_processes):
    structured_securities.addTranche(0.8, 0.05, 0)
    structured_securities.addTranche(0.2, 0.08, 1)
    tranche_percent = [0.8, 0.2]
    coeff = [1.2, 0.8]  # Tranche A has coeff of 1.2, Tranche B has coeff of 0.8
    rates = [0.05, 0.08]  # Tranche A has rate of 5%, Tranche B has rate of 8%
    yields = []
    while True:
        new_rates = []
        for index, tranche in enumerate(structured_securities.trancheList):
            tranche.rate = rates[index]  # give each tranche a new rate based on the original or modified rate
        average_DIRR_AL = runSimulationParallel(loanpool, structured_securities, NSIM, num_processes)
        yields.append(Tranche.calculateYield(average_DIRR_AL[0][0], average_DIRR_AL[0][1]))
        yields.append(Tranche.calculateYield(average_DIRR_AL[1][0], average_DIRR_AL[1][1]))
        for index, tranches in enumerate(structured_securities.trancheList):
            new_rates.append(Tranche.newTrancheRate(tranches.rate, coeff[index], yields[index]))
        diffs = Tranche.diff(tranche_percent, rates, new_rates)
        if diffs < tolerance:
            break
        rates = new_rates  # modify the tranche rate to reflect the yields, return to the while loop

    for index, tranche in enumerate(structured_securities.trancheList):
        average_DIRR_AL[index].append(Tranche.DIRR_Rating(average_DIRR_AL[index][0]))
        average_DIRR_AL[index].append(tranche.rate)

    return average_DIRR_AL

Below is the multiprocessing function:

# doWork function can be any function with any argument
def doWork(input, output):
    while True:
        try:
            f, args = input.get(timeout=1)
            res = f(*args)
            output.put(res)
        except:
            output.put('Done')
            break


def runSimulationParallel(loan_pool, structured_securities, NSIM, num_processes):
    input_queue = multiprocessing.Queue()
    output_queue = multiprocessing.Queue()

    # add 20 runMC function items into input_queue
    for i in range(num_processes):
        input_queue.put((simulateWaterfall, (loan_pool, structured_securities, NSIM / num_processes)))
        # create 5 child processes

    processes = []  # initialize an empty list of process
    for i in range(num_processes):
        p = multiprocessing.Process(target=doWork, args=(input_queue, output_queue))
        p.start()
        processes.append(p)  # append all the processes

    res = []  # result
    # return the result list
    while True:
        r = output_queue.get()
        if r != 'Done':
            # r = np.array(r)
            res.append(r)
        else:
            break

    for p in processes:
        p.terminate()  # stop the process after done

    for p in processes:
        p.join()  # calling main processes to wait until all the processes are finished

    print(res) # print the result to test
    sum_DIRR_AL_results = np.zeros((len(structured_securities.trancheList), 2))
    for simulations in res:
        for i, tranche in enumerate(simulations):
            sum_DIRR_AL_results[i] += tranche[i]

    DIRR_AL = sum_DIRR_AL_results / num_processes
    DIRR_AL = DIRR_AL.tolist()
    return DIRR_AL

Here are the results returned:

[]
[]
[]
[]
# I stepped into it to print the res, but only empty [] are returned
Seconds taken: 1.8176448345184326
[[0.0, 0.0, 'Aaa', 0.06493333333333333], [0.0, 0.0, 'Aaa', 0.0649362962962963]]

Below is the simulateWaterfall function that got called in both regular and multiprocessing functions

def simulateWaterfall(loanpool, structured_securities, NSIM):
    if not isinstance(loanpool, LoanPool) or not isinstance(structured_securities, StructuredSecurities):
        logging.error('Please enter the correct class type')
    rating_metrics = []  # intialize an empty list to get the doWaterfall results
    # create an array filled with zeros, could be used to filled in later
    sum_DIRR_AL = np.zeros((len(structured_securities.trancheList), 2))
    for i in range(NSIM):
        # remember to reset each time of the simulation
        loanpool.reset()
        structured_securities.reset()
        # obtain the results of rating metrics from the doWaterfall
        rating_metrics.append(doWaterfall(loanpool, structured_securities)[3])
    for simulations in rating_metrics:
        for i, tranche_metric in enumerate(simulations):
            if tranche_metric[2] != math.inf:  # if AL not infinite
                sum_DIRR_AL[i] += [tranche_metric[1], tranche_metric[2]]
            else:
                # if AL is infinite, get rid of the AL, only add up the DIRR to get the average
                sum_DIRR_AL[i] += [tranche_metric[1], 0]
    # return the average DIRR and WAL values for each tranche
    # use ndarray here because it allows to divide directly instead of doing a list comprehension
    DIRR_AL = sum_DIRR_AL / NSIM
    DIRR_AL = DIRR_AL.tolist()  # convert the array to list
    return DIRR_AL

Updated: I found out the issue with the empty result. input_queue could not accept a float, but NSIM / num_processes is a float, so I did int(NSIM / num_processes) and the result list is no longer empty. However, I think my logic of getting the result from multi-processing is wrong (for loop is wrong, did not get me the result that I wanted), could anyone take a look at that? The result from multi-processing is now:

[[0.00243029267885915, 0.00243029267885915, 'Baa1', 0.06493480609938325], [11.079752982920553, 11.079752982920553, 'Ca', 0.07170816092109777]]

What I would expect is somehting like this:

[[0.0025041053262339907, 28.31594686263471, 'Baa1', 0.06685305303811526], [0.005659780599164244, 11.074511263338902, 'Baa3', 0.06579310384106068]]

Leave a Comment