/** Distributed sample and radix sorts. * @author Amir Kamil * @version 1.0 */ package sort; /** A class for sorting integers distributively across multiple processors. */ public class Sort { /** The number of elements each processor generates by default. */ public static final int NUM_ELEMENTS = 10000; /** The number of samples each processor takes in sample sort, as a * fraction of the number of elements. */ public static final double SAMPLE_RATIO = 0.05; /** Number of digits to use in each iteration of local/global radix * sort. */ public static final int RADIX_DIGITS = 8; // The sort to use. private static boolean useRadixSort = false; /** Generates random numbers, then sorts them using a distributed sort. * Uses sample sort by default, or radix sort if given * a '-r' flag. */ public static void main(String[] args) { if ((boolean single) (args.length > 0 && args[0].equals("-t"))) { if (Ti.thisProc() == 0) { runTests(); } } else { int num = processArgs(args, 0); if ((boolean single) useRadixSort) { radixStart(num); } else { sampleStart(num); } } } // Processes arguments. Returns the number of elements each processor // should generate. private static int processArgs(String[] args, int pos) { if (pos >= args.length) { return NUM_ELEMENTS; } else if (args[pos].startsWith("-")) { if (args[pos].length() == 1) { error("Illegal null option: -"); } switch (args[pos].charAt(1)) { case 'h': usage(); case 'd': if (args[pos].length() == 2) { Util.DEBUGGING_LEVEL = 1; } else { try { Util.DEBUGGING_LEVEL = Integer.parseInt(args[pos].substring(2)); } catch (NumberFormatException e) { error("Illegal option: " + args[pos]); } } Util.debug("Debugging enabled"); return processArgs(args, pos + 1); case 'r': useRadixSort = true; return processArgs(args, pos + 1); default: error("Illegal option: -" + args[pos].charAt(1)); } } else { int num; try { num = Integer.parseInt(args[pos]); if (num < 0) { throw new NumberFormatException(); } } catch (NumberFormatException e) { error("Invalid number: " + args[pos]); } processArgs(args, pos + 1); return num; } return NUM_ELEMENTS; } // Prints usage information and exits. private static void usage() { String msg = "usage: Sort [opt1 ... optN] [num]\n\n" + "\tOptions (optK):\n" + "\t-r\tuses radix sort instead of default sample sort\n" + "\t-d[m]\tsets the debugging level to m, or 1 if m is " + "not given\n" + "\t-h\tprints this message\n\n" + "\tnum\tthe number of elements each processor generates"; error(msg); } // Outputs the given error message and exits. private static void error(String msg) { if (Ti.thisProc() == 0) { System.err.println(msg); } System.exit(1); } // =============== =============== =============== =============== // =============== SAMPLE SORT CODE =============== ============== // =============== =============== =============== =============== // Possible improvements: // - do explicit array copies instead of scans /** Generates random numbers, then sorts them using a sample sort. * The resulting sorted numbers are distributed across all * processors as follows: * * @param numElements the number of elements each processor generates */ public static void sampleStart(int numElements) { Util.debug("Starting sample sort..."); int[] nums = generateNums(numElements); int numSamples = (int) Math.ceil(numElements * SAMPLE_RATIO); int[] splitters; int[] local localSplitters; // Compute samples of this processor's numbers. IntVector samples = getSamples(nums, numSamples); Buckets bucks; Ti.barrier(); // Send samples to first processor. Util.debug("scanning samples"); samples = (IntVector) Scan.gen(new IntVectorCombiner(), samples); // Compute splitters. if (Ti.thisProc() == 0) { splitters = computeSplits(samples, numSamples); } Ti.barrier(); Util.debug("retrieving splitters"); // Retrieve splitters from first processor. splitters = broadcast splitters from 0; Util.debug("copying splitters"); if (Ti.thisProc() != 0) { localSplitters = new int[splitters.length]; System.arraycopy(splitters, 0, localSplitters, 0, splitters.length); } else { localSplitters = (int[] local) splitters; } // Split this processor's numbers according to the splitters. bucks = splitNums(nums, localSplitters); Ti.barrier(); // Get the numbers this processor is to sort from all other // processors. nums = exchangeKeys(bucks); // Sort the numbers. Util.debug("sorting each processor's numbers"); Util.radixSort(nums, RADIX_DIGITS); if ((boolean single) (Util.DEBUGGING_LEVEL >= 2)) { print3(nums); } if (Ti.thisProc() == 0) { System.out.println("Done sorting."); } } // =============== =============== =============== =============== // =============== SAMPLE SORT AUXILIARY FUNCTIONS =============== // =============== =============== =============== =============== // Returns num samples from the given array. Allows duplicates. private static IntVector getSamples(int[] elems, int num) { Util.debug("computing samples"); IntVector samples = new IntVector(2 * num + 1); int pos; for (int i = 0; i < num; i++) { pos = Util.random(num); samples.addElement(elems[pos]); } return samples; } // Computes the splitters for sample sort according to the given // samples. private static int[] computeSplits(IntVector samples, int numSamples) { Util.debug("computing splitters from " + samples.size() + " samples"); int[] tmp = new int[samples.size()], splitters; samples.copyElements(tmp, 0); Util.radixSort(tmp, RADIX_DIGITS); splitters = new int[Ti.numProcs() - 1]; for (int i = 1; i < Ti.numProcs(); i++) { splitters[i - 1] = tmp[numSamples * i]; } Util.debug("splitters = " + Util.toHexString(splitters)); return splitters; } // Bucketizes the given elements according to the given splitters. private static Buckets splitNums(int[] nums, int[] splitters) { Util.debug("splitting numbers according to " + Util.toHexString(splitters)); Buckets bucks = new Buckets(splitters.length + 1, new SplitBucketizer(splitters)); bucks.addAll(nums, 0, nums.length); return bucks; } // Collects keys destined for this processor from all other processors. private static int[] exchangeKeys(Buckets bucks) { Util.debug("collecting each processor's numbers"); IntVector iv = new IntVector(); Buckets [1d] single allBuckets = new Buckets[0 : Ti.numProcs() - 1]; int[] nums; allBuckets.exchange(bucks); for (int i = 0; i < Ti.numProcs(); i++) { iv.concat(allBuckets[i].getBucket(Ti.thisProc())); } nums = new int[iv.size()]; iv.copyElements(nums, 0); Util.debugAll("received " + nums.length + " numbers"); return nums; } // =============== =============== =============== =============== // =============== RADIX SORT CODE =============== =============== // =============== =============== =============== =============== // Possible improvements: // - do explicit array copies instead of scans // - do first iteration of sort before sending numbers to first // processor // - radix sort only the lower digits, then do insertion sort on // the rest /** Generates random numbers, then sorts them using radix sort. The * resulting sorted numbers reside on the first processor. * @param numElements the number of elements each processor generates */ public static void radixStart(int numElements) { Util.debug("Starting radix sort..."); int[] nums = generateNums(numElements); IntVector numv = new IntVector(nums, nums.length); Ti.barrier(); // Send all numbers to the first processor. Util.debug("scanning numbers"); Scan.gen(new IntVectorCombiner(), numv); if (Ti.thisProc() == 0) { nums = new int[numv.size()]; numv.copyElements(nums, 0); } // Retrieve the array of all numbers from the first processor. Util.debug("retrieving array"); nums = broadcast nums from 0; // Sort the numbers. radix(nums); if (Ti.thisProc() == 0 && Util.DEBUGGING_LEVEL >= 2) { print2(nums); } if (Ti.thisProc() == 0) { System.out.println("Done sorting."); } } // =============== =============== =============== =============== // =============== RADIX SORT AUXILIARY FUNCTIONS ================ // =============== =============== =============== =============== // Runs the iterations of global radix sort. private static void radix(int[] elems) { RadixBucketizer rb = new RadixBucketizer(RADIX_DIGITS); Buckets buck = new Buckets((0xffffffff >>> (32 - RADIX_DIGITS)) + 1, rb); int num, off; int single i; // Compute the number of elements to be sorted by this // processor. The elements are divided as equally as possible. Util.debug("computing offset and length"); if (Ti.thisProc() < elems.length % Ti.numProcs()) { num = elems.length / Ti.numProcs() + 1; off = num * Ti.thisProc(); } else { num = elems.length / Ti.numProcs(); off = num * Ti.thisProc() + elems.length % Ti.numProcs(); } Util.debug("running radix sort"); // Run the iterations of radix sort, retrieving unsorted keys // from the first processor and sending back the sorted keys. for (i = 0; i < (int) Math.ceil(32. / (double) RADIX_DIGITS); i++) { Util.debug("starting iteration " + i); rb.setRad(i); Util.debug(" bucketizing numbers"); buck.addAll(elems, off, num); Ti.barrier(); Util.debug(" scanning buckets"); Scan.gen(new BucketsCombiner(), buck); if (Ti.thisProc() == 0) { buck.copyElements(elems); } buck.clear(); Ti.barrier(); } } // =============== =============== =============== =============== // =============== SHARED AUXILIARY FUNCTIONS =============== ==== // =============== =============== =============== =============== // Generates num positive random numbers. private static int[] generateNums(int num) { Util.debug("generating random numbers"); int[] tmp = new int[num]; for (int i = 0; i < num; i++) { tmp[i] = Util.random(Integer.MAX_VALUE); } return tmp; } // Prints the given elements on a single line. private static void print(int[] nums) { if (nums.length == 0) { return; } System.out.print("[" + nums[0]); for (int i = 1; i < nums.length; i++) { System.out.print(", " + nums[i]); } System.out.println("]"); } // Prints the given elements, one per line. private static void print2(int[] nums) { System.out.println("=============="); for (int i = 0; i < nums.length; i++) { System.out.println(Ti.thisProc() + " " + Integer.toHexString(nums[i])); } System.out.println("=============="); } // Serialized printing of arrays. private static void print3(int[] nums) { for (int single i = 0; i < Ti.numProcs(); i++) { if (Ti.thisProc() == i) { print2(nums); } Ti.barrier(); } } // =============== =============== =============== =============== // =============== TEST CODE =============== =============== ===== // =============== =============== =============== =============== // Tests this and auxiliary classes. private static void runTests() { boolean pass = true; pass &= IntVector.test(); pass &= Buckets.test(); pass &= SplitBucketizer.test(); if (!pass) { System.out.println("Error: one or more tests failed."); System.exit(1); } else { System.out.println("All tests passed."); } } }