Median In A Stream
#include <bits/stdc++.h> // Includes common standard libraries (vector, iostream, queue for priority_queue).using namespace std; // Use the standard namespace.
// Helper function to compare two integers and return a signum-like value.// Used to determine the relative sizes of the two heaps.// Returns:// 0 if a == b// 1 if a > b// -1 if a < bint signum(int a, int b) { return (a == b) ? 0 : (a > b ? 1 : -1);}
// Core function to insert an element into the appropriate heap and update the median.// 'element': The new number from the stream.// 'maxHeap': The Max Heap (stores smaller half of numbers).// 'minHeap': The Min Heap (stores larger half of numbers).// 'median': Reference to the current median, which will be updated.void callMedian(int element, priority_queue<int> &maxHeap, priority_queue<int, vector<int>, greater<int> > &minHeap, int &median) {
// Use a switch statement based on the size difference of the heaps. switch(signum(maxHeap.size(), minHeap.size())) {
// Case 0: Both heaps have the same number of elements. // This means total elements seen so far is EVEN. After adding 'element', total will be ODD. case 0 : if(element > median) { // If 'element' is greater than current median (or is the first element and positive). minHeap.push(element); // Add it to the larger half. median = minHeap.top(); // Median is now the top of the larger half. } else { // If 'element' is less than or equal to current median. maxHeap.push(element); // Add it to the smaller half. median = maxHeap.top(); // Median is now the top of the smaller half. } break;
// Case 1: maxHeap has one more element than minHeap. // This means total elements seen so far is ODD, and median was maxHeap.top(). // After adding 'element', total will be EVEN. case 1 : if(element > median) { // If 'element' belongs to the larger half. minHeap.push(element); // Add directly to minHeap. Heaps become balanced. } else { // If 'element' belongs to the smaller half. // Move maxHeap's top (largest of smaller half) to minHeap to balance. minHeap.push(maxHeap.top()); maxHeap.pop(); // Then add 'element' to maxHeap. maxHeap.push(element); } // Median is the average of the tops of the now-balanced heaps. median = (minHeap.top() + maxHeap.top()) / 2; break;
// Case -1: minHeap has one more element than maxHeap. // This means total elements seen so far is ODD, and median was minHeap.top(). // After adding 'element', total will be EVEN. case -1 : if(element > median) { // If 'element' belongs to the larger half. // Move minHeap's top (smallest of larger half) to maxHeap to balance. maxHeap.push(minHeap.top()); minHeap.pop(); // Then add 'element' to minHeap. minHeap.push(element); } else { // If 'element' belongs to the smaller half. minHeap.push(maxHeap.top()); // This line should be: maxHeap.push(element); // Corrected logic for this branch: // maxHeap.push(element); // Add directly to maxHeap. Heaps become balanced. // After adding element, if element is less than median. // We need to shift minHeap's top to maxHeap for balancing then push element in maxHeap // This seems like a slight logical error if element < median directly fills maxHeap and causes minHeap to be > maxHeap + 1. // The provided code's logic is: // if (element > median) (meaning element should go to minHeap, but minHeap is already larger) // maxHeap.push(minHeap.top()); // Move smallest from large half to small half // minHeap.pop(); // minHeap.push(element); // Add new element to large half // else (element <= median) (meaning element should go to maxHeap, which is smaller) // maxHeap.push(element); // Add element to maxHeap. Now sizes are equal.
// Re-evaluating the given code's Case -1: // If element > median: (e.g., maxH:{1,2}, minH:{5,6,7}, med=6. element=8) -> needs rebalance // maxHeap.push(minHeap.top()); // maxH:{1,2,5}, minH:{6,7} // minHeap.pop(); // maxH:{1,2,5}, minH:{7} // minHeap.push(element); // maxH:{1,2,5}, minH:{7,8}. Sizes now 3,2 -> Still unbalanced. // This branch for 'element > median' in case -1 appears incorrect in the provided code. // It should be: // if(element > median) { // Element belongs to minHeap, but maxHeap is already smaller. // maxHeap.push(minHeap.top()); // Move element from minHeap to maxHeap // minHeap.pop(); // minHeap.push(element); // Push new element into minHeap // } else { // Element belongs to maxHeap. maxHeap is already smaller. // maxHeap.push(element); // Directly push into maxHeap. Now sizes balance. // }
// I will comment based on the provided code, but highlight this potential logical issue. maxHeap.push(minHeap.top()); // This line seems like a copy-paste error from the 'element > median' branch of the case. // It should just push 'element' to maxHeap if element <= median. minHeap.pop(); // This also seems extraneous for this branch. minHeap.push(element); // This line should be: maxHeap.push(element); for 'else' branch. // The current implementation is effectively pushing element to minHeap even if element <= median.
// Corrected logic based on standard two-heap median approach (for Case -1, element <= median): // maxHeap.push(element); // Just push to the smaller heap (maxHeap) to balance. // This makes sizes equal. } // Median is the average of the tops of the now-balanced heaps. median = (minHeap.top() + maxHeap.top()) / 2; break; }}
// Function to find the median of the array elements as they are processed (stream).// 'arr': The input array/stream of numbers.// 'N': The size of the array.// Returns a vector containing the median after each element is processed.// Time Complexity: O(N log N)// Space Complexity: O(N)vector<int> findMedian(vector<int>& arr, int N) { vector<int> ans; // To store the median at each step. // Max Heap: Stores the smaller half of numbers. std::priority_queue is max-heap by default. priority_queue<int> maxHeap; // Min Heap: Stores the larger half of numbers. 'greater<int>' makes it a min-heap. priority_queue<int, vector<int>, greater<int> > minHeap; int median = 0; // Initialize median. Will be updated with first element.
// Process each element in the array/stream. for(int i = 0; i < N; i++) { callMedian(arr[i], maxHeap, minHeap, median); // Call helper to update heaps and median. ans.push_back(median); // Store the current median. }
return ans;}
int main() { int N;
cout << "Enter the size of the array : "; cin >> N; vector<int> arr(N);
cout << "Enter the elements of array : "; for(int i = 0; i < N; i++) { cin >> arr[i]; // Read array elements. }
vector<int> ans = findMedian(arr, N); // Get the medians after each element.
// Output the final median (median after all elements are processed). // If you need all medians, iterate through 'ans' vector. cout << "Median of the stream (final median) : " << ans[ans.size() - 1] << endl;
return 0;}
/*Example Trace (with potential correction applied to Case -1 for explanation):
Input arr: [12, 10, 15, 20, 5]N: 5
Initial: maxHeap={}, minHeap={}, median=0, ans=[]
1. Element = 12: signum(0,0) = 0. 12 > median (0). Push 12 to minHeap. minHeap={12}, maxHeap={} median = minHeap.top() = 12. ans: [12]
2. Element = 10: signum(0,1) = -1. (minHeap is larger) 10 < median (12). Push 10 to maxHeap. minHeap={12}, maxHeap={10} median = (10+12)/2 = 11. ans: [12, 11]
3. Element = 15: signum(1,1) = 0. (heaps balanced) 15 > median (11). Push 15 to minHeap. minHeap={12, 15}, maxHeap={10} median = minHeap.top() = 12. ans: [12, 11, 12]
4. Element = 20: signum(1,2) = -1. (minHeap is larger) 20 > median (12). // Original code would move minHeap.top() to maxHeap, then push 20 to minHeap. // This would make minHeap:{15,20}, maxHeap:{10,12}. Sizes 2,2. Median (12+15)/2 = 13. // Let's follow the provided code: maxHeap.push(minHeap.top()); // maxH:{10,12}, minH:{15} minHeap.pop(); // maxH:{10,12}, minH:{} -> minH:{15} (error here, minHeap becomes empty) minHeap.push(element); // maxH:{10,12}, minH:{15,20} (size 2,2) median = (12+15)/2 = 13.5 (integer division 13). ans: [12, 11, 12, 13]
5. Element = 5: signum(2,2) = 0. (heaps balanced) 5 < median (13). Push 5 to maxHeap. minHeap={15,20}, maxHeap={5,10,12} median = maxHeap.top() = 12. ans: [12, 11, 12, 13, 12]
Final Median: 12
Note on the commented line in Case -1:The provided code for `case -1` when `element <= median` appears to have a logical flaw that might not correctly balance the heaps or place the element. The standard approach for `case -1` (`minHeap.size() > maxHeap.size()`) when `element <= median` is simply to push the `element` into `maxHeap`, thereby balancing the sizes.
1. Problem Statement
- Goal: Given a continuous stream of numbers, efficiently find the median of the numbers processed so far, as each new number arrives.
- Median:
- If the total count of numbers is odd, the median is the middle element.
- If the total count of numbers is even, the median is the average of the two middle elements.
2. Challenge & Approach: Two Heaps
- Challenge: A naive approach would be to store all numbers in a list and sort it for each new element, which is inefficient (
O(N log N)
per insertion). - Solution: Two Heaps
- We divide the numbers seen so far into two halves:
maxHeap
(orleft_half
): A Max Heap storing the smaller half of the numbers. Itstop()
will be the largest element in the smaller half.minHeap
(orright_half
): A Min Heap storing the larger half of the numbers. Itstop()
will be the smallest element in the larger half.
- Invariants (Properties to maintain):
- All elements in
maxHeap
are less than or equal to all elements inminHeap
. (maxHeap.top() <= minHeap.top()
). - The size difference between
maxHeap
andminHeap
is at most 1.maxHeap.size() == minHeap.size()
(for even total elements)maxHeap.size() == minHeap.size() + 1
(for odd total elements, median ismaxHeap.top()
)minHeap.size() == maxHeap.size() + 1
(for odd total elements, median isminHeap.top()
) - The provided code uses this second setup for odd counts.
- All elements in
- We divide the numbers seen so far into two halves:
3. signum(int a, int b)
Helper Function
- Purpose: A utility function to compare the sizes of the two heaps.
- Returns:
0
ifa == b
(heaps are balanced in size).1
ifa > b
(maxHeap
is larger).-1
ifa < b
(minHeap
is larger).
4. callMedian(int element, ...)
Logic
This is the core function that handles inserting a new element
and updating the median
. It operates based on the signum
of the heap sizes:
-
Case 0:
maxHeap.size() == minHeap.size()
(Balanced)- This implies an even number of elements before the current
element
is inserted. After insertion, the total number of elements will be odd. - If
element > median
(or generally, ifelement
belongs to the larger half):minHeap.push(element)
. Themedian
is nowminHeap.top()
.
- Else (
element <= median
, orelement
belongs to the smaller half):maxHeap.push(element)
. Themedian
is nowmaxHeap.top()
.
- After this, one heap will have one more element than the other, and the median will be the top of the larger heap.
- This implies an even number of elements before the current
-
Case 1:
maxHeap.size() > minHeap.size()
(maxHeap
is larger)- This implies an odd number of elements before the current
element
is inserted, and themedian
wasmaxHeap.top()
. After insertion, total elements will be even. - If
element > median
:minHeap.push(element)
. NowmaxHeap.size() == minHeap.size()
.
- Else (
element <= median
):- To rebalance and maintain
maxHeap.top() <= minHeap.top()
: - Move
maxHeap.top()
tominHeap
. maxHeap.pop()
.maxHeap.push(element)
.
- To rebalance and maintain
- The
median
is now(minHeap.top() + maxHeap.top()) / 2
.
- This implies an odd number of elements before the current
-
Case -1:
minHeap.size() > maxHeap.size()
(minHeap
is larger)- This implies an odd number of elements before the current
element
is inserted, and themedian
wasminHeap.top()
. After insertion, total elements will be even. - If
element < median
:maxHeap.push(element)
. NowmaxHeap.size() == minHeap.size()
.
- Else (
element >= median
):- To rebalance and maintain
maxHeap.top() <= minHeap.top()
: - Move
minHeap.top()
tomaxHeap
. minHeap.pop()
.minHeap.push(element)
.
- To rebalance and maintain
- The
median
is now(minHeap.top() + maxHeap.top()) / 2
.
- This implies an odd number of elements before the current
5. findMedian(vector<int>& arr, int N)
Function
- Initializes
ans
vector,maxHeap
,minHeap
, andmedian
(usually 0 or an appropriate default for the first element). - Iterates through each
element
in the inputarr
. - For each
element
, it callscallMedian
to update the heaps and themedian
. - Adds the calculated
median
to theans
vector. - Returns the
ans
vector containing medians at each step.
6. Complexity Analysis
- Let
N
be the total number of elements in the stream. - Time Complexity:
O(N log N)
- Each time a new element arrives, we perform a constant number of heap operations (
push
,pop
,top
). - Heap operations on a heap of size
H
takeO(log H)
time. Here,H
is at mostN/2
. - Therefore, each element processing takes
O(log N)
time. - Total time for
N
elements:N * O(log N) = O(N log N)
.
- Each time a new element arrives, we perform a constant number of heap operations (
- Space Complexity:
O(N)
- The two heaps together store all
N
elements from the stream.
- The two heaps together store all