# Problem 8.5

## Average sorting

Suppose that instead of sorting an array, we just require that the elements increase on average. More precisely, we call an $n$-element array k-sorted if, for all $i = 1, 2, \ldots, n - k$, the following holds:

$$\frac{\sum_{j=i}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i + 1}^{i+k}A[j]}{k}$$

1. What does it mean for an array to be 1-sorted?
2. Give a permutation of the numbers $1, 2, \ldots, 10$ that is 2-sorted, but not sorted
3. Prove that an $n$-element array is k-sorted if and only if $A[i] \le A[i+k]$ for all $i = 1, 2, \ldots, n-k$.
4. Give an algorithm that $k$-sorts an $n$-element array in $\O(n\lg(n/k))$ time.

We can also show a lower bound on the time to produce a $k$-sorted array, when $k$ is a constant.

1. Show that we can sort a $k$-sorted array of length $n$ in $\O(n\lg{k})$ time. (Hint: Use the solution to [exercise 6.5-9](/06/05/09.html).)
2. Show that when $k$ is a constant, $k$-sorting an $n$-element array requires $\Omega(n\lg{n})$ time. (Hint: Use the solution to the previous part along with the lower bound on comparison sorts.)

### Intuitive description

I'm going to state an unintuitive description of what does it mean for an array to be $k$-sorted. It will be presented without proof. It will become evident later in the text.

If we write out $k$-sorted array in a grid with $k$ columns, each column will be sorted from top to bottom. This is essentially what (c) means.

### Various notions

For an array to be 1-sorted it means that it is sorted in the traditional sense of the word, that is, $A[i] \le A[i+1]$ for each index $i$.

Here's a 2-sorted permutation that is not sorted: $2, 1, 4, 3, 6, 5, 8, 7, 10, 9$.

### Alternative condition

Let's assume that an array is $k$-sorted. Then:

$$\frac{\sum_{j=i}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i + 1}^{i+k}A[j]}{k} \\ \Updownarrow \\ \frac{A[i] + \sum_{j=i+1}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i+1}^{i+k-1}A[j] + A[i+k]}{k} \\ \Updownarrow \\ \frac{A[i]}{k} \le \frac{A[i+k]}{k} \\ \Updownarrow \\ A[i] \le A[i+k]$$

Note that this derivation works backwards for proving the if part.

### The algorithms

To $k$-sort the array, we need to sort the $k$ columns, that is, $k$ arrays of $n/k$ elements. This is a $(n/k)\lg(n/k)$ algorithm, performed $k$ times. We can use merge-sort. We don't even need to copy the array - we can calculate the indices on the fly, although that turned less fun than expected.

To sort a $k$-sorted array, we just do what we suggested in exercise 6.5-9 - we build a min heap and every time we take an element from it, we pick another element from the column the minimal element was in. Keeping track of that in C is a bit hairy, so the implementation is gruesome. There is a new operation, MIN-HEAP-PUSH-POP, that is an improvement over first extracting the element and then inserting another one.

### Lower bound on comparisons

The problem can be reduced to $k$ problems of size $(n/k)$, each with minimal number of comparisons $\Omega((n/k)\lg(n/k))$. The total is $\Omega(n\lg(n/k))$ and if $k$ is a constant, we get a very familiar $\Omega(n\lg{n})$. Really not that surprising.

### C code

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>

typedef struct {
int value;
int s;
} item;

typedef struct {
item *elements;
int length;
int heap_size;
} heap_t;

typedef struct {
int size;
int k;
int exhausted;
int *next_indices;
} sort_state_t;

void merge_sort(int A[], int p, int r, int k, int s);
void min_heap_insert(heap_t *heap, item key);
int state_took_column(sort_state_t *state, int index);
item min_heap_push_pop(heap_t *heap, item new);
item heap_minimum(heap_t *heap);
item heap_extract_min(heap_t *heap);

/*
* Average soting is performed by just merge-sorting each column. That was
* easy. Modifying merge sort was hard.
*/

void k_sort(int *numbers, int size, int k) {
for (int i = 0; i < k; i++) {
merge_sort(numbers, 0, size, k, i);
}
}

/*
* Sorting a k-sorted array. We need to keep track of which column produced
* the minumum element in the heap and this resulted in quite the tricky C
* code. I don't think this is a good practice, but still, that's the best I'm
* willing to make it right now.
*/

void merge_k_sorted(int *numbers, int size, int k) {
int copy[size];

item heap_elements[k];
heap_t heap = {heap_elements, k, 0};

int next_indices[k];
sort_state_t state = {size, k, 0, next_indices};

memcpy(copy, numbers, size * sizeof(int));

for (int i = 0; i < k; i++) {
item new = {copy[i], i};
min_heap_insert(&heap, new);
next_indices[i] = i + k;
}

for (int i = 0; i < size; i++) {
item min = heap_minimum(&heap);
numbers[i] = min.value;

int next = state_took_column(&state, min.s);

if (next != -1) {
min_heap_push_pop(&heap, (item) {copy[next], next % k});
} else {
heap_extract_min(&heap);
}
}
}

int state_took_column(sort_state_t *state, int index) {
int size = state->size,
k = state->k,
s = index,
*next_indices = state->next_indices;

if (next_indices[s] >= size) {
while (state->exhausted < k && next_indices[state->exhausted] >= state->size) {
state->exhausted++;
}

if (state->exhausted == k) {
return -1;
}

int next = next_indices[state->exhausted];
next_indices[state->exhausted] += k;
return next;
} else {
next_indices[s] += k;
return s;
}
}

/*
* This is the merge sort from Chapter 2, modified to look only at indices
* congruent to k modulo s. There are two very ugly and long macroses that
* perform this unpleasant job. There's probably a nicer way to do the
* calculation, but modular arithmetic has always been my Achilles' heel.
*/

#define FIRST(index, k, s) ((index) + (s) - (index) % (k) + ((index) % (k) <= (s) ? 0 : (k)))
#define COUNT(a, b, k, s) (((b) - (a)) / (k) + ((((s) - (a) % (k)) + (k)) % (k) < ((b) - (a)) % (k) ? 1 : 0))

void merge(int A[], int p, int q, int r, int k, int s) {
int i, j, l;

int n1 = COUNT(p, q, k, s);
int n2 = COUNT(q, r, k, s);

int L[n1];
int R[n2];

for (i = FIRST(p, k, s), j = 0; i < q; j++, i += k) L[j] = A[i];
for (i = FIRST(q, k, s), j = 0; i < r; j++, i += k) R[j] = A[i];

for(i = 0, j = 0, l = FIRST(p, k, s); l < r; l += k) {
if (i == n1) {
A[l] = R[j++];
} else if (j == n2) {
A[l] = L[i++];
} else if (L[i] <= R[j]) {
A[l] = L[i++];
} else {
A[l] = R[j++];
}
}
}

void merge_sort(int A[], int p, int r, int k, int s) {
if (COUNT(p, r, k, s) > 1) {
int q = (p + r) / 2;
merge_sort(A, p, q, k, s);
merge_sort(A, q, r, k, s);
merge(A, p, q, r, k, s);
}
}

/*
* Finally, the min heap from exercise 6.5-3, modified to store items instead
* of ints. When I first wrote it, I made an error in the implementation and
* that sent me in a hour-long debugging session. C is fun.
*
* Also, there is a new heap operation (min_heap_push_pop) that is a faster
* than heap_extract_min and then min_heap_insert.
*/

#define PARENT(i) ((i - 1) / 2)
#define LEFT(i)   (2 * i + 1)
#define RIGHT(i)  (2 * i + 2)

item heap_minimum(heap_t *heap) {
return heap->elements;
}

void min_heapify(heap_t *heap, int i) {
int left  = LEFT(i),
right = RIGHT(i),
smallest;

if (left < heap->heap_size && heap->elements[left].value < heap->elements[i].value) {
smallest = left;
} else {
smallest = i;
}

if (right < heap->heap_size && heap->elements[right].value < heap->elements[smallest].value) {
smallest = right;
}

if (smallest != i) {
item tmp = heap->elements[i];
heap->elements[i] = heap->elements[smallest];
heap->elements[smallest] = tmp;

min_heapify(heap, smallest);
}
}

item heap_extract_min(heap_t *heap) {
if (heap->heap_size == 0) {
fprintf(stderr, "heap underflow");
exit(0);
}

item min = heap->elements;
heap->elements = heap->elements[heap->heap_size - 1];
heap->heap_size--;
min_heapify(heap, 0);

return min;
}

void heap_decrease_key(heap_t *heap, int i, item key) {
if (key.value > heap->elements[i].value) {
fprintf(stderr, "new key is larger than current key");
exit(0);
}

heap->elements[i].value = key.value;
while (i > 0 && heap->elements[PARENT(i)].value > heap->elements[i].value) {
item tmp = heap->elements[PARENT(i)];
heap->elements[PARENT(i)] = heap->elements[i];
heap->elements[i] = tmp;
i = PARENT(i);
}
}

void min_heap_insert(heap_t *heap, item key) {
if (heap->length == heap->heap_size) {
fprintf(stderr, "heap overflow");
exit(0);
}

heap->elements[heap->heap_size].value = INT_MAX;
heap->elements[heap->heap_size].s = key.s;
heap->heap_size++;
heap_decrease_key(heap, heap->heap_size - 1, key);
}

item min_heap_push_pop(heap_t *heap, item new) {
item result = heap->elements;
heap->elements = new;
min_heapify(heap, 0);
return result;
}