NumPy阵列的扫描窗口

带有示例的CoLab记事本


可以使用Python编程语言在NumPy数组上创建滚动窗口(滚动窗口,滑动窗口,移动窗口),而无需显式循环本文讨论了在NumPy数组上创建一维,二维,三维和N维滑动窗口的方法。结果,数据处理速度提高了数千倍,并且速度与C编程语言相当


滑动窗口用于:图像处理,人工神经网络,Internet协议TCP,基因组数据处理,预测时间序列等。


免责声明源代码中可能有错误!如果发现错误,请给我写信。





介绍


本文是对StackOverflow网站的回答的延续我的第一个实验是在这里这里使用滑动窗口


二维图像阵列上滑动二维窗口的实际实现取决于项目roll文件的功能,使用多边形手动标记图像logic_tools.py


一维滑动窗口的算法已在此处此处此处实现


, , (strides, ).


- Pandas, Pandas, , . , . , Cython, - , NumPy.




1. 1D ND Numpy


Numpy中ND阵列的滚动1D窗口


:


# Rolling 1D window for ND array
def roll(a,      # ND array
         b,      # rolling 1D window array
         dx=1):  # step size (horizontal)
    shape = a.shape[:-1] + (int((a.shape[-1] - b.shape[-1]) / dx) + 1,) + b.shape
    strides = a.strides[:-1] + (a.strides[-1] * dx,) + a.strides[-1:]
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

numpy.lib.stride_tricks.as_strided (view) (shape) (strides).


(shape) , , . (strides) .


(shape) :


  • a.shape[:-1] — ND-, N > 1. N == 1, t == (), N == 1.
  • (int((a.shape[-1] - b.shape[-1]) / dx) + 1,)[-1] . dx : 1, 2, 3 ..
  • b.shape — .

(strides) :


  • a.strides[:-1] — ND-, N > 1. N == 1, t == (), N == 1.
  • (a.strides[-1] * dx,) — . , int 4 , dx == 2 4 * 2 = 8 .
  • a.strides[-1:] — . , int 4 , (4,).



2. 2D ND Numpy


Numpy中ND阵列的滚动2D窗口


2D 2D :


  • ;
  • ;
  • ( , , ..).

, 2D - . , , , , .. , , .


# Rolling 2D window for ND array
def roll(a,      # ND array
         b,      # rolling 2D window array
         dx=1,   # horizontal step, abscissa, number of columns
         dy=1):  # vertical step, ordinate, number of rows
    shape = a.shape[:-2] + \
            ((a.shape[-2] - b.shape[-2]) // dy + 1,) + \
            ((a.shape[-1] - b.shape[-1]) // dx + 1,) + \
            b.shape  # sausage-like shape with 2D cross-section
    strides = a.strides[:-2] + \
              (a.strides[-2] * dy,) + \
              (a.strides[-1] * dx,) + \
              a.strides[-2:]
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

: , , — ((a.shape[-2] - b.shape[-2]) // dy + 1,). :


    (int((a.shape[-1] - b.shape[-1]) / dx) + 1,)


    ((a.shape[-1] - b.shape[-1]) // dx + 1,)

.


() , (a.strides[-2] * dy,) 2D .


counts, coords :


def show_results(a, b, dx=1, dy=1):
    n = a.ndim  # number of dimensions
    # np.all over 2 dimensions of the rolling 2D window for 4D array
    bool_array = np.all(roll(a, b, dx, dy) == b, axis=(n, n+1))
    counts = np.count_nonzero(bool_array)
    coords = np.transpose(np.nonzero(bool_array)) * [dy, dx]
    print("Found {counts} elements with coordinates:\n{coords}".format(
        counts=counts, coords=coords))

np.all 2D 4D . coords [dy, dx] .




3. 3D ND Numpy


Numpy中ND阵列的滚动3D窗口


() - . , 3D ND- .


3D 3D — ( ) . CoLab 3D - , (, , ..).


# Rolling 3D window for ND array
def roll(a,      # ND array
         b,      # rolling 2D window array
         dx=1,   # horizontal step, abscissa, number of columns
         dy=1,   # vertical step, ordinate, number of rows
         dz=1):  # transverse step, applicate, number of layers
    shape = a.shape[:-3] + \
            ((a.shape[-3] - b.shape[-3]) // dz + 1,) + \
            ((a.shape[-2] - b.shape[-2]) // dy + 1,) + \
            ((a.shape[-1] - b.shape[-1]) // dx + 1,) + \
            b.shape  # multidimensional "sausage" with 3D cross-section
    strides = a.strides[:-3] + \
              (a.strides[-3] * dz,) + \
              (a.strides[-2] * dy,) + \
              (a.strides[-1] * dx,) + \
              a.strides[-3:]
    #print('shape =', shape, " strides =", strides)  # for debugging
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

counts coords :


def show_results(a, b, dx=1, dy=1, dz=1):
    n = a.ndim  # number of dimensions == 3
    # np.all over 3 dimensions of the rolling 3D window for 6D array
    bool_array = np.all(roll(a, b, dx, dy, dz) == b, axis=(n, n+1, n+2))
    counts = np.count_nonzero(bool_array)
    coords = np.transpose(np.nonzero(bool_array)) * [dz, dy, dx]
    print("Found {counts} elements with coordinates:\n{coords}".format(
        counts=counts, coords=coords))



4. MD ND , M ≤ N


Numpy中ND阵列的滚动MD窗口


roll show_results MD ND , M N : M ≤ N.


# Rolling MD window for ND array
def roll(a,        # ND array
         b,        # rolling MD window array
         d=None):  # steps array

    # Make several verifications
    n = a.ndim  # array dimensions
    m = b.ndim  # rolling window dimensions
    if m > n:  # check if M ≤ N
        print("Error: rolling window dimensions is larger than the array dims")
        return None
    if d is None:  # steps are equal to 1 by default
        d = np.ones(m, dtype=np.uint32)
    elif d.ndim != 1 and d.size != m:
        print("Error: steps number must be equal to rolling window dimensions")
        return None
    elif not np.issubdtype(d.dtype, np.integer) or \
         not (d > 0).all():
        print("Error: steps must be integer and > 0")
        return None

    s = np.flip(d)  # flip the 1D array of step sizes
    sub = np.subtract(a.shape[-m:], b.shape[-m:])
    steps = tuple(np.divide(sub, s).astype(np.uint32) + 1)
    shape = a.shape[:-m] + steps + b.shape

    section = tuple(np.multiply(a.strides[-m:], s))
    strides = a.strides[:-m] + section + a.strides[-m:]

    #print('shape =', shape, " strides =", strides)  # for debugging
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

roll . :


  • steps = tuple(np.divide(sub, s).astype(np.uint32) + 1) — .
  • section = tuple(np.multiply(a.strides[-m:], s)) — () « ».
  • « » section ND-: strides = a.strides[:-m] + section + a.strides[-m:].

counts coords :


def show_results(a, b, d=None):
    n = a.ndim  # array number of dimensions == N
    m = b.ndim  # rolling window dimensions == M
    if d is None:  # step sizes are equal to 1 by default
        d = np.ones(m, dtype=np.uint32)
    bool_array = roll(a, b, d) == b
    # np.all over M dimensions of the rolling MD window for (N+M)D array
    bool_array = np.all(bool_array, axis=tuple(range(n, n + m)))
    counts = np.count_nonzero(bool_array)
    # flip 1D array of step sizes and concatenate it with remaining dimensions
    s = np.concatenate((np.ones(n-m, dtype=int), np.flip(d)))
    coords = np.transpose(np.nonzero(bool_array)) * s
    print("Found {counts} elements with coordinates:\n{coords}".format(
        counts=counts, coords=coords))

show_results :


  • () bool_array . numpy.all m , True. , bool_array — (N+M)D , np.all m MD :

    bool_array = roll(a, b, d) == b  # get (N+M)D boolean array
    # np.all over M dimensions of the rolling MD window for (N+M)D array
    bool_array = np.all(bool_array, axis=tuple(range(n, n + m)))

  • M < N. M < N 1D , N-M ( 1). M == N, , :
    # flip 1D array of step sizes and concatenate it with remaining dimensions
    s = np.concatenate((np.ones(n-m, dtype=int), np.flip(d)))



5. MD ND M N


ND阵列的滚动MD窗口已扩展


MD ND , M > N? , ! ND , MD M > N.


MD ND . MD ND M N. roll show_results.


def get_results(a, b, d=None):  # the same as `show_results` function
    n = a.ndim  # array number of dimensions == N
    m = b.ndim  # rolling window dimensions == M
    if d is None:  # step sizes are equal to 1 by default
        d = np.ones(m, dtype=np.uint32)
    bool_array = roll(a, b, d) == b  # get (N+M)D boolean array
    # np.all over M dimensions of the rolling MD window for (N+M)D array
    bool_array = np.all(bool_array, axis=tuple(range(n, n + m)))
    counts = np.count_nonzero(bool_array)
    # flip 1D array of step sizes and concatenate it with remaining dimensions
    s = np.concatenate((np.ones(n-m, dtype=int), np.flip(d)))
    coords = np.transpose(np.nonzero(bool_array)) * s
    return (counts, coords)

def show_intersections(a, b, d=None):
    d_tmp = d
    n = a.ndim  # array number of dimensions == N
    m = b.ndim  # rolling window dimensions == M
    #
    if d_tmp is None:  # step sizes are equal to 1 by default
        d_tmp = np.ones(m, dtype=np.uint32)
    elif m > n and d_tmp.size == n:  # for m > n case
        # Concatenate d_tmp with remaining dimensions
        d_tmp = np.concatenate((np.ones(m-n, dtype=int), d_tmp))
    #
    counts = 0
    coords = None
    if m <= n:
        results = get_results(a, b, d_tmp)  # return previous example
        counts = results[0]
        coords = results[1]
    else:  # if m > n
        t = m - n  # excessive dimensions
        layers = np.prod(b.shape[:t])  # find number of layers
        # Reshape MD array into (N+1)D array.
        temp = b.reshape((layers,) + b.shape[t:])
        # Get results for every layer in the intersection
        for i in range(layers):
            results = get_results(a, temp[i], d_tmp[t:])
            counts += results[0]
            if coords is None:
                coords = results[1]
            else:
                coords = np.concatenate((coords, results[1]))
    print("Found {counts} elements with coordinates:\n{coords}".format(
        counts=counts, coords=coords))

get_results , show_results .


show_intersections . M <= N, show_intersections get_results, . M > N, b a.


t = m - n MD b ND a. b a: layers = np.prod(b.shape[:t]). ( , reshape) b MD (N+1)D :


    # Reshape MD array into (N+1)D array.
    temp = b.reshape((layers,) + b.shape[t:])

: (N+1)D ND, (N+1) layers:


    # Get results for every layer in the intersection
    for i in range(layers):
        results = get_results(a, temp[i], d_tmp[t:])

组合匹配的数量counts以及coords为每个图层找到的这些匹配的坐标


    # Get results for every layer in the intersection
    for i in range(layers):
        results = get_results(a, temp[i], d_tmp[t:])
        counts += results[0]
        if coords is None:
            coords = results[1]
        else:
            coords = np.concatenate((coords, results[1]))

所有示例都在CoLab记事本中


感谢您的关注!


All Articles