Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unify streaming and batch OVER window #10021

Closed
1 task done
zhenzhongxu opened this issue Sep 4, 2024 · 4 comments
Closed
1 task done

feat: unify streaming and batch OVER window #10021

zhenzhongxu opened this issue Sep 4, 2024 · 4 comments
Labels
feature Features or general enhancements

Comments

@zhenzhongxu
Copy link
Contributor

zhenzhongxu commented Sep 4, 2024

Is your feature request related to a problem?

Ibis has a streaming specific over window:

over_window_streaming = bid_table.filter(_ is not None)[_.price.mean().over(range=(-ibis.interval(seconds=10), 0), order_by=_.datetime).name("avg_price")]

and a batch OVER window:

over_window_batch = bid_table.filter(_ is not None).mutate(avg_price=_.price.mean().over(rows=(-2,0), order_by=_.datetime))

which generates different expression trees. The syntax is only different on minor details (mutate + time range vs row range). Are there opportunities for converging the two APIs to avoid user confusions. (I am putting myself in the users shoes).

This issue is originally surfaced in ibis-project/ibis-substrait#1117

What is the motivation behind your request?

Improve stream/batch unification on Ibis

Describe the solution you'd like

I'd like to see a single unified API for both batch and streaming OVER window, with backend specific rewrite logic into expressions that can work across.

What version of ibis are you running?

9.0.0

What backend(s) are you using, if any?

DuckDB, Flink

Code of Conduct

  • I agree to follow this project's Code of Conduct
@zhenzhongxu zhenzhongxu added the feature Features or general enhancements label Sep 4, 2024
@ncclementi
Copy link
Contributor

@zhenzhongxu can you provide a minimal reproducible example so we can try to run both and explore the APIs. I wonder if these are just different ways of achieving the same goal, rather than an inconsistency.

In the meantime maybe @chloeh13q has some input here, since I believe she worked on the window operation support for flink.

@zhenzhongxu
Copy link
Contributor Author

zhenzhongxu commented Sep 5, 2024

@ncclementi thanks for the note. I looked deeper and realized the range-based window (streaming) and row-based window (batch) already have a unified API and expression tree. I had an invalid assumption initially from confusing with the TVFs. We can go ahead and resolve this ticket for now.

import ibis
from ibis import _

bid_schema = ibis.schema(
    {
        "auction": "int64",
        "bidder": "int64",
        "price": "float64",
        "datetime": "timestamp(3)"
    }
)
bid_table = ibis.table(name="Bid", schema=bid_schema)

ranged based / streaming:
image

row based / batch:
image

@github-project-automation github-project-automation bot moved this from backlog to done in Ibis planning and roadmap Sep 5, 2024
@zhenzhongxu
Copy link
Contributor Author

zhenzhongxu commented Sep 5, 2024

On another note, Ibis generates the correct expression for row-based and ranged-based OVER window. It looks like ibis-substrate errors out generating the corresponding substrait plan for the range window. @gforsyth looks like this is a separate issue from the one we discovered in the original ibis-substrait thread. I'll move the discussion to downstream again, we can close this ticket.

w = ibis.window(group_by=_.auction, range=(-ibis.interval(seconds=10), 0))
window_avg_price = bid_table.mutate(avg_price=_.price.mean().over(w))
compiler.compile(window_avg_price)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], [line 1](vscode-notebook-cell:?execution_count=4&line=1)
----> [1](vscode-notebook-cell:?execution_count=4&line=1) plan = compiler.compile(window_avg_price)
      [2](vscode-notebook-cell:?execution_count=4&line=2) with open("window_avg_price.proto", "wb") as f:
      [3](vscode-notebook-cell:?execution_count=4&line=3)     f.write(plan.SerializeToString())

File /opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:222, in SubstraitCompiler.compile(self, expr, **kwargs)
    [217](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:217) from .translate import translate
    [219](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:219) expr_schema = expr.schema()
    [220](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:220) rel = stp.PlanRel(
    [221](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:221)     root=stalg.RelRoot(
--> [222](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:222)         input=translate(expr.op(), compiler=self, **kwargs),
    [223](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:223)         names=translate(expr_schema).names,
    [224](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:224)     )
    [225](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:225) )
    [226](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:226) ver = vparse(__substrait_version__)
    [227](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:227) return stp.Plan(
    [228](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:228)     version=stp.Version(
    [229](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:229)         major_number=ver.major,
   (...)
    [256](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:256)     relations=[rel],
    [257](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:257) )

File /opt/conda/envs/composable-data-arch/lib/python3.12/functools.py:909, in singledispatch.<locals>.wrapper(*args, **kw)
...
    [442](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/translate.py:442)     return translate_preceding(boundary.value.value)  # type: ignore
    [443](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/translate.py:443) else:
--> [444](https://vscode-remote+codespaces-002bsolid-002dsniffle-002d6r5vjvr5x7c555p.vscode-resource.vscode-cdn.net/opt/conda/envs/composable-data-arch/lib/python3.12/site-packages/ibis_substrait/compiler/translate.py:444)     return translate_following(boundary.value.value)

AttributeError: 'Cast' object has no attribute 'value'

@zhenzhongxu
Copy link
Contributor Author

Raised ibis-project/ibis-substrait#1130

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements
Projects
Archived in project
Development

No branches or pull requests

2 participants