Skip to content

Commit

Permalink
skip the attribution window test
Browse files Browse the repository at this point in the history
  • Loading branch information
sgandhi1311 committed Jan 9, 2025
1 parent ff1f23b commit d34f3d1
Showing 1 changed file with 123 additions and 136 deletions.
259 changes: 123 additions & 136 deletions tests/test_facebook_attribution_window.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,123 @@
# TODO: https://jira.talendforge.org/browse/TDL-26640
# The test is commented as all the insights streams are not having the data.
# Please refer to the JIRA ticket for more information.

# import base
# import os

# from tap_tester import runner, connections

# from base import FacebookBaseTest


# class FacebookAttributionWindow(FacebookBaseTest):

# is_done = None

# # TODO: https://jira.talendforge.org/browse/TDL-26640
# EXCLUDE_STREAMS = {
# 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640
# 'ads_insights_platform_and_device', # TDL-26640
# 'ads_insights', # TDL-26640
# 'ads_insights_age_and_gender', # TDL-26640
# 'ads_insights_country', # TDL-26640
# 'ads_insights_dma', # TDL-26640
# 'ads_insights_region' # TDL-26640
# }

# @staticmethod
# def name():
# return "tap_tester_facebook_attribution_window"

# def streams_to_test(self):
# """ 'attribution window' is only supported for 'ads_insights' streams """

# # Fail the test when the JIRA card is done to allow stream to be re-added and tested
# if self.is_done is None:
# self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
# self.assert_message = ("JIRA ticket has moved to done, re-add the "
# "applicable EXCLUDE_STREAMS to the test.")
# self.is_done_2 = base.JIRA_CLIENT.get_status_category("TDL-26640") == 'done'
# # if either card is done, fail & update the test to include more streams
# self.is_done = self.is_done or self.is_done_2
# assert self.is_done != True, self.assert_message

# # return [stream for stream in self.expected_streams() if self.is_insight(stream)]
# return [stream for stream in self.expected_streams()
# if self.is_insight(stream)
# and stream != 'ads_insights_hourly_advertiser']

# def get_properties(self, original: bool = True):
# """Configuration properties required for the tap."""
# return_value = {
# 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
# 'start_date': self.start_date,
# 'end_date': self.end_date,
# 'insights_buffer_days': str(self.ATTRIBUTION_WINDOW)
# }
# if original:
# return return_value

# return_value["start_date"] = self.start_date
# return return_value

# def test_run(self):
# """
# For the test ad set up in facebook ads manager we see data
# on April 7th, start date is based on this data
# """
# # attrribution window = 7
# self.ATTRIBUTION_WINDOW = 7
# self.start_date = '2021-04-14T00:00:00Z'
# self.end_date = '2021-04-15T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# # attribution window = 28
# self.ATTRIBUTION_WINDOW = 28
# self.start_date = '2021-04-30T00:00:00Z'
# self.end_date = '2021-05-01T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# # attribution window = 1
# self.ATTRIBUTION_WINDOW = 1
# self.start_date = '2021-04-08T00:00:00Z'
# self.end_date = '2021-04-09T00:00:00Z'
# self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# def run_test(self, attr_window, start_date, end_date):
# """
# Test to check the attribution window
# """

# expected_streams = self.streams_to_test()

# conn_id = connections.ensure_connection(self)

# # calculate start date with attribution window
# start_date_with_attribution_window = self.timedelta_formatted(
# start_date, days=-attr_window, date_format=self.START_DATE_FORMAT
# )

# # Run in check mode
# found_catalogs = self.run_and_verify_check_mode(conn_id)

# # Select only the expected streams tables
# catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
# self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True)

# # Run a sync job using orchestrator
# self.run_and_verify_sync(conn_id)
# sync_records = runner.get_records_from_target_output()

# expected_replication_keys = self.expected_replication_keys()

# for stream in expected_streams:
# with self.subTest(stream=stream):

# replication_key = next(iter(expected_replication_keys[stream]))

# # get records
# records = [record.get('data') for record in sync_records.get(stream).get('messages')]

# # check for the record is between attribution date and start date
# is_between = False

# for record in records:
# replication_key_value = record.get(replication_key)

# # Verify the sync records respect the attribution window
# self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window),
# msg="The record does not respect the attribution window.")

# # verify if the record's bookmark value is between start date and attribution window
# if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) <= self.parse_date(start_date):
# is_between = True

# self.assertTrue(is_between)
import base
import os

from tap_tester import runner, connections

from base import FacebookBaseTest, LOGGER


class FacebookAttributionWindow(FacebookBaseTest):

is_done = None

@staticmethod
def name():
return "tap_tester_facebook_attribution_window"

def streams_to_test(self):
""" 'attribution window' is only supported for 'ads_insights' streams """

# Fail the test when the JIRA card is done to allow stream to be re-added and tested
if self.is_done is None:
self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done'
self.assert_message = ("JIRA ticket has moved to done, re-add the "
"ads_insights_hourly_advertiser stream to the test.")
assert self.is_done != True, self.assert_message

# return [stream for stream in self.expected_streams() if self.is_insight(stream)]
return [stream for stream in self.expected_streams()
if self.is_insight(stream)
and stream != 'ads_insights_hourly_advertiser']

def get_properties(self, original: bool = True):
"""Configuration properties required for the tap."""
return_value = {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date': self.start_date,
'end_date': self.end_date,
'insights_buffer_days': str(self.ATTRIBUTION_WINDOW)
}
if original:
return return_value

return_value["start_date"] = self.start_date
return return_value

def test_run(self):
"""
For the test ad set up in facebook ads manager we see data
on April 7th, start date is based on this data
"""
# TODO: https://jira.talendforge.org/browse/TDL-26640
if base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done':
LOGGER.warning("Skipping TEST! See BUG[TDL-26640]")
self.skipTest("Skipping TEST! See BUG[TDL-26640]")

# attrribution window = 7
self.ATTRIBUTION_WINDOW = 7
self.start_date = '2021-04-14T00:00:00Z'
self.end_date = '2021-04-15T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# attribution window = 28
self.ATTRIBUTION_WINDOW = 28
self.start_date = '2021-04-30T00:00:00Z'
self.end_date = '2021-05-01T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

# attribution window = 1
self.ATTRIBUTION_WINDOW = 1
self.start_date = '2021-04-08T00:00:00Z'
self.end_date = '2021-04-09T00:00:00Z'
self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date)

def run_test(self, attr_window, start_date, end_date):
"""
Test to check the attribution window
"""

expected_streams = self.streams_to_test()

conn_id = connections.ensure_connection(self)

# calculate start date with attribution window
start_date_with_attribution_window = self.timedelta_formatted(
start_date, days=-attr_window, date_format=self.START_DATE_FORMAT
)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True)

# Run a sync job using orchestrator
self.run_and_verify_sync(conn_id)
sync_records = runner.get_records_from_target_output()

expected_replication_keys = self.expected_replication_keys()

for stream in expected_streams:
with self.subTest(stream=stream):

replication_key = next(iter(expected_replication_keys[stream]))

# get records
records = [record.get('data') for record in sync_records.get(stream).get('messages')]

# check for the record is between attribution date and start date
is_between = False

for record in records:
replication_key_value = record.get(replication_key)

# Verify the sync records respect the attribution window
self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window),
msg="The record does not respect the attribution window.")

# verify if the record's bookmark value is between start date and attribution window
if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) <= self.parse_date(start_date):
is_between = True

self.assertTrue(is_between)

0 comments on commit d34f3d1

Please sign in to comment.