Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'gob/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ceoloide committed Feb 6, 2020
2 parents b42be20 + 006853f commit 5cbc67b
Show file tree
Hide file tree
Showing 4 changed files with 602 additions and 0 deletions.
116 changes: 116 additions & 0 deletions orchestra/google/marketing_platform/example_dags/example_bq_to_ga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
###########################################################################
#
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###########################################################################

"""Example DAG with tasks for listing DCM reports and report file
"""
from datetime import timedelta
from airflow import (
DAG,
models
)
from airflow.utils import dates
from orchestra.google.marketing_platform.operators.google_analytics import (
GoogleAnalyticsDeletePreviousDataUploadsOperator,
GoogleAnalyticsModifyFileHeadersDataImportOperator,
GoogleAnalyticsDataImportUploadOperator
)
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
# TODO(rebeccasg): update the references to these operators once composer
# version updates

default_args = {
'owner': 'airflow',
"start_date": dates.days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=10)
}

conn_id = 'dt'
bq_dataset = models.Variable.get('bq_dataset')
gcs_bucket = models.Variable.get('gcs_bucket')
sql = models.Variable.get('sql')
output_table = models.Variable.get('output_table')
keep_header_on_gcs_export = models.Variable.get('keep_header_on_gcs_export')
gcs_filename = models.Variable.get('gcs_filename')
ga_data_import_custom_dimensions_header_mapping = \
models.Variable.get('ga_data_import_custom_dimensions_header_mapping',
deserialize_json=True)
ga_info = models.Variable.get('ga_info', deserialize_json=True)
ga_account_id = ga_info['ga_account_id']
ga_web_property_id = ga_info['ga_web_property_id']
ga_custom_data_source_id = ga_info['ga_custom_data_source_id']

dag = DAG(
'bq_to_ga360',
default_args=default_args,
schedule_interval=timedelta(1))

run_bq_query = BigQueryOperator(
task_id='run_bq_query',
sql=sql,
destination_dataset_table='%s.%s' % (bq_dataset, output_table),
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id=conn_id,
dag=dag
)

export_bq_data_to_gcs = BigQueryToCloudStorageOperator(
task_id='export_bq_data_to_gcs',
bigquery_conn_id=conn_id,
gcs_conn_id=conn_id,
source_project_dataset_table='%s.%s' % (bq_dataset, output_table),
destination_cloud_storage_uris=["gs://{0}/{1}".format(gcs_bucket, gcs_filename)],
export_format='CSV',
print_header=keep_header_on_gcs_export,
dag=dag
)

modify_column_headers_for_ga = GoogleAnalyticsModifyFileHeadersDataImportOperator(
task_id='format_column_headers_for_ga',
gcp_conn_id=conn_id,
storage_bucket=gcs_bucket,
storage_name_object=gcs_filename,
custom_dimension_header_mapping=ga_data_import_custom_dimensions_header_mapping,
dag=dag
)

upload_data_to_ga = GoogleAnalyticsDataImportUploadOperator(
task_id='upload_data_to_ga',
gcp_conn_id=conn_id,
storage_bucket=gcs_bucket,
storage_name_object=gcs_filename,
account_id=ga_account_id,
web_property_id=ga_web_property_id,
custom_data_source_id=ga_custom_data_source_id,
dag=dag
)

delete_previous_ga_uploads = GoogleAnalyticsDeletePreviousDataUploadsOperator(
task_id='delete_previous_ga_uploads',
gcp_conn_id=conn_id,
account_id=ga_account_id,
web_property_id=ga_web_property_id,
custom_data_source_id=ga_custom_data_source_id,
dag=dag
)

run_bq_query >> export_bq_data_to_gcs >> modify_column_headers_for_ga >> upload_data_to_ga >> delete_previous_ga_uploads
169 changes: 169 additions & 0 deletions orchestra/google/marketing_platform/hooks/google_analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Hook for accessing the API.
This hook handles authentication and provides a lightweight wrapper around the
Google Analytics API.
"""

import logging
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


logger = logging.getLogger(__name__)


class GoogleAnalyticsManagementHook(GoogleCloudBaseHook):
"""Hook for connecting to the Google Analytics Management API.
"""

def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None,
api_name='analytics',
api_version='v3'):
"""Constructor.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
:type delegate_to: str
:param api_name: The name of the GA API.
:type api_name: str
:param api_version: The version of the GA API.
:type api_version: str
"""
super(GoogleAnalyticsManagementHook, self).__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to)
self.api_name = api_name
self.api_version = api_version
self._service = None

def get_service(self):
"""Retrieves the GA service object.
Returns:
The GA service object.
"""
if self._service is None:
http_authorized = self._authorize()
self._service = build(
self.api_name, self.api_version, http=http_authorized)
return self._service

def upload_file(self,
file_location,
account_id,
web_property_id,
custom_data_source_id,
mime_type='application/octet-stream',
resumable_upload=False):

"""Uploads file to GA via the Data Import API
:param file_location: The path and name of the file to upload.
:type file_location: str
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data
import belongs.
:type custom_data_source_id: str
:param mime_type: Label to identify the type of data in the HTTP request
:type mime_type: str
:param resumable_upload: flag to upload the file in a resumable fashion,
using a series of at least two requests
:type resumable_upload: bool
"""

media = MediaFileUpload(file_location,
mimetype=mime_type,
resumable=resumable_upload)

logger.info('Uploading file to GA file for accountId:%s,'
'webPropertyId:%s'
'and customDataSourceId:%s ',
account_id, web_property_id, custom_data_source_id)

# TODO(): handle scenario where upload fails
self.get_service().management().uploads().uploadData(
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
media_body=media).execute()

def delete_upload_data(self,
account_id,
web_property_id,
custom_data_source_id,
delete_request_body
):
"""Deletes the uploaded data for a given account/property/dataset
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data
import belongs.
:type custom_data_source_id: str
:param delete_request_body: Dict of customDataImportUids to delete
:type delete_request_body: dict
"""

logger.info('Deleting previous uploads to GA file for accountId:%s,'
'webPropertyId:%s'
'and customDataSourceId:%s ',
account_id, web_property_id, custom_data_source_id)

# TODO(): handle scenario where deletion fails
self.get_service().management().uploads().deleteUploadData(
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id,
body=delete_request_body).execute()

def get_list_of_uploads(self,
account_id,
web_property_id,
custom_data_source_id):
"""Get list of data upload from GA
:param account_id: The GA account Id to which the data upload belongs.
:type account_id: str
:param web_property_id: UA-string associated with the upload.
:type web_property_id: str
:param custom_data_source_id: Custom Data Source Id to which this data
import belongs.
:type custom_data_source_id: str
"""
logger.info('Getting list of uploads for accountId:%s,'
'webPropertyId:%s'
'and customDataSourceId:%s ',
account_id, web_property_id, custom_data_source_id)

# TODO(): handle scenario where request fails
response = self.get_service().management().uploads().list(
accountId=account_id,
webPropertyId=web_property_id,
customDataSourceId=custom_data_source_id).execute()

return response
12 changes: 12 additions & 0 deletions orchestra/google/marketing_platform/operators/display_video_360.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class GoogleDisplayVideo360CreateReportOperator(GoogleMarketingPlatformBaseOpera
Can receive a json string representing the report or reference to a
template file. Template references are recognized by a string ending in
'.json'.
api_version: The DV360 API version.
gcp_conn_id: The connection ID to use when fetching connection info.
delegate_to: The account to impersonate, if any.
Expand All @@ -62,19 +63,22 @@ class GoogleDisplayVideo360CreateReportOperator(GoogleMarketingPlatformBaseOpera

def __init__(self,
report,
api_version='v1',
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(GoogleDisplayVideo360CreateReportOperator, self).__init__(*args, **kwargs)
self.report = report
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.hook = None

def execute(self, context):
if self.hook is None:
self.hook = GoogleDisplayVideo360Hook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

Expand All @@ -89,6 +93,7 @@ class GoogleDisplayVideo360RunReportOperator(GoogleMarketingPlatformBaseOperator
"""Runs a stored query to generate a report.
Attributes:
api_version: The DV360 API version.
query_id: The ID of the query to run. (templated)
gcp_conn_id: The connection ID to use when fetching connection info.
delegate_to: The account to impersonate, if any.
Expand All @@ -98,11 +103,13 @@ class GoogleDisplayVideo360RunReportOperator(GoogleMarketingPlatformBaseOperator

def __init__(self,
query_id,
api_version='v1',
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(GoogleDisplayVideo360RunReportOperator, self).__init__(*args, **kwargs)
self.api_version = api_version
self.conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.service = None
Expand All @@ -111,6 +118,7 @@ def __init__(self,
def execute(self, context):
if self.service is None:
hook = GoogleDisplayVideo360Hook(
api_version=self.api_version,
gcp_conn_id=self.conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -223,6 +231,7 @@ class GoogleDisplayVideo360DeleteReportOperator(GoogleMarketingPlatformBaseOpera
"""Deletes Display & Video 360 queries and any associated reports.
Attributes:
api_version: The DV360 API version.
query_id: The DV360 query id to delete. (templated)
query_title: The DV360 query title to delete. (templated)
Any query with a matching title will be deleted.
Expand All @@ -235,6 +244,7 @@ class GoogleDisplayVideo360DeleteReportOperator(GoogleMarketingPlatformBaseOpera
ui_color = '#ffd1dc'

def __init__(self,
api_version='v1',
query_id=None,
query_title=None,
ignore_if_missing=False,
Expand All @@ -243,6 +253,7 @@ def __init__(self,
*args,
**kwargs):
super(GoogleDisplayVideo360DeleteReportOperator, self).__init__(*args, **kwargs)
self.api_version = api_version
self.query_id = query_id
self.query_title = query_title
self.ignore_if_missing = ignore_if_missing
Expand All @@ -254,6 +265,7 @@ def execute(self, context):
if self.hook is None:
self.hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
delegate_to=self.delegate_to)

if self.query_id is not None:
Expand Down
Loading

0 comments on commit 5cbc67b

Please sign in to comment.