Coverage for src/ipyvizzu/integrations/fugue.py: 100%
50 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 10:12 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 10:12 +0000
1"""
2A module for Fugue integration.
4Example:
5 Users should not instantiate this module directly. As long as you
6 installed fugue and ipyvizzu, the extension is auto-registered.
8 from fugue import fsql
10 fsql('''
11 SELECT a, SUM(b) AS b FROM spark.table
12 GROUP BY a ORDER BY b
14 OUTPUT USING vizzu:bar(x="a", y="b", title="title")
15 ''').run(spark_session)
16"""
18from typing import Any, Dict, Tuple
20import pandas as pd
21from fugue import DataFrames, Outputter # type: ignore
22from fugue.exceptions import FugueWorkflowError
23from fugue.extensions import namespace_candidate # type: ignore
24from fugue.plugins import parse_outputter # type: ignore
25from triad import assert_or_throw # type: ignore
27from ipyvizzu import Chart, Config, Data, DisplayTarget
29_TIMELINE_DEFAULT_CONF: Dict[str, Any] = dict( # pylint: disable=use-dict-literal
30 show={"delay": 0},
31 hide={"delay": 0},
32 title={"duration": 0, "delay": 0},
33 duration=0.5,
34)
37class _Visualize(Outputter):
38 """
39 A Fugue outputter extension (majorly for Fugue SQL)
41 Args:
42 func:
43 A function name of [Config][ipyvizzu.animation.Config]
44 category:
45 Can be preset or timeline
46 """
48 def __init__(self, func: str, category: str) -> None:
49 super().__init__()
50 self._category = category
51 self._func = getattr(Config, func)
53 def process(self, dfs: DataFrames) -> None:
54 assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
55 df = dfs[0].as_pandas() # pylint: disable=invalid-name
56 if self._category == "timeline":
57 self._process_timeline(df)
58 else:
59 self._process_preset(df)
61 def _process_preset(self, df: pd.DataFrame) -> None: # pylint: disable=invalid-name
62 data = Data()
63 data.add_df(df)
64 chart = Chart(display=DisplayTarget.END)
65 chart.animate(data)
66 chart.animate(self._func(dict(self.params)))
68 def _process_timeline(
69 self, df: pd.DataFrame # pylint: disable=invalid-name
70 ) -> None:
71 _p = dict(self.params)
72 _pc = dict(_p.pop("config", {}))
73 title = _pc.pop("title", "%s")
74 key = _p.pop("by")
75 conf = dict(_TIMELINE_DEFAULT_CONF)
76 conf.update(_p)
78 data = Data()
79 chart = Chart(display=DisplayTarget.END)
80 keys = df[key].unique()
81 keys.sort()
82 idx = pd.DataFrame({"_idx": range(len(keys)), key: keys})
83 df = df.sort_values(key).merge(idx)
84 data.add_df(df)
85 chart.animate(data)
87 for i, key in enumerate(keys):
88 _p2 = dict(_pc)
89 _p2["title"] = (title % key) if "%s" in title else title
90 chart.animate(Data.filter(f"record._idx == {i}"), self._func(_p2), **conf)
93@parse_outputter.candidate(namespace_candidate("vizzu", lambda x: isinstance(x, str)))
94def _parse_vizzu(obj: Tuple[str, str]) -> Outputter:
95 if obj[1].startswith("timeline_"):
96 return _Visualize(obj[1].split("_", 1)[1], "timeline")
97 return _Visualize(obj[1], "preset")