Skip to content

Converter

ipyvizzu.data.converters.spark.converter

This module provides the SparkDataFrameConverter class, which allows converting a pyspark DataFrame into a list of dictionaries representing series.

ipyvizzu.data.converters.spark.converter.SparkDataFrameConverter

Bases: DataFrameConverter

Converts a pyspark DataFrame into a list of dictionaries representing series. Each dictionary contains information about the series name, values and type.

Parameters:

Name Type Description Default
df DataFrame

The pyspark DataFrame to convert.

required
default_measure_value MeasureValue

Default value to use for missing measure values. Defaults to 0.

NAN_MEASURE
default_dimension_value DimensionValue

Default value to use for missing dimension values. Defaults to an empty string.

NAN_DIMENSION
max_rows int

The maximum number of rows to include in the converted series list. If the df contains more rows, a random sample of the given number of rows (approximately) will be taken.

MAX_ROWS
Example

Get series list from DataFrame columns:

converter = SparkDataFrameConverter(df)
series_list = converter.get_series_list()
Source code in src/ipyvizzu/data/converters/spark/converter.py
class SparkDataFrameConverter(DataFrameConverter):
    """
    Converts a `pyspark` `DataFrame` into a list of dictionaries representing series.
    Each dictionary contains information about the series `name`, `values` and `type`.

    Parameters:
        df: The `pyspark` `DataFrame` to convert.
        default_measure_value:
            Default value to use for missing measure values. Defaults to 0.
        default_dimension_value:
            Default value to use for missing dimension values. Defaults to an empty string.
        max_rows: The maximum number of rows to include in the converted series list.
            If the `df` contains more rows,
            a random sample of the given number of rows (approximately) will be taken.

    Example:
        Get series list from `DataFrame` columns:

            converter = SparkDataFrameConverter(df)
            series_list = converter.get_series_list()
    """

    # pylint: disable=too-few-public-methods

    def __init__(
        self,
        df: "pyspark.sql.DataFrame",  # type: ignore
        default_measure_value: MeasureValue = NAN_MEASURE,
        default_dimension_value: DimensionValue = NAN_DIMENSION,
        max_rows: int = MAX_ROWS,
        units: Optional[Dict[str, str]] = None,
    ) -> None:
        # pylint: disable=too-many-arguments

        super().__init__(
            default_measure_value, default_dimension_value, max_rows, units
        )
        self._pyspark, self._pyspark_func = self._get_pyspark()
        self._df = self._get_sampled_df(df)

    def _get_pyspark(self) -> Tuple[ModuleType, ModuleType]:
        try:
            import pyspark  # pylint: disable=import-outside-toplevel
            from pyspark.sql import functions  # pylint: disable=import-outside-toplevel

            return pyspark, functions
        except ImportError as error:
            raise ImportError(
                "pyspark is not available. Please install pyspark to use this feature."
            ) from error

    def _get_sampled_df(
        self, df: "pyspark.sql.DataFrame"  # type: ignore
    ) -> "pyspark.sql.DataFrame":  # type: ignore
        row_number = df.count()
        if self._is_max_rows_exceeded(row_number):
            fraction = self._max_rows / row_number
            sample_df = df.sample(withReplacement=False, fraction=fraction, seed=42)
            return sample_df.limit(self._max_rows)
        return df

    def _get_columns(self) -> List[str]:
        return self._df.columns

    def _convert_to_series_values_and_type(
        self, obj: str
    ) -> Tuple[SeriesValues, InferType]:
        column_name = obj
        column = self._df.select(column_name)
        integer_type = self._pyspark.sql.types.IntegerType
        double_type = self._pyspark.sql.types.DoubleType
        if isinstance(column.schema[column_name].dataType, (integer_type, double_type)):
            return self._convert_to_measure_values(column_name), InferType.MEASURE
        return self._convert_to_dimension_values(column_name), InferType.DIMENSION

    def _convert_to_measure_values(self, obj: str) -> List[MeasureValue]:
        column_name = obj
        func = self._pyspark_func
        df = self._df.withColumn(
            column_name,
            func.when(
                func.col(column_name).isNull(), self._default_measure_value
            ).otherwise(func.col(column_name)),
        )
        df_rdd = (
            df.withColumn(column_name, func.col(column_name).cast("float"))
            .select(column_name)
            .rdd
        )
        return df_rdd.flatMap(list).collect()

    def _convert_to_dimension_values(self, obj: str) -> List[DimensionValue]:
        column_name = obj
        func = self._pyspark_func
        df = self._df.withColumn(
            column_name,
            func.when(
                func.col(column_name).isNull(), self._default_dimension_value
            ).otherwise(func.col(column_name)),
        )
        df_rdd = (
            df.withColumn(column_name, func.col(column_name).cast("string"))
            .select(column_name)
            .rdd
        )
        return df_rdd.flatMap(list).collect()