Skip to content

mosaic

Mosaic

Source code in phomo/mosaic.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class Mosaic:
    @classmethod
    def from_file_and_dir(
        cls,
        master_file: PathLike,
        tile_dir: PathLike,
        *args,
        master_crop_ratio: Optional[float] = None,
        master_size: Optional[Tuple[int, int]] = None,
        master_mode: Optional[str] = None,
        tile_crop_ratio: Optional[float] = None,
        tile_size: Optional[Tuple[int, int]] = None,
        tile_mode: Optional[str] = None,
        **kwargs,
    ) -> "Mosaic":
        """Construct a `Mosaic` from a master image file and a directory containing the file images.

        Args:
            master_file: The master image file.
            tile_dir: the directory containing the tile images.

        Returns:
            A `Mosaic` to construct the `master_file` using the tile images in the `tile_dir`.
        """
        master = Master.from_file(
            master_file,
            crop_ratio=master_crop_ratio,
            img_size=master_size,
            mode=master_mode,
        )
        pool = Pool.from_dir(
            tile_dir, tile_size=tile_size, crop_ratio=tile_crop_ratio, mode=tile_mode
        )
        return cls(master, pool, *args, **kwargs)

    def __init__(
        self,
        master: Master,
        pool: Pool,
        n_appearances: int = 1,
    ) -> None:
        """Construct a regular grid mosaic.

        Note:
            The Pool's tiles should all be the same size.

        Args:
            master: `Master` image to reconstruct.
            pool: Tile image pool with which to reconstruct the `Master` image.
            n_appearances: Number of times a tile can appear in the mosaic.

        Examples:
            Building a mosaic.

            >>> pool = Pool.from_dir("tiles")
            >>> master = Master.from_file("master.png")
            >>> mosaic = Mosaic(master, pool, n_appearances=1)
            >>> mosaic.build(mosaic.d_matrix())
        """
        self.master = master
        if len(set([array.size for array in pool.array])) != 1:
            raise ValueError("Pool tiles sizes are not identical.")
        self.pool = pool
        self.tile_shape = (self.pool.array[0].shape[0], self.pool.array[0].shape[1])
        self.n_appearances = n_appearances
        self.grid = Grid(self.master, (self.size[1], self.size[0]), self.tile_shape)

    @property
    def size(self) -> Tuple[int, int]:
        """The size of the mosaic image.

        It can be different from the master image size as an integer number of
        tiles should fit within it.

        Returns:
            The width and height of the mosaic image.
        """
        return (
            self.master.array.shape[1]
            - self.master.array.shape[1] % self.tile_shape[1],
            self.master.array.shape[0]
            - self.master.array.shape[0] % self.tile_shape[0],
        )

    @property
    def n_leftover(self) -> int:
        """The number of tiles which will be unused when building the mosaic."""
        return len(self.pool) * self.n_appearances - len(self.grid.slices)

    def _d_matrix_worker(
        self, array: np.ndarray, metric_func: MetricCallable, **kwargs
    ) -> np.ndarray:
        """Parallel worker. Computes one row of the distance matrix."""
        # if the tile grid was subdivided the master array can be smaller
        # than the tiles, need to resize to match the shapes
        if array.shape[:-1] != self.tile_shape:
            # this isn't exact because we are upscalling the master array
            # we should be shrinking all the tile arrays but that is slower
            array = resize_array(array, (self.tile_shape[1], self.tile_shape[0]))
        return metric_func(array, self.pool.array, **kwargs)

    def d_matrix(
        self,
        workers: int = 1,
        metric: Union[str, MetricCallable] = "norm",
        **kwargs,
    ) -> np.ndarray:
        """Compute the distance matrix between all the master's tiles and the
        pool tiles.

        Args:
            workers: The number of worker to use.
            metric: The distance metric used for the distance matrix. Either
                provide a string, for implemented metrics see ``phomo.metrics.METRICS``.
                Or a callable, which should take two ``np.ndarray``s and return a float.
            **kwargs: Passed to `metric`.

        Returns:
            Distance matrix, shape: (number of master arrays, number of tiles in the pool).
        """
        if isinstance(metric, str):
            if metric not in METRICS.keys():
                raise KeyError(
                    "'%s' not in available metrics: %s",
                    metric,
                    repr(list(METRICS.keys())),
                )
            LOGGER.info("Using metric '%s'", metric)
            metric_func = METRICS[metric]
        else:
            LOGGER.info("Using user provided distance metric function.")
            metric_func = metric

        # Compute the distance matrix.
        worker = partial(self._d_matrix_worker, metric_func=metric_func, **kwargs)
        if workers != 1:
            LOGGER.info("Computing distance matrix with %i workers.", workers)
            with MpPool(processes=workers) as pool:
                d_matrix = np.array(
                    list(
                        tqdm(
                            pool.imap(
                                worker,
                                self.grid.arrays,
                                chunksize=len(self.grid) // workers,
                            ),
                            total=len(self.grid.slices),
                            desc="Building distance matrix",
                        )
                    )
                )
        else:
            # get rid of pool overhead if serial computation is desired.
            LOGGER.info("Computing distance matrix in serial.")
            d_matrix = np.array(
                [
                    worker(array)
                    for array in tqdm(self.grid.arrays, desc="Building distance matrix")
                ]
            )
        LOGGER.debug("d_matrix shape: %s", d_matrix.shape)
        return d_matrix

    def d_matrix_cuda(self, metric: str = "norm") -> np.ndarray:
        """Compute the distance matrix using CUDA for GPU acceleration.

        Args:
            metric: The distance metric used for the distance matrix. Either "norm" or "greyscale".

        Returns:
            Distance matrix, shape: (number of master arrays, number of tiles in the pool).
        """

        try:
            from numba import cuda
        except ImportError:
            raise ImportError(
                "Numba is required for CUDA support, run \"pip install 'phomo[cuda]'\" to install it."
            )

        if metric not in ["norm", "greyscale"]:
            raise ValueError(
                f"Invalid metric '{metric}'. When using gpu `metric' must be 'norm' or 'greyscale'."
            )

        LOGGER.info("Computing distance matrix with CUDA.")

        # when the grid has been subdivided the master arrays will be smaller, so we grow them to match
        # the tile size
        grid_arrays = [
            array
            if array.shape == self.tile_shape
            else resize_array(array, self.tile_shape)
            for array in self.grid.arrays
        ]
        pool_arrays = self.pool.array
        if metric == "greyscale":
            grid_arrays = [array.sum(axis=-1, keepdims=True) for array in grid_arrays]
            pool_arrays = [array.sum(axis=-1, keepdims=True) for array in pool_arrays]

        # Transfer the master and pool arrays to the GPU.
        master_arrays_device = cuda.to_device(grid_arrays)
        pool_arrays_device = cuda.to_device(pool_arrays)

        # Allocate memory for the distance matrix on the GPU.
        d_matrix_device = cuda.device_array((len(grid_arrays), len(pool_arrays)))

        # Define the CUDA kernel for computing the distance matrix.
        @cuda.jit
        def compute_d_matrix_kernel(master_arrays, pool_arrays, d_matrix):
            i, j = cuda.grid(2)  # type: ignore
            if i < master_arrays.shape[0] and j < pool_arrays.shape[0]:
                distance = 0.0
                for x in range(master_arrays.shape[1]):
                    for y in range(master_arrays.shape[2]):
                        for c in range(master_arrays.shape[3]):
                            diff = master_arrays[i, x, y, c] - pool_arrays[j, x, y, c]
                            distance += diff * diff
                d_matrix[i, j] = math.sqrt(distance)

        # Define the number of threads per block and blocks per grid.
        threads_per_block = (16, 16)
        blocks_per_grid_x = math.ceil(len(grid_arrays) / threads_per_block[0])
        blocks_per_grid_y = math.ceil(len(pool_arrays) / threads_per_block[1])
        blocks_per_grid = (blocks_per_grid_x, blocks_per_grid_y)

        # Launch the kernel.
        compute_d_matrix_kernel[blocks_per_grid, threads_per_block](  # type: ignore
            master_arrays_device, pool_arrays_device, d_matrix_device
        )

        LOGGER.debug("d_matrix shape: %s", d_matrix_device.shape)
        # Copy the result back to the host.
        return d_matrix_device.copy_to_host()

    def build_greedy(self, d_matrix: np.ndarray) -> Image.Image:
        """Construct the mosaic image using a greedy tile assignement algorithm.

        This leads to less accurate mosaics, but is significantly faster than the
        optimal assignement algorithm, especialy when the distance matrix is large.

        Args:
            d_matrix: The computed distance matrix.

        Returns:
            The `PIL.Image` instance of the mosaic.
        """
        mosaic = np.zeros((self.size[1], self.size[0], 3))

        # Keep track of tiles and sub arrays.
        placed_master_arrays = set()
        placed_tiles = set()
        n_appearances = [0] * len(self.pool)

        pbar = tqdm(total=d_matrix.shape[0], desc="Building mosaic")
        # from: https://stackoverflow.com/questions/29046162/numpy-array-loss-of-dimension-when-masking
        sorted_master_slices_i, sorted_tiles = np.unravel_index(
            np.argsort(d_matrix, axis=None), d_matrix.shape
        )
        for slices_i, tile in zip(sorted_master_slices_i, sorted_tiles):
            if slices_i in placed_master_arrays or tile in placed_tiles:
                continue
            slices = self.grid.slices[slices_i]
            tile_array = self.pool.array[tile]
            # if the grid has been subdivided then the tile should be shrunk to
            # the size of the subdivision
            array_size = (
                slices[1].stop - slices[1].start,
                slices[0].stop - slices[0].start,
            )
            if tile_array.shape[:-1] != array_size[::-1]:
                tile_array = resize_array(tile_array, array_size)

            # shift slices back so that the centering of the mosaic within the
            # master image is removed
            slices = self.grid.remove_origin(slices)
            mosaic[slices[0], slices[1]] = tile_array
            placed_master_arrays.add(slices_i)
            n_appearances[tile] += 1
            if n_appearances[tile] == self.n_appearances:
                placed_tiles.add(tile)
            pbar.update(1)
        pbar.close()
        return Image.fromarray(np.uint8(mosaic))

    def build(self, d_matrix: np.ndarray) -> Image.Image:
        """Construct the mosaic image by solving the linear sum assignment problem.
        See: https://en.wikipedia.org/wiki/Assignment_problem

        Args:
            d_matrix: The computed distance matrix.

        Returns:
            The `PIL.Image` instance of the mosaic.

        Examples:
            Building a mosaic.

            >>> mosaic.build(mosaic.d_matrix())

            On a GPU.

            >>> mosaic.build(mosaic.d_matrix_cuda())
        """
        mosaic = np.zeros((self.size[1], self.size[0], 3))

        # expand the dmatrix to allow for repeated tiles
        if self.n_appearances > 0:
            d_matrix = np.tile(d_matrix, self.n_appearances)

        LOGGER.info("Computing optimal tile assignment.")
        row_ind, col_ind = linear_sum_assignment(d_matrix)
        pbar = tqdm(total=d_matrix.shape[0], desc="Building mosaic")
        for row, col in zip(row_ind, col_ind):
            slices = self.grid.slices[row]
            tile_array = self.pool.array[col % len(self.pool.array)]
            # if the grid has been subdivided then the tile should be shrunk to
            # the size of the subdivision
            array_size = (
                slices[1].stop - slices[1].start,
                slices[0].stop - slices[0].start,
            )
            if tile_array.shape[:-1] != array_size[::-1]:
                tile_array = resize_array(tile_array, array_size)

            # shift slices back so that the centering of the mosaic within the
            # master image is removed
            slices = self.grid.remove_origin(slices)
            mosaic[slices[0], slices[1]] = tile_array
            pbar.update(1)
        pbar.close()

        return Image.fromarray(np.uint8(mosaic))

    def __repr__(self) -> str:
        # indent these guys
        master = repr(self.master).replace("\n", "\n    ")
        pool = repr(self.pool).replace("\n", "\n    ")
        grid = repr(self.grid).replace("\n", "\n    ")
        return f"""{self.__class__.__module__}.{self.__class__.__name__} at {hex(id(self))}:
    n_appearances: {self.n_appearances}
    mosaic size: {self.size}
    tile shape: {self.tile_shape}
    leftover tiles: {self.n_leftover}
    {grid}
    {master}
    {pool}"""

n_leftover: int property

The number of tiles which will be unused when building the mosaic.

size: Tuple[int, int] property

The size of the mosaic image.

It can be different from the master image size as an integer number of tiles should fit within it.

Returns:

Type Description
Tuple[int, int]

The width and height of the mosaic image.

__init__(master, pool, n_appearances=1)

Construct a regular grid mosaic.

Note

The Pool's tiles should all be the same size.

Parameters:

Name Type Description Default
master Master

Master image to reconstruct.

required
pool Pool

Tile image pool with which to reconstruct the Master image.

required
n_appearances int

Number of times a tile can appear in the mosaic.

1

Examples:

Building a mosaic.

>>> pool = Pool.from_dir("tiles")
>>> master = Master.from_file("master.png")
>>> mosaic = Mosaic(master, pool, n_appearances=1)
>>> mosaic.build(mosaic.d_matrix())
Source code in phomo/mosaic.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    master: Master,
    pool: Pool,
    n_appearances: int = 1,
) -> None:
    """Construct a regular grid mosaic.

    Note:
        The Pool's tiles should all be the same size.

    Args:
        master: `Master` image to reconstruct.
        pool: Tile image pool with which to reconstruct the `Master` image.
        n_appearances: Number of times a tile can appear in the mosaic.

    Examples:
        Building a mosaic.

        >>> pool = Pool.from_dir("tiles")
        >>> master = Master.from_file("master.png")
        >>> mosaic = Mosaic(master, pool, n_appearances=1)
        >>> mosaic.build(mosaic.d_matrix())
    """
    self.master = master
    if len(set([array.size for array in pool.array])) != 1:
        raise ValueError("Pool tiles sizes are not identical.")
    self.pool = pool
    self.tile_shape = (self.pool.array[0].shape[0], self.pool.array[0].shape[1])
    self.n_appearances = n_appearances
    self.grid = Grid(self.master, (self.size[1], self.size[0]), self.tile_shape)

build(d_matrix)

Construct the mosaic image by solving the linear sum assignment problem. See: https://en.wikipedia.org/wiki/Assignment_problem

Parameters:

Name Type Description Default
d_matrix ndarray

The computed distance matrix.

required

Returns:

Type Description
Image

The PIL.Image instance of the mosaic.

Examples:

Building a mosaic.

>>> mosaic.build(mosaic.d_matrix())

On a GPU.

>>> mosaic.build(mosaic.d_matrix_cuda())
Source code in phomo/mosaic.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def build(self, d_matrix: np.ndarray) -> Image.Image:
    """Construct the mosaic image by solving the linear sum assignment problem.
    See: https://en.wikipedia.org/wiki/Assignment_problem

    Args:
        d_matrix: The computed distance matrix.

    Returns:
        The `PIL.Image` instance of the mosaic.

    Examples:
        Building a mosaic.

        >>> mosaic.build(mosaic.d_matrix())

        On a GPU.

        >>> mosaic.build(mosaic.d_matrix_cuda())
    """
    mosaic = np.zeros((self.size[1], self.size[0], 3))

    # expand the dmatrix to allow for repeated tiles
    if self.n_appearances > 0:
        d_matrix = np.tile(d_matrix, self.n_appearances)

    LOGGER.info("Computing optimal tile assignment.")
    row_ind, col_ind = linear_sum_assignment(d_matrix)
    pbar = tqdm(total=d_matrix.shape[0], desc="Building mosaic")
    for row, col in zip(row_ind, col_ind):
        slices = self.grid.slices[row]
        tile_array = self.pool.array[col % len(self.pool.array)]
        # if the grid has been subdivided then the tile should be shrunk to
        # the size of the subdivision
        array_size = (
            slices[1].stop - slices[1].start,
            slices[0].stop - slices[0].start,
        )
        if tile_array.shape[:-1] != array_size[::-1]:
            tile_array = resize_array(tile_array, array_size)

        # shift slices back so that the centering of the mosaic within the
        # master image is removed
        slices = self.grid.remove_origin(slices)
        mosaic[slices[0], slices[1]] = tile_array
        pbar.update(1)
    pbar.close()

    return Image.fromarray(np.uint8(mosaic))

build_greedy(d_matrix)

Construct the mosaic image using a greedy tile assignement algorithm.

This leads to less accurate mosaics, but is significantly faster than the optimal assignement algorithm, especialy when the distance matrix is large.

Parameters:

Name Type Description Default
d_matrix ndarray

The computed distance matrix.

required

Returns:

Type Description
Image

The PIL.Image instance of the mosaic.

Source code in phomo/mosaic.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def build_greedy(self, d_matrix: np.ndarray) -> Image.Image:
    """Construct the mosaic image using a greedy tile assignement algorithm.

    This leads to less accurate mosaics, but is significantly faster than the
    optimal assignement algorithm, especialy when the distance matrix is large.

    Args:
        d_matrix: The computed distance matrix.

    Returns:
        The `PIL.Image` instance of the mosaic.
    """
    mosaic = np.zeros((self.size[1], self.size[0], 3))

    # Keep track of tiles and sub arrays.
    placed_master_arrays = set()
    placed_tiles = set()
    n_appearances = [0] * len(self.pool)

    pbar = tqdm(total=d_matrix.shape[0], desc="Building mosaic")
    # from: https://stackoverflow.com/questions/29046162/numpy-array-loss-of-dimension-when-masking
    sorted_master_slices_i, sorted_tiles = np.unravel_index(
        np.argsort(d_matrix, axis=None), d_matrix.shape
    )
    for slices_i, tile in zip(sorted_master_slices_i, sorted_tiles):
        if slices_i in placed_master_arrays or tile in placed_tiles:
            continue
        slices = self.grid.slices[slices_i]
        tile_array = self.pool.array[tile]
        # if the grid has been subdivided then the tile should be shrunk to
        # the size of the subdivision
        array_size = (
            slices[1].stop - slices[1].start,
            slices[0].stop - slices[0].start,
        )
        if tile_array.shape[:-1] != array_size[::-1]:
            tile_array = resize_array(tile_array, array_size)

        # shift slices back so that the centering of the mosaic within the
        # master image is removed
        slices = self.grid.remove_origin(slices)
        mosaic[slices[0], slices[1]] = tile_array
        placed_master_arrays.add(slices_i)
        n_appearances[tile] += 1
        if n_appearances[tile] == self.n_appearances:
            placed_tiles.add(tile)
        pbar.update(1)
    pbar.close()
    return Image.fromarray(np.uint8(mosaic))

d_matrix(workers=1, metric='norm', **kwargs)

Compute the distance matrix between all the master's tiles and the pool tiles.

Parameters:

Name Type Description Default
workers int

The number of worker to use.

1
metric Union[str, MetricCallable]

The distance metric used for the distance matrix. Either provide a string, for implemented metrics see phomo.metrics.METRICS. Or a callable, which should take two np.ndarrays and return a float.

'norm'
**kwargs

Passed to metric.

{}

Returns:

Type Description
ndarray

Distance matrix, shape: (number of master arrays, number of tiles in the pool).

Source code in phomo/mosaic.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def d_matrix(
    self,
    workers: int = 1,
    metric: Union[str, MetricCallable] = "norm",
    **kwargs,
) -> np.ndarray:
    """Compute the distance matrix between all the master's tiles and the
    pool tiles.

    Args:
        workers: The number of worker to use.
        metric: The distance metric used for the distance matrix. Either
            provide a string, for implemented metrics see ``phomo.metrics.METRICS``.
            Or a callable, which should take two ``np.ndarray``s and return a float.
        **kwargs: Passed to `metric`.

    Returns:
        Distance matrix, shape: (number of master arrays, number of tiles in the pool).
    """
    if isinstance(metric, str):
        if metric not in METRICS.keys():
            raise KeyError(
                "'%s' not in available metrics: %s",
                metric,
                repr(list(METRICS.keys())),
            )
        LOGGER.info("Using metric '%s'", metric)
        metric_func = METRICS[metric]
    else:
        LOGGER.info("Using user provided distance metric function.")
        metric_func = metric

    # Compute the distance matrix.
    worker = partial(self._d_matrix_worker, metric_func=metric_func, **kwargs)
    if workers != 1:
        LOGGER.info("Computing distance matrix with %i workers.", workers)
        with MpPool(processes=workers) as pool:
            d_matrix = np.array(
                list(
                    tqdm(
                        pool.imap(
                            worker,
                            self.grid.arrays,
                            chunksize=len(self.grid) // workers,
                        ),
                        total=len(self.grid.slices),
                        desc="Building distance matrix",
                    )
                )
            )
    else:
        # get rid of pool overhead if serial computation is desired.
        LOGGER.info("Computing distance matrix in serial.")
        d_matrix = np.array(
            [
                worker(array)
                for array in tqdm(self.grid.arrays, desc="Building distance matrix")
            ]
        )
    LOGGER.debug("d_matrix shape: %s", d_matrix.shape)
    return d_matrix

d_matrix_cuda(metric='norm')

Compute the distance matrix using CUDA for GPU acceleration.

Parameters:

Name Type Description Default
metric str

The distance metric used for the distance matrix. Either "norm" or "greyscale".

'norm'

Returns:

Type Description
ndarray

Distance matrix, shape: (number of master arrays, number of tiles in the pool).

Source code in phomo/mosaic.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def d_matrix_cuda(self, metric: str = "norm") -> np.ndarray:
    """Compute the distance matrix using CUDA for GPU acceleration.

    Args:
        metric: The distance metric used for the distance matrix. Either "norm" or "greyscale".

    Returns:
        Distance matrix, shape: (number of master arrays, number of tiles in the pool).
    """

    try:
        from numba import cuda
    except ImportError:
        raise ImportError(
            "Numba is required for CUDA support, run \"pip install 'phomo[cuda]'\" to install it."
        )

    if metric not in ["norm", "greyscale"]:
        raise ValueError(
            f"Invalid metric '{metric}'. When using gpu `metric' must be 'norm' or 'greyscale'."
        )

    LOGGER.info("Computing distance matrix with CUDA.")

    # when the grid has been subdivided the master arrays will be smaller, so we grow them to match
    # the tile size
    grid_arrays = [
        array
        if array.shape == self.tile_shape
        else resize_array(array, self.tile_shape)
        for array in self.grid.arrays
    ]
    pool_arrays = self.pool.array
    if metric == "greyscale":
        grid_arrays = [array.sum(axis=-1, keepdims=True) for array in grid_arrays]
        pool_arrays = [array.sum(axis=-1, keepdims=True) for array in pool_arrays]

    # Transfer the master and pool arrays to the GPU.
    master_arrays_device = cuda.to_device(grid_arrays)
    pool_arrays_device = cuda.to_device(pool_arrays)

    # Allocate memory for the distance matrix on the GPU.
    d_matrix_device = cuda.device_array((len(grid_arrays), len(pool_arrays)))

    # Define the CUDA kernel for computing the distance matrix.
    @cuda.jit
    def compute_d_matrix_kernel(master_arrays, pool_arrays, d_matrix):
        i, j = cuda.grid(2)  # type: ignore
        if i < master_arrays.shape[0] and j < pool_arrays.shape[0]:
            distance = 0.0
            for x in range(master_arrays.shape[1]):
                for y in range(master_arrays.shape[2]):
                    for c in range(master_arrays.shape[3]):
                        diff = master_arrays[i, x, y, c] - pool_arrays[j, x, y, c]
                        distance += diff * diff
            d_matrix[i, j] = math.sqrt(distance)

    # Define the number of threads per block and blocks per grid.
    threads_per_block = (16, 16)
    blocks_per_grid_x = math.ceil(len(grid_arrays) / threads_per_block[0])
    blocks_per_grid_y = math.ceil(len(pool_arrays) / threads_per_block[1])
    blocks_per_grid = (blocks_per_grid_x, blocks_per_grid_y)

    # Launch the kernel.
    compute_d_matrix_kernel[blocks_per_grid, threads_per_block](  # type: ignore
        master_arrays_device, pool_arrays_device, d_matrix_device
    )

    LOGGER.debug("d_matrix shape: %s", d_matrix_device.shape)
    # Copy the result back to the host.
    return d_matrix_device.copy_to_host()

from_file_and_dir(master_file, tile_dir, *args, master_crop_ratio=None, master_size=None, master_mode=None, tile_crop_ratio=None, tile_size=None, tile_mode=None, **kwargs) classmethod

Construct a Mosaic from a master image file and a directory containing the file images.

Parameters:

Name Type Description Default
master_file PathLike

The master image file.

required
tile_dir PathLike

the directory containing the tile images.

required

Returns:

Type Description
Mosaic

A Mosaic to construct the master_file using the tile images in the tile_dir.

Source code in phomo/mosaic.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def from_file_and_dir(
    cls,
    master_file: PathLike,
    tile_dir: PathLike,
    *args,
    master_crop_ratio: Optional[float] = None,
    master_size: Optional[Tuple[int, int]] = None,
    master_mode: Optional[str] = None,
    tile_crop_ratio: Optional[float] = None,
    tile_size: Optional[Tuple[int, int]] = None,
    tile_mode: Optional[str] = None,
    **kwargs,
) -> "Mosaic":
    """Construct a `Mosaic` from a master image file and a directory containing the file images.

    Args:
        master_file: The master image file.
        tile_dir: the directory containing the tile images.

    Returns:
        A `Mosaic` to construct the `master_file` using the tile images in the `tile_dir`.
    """
    master = Master.from_file(
        master_file,
        crop_ratio=master_crop_ratio,
        img_size=master_size,
        mode=master_mode,
    )
    pool = Pool.from_dir(
        tile_dir, tile_size=tile_size, crop_ratio=tile_crop_ratio, mode=tile_mode
    )
    return cls(master, pool, *args, **kwargs)