# -*- coding: utf-8 -*-
"""
Coders and decoders for the various compression types in PSD.
"""
from __future__ import unicode_literals, absolute_import
import zlib
import numpy as np
from typing import Any, BinaryIO, Callable, Dict, Tuple # NOQA
from . import enums
from . import util
try:
from . import packbits # type: ignore
except ImportError:
pass
_decompress_params = """
Parameters
----------
data : bytes
The raw bytes from the file.
shape : 2-tuple of int
The shape of the resulting array, ``(height, width)``.
depth : enums.ColorDepth
The bit depth of the image. See `enums.ColorDepth`.
version : enums.Version
The version of the PSD file. See `enums.Version`.
Returns
-------
image : numpy array
The image data as a Numpy array. If *depth* is 1, the array
is expanded to ``uint8`` with a byte per pixel.
"""
_compress_params = """
Parameters
----------
fd : file-like object
Writable file-like object, open in binary mode.
image : 2-D numpy array
The image to compress. Must be unsigned integer with 8, 16 or
32 bits. If *depth* is 1, the array should have dtype
``uint8`` with a byte per pixel.
depth : enums.ColorDepth
The bit depth of the image. See `enums.ColorDepth`.
version : enums.Version
The version of the PSD file. See `enums.Version`.
"""
_compress_constant_params = """
Parameters
----------
fd : file-like object
Writable file-like object, open in binary mode.
value : int
The constant value in the generated virtual image.
width : int
The width of the image, in pixels.
rows : int
The number of rows in the image, in pixels. This is
``height * num_channels``.
depth : enums.ColorDepth
The bit depth of the image. See `enums.ColorDepth`.
version : enums.Version
The version of the PSD file. See `enums.Version`.
"""
color_depth_dtype_map = {
1: 'u1',
8: 'u1',
16: '>u2',
32: '>u4'
} # type: Dict[int, unicode]
color_depth_size_map = {
1: 1,
8: 1,
16: 2,
32: 4
} # type: Dict[int, int]
[docs]def decompress_raw(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Converts raw data to a Numpy array.
{}
"""
depth = enums.ColorDepth(depth)
dtype = color_depth_dtype_map[depth]
itemsize = color_depth_size_map[depth]
# Truncate the data to a multiple of the dtype size
data = data[:(len(data) // itemsize) * itemsize]
arr = np.frombuffer(data, dtype)
if depth == 1:
# Unpack 1-bit image data
arr = np.unpackbits(arr)
# Make 2-dimensional
image = arr.reshape(shape)
return image
decompress_raw.__doc__ = decompress_raw.__doc__.format( # type: ignore
_decompress_params)
[docs]def decompress_rle(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress run length encoded data.
{}
"""
output = packbits.decode(
data, shape[0], shape[1], color_depth_size_map[depth], version)
# Now pass along to the raw decoder to get a Numpy array
return decompress_raw(output, shape, depth, version)
decompress_rle.__doc__ = decompress_rle.__doc__.format( # type: ignore
_decompress_params)
[docs]def decompress_zip(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress zip (zlib) encoded data.
{}
"""
data = zlib.decompress(data)
return decompress_raw(data, shape, depth, version)
decompress_zip.__doc__ = decompress_zip.__doc__.format( # type: ignore
_decompress_params)
[docs]def decompress_zip_prediction(data, # type: bytes
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress zip (zlib) with prediction encoded data.
Not supported for 1- or 32-bit images.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32:
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
decoder = packbits.decode_prediction_8bit
else:
decoder = packbits.decode_prediction_16bit
data = zlib.decompress(data)
image = util.ensure_native_endian(
decompress_raw(data, shape, depth, version))
for i in range(len(image)):
decoder(image[i].flatten())
return image
decompress_zip_prediction.__doc__ = \
decompress_zip_prediction.__doc__.format( # type: ignore
_decompress_params)
decompressors = {
enums.Compression.raw: decompress_raw,
enums.Compression.rle: decompress_rle,
enums.Compression.zip: decompress_zip,
enums.Compression.zip_prediction: decompress_zip_prediction
} # type: Dict[int, Callable]
[docs]def decompress_image(data, # type: bytes
compression, # type: int
shape, # type: Tuple[int, int]
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Decompress data with the given compression.
Parameters
----------
data : bytes
The raw bytes from the file.
compression : enums.Compression
The compression format to use. See `enums.Compression`.
shape : 2-tuple of int
The shape of the resulting array, ``(height, width)``.
depth : enums.ColorDepth
The bit depth of the image. See `enums.ColorDepth`.
version : enums.Version
The version of the PSD file. See `enums.Version`.
Returns
-------
image : numpy array
The image data as a Numpy array.
"""
compression = enums.Compression(compression)
depth = enums.ColorDepth(depth)
version = enums.Version(version)
return decompressors[compression](data, shape, depth, version)
[docs]def normalize_image(image, # type: np.ndarray
depth # type: int
): # type: (...) -> np.ndarray
if depth == 1:
image = np.packbits(image.flatten())
return image
[docs]def compress_raw(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to raw bytes in a file.
{}
"""
image = normalize_image(image, depth)
if len(image.shape) == 2:
if util.needs_byteswap(image):
for row in image:
row = util.do_byteswap(row)
fd.write(row)
else:
fd.write(image)
else:
fd.write(image)
compress_raw.__doc__ = compress_raw.__doc__.format( # type: ignore
_compress_params)
[docs]def compress_rle(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a run length encoded stream.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"rle compression is not supported for 1-bit images")
start = fd.tell()
if version == 1:
fd.seek(image.shape[0] * 2, 1)
lengths = np.empty((len(image),), dtype='>u2')
else:
fd.seek(image.shape[0] * 4, 1)
lengths = np.empty((len(image),), dtype='>u4')
if util.needs_byteswap(image):
for i, row in enumerate(image):
row = util.do_byteswap(row)
packed = packbits.encode(row)
lengths[i] = len(packed)
fd.write(packed)
else:
for i, row in enumerate(image):
packed = packbits.encode(row)
lengths[i] = len(packed)
fd.write(packed)
end = fd.tell()
fd.seek(start)
fd.write(lengths.tobytes())
fd.seek(end)
compress_rle.__doc__ = compress_rle.__doc__.format( # type: ignore
_compress_params)
[docs]def compress_zip(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a zip (zlib) compressed stream.
{}
"""
image = normalize_image(image, depth)
if util.needs_byteswap(image):
compressor = zlib.compressobj()
for row in image:
row = util.do_byteswap(row)
fd.write(compressor.compress(row))
fd.write(compressor.flush())
else:
fd.write(zlib.compress(image))
compress_zip.__doc__ = compress_zip.__doc__.format( # type: ignore
_compress_params)
[docs]def compress_zip_prediction(fd, # type: BinaryIO
image, # type: np.ndarray
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a Numpy array to a zip (zlib) with prediction compressed
stream.
Not supported for 1- or 32-bit images.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32: # pragma: no cover
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
encoder = packbits.encode_prediction_8bit
elif depth == 16:
encoder = packbits.encode_prediction_16bit
compressor = zlib.compressobj()
for row in image:
encoder(row.flatten())
row = util.ensure_bigendian(row)
fd.write(compressor.compress(row))
fd.write(compressor.flush())
compress_zip_prediction.__doc__ = \
compress_zip_prediction.__doc__.format( # type: ignore
_compress_params)
compressors = {
enums.Compression.raw: compress_raw,
enums.Compression.rle: compress_rle,
enums.Compression.zip: compress_zip,
enums.Compression.zip_prediction: compress_zip_prediction
} # type: Dict[int, Callable]
[docs]def compress_image(fd, # type: BinaryIO
image, # type: np.ndarray
compression, # type: int
shape, # type: Tuple[int, int]
num_channels, # type: int
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write an image with the given compression type.
Parameters
----------
fd : file-like object
Writable file-like object, open in binary mode.
image : 2-D numpy array or scalar
The image to compress. Must be unsigned integer with 8, 16 or
32 bits. If *depth* is 1, the array should have dtype
``uint8`` with a byte per pixel. If a scalar, a "virtual"
image will be used as if the image contained only that
constant value.
compression : enums.Compression
The compression format to use. See `enums.Compression`.
shape : 2-tuple of int
The shape of the image ``(height, width)``. If *image* is an
array, the *shape* is used to confirm it has the correct
shape. If *image* is a constant, *shape* is used to generate
the virtual constant image.
num_channels : int
The number of color channels in the image. If *image* is an
array, the *num_channels* is used to confirm it has the
correct number of channels. If *image* is a constant,
*num_channels* is used to generate the virtual constant image.
depth : enums.ColorDepth
The bit depth of the image. See `enums.ColorDepth`. If
*image* is an array, the *depth* is used to confirm it has the
correct number of channels. If *image* is a constant, *depth*
is used to generate the virtual constant image.
version : enums.Version
The version of the PSD file. See `enums.Version`.
"""
if isinstance(image, int):
image = np.dtype(color_depth_dtype_map[depth]).type(image)
dtype = image.dtype
if dtype.kind != 'u':
raise ValueError("Image array dtype must be unsigned int")
if dtype.itemsize != color_depth_size_map[depth]:
raise ValueError("Image array values of wrong size")
if np.isscalar(image) or image.shape == ():
width = shape[1]
rows = shape[0] * num_channels
return constant_compressors[compression](
fd, image, width, rows, depth, version)
else:
acceptable_shapes = [
(num_channels, shape[0], shape[1]),
(num_channels * shape[0], shape[1])
]
if image.shape not in acceptable_shapes:
raise ValueError("Image is the wrong shape")
image = np.asarray(image)
image = util.ensure_native_endian(image)
image = image.reshape((shape[0] * num_channels, shape[1]))
return compressors[compression](fd, image, depth, version)
def _make_onebit_constant(value, # type: int
width, # type: int
rows # type: int
): # type: (...) -> np.ndarray
if value:
value == 255
else:
value = 0
return np.full((width, rows), value, np.uint8)
def _make_constant_row(value, # type: int
width, # type: int
depth # type: int
): # type: (...) -> np.ndarray
return np.full((width,), value, dtype=color_depth_dtype_map[depth])
[docs]def compress_constant_raw(fd, # type: BinaryIO
value, # type: int
width, # type: int
rows, # type: int
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a virtual image containing a constant to a raw
stream.
{}
"""
if depth == 1:
image = _make_onebit_constant(value, width, rows)
compress_raw(fd, image, depth, version)
else:
row = _make_constant_row(value, width, depth)
row = row.tobytes()
for i in range(rows):
fd.write(row)
compress_constant_raw.__doc__ = \
compress_constant_raw.__doc__.format( # type: ignore
_compress_constant_params)
[docs]def compress_constant_rle(fd, # type: BinaryIO
value, # type: int
width, # type: int
rows, # type: int
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a virtual image containing a constant to a runlength-encoded
stream.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"rle compression is not supported for 1-bit images")
row = _make_constant_row(value, width, depth)
packed = packbits.encode(row.tobytes())
if version == 1:
lengths = np.full((rows,), len(packed), dtype='>u2')
else:
lengths = np.full((rows,), len(packed), dtype='>u4')
fd.write(lengths.tobytes())
for i in range(rows):
fd.write(packed)
compress_constant_rle.__doc__ = \
compress_constant_rle.__doc__.format( # type: ignore
_compress_constant_params)
[docs]def compress_constant_zip(fd, # type: BinaryIO
value, # type: int
width, # type: int
rows, # type: int
depth, # type: int
version # type: int
): # type: (...) -> None
"""
Write a virtual image containing a constant to a zip compressed
stream.
{}
"""
if depth == 1:
image = _make_onebit_constant(value, width, rows)
compress_zip(fd, image, depth, version)
else:
row = _make_constant_row(value, width, depth)
row = row.tobytes()
fd.write(zlib.compress(row * rows))
compress_constant_zip.__doc__ = \
compress_constant_zip.__doc__.format( # type: ignore
_compress_constant_params)
[docs]def compress_constant_zip_prediction(fd, # type: BinaryIO
value, # type: int
width, # type: int
rows, # type: int
depth, # type: int
version # type: int
): # type: (...) -> np.ndarray
"""
Write a virtual image containing a constant to a zip with
prediction compressed stream.
{}
"""
if depth == 1: # pragma: no cover
raise ValueError(
"zip with prediction is not supported for 1-bit images")
elif depth == 32: # pragma: no cover
raise ValueError(
"zip with prediction is not implemented for 32-bit images")
elif depth == 8:
encoder = packbits.encode_prediction_8bit
elif depth == 16:
encoder = packbits.encode_prediction_16bit
row = _make_constant_row(value, width, depth)
row = row.reshape((1, width))
row = util.ensure_native_endian(row)
encoder(row.flatten())
row = util.ensure_bigendian(row)
row = row.tobytes()
fd.write(zlib.compress(row * rows))
compress_constant_zip_prediction.__doc__ = \
compress_constant_zip_prediction.__doc__.format( # type: ignore
_compress_constant_params)
constant_compressors = {
enums.Compression.raw: compress_constant_raw,
enums.Compression.rle: compress_constant_rle,
enums.Compression.zip: compress_constant_zip,
enums.Compression.zip_prediction: compress_constant_zip_prediction
} # type: Dict[int, Callable]