Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[py-tx] embeded tx hash for unletterboxing #1684

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 69 additions & 27 deletions python-threatexchange/threatexchange/cli/hash_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def init_argparse(cls, settings: CLISettings, ap: argparse.ArgumentParser) -> No
signal_choices = sorted(
s.get_name() for s in signal_types if issubclass(s, FileHasher)
)

ap.add_argument(
"content_type",
**common.argparse_choices_pre_type_kwargs(
Expand Down Expand Up @@ -80,25 +81,50 @@ def init_argparse(cls, settings: CLISettings, ap: argparse.ArgumentParser) -> No
)

ap.add_argument(
"--rotations",
"--R",
"--photo-preprocess",
choices=["unletterbox", "rotations"],
help=(
"Apply one of the preprocessing steps to the image before hashing. "
"'unletterbox' removes black borders, and 'rotations' generates all 8 "
"simple rotations."
),
)

ap.add_argument(
"--black-threshold",
type=int,
default=10,
help=(
"Set the black threshold for unletterboxing (default: 5)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: documentation for default seems off. There also might be a default argparse option that will display the default for you.

blocking q: Can you tell me how you chose 10 for this?

Copy link
Contributor Author

@Mackay-Fisher Mackay-Fisher Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the implementation against the bordercropped case and found that a black_threshold of 0 consistently failed to identify all black borders. Setting the threshold to 15 gave the best results, as shown in the tests I ran (https://github.com/Mackay-Fisher/Image-Testing).

The reason a threshold of 0 doesn’t work is due to the effects of image compression and processing. In compressed image formats like JPEG, compression introduces variations in pixel values so a pixel intended to represent pure black (0, 0, 0) might be encoded as (1, 1, 1) - (15, 15, 15) (That is the largest pixel value to represent true black that I found. Although it seems like they can be collectively different). For example, we could get a pixel (8,9,15) which would return false given the threshold look at each individual boundary. Grayscaling typically reduces this but also opens up the possibility of misclassifying colors. These small deviations occur because compression algorithms prioritize reducing file size over preserving exact pixel values.

Additionally, image noise and color profile conversions can further alter pixel values slightly. This is most likely caused during the resizing operations when creating the letterboxed image. I found that by not greyscaling like the border cropped library I was better able to reduce the black threshold needed to clean the image.

By setting the threshold to 15, I accounted for the variations and it works best as the minimum threshold that consistently cropped the borders correctly without hurting the original image.

"Only applies when 'unletterbox' is selected in --preprocess."
),
)

ap.add_argument(
"--save-output",
action="store_true",
help="for photos, generate all 8 simple rotations",
help="If true, saves the processed image as a new file.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: To help a user understand what this option does, suggest naming it --save-preprocess

Since store_true doesn't take an argument, suggest this as an alternative help:

save the preprocessed image data as new files

)

def __init__(
self,
content_type: t.Type[ContentType],
signal_type: t.Optional[t.Type[SignalType]],
files: t.List[pathlib.Path],
rotations: bool = False,
photo_preprocess: t.Optional[str] = None,
black_threshold: int = 0,
save_output: bool = False,
) -> None:
self.content_type = content_type
self.signal_type = signal_type

self.photo_preprocess = photo_preprocess
self.black_threshold = black_threshold
self.save_output = save_output
self.files = files

self.rotations = rotations
if self.photo_preprocess and not issubclass(self.content_type, PhotoContent):
raise CommandError(
"--photo-preprocess flag is only available for Photo content type", 2
)

def execute(self, settings: CLISettings) -> None:
hashers = [
Expand All @@ -115,28 +141,44 @@ def execute(self, settings: CLISettings) -> None:

hashers = [self.signal_type] # type: ignore # can't detect intersection types

if not self.rotations:
if self.photo_preprocess:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can save yourself a lot of indenting by inverting this check and then early returns

if not self.photo_process:
  # Do normal thing
  return
# We are preprocessing

fine as followup: Additionally, this also becomes much easier if we do this instead:

def pre_processed_files() -> Iterator[Path]:
  if not self.preprocess:
    yield from self.files
  for path in self.files:
     # Preprocess stuff
     with NamedTemporaryFile() pre_f:
       yield pre_f

Which then very much simplifies your execute to just

for file in self.pre_processed_files():
  for hasher in hashers:
    if hash_str := hasher.hash_from_file(file):
      print(hasher.get_name(), hash_str)

for file in self.files:
for hasher in hashers:
hash_str = hasher.hash_from_file(file)
if hash_str:
print(hasher.get_name(), hash_str)
return

if not issubclass(self.content_type, PhotoContent):
raise CommandError(
"--rotations flag is only available for Photo content type", 2
)

for file in self.files:
with open(file, "rb") as f:
image_bytes = f.read()
rotated_images = PhotoContent.all_simple_rotations(image_bytes)
for rotation_type, rotated_bytes in rotated_images.items():
with tempfile.NamedTemporaryFile() as temp_file: # Create a temporary file to hold the byte data
temp_file.write(rotated_bytes)
updated_bytes: t.List[bytes] = []
rotation_type = []
if self.photo_preprocess == "unletterbox":
updated_bytes.append(
PhotoContent.unletterbox(str(file), self.black_threshold)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine as a followup: whoops, we should make unletterbox take a Path object for consistency with the rest of the library

)
elif self.photo_preprocess == "rotations":
with open(file, "rb") as f:
image_bytes = f.read()
rotations = PhotoContent.all_simple_rotations(image_bytes)
rotation_type, updated_bytes = list(rotations.keys()), list(
rotations.values()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: multiple assignment like this is generally an antipattern, one assignment per line.

Additionally, this is a very complicated way to enumerate this! It seems like you are looking for

  for rotation_type, rotation_bytes in updated_bytes.items():

for idx, bytes_data in enumerate(updated_bytes):
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(bytes_data)
temp_file_path = pathlib.Path(temp_file.name)
for hasher in hashers:
hash_str = hasher.hash_from_file(temp_file_path)
if hash_str:
print(rotation_type.name, hasher.get_name(), hash_str)
print(
f"{rotation_type[idx].name if rotation_type else ''} {hasher.get_name()} {hash_str}"
)
if self.save_output:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignorable: We can simplify this logic by using the delete= keyword of NamedTemporaryfile: https://docs.python.org/3/library/tempfile.html#tempfile.NamedTemporaryFile

delete=not self.save_output

suffix = (
f"_{rotation_type[idx].name}"
if rotation_type
else "_unletterboxed"
)
output_path = file.with_stem(f"{file.stem}{suffix}")
with open(output_path, "wb") as output_file:
output_file.write(bytes_data)
print(f"Processed image saved to: {output_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might get a bit messy - you can include files from many locations. Additionally, do we know the format of the resulting image? Without the extension the file might not be usable.

else:
for file in self.files:
for hasher in hashers:
hash_str = hasher.hash_from_file(file)
if hash_str:
print(hasher.get_name(), hash_str)
52 changes: 49 additions & 3 deletions python-threatexchange/threatexchange/cli/tests/hash_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def test_rotations_with_non_photo_content(
"""Test that rotation flag raises error with non-photo content"""
for content_type in ["url", "text", "video"]:
hash_cli.assert_cli_usage_error(
("--rotations", content_type, str(tmp_file)),
msg_regex="--rotations flag is only available for Photo content type",
("--photo-preprocess=rotations", content_type, str(tmp_file)),
msg_regex="--photo-preprocess flag is only available for Photo content type",
)


Expand All @@ -93,7 +93,7 @@ def test_rotations_with_photo_content(hash_cli: ThreatExchangeCLIE2eHelper):
test_file = pathlib.Path("threatexchange/tests/hashing/resources/LA.png")

hash_cli.assert_cli_output(
("--rotations", "photo", str(test_file)),
("--photo-preprocess=rotations", "photo", str(test_file)),
[
"ORIGINAL pdq accb6d39648035f8125c8ce6ba65007de7b54c67a2d93ef7b8f33b0611306715",
"ROTATE90 pdq 1f70cbbc77edc5f9524faa1b18f3b76cd0a04a833e20f645d229d0acc8499c56",
Expand All @@ -105,3 +105,49 @@ def test_rotations_with_photo_content(hash_cli: ThreatExchangeCLIE2eHelper):
"FLIPMINUS1 pdq 5bb15db9e8a1f03c174a380a55aeaa2985bde9c60abce301bde48df918b5c15b",
],
)


def test_unletterbox_with_non_photo_content(
hash_cli: ThreatExchangeCLIE2eHelper, tmp_file: pathlib.Path
):
"""Test that unletterbox flag raises error with non-photo content"""
for content_type in ["url", "text", "video"]:
hash_cli.assert_cli_usage_error(
("--photo-preprocess=unletterbox", content_type, str(tmp_file)),
msg_regex="--photo-preprocess flag is only available for Photo content type",
)


def test_unletterbox_with_photo_content(hash_cli: ThreatExchangeCLIE2eHelper):
Dcallies marked this conversation as resolved.
Show resolved Hide resolved
"""Test that photo unletterboxing is properly processed"""
test_file = pathlib.Path(
"threatexchange/tests/hashing/resources/letterboxed_sample-b.jpg"
)
clean_file = pathlib.Path("threatexchange/tests/hashing/resources/sample-b.jpg")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: These tests will fail in strange ways if run from different directories! For example, try changing directory to inside the test folder and then running py.test. A hack for this is to use a path that is relative to the files current location using the magic __file__ attribute.


hash_cli.assert_cli_output(
("photo", str(clean_file)),
[
"pdq f8f8f0cee0f4a84f06370a22038f63f0b36e2ed596621e1d33e6b39c4e9c9b22",
],
)

"""Test that photo unletterboxing is chnaged based off of allowed threshold"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: speels on changed

hash_cli.assert_cli_output(
("--photo-preprocess=unletterbox", "photo", str(test_file)),
[
"pdq 58f870cce0f4e84d8e378a32028f63f4b36e26f597621e1d33e6b39c4a9c9b22",
],
)

hash_cli.assert_cli_output(
(
"--photo-preprocess=unletterbox",
"--black-threshold=25",
"photo",
str(test_file),
),
[
"pdq f8f8f0cee0f4a84f06370a22038f63f0b36e2ed596621e1d33e6b39c4e9c9b22",
],
)
21 changes: 21 additions & 0 deletions python-threatexchange/threatexchange/content_type/photo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
Wrapper around the video content type.
"""
from PIL import Image
from pathlib import Path
import io
import typing as t

from .content_base import ContentType, RotationType
from threatexchange.content_type.preprocess import unletterboxing


class PhotoContent(ContentType):
Expand Down Expand Up @@ -102,3 +104,22 @@ def all_simple_rotations(cls, image_data: bytes) -> t.Dict[RotationType, bytes]:
RotationType.FLIPMINUS1: cls.flip_minus1(image_data),
}
return rotations

@classmethod
def unletterbox(cls, file_path: str, black_threshold: int = 0) -> bytes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: This should take Pathlib.path instead of str for file path

"""
Remove black letterbox borders from the sides and top of the image based on the specified black_threshold.
Returns the cleaned image as raw bytes.
"""
with Image.open(file_path) as image:
top = unletterboxing.detect_top_border(image, black_threshold)
bottom = unletterboxing.detect_bottom_border(image, black_threshold)
left = unletterboxing.detect_left_border(image, black_threshold)
right = unletterboxing.detect_right_border(image, black_threshold)

width, height = image.size
cropped_img = image.crop((left, top, width - right, height - bottom))

with io.BytesIO() as buffer:
cropped_img.save(buffer, format=image.format)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignorable: Ah I see, we keep the original format, I like this choice.

return buffer.getvalue()
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from PIL import Image
Mackay-Fisher marked this conversation as resolved.
Show resolved Hide resolved


def is_pixel_black(pixel, threshold):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: missing typing

"""
Check if each color channel in the pixel is below the threshold
"""
r, g, b = pixel
return r < threshold and g < threshold and b < threshold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking q: Shouldn't this be <=? Your default threshold is 0. Can it be negative?



def detect_top_border(image: Image.Image, black_threshold: int = 0) -> int:
"""
Detect the top black border by counting rows with only black pixels.
Checks each RGB channel of each pixel in each row.
Returns the first row that is not all black from the top.
"""
width, height = image.size
for y in range(height):
row_pixels = list(image.crop((0, y, width, y + 1)).getdata())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't tell from reading the pillow docs, but can you use the returned core.image object as an iterator without wrapping it in a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought I responded to this but I get a mypy error if I do not wrap it in a list because it does not recognize the iterator. It does completely work because there is an iterator to the image object but for the sake of mypy I left it as a list. I can change it to have the ignore if that would be better.

if all(is_pixel_black(pixel, black_threshold) for pixel in row_pixels):
continue
return y
return height


def detect_bottom_border(image: Image.Image, black_threshold: int = 0) -> int:
"""
Detect the bottom black border by counting rows with only black pixels from the bottom up.
Checks each RGB channel of each pixel in each row.
Returns the first row that is not all black from the bottom.
"""
width, height = image.size
for y in range(height - 1, -1, -1):
row_pixels = list(image.crop((0, y, width, y + 1)).getdata())
if all(is_pixel_black(pixel, black_threshold) for pixel in row_pixels):
continue
return height - y - 1
return height


def detect_left_border(image: Image.Image, black_threshold: int = 0) -> int:
"""
Detect the left black border by counting columns with only black pixels.
Checks each RGB channel of each pixel in each column.
Returns the first column from the left that is not all black.
"""
width, height = image.size
for x in range(width):
col_pixels = list(image.crop((x, 0, x + 1, height)).getdata())
if all(is_pixel_black(pixel, black_threshold) for pixel in col_pixels):
continue
return x
return width


def detect_right_border(image: Image.Image, black_threshold: int = 0) -> int:
"""
Detect the right black border by counting columns with only black pixels from the right.
Checks each RGB channel of each pixel in each column.
Returns the first column from the right that is not all black.
"""
width, height = image.size
for x in range(width - 1, -1, -1):
col_pixels = list(image.crop((x, 0, x + 1, height)).getdata())
if all(is_pixel_black(pixel, black_threshold) for pixel in col_pixels):
continue
return width - x - 1
return width
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.