/** Distributed sample and radix sorts.
* @author Amir Kamil
* @version 1.0
*/
package sort;
/** A class for sorting integers distributively across multiple processors. */
public class Sort {
/** The number of elements each processor generates by default. */
public static final int NUM_ELEMENTS = 10000;
/** The number of samples each processor takes in sample sort, as a
* fraction of the number of elements.
*/
public static final double SAMPLE_RATIO = 0.05;
/** Number of digits to use in each iteration of local/global radix
* sort.
*/
public static final int RADIX_DIGITS = 8;
// The sort to use.
private static boolean useRadixSort = false;
/** Generates random numbers, then sorts them using a distributed sort.
* Uses sample sort by default, or radix sort if given
* a '-r' flag.
*/
public static void main(String[] args) {
if ((boolean single) (args.length > 0 &&
args[0].equals("-t"))) {
if (Ti.thisProc() == 0) {
runTests();
}
} else {
int num = processArgs(args, 0);
if ((boolean single) useRadixSort) {
radixStart(num);
} else {
sampleStart(num);
}
}
}
// Processes arguments. Returns the number of elements each processor
// should generate.
private static int processArgs(String[] args, int pos) {
if (pos >= args.length) {
return NUM_ELEMENTS;
} else if (args[pos].startsWith("-")) {
if (args[pos].length() == 1) {
error("Illegal null option: -");
}
switch (args[pos].charAt(1)) {
case 'h':
usage();
case 'd':
if (args[pos].length() == 2) {
Util.DEBUGGING_LEVEL = 1;
} else {
try {
Util.DEBUGGING_LEVEL =
Integer.parseInt(args[pos].substring(2));
} catch (NumberFormatException e) {
error("Illegal option: " + args[pos]);
}
}
Util.debug("Debugging enabled");
return processArgs(args, pos + 1);
case 'r':
useRadixSort = true;
return processArgs(args, pos + 1);
default:
error("Illegal option: -" + args[pos].charAt(1));
}
} else {
int num;
try {
num = Integer.parseInt(args[pos]);
if (num < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
error("Invalid number: " + args[pos]);
}
processArgs(args, pos + 1);
return num;
}
return NUM_ELEMENTS;
}
// Prints usage information and exits.
private static void usage() {
String msg = "usage: Sort [opt1 ... optN] [num]\n\n" +
"\tOptions (optK):\n" +
"\t-r\tuses radix sort instead of default sample sort\n" +
"\t-d[m]\tsets the debugging level to m, or 1 if m is " +
"not given\n" +
"\t-h\tprints this message\n\n" +
"\tnum\tthe number of elements each processor generates";
error(msg);
}
// Outputs the given error message and exits.
private static void error(String msg) {
if (Ti.thisProc() == 0) {
System.err.println(msg);
}
System.exit(1);
}
// =============== =============== =============== ===============
// =============== SAMPLE SORT CODE =============== ==============
// =============== =============== =============== ===============
// Possible improvements:
// - do explicit array copies instead of scans
/** Generates random numbers, then sorts them using a sample sort.
* The resulting sorted numbers are distributed across all
* processors as follows:
*
* - for any two processors i and j, if i <
* j, then all keys on processor i are less than any
* key on processor j
*
- for each processor, the keys on that processor are sorted
*
* @param numElements the number of elements each processor generates
*/
public static void sampleStart(int numElements) {
Util.debug("Starting sample sort...");
int[] nums = generateNums(numElements);
int numSamples = (int) Math.ceil(numElements * SAMPLE_RATIO);
int[] splitters;
int[] local localSplitters;
// Compute samples of this processor's numbers.
IntVector samples = getSamples(nums, numSamples);
Buckets bucks;
Ti.barrier();
// Send samples to first processor.
Util.debug("scanning samples");
samples = (IntVector) Scan.gen(new IntVectorCombiner(),
samples);
// Compute splitters.
if (Ti.thisProc() == 0) {
splitters = computeSplits(samples, numSamples);
}
Ti.barrier();
Util.debug("retrieving splitters");
// Retrieve splitters from first processor.
splitters = broadcast splitters from 0;
Util.debug("copying splitters");
if (Ti.thisProc() != 0) {
localSplitters = new int[splitters.length];
System.arraycopy(splitters, 0, localSplitters, 0,
splitters.length);
} else {
localSplitters = (int[] local) splitters;
}
// Split this processor's numbers according to the splitters.
bucks = splitNums(nums, localSplitters);
Ti.barrier();
// Get the numbers this processor is to sort from all other
// processors.
nums = exchangeKeys(bucks);
// Sort the numbers.
Util.debug("sorting each processor's numbers");
Util.radixSort(nums, RADIX_DIGITS);
if ((boolean single) (Util.DEBUGGING_LEVEL >= 2)) {
print3(nums);
}
if (Ti.thisProc() == 0) {
System.out.println("Done sorting.");
}
}
// =============== =============== =============== ===============
// =============== SAMPLE SORT AUXILIARY FUNCTIONS ===============
// =============== =============== =============== ===============
// Returns num samples from the given array. Allows duplicates.
private static IntVector getSamples(int[] elems, int num) {
Util.debug("computing samples");
IntVector samples = new IntVector(2 * num + 1);
int pos;
for (int i = 0; i < num; i++) {
pos = Util.random(num);
samples.addElement(elems[pos]);
}
return samples;
}
// Computes the splitters for sample sort according to the given
// samples.
private static int[] computeSplits(IntVector samples, int numSamples) {
Util.debug("computing splitters from " + samples.size() +
" samples");
int[] tmp = new int[samples.size()], splitters;
samples.copyElements(tmp, 0);
Util.radixSort(tmp, RADIX_DIGITS);
splitters = new int[Ti.numProcs() - 1];
for (int i = 1; i < Ti.numProcs(); i++) {
splitters[i - 1] = tmp[numSamples * i];
}
Util.debug("splitters = " + Util.toHexString(splitters));
return splitters;
}
// Bucketizes the given elements according to the given splitters.
private static Buckets splitNums(int[] nums, int[] splitters) {
Util.debug("splitting numbers according to " +
Util.toHexString(splitters));
Buckets bucks = new Buckets(splitters.length + 1,
new SplitBucketizer(splitters));
bucks.addAll(nums, 0, nums.length);
return bucks;
}
// Collects keys destined for this processor from all other processors.
private static int[] exchangeKeys(Buckets bucks) {
Util.debug("collecting each processor's numbers");
IntVector iv = new IntVector();
Buckets [1d] single allBuckets = new Buckets[0 : Ti.numProcs() - 1];
int[] nums;
allBuckets.exchange(bucks);
for (int i = 0; i < Ti.numProcs(); i++) {
iv.concat(allBuckets[i].getBucket(Ti.thisProc()));
}
nums = new int[iv.size()];
iv.copyElements(nums, 0);
Util.debugAll("received " + nums.length + " numbers");
return nums;
}
// =============== =============== =============== ===============
// =============== RADIX SORT CODE =============== ===============
// =============== =============== =============== ===============
// Possible improvements:
// - do explicit array copies instead of scans
// - do first iteration of sort before sending numbers to first
// processor
// - radix sort only the lower digits, then do insertion sort on
// the rest
/** Generates random numbers, then sorts them using radix sort. The
* resulting sorted numbers reside on the first processor.
* @param numElements the number of elements each processor generates
*/
public static void radixStart(int numElements) {
Util.debug("Starting radix sort...");
int[] nums = generateNums(numElements);
IntVector numv = new IntVector(nums, nums.length);
Ti.barrier();
// Send all numbers to the first processor.
Util.debug("scanning numbers");
Scan.gen(new IntVectorCombiner(), numv);
if (Ti.thisProc() == 0) {
nums = new int[numv.size()];
numv.copyElements(nums, 0);
}
// Retrieve the array of all numbers from the first processor.
Util.debug("retrieving array");
nums = broadcast nums from 0;
// Sort the numbers.
radix(nums);
if (Ti.thisProc() == 0 && Util.DEBUGGING_LEVEL >= 2) {
print2(nums);
}
if (Ti.thisProc() == 0) {
System.out.println("Done sorting.");
}
}
// =============== =============== =============== ===============
// =============== RADIX SORT AUXILIARY FUNCTIONS ================
// =============== =============== =============== ===============
// Runs the iterations of global radix sort.
private static void radix(int[] elems) {
RadixBucketizer rb = new RadixBucketizer(RADIX_DIGITS);
Buckets buck = new Buckets((0xffffffff >>> (32 - RADIX_DIGITS)) + 1,
rb);
int num, off;
int single i;
// Compute the number of elements to be sorted by this
// processor. The elements are divided as equally as possible.
Util.debug("computing offset and length");
if (Ti.thisProc() < elems.length % Ti.numProcs()) {
num = elems.length / Ti.numProcs() + 1;
off = num * Ti.thisProc();
} else {
num = elems.length / Ti.numProcs();
off = num * Ti.thisProc() + elems.length % Ti.numProcs();
}
Util.debug("running radix sort");
// Run the iterations of radix sort, retrieving unsorted keys
// from the first processor and sending back the sorted keys.
for (i = 0; i < (int) Math.ceil(32. / (double) RADIX_DIGITS); i++) {
Util.debug("starting iteration " + i);
rb.setRad(i);
Util.debug(" bucketizing numbers");
buck.addAll(elems, off, num);
Ti.barrier();
Util.debug(" scanning buckets");
Scan.gen(new BucketsCombiner(), buck);
if (Ti.thisProc() == 0) {
buck.copyElements(elems);
}
buck.clear();
Ti.barrier();
}
}
// =============== =============== =============== ===============
// =============== SHARED AUXILIARY FUNCTIONS =============== ====
// =============== =============== =============== ===============
// Generates num positive random numbers.
private static int[] generateNums(int num) {
Util.debug("generating random numbers");
int[] tmp = new int[num];
for (int i = 0; i < num; i++) {
tmp[i] = Util.random(Integer.MAX_VALUE);
}
return tmp;
}
// Prints the given elements on a single line.
private static void print(int[] nums) {
if (nums.length == 0) {
return;
}
System.out.print("[" + nums[0]);
for (int i = 1; i < nums.length; i++) {
System.out.print(", " + nums[i]);
}
System.out.println("]");
}
// Prints the given elements, one per line.
private static void print2(int[] nums) {
System.out.println("==============");
for (int i = 0; i < nums.length; i++) {
System.out.println(Ti.thisProc() + " " +
Integer.toHexString(nums[i]));
}
System.out.println("==============");
}
// Serialized printing of arrays.
private static void print3(int[] nums) {
for (int single i = 0; i < Ti.numProcs(); i++) {
if (Ti.thisProc() == i) {
print2(nums);
}
Ti.barrier();
}
}
// =============== =============== =============== ===============
// =============== TEST CODE =============== =============== =====
// =============== =============== =============== ===============
// Tests this and auxiliary classes.
private static void runTests() {
boolean pass = true;
pass &= IntVector.test();
pass &= Buckets.test();
pass &= SplitBucketizer.test();
if (!pass) {
System.out.println("Error: one or more tests failed.");
System.exit(1);
} else {
System.out.println("All tests passed.");
}
}
}