Skip to content
User-Defined Functions

Extend Delta Forge with Custom Logic

Write scalar functions, aggregate functions, table functions, and window functions in your language of choice. Native performance with Python and JavaScript flexibility.

Function Types

Complete UDF taxonomy for every analytical need

f(x)

Scalar Functions

Process one row, return one value. The foundation of data transformation.

  • Row-at-a-time semantics
  • Vectorized batch execution
  • Null propagation options
  • Deterministic marking
SELECT my_transform(column) FROM table
Σ

Aggregate Functions

Combine multiple rows into a single result. Build custom aggregations.

  • State accumulation
  • Partial aggregation support
  • Merge function for parallelism
  • DISTINCT handling
SELECT my_agg(value) FROM table GROUP BY key

Window Functions

Compute over sliding windows of rows. Advanced analytics made simple.

  • Frame specification support
  • Partition-aware execution
  • Order-dependent computation
  • Peer row access
SELECT my_window(value) OVER (PARTITION BY key ORDER BY ts)

Table Functions

Generate multiple rows from a single input. Explode, flatten, and transform.

  • Multi-row output
  • Schema inference
  • Lateral join support
  • Streaming output
SELECT * FROM table, LATERAL my_udtf(column)

Multi-Language Support

Write UDFs in the language that best fits your use case

🐍

Python

Ecosystem Access

Leverage the Python ecosystem. NumPy, Pandas, scikit-learn, and more available in your UDFs.

  • Arrow-native PyArrow integration
  • Pandas UDF support
  • NumPy vectorization
  • ML library access
@udf(return_type=Float64)
def predict_score(features: List[float]) -> float:
    return model.predict([features])[0]
JS

JavaScript

Lightweight

Fast iteration with JavaScript. Perfect for business logic and quick transformations.

  • V8 engine integration
  • TypeScript support
  • JSON-native operations
  • Fast startup time
function formatAddress(street, city, zip) {
    return `${street}, ${city} ${zip}`;
}
SQL

SQL

Portable

Define functions using pure SQL expressions. Maximum portability and readability.

  • No external dependencies
  • Optimizer-friendly
  • Inline expansion
  • Easy maintenance
CREATE FUNCTION calculate_tax(amount DECIMAL)
RETURNS DECIMAL AS
    amount * 0.0825

Scalar UDF Architecture

How scalar functions are compiled and executed

Function Definition
Name & Signature Parameter Types Return Type Volatility
Registration
Schema Binding Type Validation Overload Resolution Catalog Entry
Query Planning
Expression Binding Type Coercion Constant Folding Null Handling
Execution
Batch Invocation Arrow Array I/O Error Handling Result Buffering

Aggregate UDF Implementation

Build stateful aggregations with full parallelism support

State Type

Define the accumulator state structure

struct WeightedAvgState { sum: f64, weight_sum: f64 }

initialize()

Create initial empty state

fn initialize() -> State { State { sum: 0.0, weight_sum: 0.0 } }

update(state, input)

Accumulate a single row into state

fn update(state: &mut State, value: f64, weight: f64) { state.sum += value * weight; state.weight_sum += weight; }

merge(state1, state2)

Combine two partial states (for parallelism)

fn merge(s1: &mut State, s2: State) { s1.sum += s2.sum; s1.weight_sum += s2.weight_sum; }

finalize(state)

Compute final result from accumulated state

fn finalize(state: State) -> f64 { state.sum / state.weight_sum }
Complete Aggregate UDF Example
use delta_forge_udf::{AggregateUDF, State};

#[derive(AggregateUDF)]
#[udf(name = "weighted_avg", return_type = "Float64")]
struct WeightedAverage;

#[derive(Default, Clone)]
struct WeightedAvgState {
    sum: f64,
    weight_sum: f64,
}

impl AggregateUDF for WeightedAverage {
    type State = WeightedAvgState;

    fn update(state: &mut Self::State, value: f64, weight: f64) {
        if weight > 0.0 {
            state.sum += value * weight;
            state.weight_sum += weight;
        }
    }

    fn merge(state: &mut Self::State, other: Self::State) {
        state.sum += other.sum;
        state.weight_sum += other.weight_sum;
    }

    fn finalize(state: Self::State) -> Option {
        if state.weight_sum > 0.0 {
            Some(state.sum / state.weight_sum)
        } else {
            None
        }
    }
}

// Usage in SQL:
// SELECT category, weighted_avg(price, quantity) FROM sales GROUP BY category

Table-Valued Functions

Generate multiple rows from custom logic

Row Generation

  • Emit 0 to N rows per input
  • Dynamic schema based on input
  • Streaming row production
  • Memory-efficient iteration

Schema Definition

  • Static return schema
  • Dynamic schema inference
  • Polymorphic return types
  • Nested struct output

Common Patterns

  • Array/JSON explosion
  • Range generation
  • External data fetch
  • File parsing

Execution Modes

  • CROSS APPLY semantics
  • OUTER APPLY for null handling
  • Correlated invocation
  • Parallel partition execution
Table Function Example: JSON Explode
-- Table function that extracts key-value pairs from JSON
CREATE FUNCTION json_entries(json_col JSON)
RETURNS TABLE(key VARCHAR, value VARCHAR, type VARCHAR)
AS $$
    for key, value in json_col.items():
        yield (key, str(value), type(value).__name__)
$$;

-- Usage: Extract all JSON fields into rows
SELECT t.id, e.key, e.value
FROM events t
CROSS JOIN LATERAL json_entries(t.metadata) AS e
WHERE e.type = 'str';

Performance Optimization

Techniques for maximum UDF throughput

🚀

Vectorized Execution

Process entire Arrow arrays instead of row-by-row. Amortize function call overhead across thousands of values.

fn eval_batch(input: &ArrayRef) -> ArrayRef
♻️

State Reuse

Allocate buffers once and reuse across batches. Avoid allocation in hot paths.

fn eval_with_state(&self, state: &mut EvalState, ...)
📦

Null Optimization

Skip null values efficiently using validity bitmaps. Propagate nulls without evaluation.

propagate_nulls: true
🔄

SIMD Kernels

Write SIMD-optimized inner loops for numeric operations. 8x speedup with AVX-512.

#[target_feature(enable = "avx512f")]
📊

Dictionary Awareness

Operate on dictionary indices for low-cardinality string columns. Avoid string comparisons.

fn eval_dict(keys: &UInt32Array, dict: &StringArray)

Lazy Evaluation

Short-circuit evaluation for conditional expressions. Skip unnecessary computation.

volatility: Volatile::Stable

Type System Integration

Full type inference and coercion support

Automatic Type Coercion

Arguments automatically coerced to expected types following SQL standard rules.

my_func(INT) called with SMALLINT → automatic upcast

Polymorphic Functions

Define functions that work with multiple input types using type variables.

fn first_non_null<T>(a: T, b: T) -> T

Variadic Arguments

Accept variable number of arguments of the same type.

fn concat_all(strings: VARIADIC VARCHAR) -> VARCHAR

Return Type Inference

Return type computed from input types at planning time.

fn add(a: NUMERIC, b: NUMERIC) -> max_precision(a, b)

Extend Delta Forge with your custom logic

Native performance. Python flexibility. Infinite possibilities.