MRR View

Computes monthly recurring revenue balances and movements from revenue changes and starting customer balances.

Purpose

This worker populates the MRR View facts table with derived MRR balances and movement metrics.

It translates customer-level revenue changes into MRR-level metrics, producing a consistent monthly view of:

  • Net New MRR
  • Cumulative MRR movements
  • Starting MRR balance
  • Ending MRR balance

The output serves as the foundational revenue fact used by downstream transformations such as ARR, Retention, GTM Efficiency, or LTV.

Inputs

The worker consumes two dataframes provided by Omora Labs database interfaces:

External Data Source

None.

Work

The worker executes the following steps:

  1. Convert customers to MRR
  2. Compute net new MRR per period
  3. Accumulate MRR movements
  4. Derive balances
  5. Materialize outputs
import polars as pl


def generate_net_new_customers(df: pl.DataFrame) -> pl.DataFrame:
    return df.group_by(["date", "value_type_id"]).agg(
        (
            pl.col("total_mrr")
            .filter(pl.col("revenue_type_id") == 1)
            .first()
            .fill_null(0)  # New Bookings (0 if missing)
            + pl.col("total_mrr")
            .filter(pl.col("revenue_type_id") == 2)
            .first()
            .fill_null(0)  # Upsell (0 if missing)
            - pl.col("total_mrr")
            .filter(pl.col("revenue_type_id") == 3)
            .first()
            .fill_null(0)  # Downsell (0 if missing)
            - pl.col("total_mrr")
            .filter(pl.col("revenue_type_id") == 4)
            .first()
            .fill_null(0)  # Churn (0 if missing)
        ).alias("net_new_mrr")
    )


def generate_cum_net_new(df: pl.DataFrame) -> pl.DataFrame:
    return df.sort("date").with_columns(
        (pl.col("net_new_mrr").cum_sum().over("value_type_id")).alias("cum_net_new_mrr")
    )


def generate_current_and_ending_balance(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        [
            pl.col("cum_net_new_mrr")
            .shift(1)
            .over("value_type_id")
            .fill_null(0)
            .add(pl.col("total_mrr"))
            .alias("current_base"),
            pl.col("cum_net_new_mrr").add(pl.col("total_mrr")).alias("ending_balance"),
        ]
    )


def get_total_mrr(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        (pl.col("nr_of_customers") * pl.col("mrr_per_customer")).alias("total_mrr")
    )


def run_worker(
    revenue_info_df: pl.DataFrame, starting_balance_df: pl.DataFrame
) -> tuple:
    """
    Calculates mrr metrics from revenue changes and starting balance.

    Args:
        revenue_info_df: Revenue changes with columns: date, revenue_type_id, value_type_id, nr_of_customers
        starting_balance_df: Starting customer counts with columns: value_type_id, nr_of_customers

    Returns:
        DataFrame with columns: date, value_type_id, net_new_mrr, cum_net_new_mrr, current_base, ending_balance
    """

    revenue_mrr = get_total_mrr(revenue_info_df)
    revenue_changes_mrr = revenue_mrr.select(
        "date", "revenue_type_id", "value_type_id", "total_mrr"
    )

    starting_balance_with_total_mrr = get_total_mrr(starting_balance_df)
    starting_mrr = starting_balance_with_total_mrr.select(
        "date", "value_type_id", "total_mrr"
    )

    df_with_new_mrr = generate_net_new_customers(revenue_mrr)
    df_with_cum_new = generate_cum_net_new(df_with_new_mrr)

    df_with_starting = df_with_cum_new.join(
        starting_balance_with_total_mrr.select("value_type_id", "total_mrr"),
        on="value_type_id",
        how="left",
    )

    ending_df = generate_current_and_ending_balance(df_with_starting)

    mrr_view = ending_df.select(
        "date",
        "value_type_id",
        "net_new_mrr",
        "cum_net_new_mrr",
        "current_base",
        "ending_balance",
    )

    return mrr_view, revenue_changes_mrr, starting_mrr

Orchestrator

A simple orchestrator suffices for handling the inputs and the worker:

from .worker import run_worker


def orchestrator(db) -> None:
    """
    Worker orchestrator
    """
    revenue_info_df = db.get_revenue_changes_df()
    starting_balance_df = db.get_starting_balance_df()
    mrr_view, revenue_changes_mrr, starting_mrr = run_worker(
        revenue_info_df, starting_balance_df
    )

    db.insert_pl_dataframe(mrr_view, "mrr_view")
    db.update_mrr_rev_changes(revenue_changes_mrr)
    db.update_mrr_starting_balance(starting_mrr)

Output

The worker produces three datasets:

On this page

Star us on GitHub

If you enjoy this content and find it useful, give us a star on GitHub to show your support and stay updated with our latest improvements.