diff --git a/sources/facebook_ads/__init__.py b/sources/facebook_ads/__init__.py index 7cf972c9f..6af670d48 100644 --- a/sources/facebook_ads/__init__.py +++ b/sources/facebook_ads/__init__.py @@ -1,6 +1,6 @@ """Loads campaigns, ads sets, ads, leads and insight data from Facebook Marketing API""" -from typing import Iterator, Sequence +from typing import Iterator, Sequence, List, Dict from facebook_business import FacebookAdsApi from facebook_business.api import FacebookResponse @@ -68,7 +68,7 @@ def facebook_ads_source( We also provide a transformation `enrich_ad_objects` that you can add to any of the resources to get additional data per object via `object.get_api` Args: - account_id (str, optional): Account id associated with add manager. See README.md + account_id (str, optional): Account id associated with ads manager. See README.md access_token (str, optional): Access token associated with the Business Facebook App. See README.md chunk_size (int, optional): A size of the page and batch request. You may need to decrease it if you request a lot of fields. Defaults to 50. request_timeout (float, optional): Connection timeout. Defaults to 300.0. @@ -133,6 +133,7 @@ def facebook_insights_source( batch_size: int = 50, request_timeout: int = 300, app_api_version: str = None, + filtering: List[Dict] = None ) -> DltResource: """Incrementally loads insight reports with defined granularity level, fields, breakdowns etc. @@ -149,14 +150,14 @@ def facebook_insights_source( fields (Sequence[str], optional): A list of fields to include in each reports. Note that `breakdowns` option adds fields automatically. Defaults to DEFAULT_INSIGHT_FIELDS. attribution_window_days_lag (int, optional): Attribution window in days. The reports in attribution window are refreshed on each run.. Defaults to 7. time_increment_days (int, optional): The report aggregation window in days. use 7 for weekly aggregation. Defaults to 1. - breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to "ads_insights_age_and_gender". + breakdowns (TInsightsBreakdownOptions, optional): A presents with common aggregations. See settings.py for details. Defaults to "ads_insights". action_breakdowns (Sequence[str], optional): Action aggregation types. See settings.py for details. Defaults to ALL_ACTION_BREAKDOWNS. level (TInsightsLevels, optional): The granularity level. Defaults to "ad". action_attribution_windows (Sequence[str], optional): Attribution windows for actions. Defaults to ALL_ACTION_ATTRIBUTION_WINDOWS. batch_size (int, optional): Page size when reading data from particular report. Defaults to 50. request_timeout (int, optional): Connection timeout. Defaults to 300. app_api_version(str, optional): A version of the facebook api required by the app for which the access tokens were issued ie. 'v17.0'. Defaults to the facebook_business library default version - + filtering(List[Dict], optional): Allows you to filter the results returned by restricting on a column. Available operators are GREATER_THAN, IN, LESS_THAN, GREATER_THAN_OR_EQUAL. Returns: DltResource: facebook_insights @@ -206,6 +207,7 @@ def facebook_insights( ).to_date_string(), } ], + "filtering": filtering, } job = execute_job(account.get_insights(params=query, is_async=True)) yield list(map(process_report_item, job.get_result())) diff --git a/sources/facebook_ads/settings.py b/sources/facebook_ads/settings.py index 8e71cfb69..b862ad9f3 100644 --- a/sources/facebook_ads/settings.py +++ b/sources/facebook_ads/settings.py @@ -136,7 +136,23 @@ "28d_view", ) -ALL_ACTION_BREAKDOWNS = ("action_type", "action_target_id", "action_destination") +ALL_ACTION_BREAKDOWNS = ( + "action_type", + "action_target_id", + "action_destination", + "action_device", + "conversion_destination", + "matched_persona_id", + "matched_persona_name", + "signal_source_bucket", + "standard_event_content_type", + "action_canvas_component_name", + "action_carousel_card_id", + "action_carousel_card_name", + "action_reaction", + "action_video_sound", + "action_video_type" +) INSIGHTS_BREAKDOWNS_OPTIONS: Dict[TInsightsBreakdownOptions, Any] = { "ads_insights": {"breakdowns": (), "fields": ()}, diff --git a/sources/facebook_ads_pipeline.py b/sources/facebook_ads_pipeline.py index 59cf5de34..34c7f11b7 100644 --- a/sources/facebook_ads_pipeline.py +++ b/sources/facebook_ads_pipeline.py @@ -117,6 +117,38 @@ def load_insights() -> None: print(info) +def load_actions_with_filter() -> None: + """Load action insights with a filter""" + pipeline = dlt.pipeline( + pipeline_name='facebook_insights_fb_pixel_purchase', + destination='duckdb', + dataset_name='facebook_insights_fb_pixel_purchases', + progress="log" + ) + + number_of_days = 3 + + fb_ads_insights = facebook_insights_source( + initial_load_past_days=number_of_days, + time_increment_days=1, + attribution_window_days_lag=7, + action_breakdowns=("action_type",), + action_attribution_windows=('7d_click', '1d_view'), + batch_size=50, + filtering=[ + {"field": "action_type", + "operator":"IN", + "value":["offsite_conversion.fb_pixel_purchase"]} + ] + ) + # In this example we filter by action type offsite_conversion.fb_pixel_purchase. + # This greatly speeds up the extraction since other action types (e.g. ad impressions) return a very large amount of records. + # Depending on your use case those action types may not be necessary or are better accessed without an action_type breakdown. + + info = pipeline.run(fb_ads_insights) + print(info) + + if __name__ == "__main__": # load_all_ads_objects() merge_ads_objects() @@ -124,3 +156,4 @@ def load_insights() -> None: # load_only_disapproved_ads() # load_and_enrich_objects() # load_insights() + # load_actions_with_filter