import Ti.io.*; import java.io.*; import java.lang.*; public class dbsort { public static long starttime; public static int buffersz; public static long sorttime=0; public static long readtime=0; public static long writetime=0; public static long filesz=0; public static int numchunks=0; public static final int FAKESORT = 0; public static final int QUICKSORT = 1; public static int sorttype; public static final boolean debugsort = false; public static void MSG(String s) { System.out.println((System.currentTimeMillis() - starttime) / 1000.0 + ": " + s); } //------------------------------------------------------------------------------------ protected static void fakesort(long [] local buf) { // this routine does O(n*log(n)) work comparable to a good sorting algorithm, // but provides uniform (data-independent) running time long bogusval; long logn = (new Double(Math.log(buffersz))).longValue(); for (int i = 0; i < buffersz; i++) { bogusval = bogusval + buf[i]; for (int j = 0; j < logn; j++) { bogusval -= 1; } } } //------------------------------------------------------------------------------------ protected static void quicksort(long [] local buf) { qsort(buf, 0, buf.length-1); } private static int qpartition(long [] local buf, int p, int r) { long midval = buf[p+((r-p)>>1)]; long tmp; int i = p-1; int j = r+1; while (true) { do j--; while (i < j && midval < buf[j]); // find rightmost elem that belongs in left partition do i++; while (i < j && midval > buf[i]); // find leftmost elem that belongs in right partition if (i < j) { // swap tmp = buf[i]; buf[i] = buf[j]; buf[j] = tmp; } else return j; } return 0; // unreachable - just to make tc happy } private static void qsort(long [] local buf, int p, int r) { if (p < r) { int q = qpartition(buf, p, r); qsort(buf, p, q); qsort(buf, q+1, r); } } //------------------------------------------------------------------------------------ public static void sort(long [] local buf) { //MSG("sort() begin"); long startsort = System.currentTimeMillis(); if (debugsort && false) { for (int i = 0; i < buf.length && i < 20; i++) { System.out.println("buf[" + i + "]= " + buf[i]); } } switch (sorttype) { case FAKESORT: fakesort(buf); break; case QUICKSORT: quicksort(buf); break; } if (debugsort) { for (int i = 0; i < buf.length-1; i++) { if (buf[i] > buf[i+1]) { System.out.println("bad sort detected at element " + i); for (int j = 0; j < buf.length && j < i+15; j++){ System.out.println("buf[" + j + "]= " + buf[j]); } throw new InternalError(); } } } long endsort = System.currentTimeMillis(); sorttime += (endsort - startsort); //MSG("sort() end"); System.out.print("."); // still-alive indicator System.out.flush(); } //------------------------------------------------------------------------------------ private static void checkchunks(long buffersz, long filesz, long minchunks) { // check selected buffersize is legal System.out.println("filesz= " + filesz); numchunks = (new Long(filesz/(buffersz*8))).intValue(); // Java sucks if (numchunks < minchunks) { if (Ti.thisProc() == 0) System.out.println("buffersz too big. Max for this filesize is " + (filesz / minchunks)/8); throw new InternalError("goodbye"); } if (numchunks * buffersz * 8 != filesz) { System.out.println("buffersz not an even divisor of filesz. Some possibilities are:"); for (long i=minchunks; true; i*=2) { long chunksz = (filesz / 8) / i; if (chunksz * i * 8 != filesz) break; System.out.println(" buffersz= " + chunksz + " (numchunks= " + i +")"); } throw new InternalError("goodbye"); } System.out.println("buffersz= " + buffersz + " numchunks= " + numchunks); } //------------------------------------------------------------------------------------ public static single void main (String single [] single args) { if (args.length != 5) { if (Ti.thisProc() == 0) System.out.println("Usage: dbsort "); Ti.barrier(); System.exit(1); } String infilename = args[0]; String outfilename = args[1]; String single iotype = args[3]; String single sortstr = args[4]; try { buffersz = Integer.parseInt(args[2]); } catch (Throwable exn) { System.out.println("bad buffersz"); }; if (sortstr.equalsIgnoreCase("quicksort")) sorttype = QUICKSORT; else if (sortstr.equalsIgnoreCase("fakesort")) sorttype = FAKESORT; else { System.out.println("Unrecognized . Allowed values are:"); System.out.println(" 'quicksort' - an O(n log n) quicksort algorithm"); System.out.println(" 'fakesort' - waste O(n log n) time in a uniform (data-independent) way"); } if (!iotype.equalsIgnoreCase("async") && !iotype.equalsIgnoreCase("ablock") && !iotype.equalsIgnoreCase("rblock") && !iotype.equalsIgnoreCase("sblock") && !iotype.equalsIgnoreCase("sblocku") && !iotype.equalsIgnoreCase("psblock") && !iotype.equalsIgnoreCase("psblocku")) { if (Ti.thisProc() == 0) System.out.println("Unrecognized . Allowed values are:"); System.out.println(" 'async' - Dan's non-blocking AsyncFile, with double-buffered inputs & outputs"); System.out.println(" 'ablock' - Dan's non-blocking AsyncFile, with double-buffered inputs & outputs"); System.out.println(" 'rblock' - RandomAccessFile blocking interface"); System.out.println(" 'sblock' - Bufferred DataStream blocking interface"); System.out.println(" 'sblocku' - UnBufferred DataStream blocking interface"); System.out.println(" 'psblock' - Bufferred textual PrintStream blocking interface"); System.out.println(" 'psblocku' - UnBufferred textual PrintStream blocking interface"); } System.out.println("------------------------------------------------------------------------------------"); System.out.println(iotype + " " + sortstr); starttime = System.currentTimeMillis(); try { //------------------------------------------------------------------------------------ if (iotype.equalsIgnoreCase("async")) { MSG("Opening files..."); AsyncFile local inf = new AsyncFile(infilename,"r"); AsyncFile local outf = new AsyncFile(outfilename,"rw"); filesz = inf.length(); checkchunks(buffersz, filesz, 4); // do checks & setup numchunks long [] local buf1 = new long[buffersz]; long [] local buf2 = new long[buffersz]; long [] local buf3 = new long[buffersz]; inf.read(buf1, 0, buffersz).Done(); // blocking read(buf1) int chunksleft = numchunks-1; AsyncFileRequest local buf1Req = null; AsyncFileRequest local buf2Req = null; AsyncFileRequest local buf3Req = null; while (chunksleft > 1) { {long begin = System.currentTimeMillis(); buf2Req = inf.read(buf2, 0, buffersz); // non-blocking read(buf2) readtime += (System.currentTimeMillis()-begin);} sort(buf1); {long begin = System.currentTimeMillis(); buf1Req = outf.write(buf1, 0, buffersz); // non-blocking write(buf1) writetime += (System.currentTimeMillis()-begin);} // wait( buf2, buf3 ) AsyncFileRequest local [] local tmplist1 = { buf2Req, buf3Req }; AsyncFile.AllDone(AsyncFile.TIME_INF, tmplist1); {long begin = System.currentTimeMillis(); buf3Req = inf.read(buf3, 0, buffersz); // non-blocking read(buf3) readtime += (System.currentTimeMillis()-begin);} sort(buf2); {long begin = System.currentTimeMillis(); buf2Req = outf.write(buf2, 0, buffersz); // non-blocking write(buf2) writetime += (System.currentTimeMillis()-begin);} // wait( buf1, buf3 ) AsyncFileRequest local [] local tmplist2 = { buf1Req, buf3Req }; AsyncFile.AllDone(AsyncFile.TIME_INF, tmplist2); // do some swapping long [] local oldbuf1 = buf1; long [] local oldbuf2 = buf2; long [] local oldbuf3 = buf3; AsyncFileRequest local oldbuf1Req = buf1Req; AsyncFileRequest local oldbuf2Req = buf2Req; AsyncFileRequest local oldbuf3Req = buf3Req; buf1 = oldbuf3; buf1Req = oldbuf3Req; buf2 = oldbuf1; buf2Req = oldbuf1Req; buf3 = oldbuf2; buf3Req = oldbuf2Req; chunksleft -= 2; } if (chunksleft == 1) { {long begin = System.currentTimeMillis(); buf2Req = inf.read(buf2, 0, buffersz); // non-blocking read(buf2) readtime += (System.currentTimeMillis()-begin);} sort(buf1); {long begin = System.currentTimeMillis(); buf1Req = outf.write(buf1, 0, buffersz); // non-blocking write(buf1) writetime += (System.currentTimeMillis()-begin);} buf2Req.Done(); // wait(buf2) sort(buf2); buf2Req = outf.write(buf2, 0, buffersz); // non-blocking write(buf2) // wait( buf1, buf2 ) AsyncFileRequest local [] local tmplist2 = { buf1Req, buf2Req }; AsyncFile.AllDone(AsyncFile.TIME_INF, tmplist2); } else { // chunksleft == 0 sort(buf1); {long begin = System.currentTimeMillis(); outf.write(buf1, 0, buffersz).Done(); // blocking write(buf1) writetime += (System.currentTimeMillis()-begin);} } inf.close(); outf.close(); } //------------------------------------------------------------------------------------ else if (iotype.equalsIgnoreCase("ablock")) { // AsyncFile with only blocking I/O MSG("Opening files..."); AsyncFile local inf = new AsyncFile(infilename,"r"); AsyncFile local outf = new AsyncFile(outfilename,"rw"); filesz = inf.length(); checkchunks(buffersz, filesz, 1); // do checks & setup numchunks System.out.println("buffersz= " + buffersz + " numchunks= " + numchunks); long [] local buf = new long[buffersz]; for (int chunk = 0 ; chunk < numchunks; chunk++) { // read chunk {long begin = System.currentTimeMillis(); inf.read(buf, 0, buffersz).Done(); readtime += (System.currentTimeMillis()-begin);} sort(buf); // write chunk {long begin = System.currentTimeMillis(); outf.write(buf, 0, buffersz).Done(); writetime += (System.currentTimeMillis()-begin);} } inf.close(); outf.close(); } //------------------------------------------------------------------------------------ else if (iotype.equalsIgnoreCase("rblock")) { MSG("Opening files..."); RandomAccessFile inf = new RandomAccessFile(infilename,"r"); RandomAccessFile outf = new RandomAccessFile(outfilename,"rw"); filesz = inf.length(); checkchunks(buffersz, filesz, 1); // do checks & setup numchunks System.out.println("buffersz= " + buffersz + " numchunks= " + numchunks); long [] local buf = new long[buffersz]; for (int chunk = 0 ; chunk < numchunks; chunk++) { // read chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { buf[i] = inf.readLong(); } readtime += (System.currentTimeMillis()-begin);} sort(buf); // write chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { outf.writeLong(buf[i]); } writetime += (System.currentTimeMillis()-begin);} } inf.close(); outf.close(); } //------------------------------------------------------------------------------------ else if (iotype.equalsIgnoreCase("sblock") || iotype.equalsIgnoreCase("sblocku")) { MSG("Opening files..."); // just to get the size RandomAccessFile inf = new RandomAccessFile(infilename,"r"); filesz = inf.length(); inf.close(); DataInputStream dis; DataOutputStream dos; if (iotype.equalsIgnoreCase("sblock")) { FileInputStream fis = new FileInputStream(infilename); BufferedInputStream bis = new BufferedInputStream(fis); dis = new DataInputStream(bis); FileOutputStream fos = new FileOutputStream(outfilename); BufferedOutputStream bos = new BufferedOutputStream(fos); dos = new DataOutputStream(bos); } else { // "sblocku" FileInputStream fis = new FileInputStream(infilename); dis = new DataInputStream(fis); FileOutputStream fos = new FileOutputStream(outfilename); dos = new DataOutputStream(fos); } checkchunks(buffersz, filesz, 1); // do checks & setup numchunks System.out.println("buffersz= " + buffersz + " numchunks= " + numchunks); long [] local buf = new long[buffersz]; for (int chunk = 0 ; chunk < numchunks; chunk++) { // read chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { buf[i] = dis.readLong(); } readtime += (System.currentTimeMillis()-begin);} sort(buf); // write chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { dos.writeLong(buf[i]); } writetime += (System.currentTimeMillis()-begin);} } } //------------------------------------------------------------------------------------ else if (iotype.equalsIgnoreCase("psblock") || iotype.equalsIgnoreCase("psblocku")) { MSG("Opening files..."); DataInputStream dis; PrintStream ps; if (iotype.equalsIgnoreCase("psblock")) { FileInputStream fis = new FileInputStream(infilename); BufferedInputStream bis = new BufferedInputStream(fis); dis = new DataInputStream(bis); FileOutputStream fos = new FileOutputStream(outfilename); BufferedOutputStream bos = new BufferedOutputStream(fos); ps = new PrintStream(bos); } else { // "psblocku" FileInputStream fis = new FileInputStream(infilename); dis = new DataInputStream(fis); FileOutputStream fos = new FileOutputStream(outfilename); ps = new PrintStream(fos); } long [] local buf = new long[buffersz]; try { while (true) { // read chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { buf[i] = Long.parseLong(dis.readLine()); } readtime += (System.currentTimeMillis()-begin);} sort(buf); // write chunk {long begin = System.currentTimeMillis(); for (int i = 0; i < buffersz; i++) { ps.println(buf[i]); } writetime += (System.currentTimeMillis()-begin);} numchunks++; } } catch(Throwable exn) { // this is how we stop for this config } filesz = numchunks * buffersz * 8; } //------------------------------------------------------------------------------------ System.out.println(""); MSG("All done!"); double totaltime = (System.currentTimeMillis() - starttime) / 1000.0; System.out.println("H\tiotype\tsorttype\tfilesz\tchunksize\tnumchunks\ttotaltime\t\tsorttime\treadtime\twritetime"); System.out.println("D\t" + iotype + "\t" + sortstr + "\t" + filesz + "\t" + buffersz + "\t" + numchunks + "\t" + totaltime + "\t" + (sorttime / 1000.0) + "\t" + (readtime / 1000.0) + "\t" + (writetime / 1000.0) + "\t"); } catch (Throwable exn) { MSG("caught " + ExnType.gettype(exn) + " : " + exn.getMessage()); } } }