Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: The Python DoOutputsTuple force Tagged PCollection to is_bounded=True #29196

Closed
1 of 16 tasks
ad-momo opened this issue Oct 30, 2023 · 4 comments · Fixed by #29506
Closed
1 of 16 tasks

[Bug]: The Python DoOutputsTuple force Tagged PCollection to is_bounded=True #29196

ad-momo opened this issue Oct 30, 2023 · 4 comments · Fixed by #29506
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@ad-momo
Copy link

ad-momo commented Oct 30, 2023

What happened?

When you apply a beam.Partition on a unbounded PCollection, the tagged PCollections from DoOutputsTuple have is_bounded=True.

Normally, if the source is unbounded then the Pcollections from partition are unbounded too, no ?

pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ad-momo
Copy link
Author

ad-momo commented Oct 30, 2023

A workaround waiting for support response :

# The Taggedoutput have to have same is_bounded state like the source
initial_get_item = DoOutputsTuple.__getitem__


def new_get_item(self: DoOutputsTuple, tag: int | str | None):
    tag_not_in_pcolls = tag not in self._pcolls

    pcoll = initial_get_item(self, tag)

    if tag_not_in_pcolls:
        assert self.producer is not None
        pval = self.producer.parts[0].outputs[None]

        # pass bounded state
        pcoll.is_bounded = pval.is_bounded

    return pcoll


DoOutputsTuple.__getitem__ = new_get_item

@damccorm
Copy link
Contributor

Looks like we output a DoOutputsTuple here -

return pvalue.DoOutputsTuple(
but in that object we have no logic for saying if a transform is bounded -
pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)

I think we should be able to iterate over self.producer.main_inputs to make a determination about boundedness

@ad-momo
Copy link
Author

ad-momo commented Nov 21, 2023

Thanks for response and fix 😃

In my workaround, I supposed the tagged output have the same bounded state as the main output self.producer.parts[0].outputs[None] because I don’t see any case contrary to that.

By the way, I don't know how to explain why the main output has the correct bounded state 😮

@github-actions github-actions bot added this to the 2.53.0 Release milestone Nov 21, 2023
@tvalentyn
Copy link
Contributor

@damccorm Thanks for investigating and fixing this!

@tvalentyn tvalentyn added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants