Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PowerBIDatasetRefreshOperator task fails whereas dataset refresh succeeds #44618

Open
2 tasks done
Ohashiro opened this issue Dec 3, 2024 · 23 comments · May be fixed by #45513
Open
2 tasks done

PowerBIDatasetRefreshOperator task fails whereas dataset refresh succeeds #44618

Ohashiro opened this issue Dec 3, 2024 · 23 comments · May be fixed by #45513
Labels
area:providers kind:bug This is a clearly a bug provider:microsoft-azure Azure-related issues

Comments

@Ohashiro
Copy link
Contributor

Ohashiro commented Dec 3, 2024

Apache Airflow Provider(s)

microsoft-azure

Versions of Apache Airflow Providers

apache-airflow-providers-microsoft-azure==11.1.0

Apache Airflow version

2.10.2

Operating System

linux

Deployment

Google Cloud Composer

Deployment details

No response

What happened

We use the operator PowerBIDatasetRefreshOperator to refresh our PowerBI datasets. Sometimes, the task quickly fails with the following error:

airflow.exceptions.AirflowException: An error occurred: Unable to fetch the details of dataset refresh with Request Id: <request id>
image

However, on PowerBI side, the dataset is refreshed (timezone GMT+1, both the time in the refresh and the log are about the same):
image

If I understand correctly, this corresponds to an error in this function, which may mean that there was an error when trying to fetch the refresh status, even though the refresh was running. Maybe the error comes from the request to the Microsoft API, which could be improved with a retry (but we did not test for the moment).

What you think should happen instead

The task should succeed. (since the dataset refresh suceeded)

How to reproduce

On our side, we use the following configuration.

refresh_powerbi_dataset_success = PowerBIDatasetRefreshOperator(
        task_id="refresh_powerbi_dataset_success",
        conn_id="our-conn-id",
        dataset_id="our-dataset-id",
        group_id="our-group-id",
    )

The dataset ID and group ID correspond to valid PowerBI dataset and workspace.

Anything else

This bug occurs about once every 3 task runs (the task fails but the refresh succeeds). Sometimes if fails several times in a row then works again, other times it works on the first run. I think this should mean that the configuration is good (since the operator often works fine).
I haven't been able to identify any specific pattern.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Ohashiro Ohashiro added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 3, 2024
@dosubot dosubot bot added the provider:microsoft-azure Azure-related issues label Dec 3, 2024
@dabla
Copy link
Contributor

dabla commented Dec 18, 2024

Can you just specify a retry with following parameters on the PowerBIDatasetRefreshOperator?

retry_exponential_backoff=True,  # to increase the delay after each failed attempt
retry_delay=60,
retries=5,

@Ohashiro
Copy link
Contributor Author

Thank you for the suggestion! (sorry I didn't know these fields were available)
I just tried it and still get some task fails (with the same error). I will try tweaking the parameters a little to see if it fixes the issue.
Do you know to what (which request) do these "retries" apply?

@dabla
Copy link
Contributor

dabla commented Dec 19, 2024

Those parameters apply on the whole operator, you could also try to specify the check_interval and timeout parameter, which by default both are 60 seconds, I think timeout should be greater then then check_interval, otherwise it will stop polling due to timeout.

@Ohashiro
Copy link
Contributor Author

OK I see, so the retry is not specific to the "get status" request.
From what I understand, the issue seems to be that the very first request (to get the refresh status) fails (maybe because the refresh id is not yet available for this endpoint) and the task exits.
If I understand correctly, the retry on the operator allows us to re-run the whole process (so re-trigger a refresh), instead of retrying the request to get the refresh status already running.

@dabla
Copy link
Contributor

dabla commented Dec 27, 2024

OK I see, so the retry is not specific to the "get status" request. From what I understand, the issue seems to be that the very first request (to get the refresh status) fails (maybe because the refresh id is not yet available for this endpoint) and the task exits. If I understand correctly, the retry on the operator allows us to re-run the whole process (so re-trigger a refresh), instead of retrying the request to get the refresh status already running.

Indeed, you're correct about it. I still don't understand your issue completely, as from what I see in the code of the
PowerBIDatasetRefreshOperator, it first triggers the refresh datasets and then polls and wait for the refresh to complete (which by default has a timeout of 60 seconds). So indeed it could happen that the refresh isn't ready or started yet, but still I would expect it to retry within the timeout interval, unless of course the timeout has expired, but that's something you can parametrize.

@dabla
Copy link
Contributor

dabla commented Dec 27, 2024

I understand the issue, I've checked the code of the PowerBITrigger and the check_interval is set to 60 seconds, but the timeout is also 60 seconds by default, which means the status is only fetched once and then it fails like in your example log. So you should reduce the check_interval value to like 10 seconds for example, or even a bit less. So try setting the check_interval to 10 seconds. Maybe we should do a PR and add a check that the check_interval must be smaller than the timeout and set it to 10 seconds by default.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 2, 2025

Hello!
Thank you very much for taking the time to look at the code and making this suggestion.
I tried to reduce the check_interval to 5, unfortunately it did not seem to fix the issue on our side.

Also, I thought that the default timeout was 1 week because of this line:

But maybe I got this wrong. Am I mistaken?

Regarding the bug, I think the issue may come from the very first request to get the refresh status, at this line:

dataset_refresh_status, dataset_refresh_error = await fetch_refresh_status_and_error()

This request is made by the PowerBi hook (using the method get_refresh_details_by_refresh_id) and I think that, sometimes, the API is not yet "up-to-date" with the latest refresh IDs and can't find the status of our refresh ID. So, we fall in this case:

and the hook raises the error "Unable to fetch the details of dataset refresh with Request Id" (which is also what I find in my Airflow logs). Do you think it makes sense?

I did try to add a retry on this request (and it seemed to fix the bug), but I understand that you don't want to introduce this type of custom retry mechanism. Maybe another option could be to wait "check_interval" seconds BEFORE sending the first request. What do you think?

@dabla
Copy link
Contributor

dabla commented Jan 2, 2025

Hello! Thank you very much for taking the time to look at the code and making this suggestion. I tried to reduce the check_interval to 5, unfortunately it did not seem to fix the issue on our side.

Also, I thought that the default timeout was 1 week because of this line:

But maybe I got this wrong. Am I mistaken?

Regarding the bug, I think the issue may come from the very first request to get the refresh status, at this line:

dataset_refresh_status, dataset_refresh_error = await fetch_refresh_status_and_error()

This request is made by the PowerBi hook (using the method get_refresh_details_by_refresh_id) and I think that, sometimes, the API is not yet "up-to-date" with the latest refresh IDs and can't find the status of our refresh ID. So, we fall in this case:

and the hook raises the error "Unable to fetch the details of dataset refresh with Request Id" (which is also what I find in my Airflow logs). Do you think it makes sense?

I did try to add a retry on this request (and it seemed to fix the bug), but I understand that you don't want to introduce this type of custom retry mechanism. Maybe another option could be to wait "check_interval" seconds BEFORE sending the first request. What do you think?

@Ohashiro from what I can see from my smartphone (no laptop atm) your explanation does totally make sense. Waiting before doing the actual invocation could make sense, but I would suggest checking the PowerBIDatasetRefreshException and reinvoke the method after waiting for the the interval when this error occurs within the code.

@dabla
Copy link
Contributor

dabla commented Jan 2, 2025

@Ohashiro also thank you for investigating this thouroughly. It would also be nice to have a unit test which reproduces this behaviour, so on fist invocation raising an PowerBIDatasetRefreshException, then the code waits for the interval on on second invocation the call would succeed. This should be doable by mocking the PowerBI hook.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 3, 2025

@dabla
Thank you for your feedback! I'll be working on it today.

I have a quick question regarding the best practices. Given that the PowerBi Hook can raise PowerBIDatasetRefreshException for different scenarios (including when it didn't find the refresh Id, but not only), do you think it would be best to create a new dedicated exception for our case (ex: PowerBIDatasetRefreshIdNotFoundException) or would you rather check the exception message to see if it matches "Unable to fetch the details of dataset refresh with Request Id"?

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 3, 2025

I opened a PR with both the retry in case of exception on the first request, and the corresponding unit test. I'd be happy to have your feedbacks on this when you have some time!

@nathadfield nathadfield removed the needs-triage label for new issues that we didn't triage yet label Jan 3, 2025
@ambika-garg
Copy link
Contributor

Hi @Ohashiro, thank you for bringing this up and creating the PR to address it. After reviewing the conversation, I see the issue lies in get refresh history function in hook class, as sometimes it fails to return the "dataset refresh histories" leading the dataset refresh to be marked as fail, even if it actually succeeds. Please correct me if I’ve misunderstood.

So, I suggest, we should add the retry mechanism to the get refresh history function only as below, it will retry to fetch histories, if it still fails, then we just throw the exception. That would also mean we don't need any extra exception class as you created "PowerBIDatasetRefreshStatusExecption"

@tenacity.retry(
        stop=tenacity.stop_after_attempt(3),
        wait=tenacity.wait_random_exponential(),
        reraise=True,
        retry=tenacity.retry_if_exception(should_retry_creation),
    )
    async def get_refresh_history(
        self,
        dataset_id: str,
        group_id: str,
    ) -> list[dict[str, str]]:
        """
        Retrieve the refresh history of the specified dataset from the given group ID.

        :param dataset_id: The dataset ID.
        :param group_id: The workspace ID.

        :return: Dictionary containing all the refresh histories of the dataset.
        """
        try:
            response = await self.run(
                url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
                path_parameters={
                    "group_id": group_id,
                    "dataset_id": dataset_id,
                },
            )

            refresh_histories = response.get("value")
            if not refresh_histories:  # Retry if refresh_histories is None or empty
                raise PowerBIDatasetRefreshException(
                    "Refresh histories are empty; retrying..."
                )
                
            return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories]

        except Exception as error:
            raise PowerBIDatasetRefreshException(f"Failed to retrieve refresh history due to error: {error}")

@dabla
Copy link
Contributor

dabla commented Jan 6, 2025

Hi @Ohashiro, thank you for bringing this up and creating the PR to address it. After reviewing the conversation, I see the issue lies in get refresh history function in hook class, as sometimes it fails to return the "dataset refresh histories" leading the dataset refresh to be marked as fail, even if it actually succeeds. Please correct me if I’ve misunderstood.

So, I suggest, we should add the retry mechanism to the get refresh history function only as below, it will retry to fetch histories, if it still fails, then we just throw the exception. That would also mean we don't need any extra exception class as you created "PowerBIDatasetRefreshStatusExecption"

@tenacity.retry(
        stop=tenacity.stop_after_attempt(3),
        wait=tenacity.wait_random_exponential(),
        reraise=True,
        retry=tenacity.retry_if_exception(should_retry_creation),
    )
    async def get_refresh_history(
        self,
        dataset_id: str,
        group_id: str,
    ) -> list[dict[str, str]]:
        """
        Retrieve the refresh history of the specified dataset from the given group ID.

        :param dataset_id: The dataset ID.
        :param group_id: The workspace ID.

        :return: Dictionary containing all the refresh histories of the dataset.
        """
        try:
            response = await self.run(
                url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
                path_parameters={
                    "group_id": group_id,
                    "dataset_id": dataset_id,
                },
            )

            refresh_histories = response.get("value")
            if not refresh_histories:  # Retry if refresh_histories is None or empty
                raise PowerBIDatasetRefreshException(
                    "Refresh histories are empty; retrying..."
                )
                
            return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories]

        except Exception as error:
            raise PowerBIDatasetRefreshException(f"Failed to retrieve refresh history due to error: {error}")

I don't think it's a good practice to use tenacity retry mechanism while operators (e.g. task instances) have there own retry mechanism managed by Airflow tasks, but I could be wrong. I also went back to the code and I still don't understand why the call would still fail when the task instance for that operator is retried the second time by Airflow, as some time would have passed between the first and the second attempt, I would expect the second call to succeed but apparently it still doesn't.

Maybe it because of this part:

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Make async connection to the PowerBI and polls for the dataset refresh status."""
        # this is always called, even if it succeeded during first attempt but is retried because the second one below fails, maybe something has to be done to avoid that when a retry is being executed
        self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
            dataset_id=self.dataset_id,
            group_id=self.group_id,
        )

        async def fetch_refresh_status_and_error() -> tuple[str, str]:
            """Fetch the current status and error of the dataset refresh."""
            # this is the call that fails, because it probably happens to fast after the above one
            refresh_details = await self.hook.get_refresh_details_by_refresh_id(
                dataset_id=self.dataset_id,
                group_id=self.group_id,
                refresh_id=self.dataset_refresh_id,
            )
            return refresh_details["status"], refresh_details["error"]

So maybe @Ohashiro a delay before calling the get_refresh_details_by_refresh_id would be a possible easy fix to avoid the issue on the second call, even though I don't like it that much and also doesn't ensure it will eventually succeed.

What I suggest is to refactor the PowerBITrigger and PowerBIDatasetRefreshOperator. The PowerBITrigger should get an extra dataset_refresh_id parameter in the constructor, so the run method can be called without and with the dataset_refresh_id parameter, that way it can handle both scenario's and return corresponding TriggerEvents regarding the executed flow (e.g. is a dataset refresh being triggered or do we want to get the dataset refresh details), code modifications could possibly look like this in PowerBITrigger :

    def __init__(
        self,
        conn_id: str,
        dataset_id: str,
        group_id: str,
        timeout: float = 60 * 60 * 24 * 7,
        proxies: dict | None = None,
        api_version: APIVersion | str | None = None,
        check_interval: int = 60,
        wait_for_termination: bool = True,
        dataset_refresh_id: str | None = None,  # add dataset_refresh_id parameter
    ):
        super().__init__()
        self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
        self.dataset_id = dataset_id
        self.timeout = timeout
        self.group_id = group_id
        self.check_interval = check_interval
        self.wait_for_termination = wait_for_termination
        self.dataset_refresh_id = dataset_refresh_id
        
        def serialize(self):
              """Serialize the trigger instance."""
              return (
                  "airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
                  {
                      "conn_id": self.conn_id,
                      "proxies": self.proxies,
                      "api_version": self.api_version,
                      "dataset_id": self.dataset_id,
                      "group_id": self.group_id,
                      "timeout": self.timeout,
                      "check_interval": self.check_interval,
                      "wait_for_termination": self.wait_for_termination,
                      "dataset_refresh_id ": self.dataset_refresh_id ,  # IMPORTANT: do not forget to add parameter in serialize method also
                  },
              )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Make async connection to the PowerBI and polls for the dataset refresh status."""
something has to be done to avoid that when a retry is being executed
        if not self.dataset_refresh_id:
            dataset_refresh_id = await self.hook.trigger_dataset_refresh(
                dataset_id=self.dataset_id,
                group_id=self.group_id,
            )
            
            # Just yield a TriggerEvent with the dataset_refresh_id, that will then be used by the operator to retrigger it with the corresponding dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh details in case of failure, then the refresh details would then be executed.
            yield TriggerEvent(
                {
                    "status": "success",
                    "message": f"The dataset refresh {self.dataset_refresh_id} has been triggered.",
                    "dataset_refresh_id": dataset_refresh_id,
                }
            )

          async def fetch_refresh_status_and_error() -> tuple[str, str]:
              """Fetch the current status and error of the dataset refresh."""
              refresh_details = await self.hook.get_refresh_details_by_refresh_id(
                  dataset_id=self.dataset_id,
                  group_id=self.group_id,
                  refresh_id=self.dataset_refresh_id,
              )
              return refresh_details["status"], refresh_details["error"]

Then in the PowerBIDatasetRefreshOperator, both TriggerEvents should be handled accordingly, which means the PowerBITrigger will be called (e.g. deferred) twice, once to trigger the dataset_refresh_id and once to poll for the get_refresh_details_by_refresh_id. On the first attempt, if the trigger succeeds, the dataset_refresh_id should be persisted as an XCom within the operator, so that when the seconds calls fails the operator can directly retry the second Trigger call.

This is of course a more complex approach due to the deferrable aspect, but would be more in line with how operators should work imho instead of hiding failures and doing retries outside Airflow flow using the tenacity library. From what I see in the code base, tenacity is only used in the retries module of Airflow and the cli commands, not in the operators/hooks/triggerers.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 6, 2025

Hello! @dabla thank you for your investigation and suggestion!

Regarding your first point:

I also went back to the code and I still don't understand why the call would still fail when the task instance for that operator is retried the second time by Airflow, as some time would have passed between the first and the second attempt, I would expect the second call to succeed but apparently it still doesn't.

From what I understand, when the task fails (for example, in our case, because the refreshId was not found), the operator cancels the refresh (using cancel_dataset_refresh hook method). So when the task is retried by Airflow, a new refresh is triggered with a new refreshId.

async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None:

Regarding the fix implementation, the delay solution we discussed seems to work well (which, imho, confirm the bug root cause), but I agree that this fix is more of a "quick fix" than a clean one.

I can work on the refactor you suggested to check, I just have a few questions:

so that when the seconds calls fails the operator can directly retry the second Trigger call

Regarding the retry mechanism you are suggesting in the operator, how would you do it?
I just wrote the following draft, do you confirm this is what you expected? However I'm not sure how you'd handle the retry part... Would you let the operator fail and when the task is retried, it tried to fetch the refreshId directly? Or would you make the operator retry the Trigger in the same task?

Regarding the trigger, if I understand correctly, the run method would look like that:

async def run(self) -> AsyncIterator[TriggerEvent]:

   if not self.dataset_refresh_id:
      # Just yield a TriggerEvent with the dataset_refresh_id, that will then be used by the operator to retrigger it with the corresponding dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh details in case of failure, then the refresh details would then be executed.
      yield TriggerEvent(
         ...
      )

   else:
      # Handle the "while" loop looking for the refresh status

And regarding the operator, if I understand correctly, it would look like that:

class PowerBIDatasetRefreshOperator(BaseOperator):
   def execute(self, context: Context):
        """Refresh the Power BI Dataset."""
        if self.wait_for_termination:
            self.defer(
                trigger=PowerBITrigger(...),
                method_name=self.push_refreshId.__name__,
            )

   def push_refreshId():
      # push the refresh Id to xcom
      self.xcom_push(
                context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"]
            )
      self.defer(
                trigger=PowerBITrigger(...),
                method_name=self.execute.__name__,
            )

   def execute():
      # exit the operator as currently done

@dabla
Copy link
Contributor

dabla commented Jan 6, 2025

Hello! @dabla thank you for your investigation and suggestion!

Regarding your first point:

I also went back to the code and I still don't understand why the call would still fail when the task instance for that operator is retried the second time by Airflow, as some time would have passed between the first and the second attempt, I would expect the second call to succeed but apparently it still doesn't.

From what I understand, when the task fails (for example, in our case, because the refreshId was not found), the operator cancels the refresh (using cancel_dataset_refresh hook method). So when the task is retried by Airflow, a new refresh is triggered with a new refreshId.

async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None:

Regarding the fix implementation, the delay solution we discussed seems to work well (which, imho, confirm the bug root cause), but I agree that this fix is more of a "quick fix" than a clean one.

I can work on the refactor you suggested to check, I just have a few questions:

so that when the seconds calls fails the operator can directly retry the second Trigger call

Regarding the retry mechanism you are suggesting in the operator, how would you do it? I just wrote the following draft, do you confirm this is what you expected? However I'm not sure how you'd handle the retry part... Would you let the operator fail and when the task is retried, it tried to fetch the refreshId directly? Or would you make the operator retry the Trigger in the same task?

Regarding the trigger, if I understand correctly, the run method would look like that:

async def run(self) -> AsyncIterator[TriggerEvent]:

   if not self.dataset_refresh_id:
      # Just yield a TriggerEvent with the dataset_refresh_id, that will then be used by the operator to retrigger it with the corresponding dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh details in case of failure, then the refresh details would then be executed.
      yield TriggerEvent(
         ...
      )

   else:
      # Handle the "while" loop looking for the refresh status

And regarding the operator, if I understand correctly, it would look like that:

class PowerBIDatasetRefreshOperator(BaseOperator):
   def execute(self, context: Context):
        """Refresh the Power BI Dataset."""
        if self.wait_for_termination:
            self.defer(
                trigger=PowerBITrigger(...),
                method_name=self.push_refreshId.__name__,
            )

   def push_refreshId():
      # push the refresh Id to xcom
      self.xcom_push(
                context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"]
            )
      self.defer(
                trigger=PowerBITrigger(...),
                method_name=self.execute.__name__,
            )

   def execute():
      # exit the operator as currently done

Indeed, that's the main issue, because the fact that the refresh details fails and how the operator is implemented today, instead of trying to directly get the refresh details on the second attempt, it will again trigger a new dataset refresh and then again try to get its refresh details, which of course like you mentioned, will fail again, thus the main problem persists.

That's why I suggested the refactor, so that the PowerBITrigger can handle both cases separately, and that the operator can then retry the second case directly instead of redoing the whole flow.

I also saw after a remark made by my colleague @joffreybienvenu-infrabel that the tenacity is actually also used in the KubernetesPodOperator, so I was wrong there that it's not being used by operators. But still I think it's better to use the retry mechanism implemented by the TaskInstance instead of bypassing it and doing it directly within the hook/operator, as imho that's not the purpose/ good practise but again I could be wrong.

Also regarding you retry question, the retry mechanism is implemented by default by the task handling in Airflow, that's why I want to avoid the usage of tenacity (I see this more as a hack or quick fix than an actual good solution) as there is already a solution for that in Airflow. So I you do the refactor as I suggested, the retry mechanism will work fine if the xcom_push will work when a task fails afterwards, so that something to be tested.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 6, 2025

Btw, I just (locally) implemented your refactor and the separation between the 2 flows (refresh trigger and get refresh status) in 2 deferrable triggers seems to let enough time between the refresh creation and the first "get refresh status" request. It seems that I don't get the error anymore (not that the bug is really fixed, but in my environment, the duration between both events is enough to prevent the error).

the operator can then retry the second case directly instead of redoing the whole flow

Regarding this, how/where would you retry the second case?

@dabla
Copy link
Contributor

dabla commented Jan 6, 2025

Btw, I just (locally) implemented your refactor and the separation between the 2 flows (refresh trigger and get refresh status) in 2 deferrable triggers seems to let enough time between the refresh creation and the first "get refresh status" request. It seems that I don't get the error anymore (not that the bug is really fixed, but in my environment, the duration between both events is enough to prevent the error).

the operator can then retry the second case directly instead of redoing the whole flow

Regarding this, how/where would you retry the second case?

Indeed, that's also a consequence of using the trigger twice, as more time will pass automatically between both invocations, the error will probably not occurs anymore and everything will succeed in one attempt.

For the second case, in the operator, you should do in the execute method an xcom_pull to see if there is an existing dataset_refresh_id or not, and if not you know you have to execute the whole flow, if it's there then you know you should only trigger the second part.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 6, 2025

For the second case, in the operator, you should do in the execute method an xcom_pull to see if there is an existing dataset_refresh_id or not, and if not you know you have to execute the whole flow, if it's there then you know you should only trigger the second part.

After a bit a investigation, I think this won't be possible to pass XCom messages between different task executions of the same operator (if this was what you meant). According to Airflow XComs documentation, "If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent.". From what I understand, it is not possible for a 2nd operator (in a task retry) to access the 1st task XCom message.
However, it should be possible to retry the operator "execute" function without retrying the task, as done in DmsStartReplicationOperator:

def retry_execution(self, context, event=None):
self.replication_config_arn = event.get("replication_config_arn")
self.log.info("Retrying replication %s.", self.replication_config_arn)
self.execute(context)

Though I'm not sure if this is what you'd prefer.

After looking at the codebase, I see retry mechanisms in some hooks (ex:BaseDatabricksHook, LivyHook, GlueJobHook, EmrContainerHook...), handlers (ex: S3TaskHandler) and in some operators as well (ex: the Kubernetes operators as your colleague pointed out, or some AWS operators such as BedrockCustomizeModelOperator, DmsDeleteReplicationConfigOperator...). I didn't look at all the files, there must be others.
The retry can be used to wait for a status (ex: hook ElastiCacheReplicationGroupHook, method wait_for_availability, link) or to wait for an API request to succeed. In our case, I think that we can fall in these cases: We could add a retry in PowerBIHook.get_refresh_details_by_refresh_id before raising the PowerBIDatasetRefreshException?

Not sure which solution is best to be honest. What do you think?

  • Should I set the retry within the operator? with a retry_execution method? or a while loop in the execute method? (or else?)
  • Should I handle the retry in the existing while loop from the trigger?
  • Or should I set a retry in the hook directly?

@dabla
Copy link
Contributor

dabla commented Jan 7, 2025

However, it should be possible to retry the operator "execute" function without retrying the task, as done in DmsStartReplicationOperator

You're right about the Xcoms being flushed when a task fails, I like the idea/above of approach, by define a dedicated retry method which is passed as argument of the next_method of the Trigger, that could be an option. If it's too difficult, then I would do what @ambika-garg proposed and I think you also proposed at the beginning of this issue.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 7, 2025

Hi @dabla
I quickly tested what we said, so that the triggers return to execute_complete method, which itself checks the message and if it detects that there was an issue fetching the refresh Id, it redirects to a retry_execution method that counts the retries and re-executes the execute method. Of course the code is not very clean, but here is an overview:

def execute(self, context: Context):
    """Refresh the Power BI Dataset."""
    if self.wait_for_termination:
        self.defer(
            trigger=PowerBITrigger(
                conn_id=self.conn_id,
                group_id=self.group_id,
                dataset_id=self.dataset_id,
                timeout=self.timeout,
                proxies=self.proxies,
                api_version=self.api_version,
                check_interval=self.check_interval,
                wait_for_termination=self.wait_for_termination,
            ),
            method_name=self.get_refresh_status.__name__,
        )

def get_refresh_status(self, context: Context, event: dict[str, str] | None = None):
    """Push the refresh Id to XCom then runs the Triggers to wait for refresh completion."""

    if event:
        if event["status"] == "error" and "Unable to fetch the details of dataset refresh with Request Id" not in event["message"] and "not found" not in event["message"]:
            raise AirflowException(event["message"])

        self.xcom_push(context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"])

    dataset_refresh_id = self.xcom_pull(context=context, key="powerbi_dataset_refresh_Id")
    if dataset_refresh_id:
        self.defer(
            trigger=PowerBITrigger(
                conn_id=self.conn_id,
                group_id=self.group_id,
                dataset_id=self.dataset_id,
                dataset_refresh_id=dataset_refresh_id,
                timeout=self.timeout,
                proxies=self.proxies,
                api_version=self.api_version,
                check_interval=self.check_interval,
                wait_for_termination=self.execute_complete,
            ),
            method_name=self.execute_complete.__name__,
        )

def retry_execution(self, context: Context):
    retries = self.xcom_pull(context=context, key="retries")
    if retries and retries >= self.max_retries:
        raise AirflowException("Max number of retries reached!")

    if not retries:
        retries = 0
    self.xcom_push(context=context, key="retries", value=retries+1)

    self.get_refresh_status(context)

def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
    """
    Return immediately - callback for when the trigger fires.

    Relies on trigger to throw an exception, otherwise it assumes execution was successful.
    """
    if event:
        if event["status"] == "error":
            if "Unable to fetch the details of dataset refresh with Request Id" in event["message"] or "not found" in event["message"]:
                self.retry_execution(context)
            else:
                raise AirflowException(event["message"])

        self.xcom_push(context=context, key="powerbi_dataset_refresh_status", value=event["status"])

Note: in addition to these changes, we should add a new way to handle the refresh cancellation. By default, if the trigger encounters an exception, it cancels the refresh (which is not compatible with the retry made by the operator). If we keep this solution, we have to change this behavior.

I think this solution can work but might add a little too much complexity to the operator compared to a simple retry, though I think that this separation between the trigger refresh and the status fetch is nice.
What's your opinion?

@dabla
Copy link
Contributor

dabla commented Jan 8, 2025

The separation of the 2 flows handled by the PowerBITriggerer is a good thing indeed, so the effort there is not lost and is in fact a cleaner design. Now if it's to complex to handle the retry within the operator, then I propose you use the tenacity on that refresh_history method of the PowerBIHook.

@Ohashiro
Copy link
Contributor Author

Ohashiro commented Jan 9, 2025

Hi @dabla
I opened a PR with the following changes: the separation between the 2 flows and the retry on the func fetch_refresh_status_and_error.
I'd be happy to have your feedbacks on this PR when you have some time !

@dabla
Copy link
Contributor

dabla commented Jan 10, 2025

Hi @dabla I opened a PR with the following changes: the separation between the 2 flows and the retry on the func fetch_refresh_status_and_error. I'd be happy to have your feedbacks on this PR when you have some time !

@Ohashiro Just reviewed it, looks good to me, nice job!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug provider:microsoft-azure Azure-related issues
Projects
None yet
4 participants