为什么在 for 循环中使用 python 多处理来解决令人尴尬的并行问题并共享 numpy 数据时没有加速?

时间:2023-03-13
本文介绍了为什么在 for 循环中使用 python 多处理来解决令人尴尬的并行问题并共享 numpy 数据时没有加速?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我想加速一个与贝叶斯推理相关的令人尴尬的并行问题.目的是推断一组图像 x 的系数 u,给定矩阵 A,使得 X = A*U.X 具有尺寸 mxn、A mxp 和 U pxn.对于 X 的每一列,必须推断系数 U 的最优对应列.最后,此信息用于更新 A.我使用 m = 3000,p = 1500 和 n = 100.因此,由于它是一个线性模型,系数矩阵 u 的推断由 n 个独立计算组成.因此,我尝试使用 Python 的多处理模块,但没有加速.

I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U. X has dimensions mxn, A mxp and U pxn. For each column of X, one has to infer the optimal corresponding column of the coefficients U. In the end, this information is used to update A. I use m = 3000, p = 1500 and n = 100. So, as it is a linear model, the inference of the coefficient-matrix u consists of n independent calculations. Thus, I tried to work with the multiprocessing module of Python, but there is no speed up.

这是我所做的:

没有并行化的主要结构是:

The main structure, without parallelization, is:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多处理的实现:

我尝试实现多处理.我有一台可以使用的 8 核机器.

I tried to implement multiprocessing. I have an 8-core machine that I can use.

  1. 有 3 个 for 循环.唯一似乎可并行化"的是第三个,其中推断了系数:
    • 生成一个队列并将迭代数从 0 到 batch_size-1 堆叠到队列中
    • 生成 8 个进程,让它们通过 Queue 工作

因此,我将第三个循环替换为以下内容:

So, I replaced this third loop with the following:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

这是 Wrap_mp 类:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

这里是函数 infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

现在的问题如下:

  1. 对于给定的数据维度,多处理版本并不比单线程版本快.
  2. 进程 ID 会随着每次迭代而增加.这是否意味着不断产生新的进程?这不会产生巨大的开销吗?我怎样才能避免这种情况?是否有可能在整个 for-loop 中创建 8 个不同的进程并用数据更新它们?
  3. 我在进程之间共享系数 U 的方式是否会减慢计算速度?还有其他更好的方法吗?
  4. 进程池会更好吗?

我非常感谢任何形式的帮助!一个月前我开始使用 Python,现在很迷茫.

I am really thankful for any sort of help! I have started working with Python a month ago, and am pretty lost now.

引擎

推荐答案

每次创建流程时,您都在创建新流程.如果您在 for 循环中执行此操作,那么是的,您每次都在循环中启动新进程.听起来您想要做的是在循环外初始化队列和进程,然后在循环内填充队列.

Every time you create a Process you are creating a new process. If you're doing that within your for loop, then yes, you are starting new processes every time through the loop. It sounds like what you want to do is initialize your Queue and Processes outside of the loop, then fill the Queue inside the loop.

我之前使用过 multiprocessing.Pool,它很有用,但与您已经使用 Queue 实现的功能相比,它提供的功能并不多.

I've used multiprocessing.Pool before, and it's useful, but it doesn't offer much over what you've already implemented with a Queue.

这篇关于为什么在 for 循环中使用 python 多处理来解决令人尴尬的并行问题并共享 numpy 数据时没有加速?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:如何限制多处理进程的范围? 下一篇:multiprocessing.pool.map 和带有两个参数的函数

相关文章