1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use polars_core::prelude::*;
use rayon::prelude::*;

use super::*;
use crate::physical_plan::planner::create_physical_expr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

pub trait ExprEvalExtension: IntoExpr + Sized {
    /// Run an expression over a sliding window that increases `1` slot every iteration.
    ///
    /// # Warning
    /// this can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
    /// that visit all elements.
    fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Expr {
        let this = self.into_expr();
        let expr2 = expr.clone();
        let func = move |mut s: Series| {
            let name = s.name().to_string();
            s.rename("");
            let expr = expr.clone();
            let mut arena = Arena::with_capacity(10);
            let aexpr = to_aexpr(expr, &mut arena);
            let phys_expr = create_physical_expr(aexpr, Context::Default, &arena, None)?;

            let state = ExecutionState::new();

            let finish = |out: Series| {
                if out.len() > 1 {
                    Err(PolarsError::ComputeError(
                        format!(
                            "expected single value, got a result with length: {}, {:?}",
                            out.len(),
                            out
                        )
                        .into(),
                    ))
                } else {
                    Ok(out.get(0).unwrap().into_static().unwrap())
                }
            };

            let avs = if parallel {
                (1..s.len() + 1)
                    .into_par_iter()
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            let df = DataFrame::new_no_checks(vec![s]);
                            let out = phys_expr.evaluate(&df, &state)?;
                            finish(out)
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            } else {
                let mut df_container = DataFrame::new_no_checks(vec![]);
                (1..s.len() + 1)
                    .map(|len| {
                        let s = s.slice(0, len);
                        if (len - s.null_count()) >= min_periods {
                            df_container.get_columns_mut().push(s);
                            let out = phys_expr.evaluate(&df_container, &state)?;
                            df_container.get_columns_mut().clear();
                            finish(out)
                        } else {
                            Ok(AnyValue::Null)
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?
            };
            Ok(Series::new(&name, avs))
        };

        this.apply(
            func,
            GetOutput::map_field(move |f| {
                // dummy df to determine output dtype
                let dtype = f
                    .data_type()
                    .inner_dtype()
                    .cloned()
                    .unwrap_or_else(|| f.data_type().clone());

                let df = Series::new_empty("", &dtype).into_frame();

                #[cfg(feature = "python")]
                let out = {
                    use pyo3::Python;
                    Python::with_gil(|py| {
                        py.allow_threads(|| df.lazy().select([expr2.clone()]).collect())
                    })
                };
                #[cfg(not(feature = "python"))]
                let out = { df.lazy().select([expr2.clone()]).collect() };

                match out {
                    Ok(out) => {
                        let dtype = out.get_columns()[0].dtype();
                        Field::new(f.name(), dtype.clone())
                    }
                    Err(_) => Field::new(f.name(), DataType::Null),
                }
            }),
        )
        .with_fmt("expanding_eval")
    }
}

impl ExprEvalExtension for Expr {}