Coverage for src/ipyvizzu/integrations/fugue.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-10 09:30 +0000

1""" 

2A module for Fugue integration. 

3 

4Example: 

5 Users should not instantiate this module directly. As long as you 

6 installed fugue and ipyvizzu, the extension is auto-registered. 

7 

8 from fugue import fsql 

9 

10 fsql(''' 

11 SELECT a, SUM(b) AS b FROM spark.table 

12 GROUP BY a ORDER BY b 

13 

14 OUTPUT USING vizzu:bar(x="a", y="b", title="title") 

15 ''').run(spark_session) 

16""" 

17 

18from typing import Any, Dict, Tuple 

19 

20import pandas as pd # type: ignore 

21from fugue import DataFrames, Outputter # type: ignore 

22from fugue.exceptions import FugueWorkflowError # type: ignore 

23from fugue.extensions import namespace_candidate # type: ignore 

24from fugue.plugins import parse_outputter # type: ignore 

25from triad import assert_or_throw # type: ignore 

26 

27from ipyvizzu import Chart, Config, Data, DisplayTarget 

28 

29_TIMELINE_DEFAULT_CONF: Dict[str, Any] = dict( # pylint: disable=use-dict-literal 

30 show={"delay": 0}, 

31 hide={"delay": 0}, 

32 title={"duration": 0, "delay": 0}, 

33 duration=0.5, 

34) 

35 

36 

37class _Visualize(Outputter): 

38 """ 

39 A Fugue outputter extension (majorly for Fugue SQL) 

40 

41 Args: 

42 func: 

43 A function name of [Config][ipyvizzu.animation.Config] 

44 category: 

45 Can be preset or timeline 

46 """ 

47 

48 def __init__(self, func: str, category: str) -> None: 

49 super().__init__() 

50 self._category = category 

51 self._func = getattr(Config, func) 

52 

53 def process(self, dfs: DataFrames) -> None: 

54 assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) 

55 df = dfs[0].as_pandas() # pylint: disable=invalid-name 

56 if self._category == "timeline": 

57 self._process_timeline(df) 

58 else: 

59 self._process_preset(df) 

60 

61 def _process_preset(self, df: pd.DataFrame) -> None: # pylint: disable=invalid-name 

62 data = Data() 

63 data.add_data_frame(df) 

64 chart = Chart(display=DisplayTarget.END) 

65 chart.animate(data) 

66 chart.animate(self._func(dict(self.params))) 

67 

68 def _process_timeline( 

69 self, df: pd.DataFrame # pylint: disable=invalid-name 

70 ) -> None: 

71 _p = dict(self.params) 

72 _pc = dict(_p.pop("config", {})) 

73 title = _pc.pop("title", "%s") 

74 key = _p.pop("by") 

75 conf = dict(_TIMELINE_DEFAULT_CONF) 

76 conf.update(_p) 

77 

78 data = Data() 

79 chart = Chart(display=DisplayTarget.END) 

80 keys = df[key].unique() 

81 keys.sort() 

82 idx = pd.DataFrame({"_idx": range(len(keys)), key: keys}) 

83 df = df.sort_values(key).merge(idx) 

84 data.add_data_frame(df) 

85 chart.animate(data) 

86 

87 for i, k in enumerate(keys): 

88 _p2 = dict(_pc) 

89 _p2["title"] = (title % k) if "%s" in title else title 

90 chart.animate(Data.filter(f"record._idx == {i}"), self._func(_p2), **conf) 

91 

92 

93@parse_outputter.candidate(namespace_candidate("vizzu", lambda x: isinstance(x, str))) 

94def _parse_vizzu(obj: Tuple[str, str]) -> Outputter: 

95 if obj[1].startswith("timeline_"): 

96 return _Visualize(obj[1].split("_", 1)[1], "timeline") 

97 return _Visualize(obj[1], "preset")