Skip to content

Median In A Stream

#include <bits/stdc++.h> // Includes common standard libraries (vector, iostream, queue for priority_queue).
using namespace std; // Use the standard namespace.
// Helper function to compare two integers and return a signum-like value.
// Used to determine the relative sizes of the two heaps.
// Returns:
// 0 if a == b
// 1 if a > b
// -1 if a < b
int signum(int a, int b) {
return (a == b) ? 0 : (a > b ? 1 : -1);
}
// Core function to insert an element into the appropriate heap and update the median.
// 'element': The new number from the stream.
// 'maxHeap': The Max Heap (stores smaller half of numbers).
// 'minHeap': The Min Heap (stores larger half of numbers).
// 'median': Reference to the current median, which will be updated.
void callMedian(int element, priority_queue<int> &maxHeap, priority_queue<int, vector<int>, greater<int> > &minHeap, int &median) {
// Use a switch statement based on the size difference of the heaps.
switch(signum(maxHeap.size(), minHeap.size())) {
// Case 0: Both heaps have the same number of elements.
// This means total elements seen so far is EVEN. After adding 'element', total will be ODD.
case 0 :
if(element > median) { // If 'element' is greater than current median (or is the first element and positive).
minHeap.push(element); // Add it to the larger half.
median = minHeap.top(); // Median is now the top of the larger half.
} else { // If 'element' is less than or equal to current median.
maxHeap.push(element); // Add it to the smaller half.
median = maxHeap.top(); // Median is now the top of the smaller half.
}
break;
// Case 1: maxHeap has one more element than minHeap.
// This means total elements seen so far is ODD, and median was maxHeap.top().
// After adding 'element', total will be EVEN.
case 1 :
if(element > median) { // If 'element' belongs to the larger half.
minHeap.push(element); // Add directly to minHeap. Heaps become balanced.
} else { // If 'element' belongs to the smaller half.
// Move maxHeap's top (largest of smaller half) to minHeap to balance.
minHeap.push(maxHeap.top());
maxHeap.pop();
// Then add 'element' to maxHeap.
maxHeap.push(element);
}
// Median is the average of the tops of the now-balanced heaps.
median = (minHeap.top() + maxHeap.top()) / 2;
break;
// Case -1: minHeap has one more element than maxHeap.
// This means total elements seen so far is ODD, and median was minHeap.top().
// After adding 'element', total will be EVEN.
case -1 :
if(element > median) { // If 'element' belongs to the larger half.
// Move minHeap's top (smallest of larger half) to maxHeap to balance.
maxHeap.push(minHeap.top());
minHeap.pop();
// Then add 'element' to minHeap.
minHeap.push(element);
} else { // If 'element' belongs to the smaller half.
minHeap.push(maxHeap.top()); // This line should be: maxHeap.push(element);
// Corrected logic for this branch:
// maxHeap.push(element); // Add directly to maxHeap. Heaps become balanced.
// After adding element, if element is less than median.
// We need to shift minHeap's top to maxHeap for balancing then push element in maxHeap
// This seems like a slight logical error if element < median directly fills maxHeap and causes minHeap to be > maxHeap + 1.
// The provided code's logic is:
// if (element > median) (meaning element should go to minHeap, but minHeap is already larger)
// maxHeap.push(minHeap.top()); // Move smallest from large half to small half
// minHeap.pop();
// minHeap.push(element); // Add new element to large half
// else (element <= median) (meaning element should go to maxHeap, which is smaller)
// maxHeap.push(element); // Add element to maxHeap. Now sizes are equal.
// Re-evaluating the given code's Case -1:
// If element > median: (e.g., maxH:{1,2}, minH:{5,6,7}, med=6. element=8) -> needs rebalance
// maxHeap.push(minHeap.top()); // maxH:{1,2,5}, minH:{6,7}
// minHeap.pop(); // maxH:{1,2,5}, minH:{7}
// minHeap.push(element); // maxH:{1,2,5}, minH:{7,8}. Sizes now 3,2 -> Still unbalanced.
// This branch for 'element > median' in case -1 appears incorrect in the provided code.
// It should be:
// if(element > median) { // Element belongs to minHeap, but maxHeap is already smaller.
// maxHeap.push(minHeap.top()); // Move element from minHeap to maxHeap
// minHeap.pop();
// minHeap.push(element); // Push new element into minHeap
// } else { // Element belongs to maxHeap. maxHeap is already smaller.
// maxHeap.push(element); // Directly push into maxHeap. Now sizes balance.
// }
// I will comment based on the provided code, but highlight this potential logical issue.
maxHeap.push(minHeap.top()); // This line seems like a copy-paste error from the 'element > median' branch of the case.
// It should just push 'element' to maxHeap if element <= median.
minHeap.pop(); // This also seems extraneous for this branch.
minHeap.push(element); // This line should be: maxHeap.push(element); for 'else' branch.
// The current implementation is effectively pushing element to minHeap even if element <= median.
// Corrected logic based on standard two-heap median approach (for Case -1, element <= median):
// maxHeap.push(element); // Just push to the smaller heap (maxHeap) to balance.
// This makes sizes equal.
}
// Median is the average of the tops of the now-balanced heaps.
median = (minHeap.top() + maxHeap.top()) / 2;
break;
}
}
// Function to find the median of the array elements as they are processed (stream).
// 'arr': The input array/stream of numbers.
// 'N': The size of the array.
// Returns a vector containing the median after each element is processed.
// Time Complexity: O(N log N)
// Space Complexity: O(N)
vector<int> findMedian(vector<int>& arr, int N) {
vector<int> ans; // To store the median at each step.
// Max Heap: Stores the smaller half of numbers. std::priority_queue is max-heap by default.
priority_queue<int> maxHeap;
// Min Heap: Stores the larger half of numbers. 'greater<int>' makes it a min-heap.
priority_queue<int, vector<int>, greater<int> > minHeap;
int median = 0; // Initialize median. Will be updated with first element.
// Process each element in the array/stream.
for(int i = 0; i < N; i++) {
callMedian(arr[i], maxHeap, minHeap, median); // Call helper to update heaps and median.
ans.push_back(median); // Store the current median.
}
return ans;
}
int main() {
int N;
cout << "Enter the size of the array : ";
cin >> N;
vector<int> arr(N);
cout << "Enter the elements of array : ";
for(int i = 0; i < N; i++) {
cin >> arr[i]; // Read array elements.
}
vector<int> ans = findMedian(arr, N); // Get the medians after each element.
// Output the final median (median after all elements are processed).
// If you need all medians, iterate through 'ans' vector.
cout << "Median of the stream (final median) : " << ans[ans.size() - 1] << endl;
return 0;
}
/*
Example Trace (with potential correction applied to Case -1 for explanation):
Input arr: [12, 10, 15, 20, 5]
N: 5
Initial: maxHeap={}, minHeap={}, median=0, ans=[]
1. Element = 12:
signum(0,0) = 0.
12 > median (0). Push 12 to minHeap.
minHeap={12}, maxHeap={}
median = minHeap.top() = 12.
ans: [12]
2. Element = 10:
signum(0,1) = -1. (minHeap is larger)
10 < median (12). Push 10 to maxHeap.
minHeap={12}, maxHeap={10}
median = (10+12)/2 = 11.
ans: [12, 11]
3. Element = 15:
signum(1,1) = 0. (heaps balanced)
15 > median (11). Push 15 to minHeap.
minHeap={12, 15}, maxHeap={10}
median = minHeap.top() = 12.
ans: [12, 11, 12]
4. Element = 20:
signum(1,2) = -1. (minHeap is larger)
20 > median (12).
// Original code would move minHeap.top() to maxHeap, then push 20 to minHeap.
// This would make minHeap:{15,20}, maxHeap:{10,12}. Sizes 2,2. Median (12+15)/2 = 13.
// Let's follow the provided code:
maxHeap.push(minHeap.top()); // maxH:{10,12}, minH:{15}
minHeap.pop(); // maxH:{10,12}, minH:{} -> minH:{15} (error here, minHeap becomes empty)
minHeap.push(element); // maxH:{10,12}, minH:{15,20} (size 2,2)
median = (12+15)/2 = 13.5 (integer division 13).
ans: [12, 11, 12, 13]
5. Element = 5:
signum(2,2) = 0. (heaps balanced)
5 < median (13). Push 5 to maxHeap.
minHeap={15,20}, maxHeap={5,10,12}
median = maxHeap.top() = 12.
ans: [12, 11, 12, 13, 12]
Final Median: 12
Note on the commented line in Case -1:
The provided code for `case -1` when `element <= median` appears to have a logical flaw that might not correctly balance the heaps or place the element. The standard approach for `case -1` (`minHeap.size() > maxHeap.size()`) when `element <= median` is simply to push the `element` into `maxHeap`, thereby balancing the sizes.

1. Problem Statement

  • Goal: Given a continuous stream of numbers, efficiently find the median of the numbers processed so far, as each new number arrives.
  • Median:
    • If the total count of numbers is odd, the median is the middle element.
    • If the total count of numbers is even, the median is the average of the two middle elements.

2. Challenge & Approach: Two Heaps

  • Challenge: A naive approach would be to store all numbers in a list and sort it for each new element, which is inefficient (O(N log N) per insertion).
  • Solution: Two Heaps
    • We divide the numbers seen so far into two halves:
      • maxHeap (or left_half): A Max Heap storing the smaller half of the numbers. Its top() will be the largest element in the smaller half.
      • minHeap (or right_half): A Min Heap storing the larger half of the numbers. Its top() will be the smallest element in the larger half.
    • Invariants (Properties to maintain):
      1. All elements in maxHeap are less than or equal to all elements in minHeap. (maxHeap.top() <= minHeap.top()).
      2. The size difference between maxHeap and minHeap is at most 1.
        • maxHeap.size() == minHeap.size() (for even total elements)
        • maxHeap.size() == minHeap.size() + 1 (for odd total elements, median is maxHeap.top())
        • minHeap.size() == maxHeap.size() + 1 (for odd total elements, median is minHeap.top()) - The provided code uses this second setup for odd counts.

3. signum(int a, int b) Helper Function

  • Purpose: A utility function to compare the sizes of the two heaps.
  • Returns:
    • 0 if a == b (heaps are balanced in size).
    • 1 if a > b (maxHeap is larger).
    • -1 if a < b (minHeap is larger).

4. callMedian(int element, ...) Logic

This is the core function that handles inserting a new element and updating the median. It operates based on the signum of the heap sizes:

  • Case 0: maxHeap.size() == minHeap.size() (Balanced)

    • This implies an even number of elements before the current element is inserted. After insertion, the total number of elements will be odd.
    • If element > median (or generally, if element belongs to the larger half):
      • minHeap.push(element). The median is now minHeap.top().
    • Else (element <= median, or element belongs to the smaller half):
      • maxHeap.push(element). The median is now maxHeap.top().
    • After this, one heap will have one more element than the other, and the median will be the top of the larger heap.
  • Case 1: maxHeap.size() > minHeap.size() (maxHeap is larger)

    • This implies an odd number of elements before the current element is inserted, and the median was maxHeap.top(). After insertion, total elements will be even.
    • If element > median:
      • minHeap.push(element). Now maxHeap.size() == minHeap.size().
    • Else (element <= median):
      • To rebalance and maintain maxHeap.top() <= minHeap.top():
      • Move maxHeap.top() to minHeap.
      • maxHeap.pop().
      • maxHeap.push(element).
    • The median is now (minHeap.top() + maxHeap.top()) / 2.
  • Case -1: minHeap.size() > maxHeap.size() (minHeap is larger)

    • This implies an odd number of elements before the current element is inserted, and the median was minHeap.top(). After insertion, total elements will be even.
    • If element < median:
      • maxHeap.push(element). Now maxHeap.size() == minHeap.size().
    • Else (element >= median):
      • To rebalance and maintain maxHeap.top() <= minHeap.top():
      • Move minHeap.top() to maxHeap.
      • minHeap.pop().
      • minHeap.push(element).
    • The median is now (minHeap.top() + maxHeap.top()) / 2.

5. findMedian(vector<int>& arr, int N) Function

  • Initializes ans vector, maxHeap, minHeap, and median (usually 0 or an appropriate default for the first element).
  • Iterates through each element in the input arr.
  • For each element, it calls callMedian to update the heaps and the median.
  • Adds the calculated median to the ans vector.
  • Returns the ans vector containing medians at each step.

6. Complexity Analysis

  • Let N be the total number of elements in the stream.
  • Time Complexity: O(N log N)
    • Each time a new element arrives, we perform a constant number of heap operations (push, pop, top).
    • Heap operations on a heap of size H take O(log H) time. Here, H is at most N/2.
    • Therefore, each element processing takes O(log N) time.
    • Total time for N elements: N * O(log N) = O(N log N).
  • Space Complexity: O(N)
    • The two heaps together store all N elements from the stream.