Add initial infrastructure for cursors.  In where.c, optimize out clauses
of the form "ORDER BY rowid" if a table scan is being performed.  Do a
reverse table scan if "ORDER BY rowid DESC" is present. (CVS 2141)

FossilOrigin-Name: fc8c1393c86017a816beb52725b68af3b973f979
diff --git a/src/cursor.c b/src/cursor.c
new file mode 100644
index 0000000..061fcb6
--- /dev/null
+++ b/src/cursor.c
@@ -0,0 +1,169 @@
+/*
+** 2004 November 21
+**
+** The author disclaims copyright to this source code.  In place of
+** a legal notice, here is a blessing:
+**
+**    May you do good and not evil.
+**    May you find forgiveness for yourself and forgive others.
+**    May you share freely, never taking more than you give.
+**
+*************************************************************************
+** This file contains code used to implement the DECLARE...CURSOR syntax
+** of SQL and related processing.
+**
+** Do not confuse SQL cursors and B-tree cursors.  An SQL cursor (as
+** implemented by this file) is a user-visible cursor that is created
+** using the DECLARE...CURSOR command and deleted using CLOSE.  A
+** B-tree cursor is an abstraction of the b-tree layer.  See the btree.c
+** module for additional information.  There is also a VDBE-cursor that
+** is used by the VDBE module.  Even though all these objects are called
+** cursors, they are really very different things.  It is worth your while
+** to fully understand the difference.
+**
+** @(#) $Id: cursor.c,v 1.1 2004/11/22 19:12:20 drh Exp $
+*/
+#ifndef SQLITE_OMIT_CURSOR
+#include "sqliteInt.h"
+#include "vdbeInt.h"
+
+/*
+** Delete a cursor object.
+*/
+void sqlite3CursorDelete(SqlCursor *p){
+  if( p ){
+    int i;
+    sqlite3SelectDelete(p->pSelect);
+    for(i=0; i<p->nPtr; i++){
+      sqlite3VdbeMemRelease(&p->aPtr[i]);
+    }
+    sqliteFree(p->aPtr);
+    sqliteFree(p);
+  }
+}
+
+/*
+** Look up a cursor by name.  Return NULL if not found.
+*/
+static SqlCursor *findCursor(sqlite3 *db, Token *pName){
+  int i;
+  SqlCursor *p;
+  for(i=0; i<db->nSqlCursor; i++){
+    p = db->apSqlCursor[i];
+    if( p && sqlite3StrNICmp(p->zName, pName->z, pName->n)==0 ){
+      return p;
+    }
+  }
+  return 0;
+}  
+
+/*
+** The parser calls this routine in order to create a new cursor.
+** The arguments are the name of the new cursor and the SELECT statement
+** that the new cursor will access.
+*/
+void sqlite3CursorCreate(Parse *pParse, Token *pName, Select *pSelect){
+  SqlCursor *pNew;
+  sqlite3 *db = pParse->db;
+  int i;
+  
+
+  pNew = findCursor(db, pName);
+  if( pNew ){
+    sqlite3ErrorMsg(pParse, "another cursor named %T already exists", pName);
+    goto end_create_cursor;
+  }
+  if( pSelect==0 ){
+    /* This can only happen due to a prior malloc failure */
+    goto end_create_cursor;
+  }
+  for(i=0; i<db->nSqlCursor; i++){
+    if( db->apSqlCursor[i]==0 ) break;
+  }
+  if( i>=db->nSqlCursor ){
+    db->apSqlCursor = sqliteRealloc(db->apSqlCursor, (i+1)*sizeof(pNew));
+    db->nSqlCursor = i+1;
+  }
+  db->apSqlCursor[i] = pNew = sqliteMallocRaw( sizeof(*pNew) + pName->n + 1 );
+  if( pNew==0 ) goto end_create_cursor;
+  pNew->zName = (char*)&pNew[1];
+  memcpy(pNew->zName, pName->z, pName->n);
+  pNew->zName[pName->n] = 0;
+  pNew->pSelect = sqlite3SelectDup(pSelect);
+  pNew->nPtr = 0;
+  pNew->aPtr = 0;
+  pNew->idx = i;
+
+end_create_cursor:
+  sqlite3SelectDelete(pSelect);
+}
+
+/*
+** The parser calls this routine in response to a CLOSE command.  Delete
+** the cursor named in the argument.
+*/
+void sqlite3CursorClose(Parse *pParse, Token *pName){
+  SqlCursor *p;
+  sqlite3 *db = pParse->db;
+
+  p = findCursor(db, pName);
+  if( p==0 ){
+    sqlite3ErrorMsg(pParse, "no such cursor: %T", pName);
+    return;
+  }
+  assert( p->idx>=0 && p->idx<db->nSqlCursor );
+  assert( db->apSqlCursor[p->idx]==p );
+  db->apSqlCursor[p->idx] = 0;
+  sqlite3CursorDelete(p);
+}
+
+/*
+** The parser calls this routine when it sees a complete FETCH statement.
+** This routine generates code to implement the FETCH.
+**
+** Information about the direction of the FETCH has already been inserted
+** into the pParse structure by parser rules.  The arguments specify the
+** name of the cursor from which we are fetching and the optional INTO
+** clause.
+*/
+void sqlite3Fetch(Parse *pParse, Token *pName, IdList *pInto){
+  SqlCursor *p;
+  sqlite3 *db = pParse->db;
+  Select *pCopy;
+  Fetch sFetch;
+
+  p = findCursor(db, pName);
+  if( p==0 ){
+    sqlite3ErrorMsg(pParse, "no such cursor: %T", pName);
+    return;
+  }
+  sFetch.pCursor = p;
+  pCopy = sqlite3SelectDup(p->pSelect);
+  pCopy->pFetch = &sFetch;
+  switch( pParse->fetchDir ){
+    case TK_FIRST: {
+      break;
+    }
+    case TK_LAST: {
+      break;
+    }
+    case TK_NEXT: {
+      break;
+    }
+    case TK_PRIOR: {
+      break;
+    }
+    case TK_ABSOLUTE: {
+      break;
+    }
+    default: {
+      assert( pParse->fetchDir==TK_RELATIVE );
+      break;
+    }
+  }
+  sqlite3Select(pParse, pCopy, SRT_Callback, 0, 0, 0, 0, 0);
+end_fetch:
+  sqlite3IdListDelete(pInto);
+}
+
+#endif /* SQLITE_OMIT_CURSOR */
diff --git a/src/expr.c b/src/expr.c
index ce3337d..03fd866 100644
--- a/src/expr.c
+++ b/src/expr.c
@@ -12,7 +12,7 @@
 ** This file contains routines used for analyzing expressions and
 ** for generating VDBE code that evaluates expressions in SQLite.
 **
-** $Id: expr.c,v 1.172 2004/11/20 20:44:14 drh Exp $
+** $Id: expr.c,v 1.173 2004/11/22 19:12:20 drh Exp $
 */
 #include "sqliteInt.h"
 #include <ctype.h>
@@ -503,10 +503,10 @@
   pNew->pPrior = sqlite3SelectDup(p->pPrior);
   pNew->nLimit = p->nLimit;
   pNew->nOffset = p->nOffset;
-  pNew->zSelect = 0;
   pNew->iLimit = -1;
   pNew->iOffset = -1;
   pNew->ppOpenTemp = 0;
+  pNew->pFetch = 0;
   return pNew;
 }
 
diff --git a/src/main.c b/src/main.c
index 08470f7..9a9b96e 100644
--- a/src/main.c
+++ b/src/main.c
@@ -14,7 +14,7 @@
 ** other files are for internal use by SQLite and should not be
 ** accessed by users of the library.
 **
-** $Id: main.c,v 1.267 2004/11/22 05:26:27 danielk1977 Exp $
+** $Id: main.c,v 1.268 2004/11/22 19:12:20 drh Exp $
 */
 #include "sqliteInt.h"
 #include "os.h"
@@ -500,6 +500,13 @@
     sqlite3ValueFree(db->pErr);
   }
 
+#ifndef SQLITE_OMIT_CURSOR
+  for(j=0; j<db->nSqlCursor; j++){
+    sqlite3CursorDelete(db->apSqlCursor[j]);
+  }
+  sqliteFree(db->apSqlCursor);
+#endif
+
   db->magic = SQLITE_MAGIC_ERROR;
   sqliteFree(db);
   return SQLITE_OK;
diff --git a/src/parse.y b/src/parse.y
index e0d2701..7579361 100644
--- a/src/parse.y
+++ b/src/parse.y
@@ -14,7 +14,7 @@
 ** the parser.  Lemon will also generate a header file containing
 ** numeric codes for all of the tokens.
 **
-** @(#) $Id: parse.y,v 1.157 2004/11/17 16:41:30 danielk1977 Exp $
+** @(#) $Id: parse.y,v 1.158 2004/11/22 19:12:21 drh Exp $
 */
 %token_prefix TK_
 %token_type {Token}
@@ -598,6 +598,9 @@
 term(A) ::= FLOAT(X).        {A = sqlite3Expr(@X, 0, 0, &X);}
 term(A) ::= STRING(X).       {A = sqlite3Expr(@X, 0, 0, &X);}
 expr(A) ::= BLOB(X).         {A = sqlite3Expr(@X, 0, 0, &X);}
+%ifndef SQLITE_OMIT_CURSOR
+expr(A) ::= CURRENT OF id.
+%endif 
 expr(A) ::= REGISTER(X).     {A = sqlite3RegisterExpr(pParse, &X);}
 expr(A) ::= VARIABLE(X).     {
   Token *pToken = &X;
@@ -955,3 +958,32 @@
   sqlite3AlterRenameTable(pParse,X,&Z);
 }
 %endif
+
+////////////////////////////// CURSORS //////////////////////////////////////
+%ifndef SQLITE_OMIT_CURSOR
+cmd ::= DECLARE nm(X) CURSOR FOR select(Y).  {sqlite3CursorCreate(pParse,&X,Y);}
+cmd ::= CLOSE nm(X).                         {sqlite3CursorClose(pParse,&X);}
+
+cmd ::= FETCH direction FROM nm(N) into_opt(D).
+                                      {sqlite3Fetch(pParse,&N,D);}
+
+%type into_opt {IdList*}
+%destructor into_opt {sqlite3IdListDelete($$);}
+into_opt(A) ::= .                     {A = 0;}
+into_opt(A) ::= INTO inscollist(X).   {A = X;}
+direction ::= NEXT(X) count_opt(Y).   {pParse->fetchDir=@X; pParse->dirArg1=Y;}
+direction ::= PRIOR(X) count_opt(Y).  {pParse->fetchDir=@X; pParse->dirArg1=Y;}
+direction ::= FIRST(X) count_opt(Y).  {pParse->fetchDir=@X; pParse->dirArg1=Y;}
+direction ::= LAST(X) count_opt(Y).   {pParse->fetchDir=@X; pParse->dirArg1=Y;}
+direction ::= ABSOLUTE(X) signed(Z) comma_count_opt(Y).
+                   {pParse->fetchDir=@X; pParse->dirArg1=Y; pParse->dirArg2=Z;}
+direction ::= RELATIVE(X) signed(Z) comma_count_opt(Y).
+                   {pParse->fetchDir=@X; pParse->dirArg1=Y; pParse->dirArg2=Z;}
+
+%type count_opt {int}
+count_opt(A) ::= .          {A = 1;}
+count_opt(A) ::= signed(X). {A = X;}
+%type comma_count_opt {int}
+comma_count_opt(A) ::= .                 {A = 1;}
+comma_count_opt(A) ::= COMMA signed(X).  {A = X;}
+%endif // SQLITE_OMIT_CURSOR
diff --git a/src/pragma.c b/src/pragma.c
index 10b282c..3d094d3 100644
--- a/src/pragma.c
+++ b/src/pragma.c
@@ -11,7 +11,7 @@
 *************************************************************************
 ** This file contains code used to implement the PRAGMA command.
 **
-** $Id: pragma.c,v 1.78 2004/11/13 15:59:15 drh Exp $
+** $Id: pragma.c,v 1.79 2004/11/22 19:12:21 drh Exp $
 */
 #include "sqliteInt.h"
 #include <ctype.h>
@@ -478,6 +478,24 @@
       sqlite3VdbeAddOp(v, OP_Callback, 3, 0);
     }
   }else
+
+#ifndef SQLITE_OMIT_CURSOR
+  if( sqlite3StrICmp(zLeft, "cursor_list")==0 ){
+    int i;
+    if( sqlite3ReadSchema(pParse) ) goto pragma_out;
+    sqlite3VdbeSetNumCols(v, 2);
+    sqlite3VdbeSetColName(v, 0, "seq", P3_STATIC);
+    sqlite3VdbeSetColName(v, 1, "name", P3_STATIC);
+    for(i=0; i<db->nSqlCursor; i++){
+      SqlCursor *p = db->apSqlCursor[i];
+      if( p==0 ) continue;
+      assert( p->zName!=0 );
+      sqlite3VdbeAddOp(v, OP_Integer, i, 0);
+      sqlite3VdbeOp3(v, OP_String8, 0, 0, p->zName, 0);
+      sqlite3VdbeAddOp(v, OP_Callback, 2, 0);
+    }
+  }else
+#endif /* SQLITE_OMIT_CURSOR */
 #endif /* SQLITE_OMIT_SCHEMA_PRAGMAS */
 
 #ifndef SQLITE_OMIT_FOREIGN_KEY
diff --git a/src/select.c b/src/select.c
index 412ee8e..d17c269 100644
--- a/src/select.c
+++ b/src/select.c
@@ -12,7 +12,7 @@
 ** This file contains C code routines that are called by the parser
 ** to handle SELECT statements in SQLite.
 **
-** $Id: select.c,v 1.215 2004/11/22 10:02:11 danielk1977 Exp $
+** $Id: select.c,v 1.216 2004/11/22 19:12:21 drh Exp $
 */
 #include "sqliteInt.h"
 
@@ -298,7 +298,6 @@
   sqlite3ExprDelete(p->pHaving);
   sqlite3ExprListDelete(p->pOrderBy);
   sqlite3SelectDelete(p->pPrior);
-  sqliteFree(p->zSelect);
   sqliteFree(p);
 }
 
@@ -2276,6 +2275,12 @@
   /* If there is are a sequence of queries, do the earlier ones first.
   */
   if( p->pPrior ){
+#ifndef SQLITE_OMIT_CURSOR
+    if( p->pFetch ){
+      sqlite3ErrorMsg(pParse, "cursors cannot be used on compound queries");
+      goto select_end;
+    }
+#endif
     return multiSelect(pParse, p, eDest, iParm, aff);
   }
 #endif
@@ -2360,6 +2365,26 @@
     goto select_end;
   }
 
+  /* We cannot use a SQL cursor on a join or on a DISTINCT query
+  */
+#ifndef SQLITE_OMIT_CURSOR
+  if( p->pFetch ){
+    if( p->isDistinct ){
+      sqlite3ErrorMsg(pParse, "cursors cannot be used on DISTINCT queries");
+      goto select_end;
+    }
+    if( pTabList->nSrc>0 ){
+      sqlite3ErrorMsg(pParse, "cursors cannot be used on joins");
+      goto select_end;
+    }
+    if( pTabList->a[0].pSelect ){
+      sqlite3ErrorMsg(pParse, "cursor cannot be used with nested queries "
+          "or views");
+      goto select_end;
+    }
+  }
+#endif
+
   /* Begin generating code.
   */
   v = sqlite3GetVdbe(pParse);
@@ -2522,8 +2547,16 @@
 
   /* Begin the database scan
   */
-  pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 
-                            pGroupBy ? 0 : &pOrderBy);
+#if 0 /* ndef SQLITE_OMIT_CURSOR */
+  if( p->pFetch ){
+    pWInfo = sqlite3WhereBeginFetch(pParse, pTabList, pWhere,
+                                    pOrderby, p->pFetch);
+  }else
+#endif
+  {
+    pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0,
+                               pGroupBy ? 0 : &pOrderBy);
+  }
   if( pWInfo==0 ) goto select_end;
 
   /* Use the standard inner loop if we are not dealing with
diff --git a/src/sqliteInt.h b/src/sqliteInt.h
index 9be16bc..0616d05 100644
--- a/src/sqliteInt.h
+++ b/src/sqliteInt.h
@@ -11,7 +11,7 @@
 *************************************************************************
 ** Internal interface definitions for SQLite.
 **
-** @(#) $Id: sqliteInt.h,v 1.342 2004/11/22 10:02:11 danielk1977 Exp $
+** @(#) $Id: sqliteInt.h,v 1.343 2004/11/22 19:12:21 drh Exp $
 */
 #ifndef _SQLITEINT_H_
 #define _SQLITEINT_H_
@@ -307,6 +307,8 @@
 typedef struct KeyClass KeyClass;
 typedef struct CollSeq CollSeq;
 typedef struct KeyInfo KeyInfo;
+typedef struct SqlCursor SqlCursor;
+typedef struct Fetch Fetch;
 
 /*
 ** Each database file to be accessed by the system is an instance
@@ -420,7 +422,10 @@
   void *pProgressArg;           /* Argument to the progress callback */
   int nProgressOps;             /* Number of opcodes for progress callback */
 #endif
-
+#ifndef SQLITE_OMIT_CURSOR
+  int nSqlCursor;               /* Number of slots in apSqlCursor[] */
+  SqlCursor **apSqlCursor;      /* Pointers to all active SQL cursors */
+#endif
   int errCode;                  /* Most recent error code (SQLITE_*) */
   u8 enc;                       /* Text encoding for this database. */
   u8 autoCommit;                /* The auto-commit flag. */
@@ -429,9 +434,8 @@
   void *pCollNeededArg;
   sqlite3_value *pValue;        /* Value used for transient conversions */
   sqlite3_value *pErr;          /* Most recent error message */
-
   char *zErrMsg;                /* Most recent error message (UTF-8 encoded) */
-  char *zErrMsg16;              /* Most recent error message (UTF-8 encoded) */
+  char *zErrMsg16;              /* Most recent error message (UTF-16 encoded) */
 };
 
 /*
@@ -929,17 +933,19 @@
 };
 
 /*
+** An instance of the following structure is used to store information
+** about a single FETCH sql command.
+*/
+struct Fetch {
+  SqlCursor *pCursor;  /* Cursor used by the fetch */
+  int isBackwards;     /* Cursor moves backwards if true, forward if false */
+  int doRewind;        /* True to rewind cursor before starting */
+};
+
+/*
 ** An instance of the following structure contains all information
 ** needed to generate code for a single SELECT statement.
 **
-** The zSelect field is used when the Select structure must be persistent.
-** Normally, the expression tree points to tokens in the original input
-** string that encodes the select.  But if the Select structure must live
-** longer than its input string (for example when it is used to describe
-** a VIEW) we have to make a copy of the input string so that the nodes
-** of the expression tree will have something to point to.  zSelect is used
-** to hold that copy.
-**
 ** nLimit is set to -1 if there is no LIMIT clause.  nOffset is set to 0.
 ** If there is a LIMIT clause, the parser sets nLimit to the value of the
 ** limit and nOffset to the value of the offset (or 0 if there is not
@@ -958,8 +964,8 @@
   Select *pPrior;        /* Prior select in a compound select statement */
   int nLimit, nOffset;   /* LIMIT and OFFSET values.  -1 means not used */
   int iLimit, iOffset;   /* Memory registers holding LIMIT & OFFSET counters */
-  char *zSelect;         /* Complete text of the SELECT command */
   IdList **ppOpenTemp;   /* OP_OpenTemp addresses used by multi-selects */
+  Fetch *pFetch;         /* If this stmt is part of a FETCH command */
 };
 
 /*
@@ -1039,6 +1045,11 @@
   u8 explain;          /* True if the EXPLAIN flag is found on the query */
   u8 useAgg;           /* If true, extract field values from the aggregator
                        ** while generating expressions.  Normally false */
+#ifndef SQLITE_OMIT_CURSOR
+  u8 fetchDir;         /* The direction argument to the FETCH command */
+  int dirArg1;         /* First argument to the direction */
+  int dirArg2;         /* Second argument to the direction */
+#endif
   int nAgg;            /* Number of aggregate expressions */
   AggExpr *aAgg;       /* An array of aggregate expressions */
   Token sErrToken;     /* The token at which the error occurred */
@@ -1050,6 +1061,7 @@
   Trigger *pNewTrigger;     /* Trigger under construct by a CREATE TRIGGER */
   TriggerStack *trigStack;  /* Trigger actions being coded */
   const char *zAuthContext; /* The 6th parameter to db->xAuth callbacks */
+  
 };
 
 /*
@@ -1212,6 +1224,18 @@
   char **pzErrMsg;    /* Error message stored here */
 } InitData;
 
+/*
+** Each SQL cursor (a cursor created by the DECLARE ... CURSOR syntax)
+** is represented by an instance of the following structure.
+*/
+struct SqlCursor {
+  char *zName;           /* Name of this cursor */
+  int idx;               /* Index of this cursor in db->apSqlCursor[] */
+  Select *pSelect;       /* The SELECT statement that defines this cursor */
+  int nPtr;              /* Number of slots in aPtr[] */
+  sqlite3_value *aPtr;   /* Values that define the current cursor position */
+};
+
 
 /*
  * This global flag is set for performance testing of triggers. When it is set
@@ -1314,6 +1338,7 @@
 Table *sqlite3SrcListLookup(Parse*, SrcList*);
 int sqlite3IsReadOnly(Parse*, Table*, int);
 void sqlite3OpenTableForReading(Vdbe*, int iCur, Table*);
+void sqlite3OpenTable(Vdbe*, int iCur, Table*, int);
 void sqlite3DeleteFrom(Parse*, SrcList*, Expr*);
 void sqlite3Update(Parse*, SrcList*, ExprList*, Expr*, int);
 WhereInfo *sqlite3WhereBegin(Parse*, SrcList*, Expr*, int, ExprList**);
@@ -1463,4 +1488,11 @@
 int sqlite3GetToken(const unsigned char *, int *);
 void sqlite3NestedParse(Parse*, const char*, ...);
 
+#ifndef SQLITE_OMIT_CURSOR
+void sqlite3CursorDelete(SqlCursor*);
+void sqlite3CursorCreate(Parse*, Token*, Select*);
+void sqlite3CursorClose(Parse*, Token*);
+void sqlite3Fetch(Parse*, Token*, IdList*);
+#endif /* SQLITE_OMIT_CURSOR */
+
 #endif
diff --git a/src/test1.c b/src/test1.c
index c7ba77c..b6e8a5b 100644
--- a/src/test1.c
+++ b/src/test1.c
@@ -13,7 +13,7 @@
 ** is not included in the SQLite library.  It is used for automated
 ** testing of the SQLite library.
 **
-** $Id: test1.c,v 1.116 2004/11/20 19:18:01 drh Exp $
+** $Id: test1.c,v 1.117 2004/11/22 19:12:21 drh Exp $
 */
 #include "sqliteInt.h"
 #include "tcl.h"
@@ -2552,6 +2552,12 @@
   Tcl_SetVar2(interp, "sqlite_options", "conflict", "1", TCL_GLOBAL_ONLY);
 #endif
 
+#ifdef SQLITE_OMIT_CURSOR
+  Tcl_SetVar2(interp, "sqlite_options", "cursor", "0", TCL_GLOBAL_ONLY);
+#else
+  Tcl_SetVar2(interp, "sqlite_options", "cursor", "1", TCL_GLOBAL_ONLY);
+#endif
+
 #ifdef SQLITE_OMIT_DATETIME_FUNCS
   Tcl_SetVar2(interp, "sqlite_options", "datetime", "0", TCL_GLOBAL_ONLY);
 #else
diff --git a/src/where.c b/src/where.c
index c3f8082..97cf37d 100644
--- a/src/where.c
+++ b/src/where.c
@@ -12,7 +12,7 @@
 ** This module contains C code that generates VDBE code used to process
 ** the WHERE clause of SQL statements.
 **
-** $Id: where.c,v 1.118 2004/11/22 10:02:20 danielk1977 Exp $
+** $Id: where.c,v 1.119 2004/11/22 19:12:21 drh Exp $
 */
 #include "sqliteInt.h"
 
@@ -248,9 +248,13 @@
 ** All terms of the ORDER BY clause must be either ASC or DESC.  The
 ** *pbRev value is set to 1 if the ORDER BY clause is all DESC and it is
 ** set to 0 if the ORDER BY clause is all ASC.
+**
+** TODO:  If earlier terms of an ORDER BY clause match all terms of a
+** UNIQUE index, then subsequent terms of the ORDER BY can be ignored.
+** This optimization needs to be implemented.
 */
 static Index *findSortingIndex(
-  Parse *pParse,
+  Parse *pParse,          /* Parsing context */
   Table *pTab,            /* The table to be sorted */
   int base,               /* Cursor number for pTab */
   ExprList *pOrderBy,     /* The ORDER BY clause */
@@ -258,14 +262,15 @@
   int nEqCol,             /* Number of index columns used with == constraints */
   int *pbRev              /* Set to 1 if ORDER BY is DESC */
 ){
-  int i, j;
-  Index *pMatch;
-  Index *pIdx;
-  int sortOrder;
+  int i, j;                    /* Loop counters */
+  Index *pMatch;               /* Best matching index so far */
+  Index *pIdx;                 /* Current index */
+  int sortOrder;               /* Which direction we are sorting */
   sqlite3 *db = pParse->db;
 
   assert( pOrderBy!=0 );
   assert( pOrderBy->nExpr>0 );
+  assert( pPreferredIdx!=0 || nEqCol==0 );
   sortOrder = pOrderBy->a[0].sortOrder;
   for(i=0; i<pOrderBy->nExpr; i++){
     Expr *p;
@@ -282,9 +287,10 @@
     }
   }
 
-  /* If we get this far, it means the ORDER BY clause consists only of
-  ** ascending columns in the left-most table of the FROM clause.  Now
-  ** check for a matching index.
+  /* If we get this far, it means the ORDER BY clause consists of columns
+  ** that are all either ascending or descending and which refer only to
+  ** the left-most table of the FROM clause.  Find the index that is best
+  ** used for sorting.
   */
   pMatch = 0;
   for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){
@@ -314,13 +320,34 @@
       if( pIdx==pPreferredIdx ) break;
     }
   }
-  if( pMatch && pbRev ){
-    *pbRev = sortOrder==SQLITE_SO_DESC;
-  }
+  *pbRev = sortOrder==SQLITE_SO_DESC;
   return pMatch;
 }
 
 /*
+** Check table to see if the ORDER BY clause in pOrderBy can be satisfied
+** by sorting in order of ROWID.  Return true if so and set *pbRev to be
+** true for reverse ROWID and false for forward ROWID order.
+*/
+static int sortableByRowid(
+  int base,               /* Cursor number for table to be sorted */
+  ExprList *pOrderBy,     /* The ORDER BY clause */
+  int *pbRev              /* Set to 1 if ORDER BY is DESC */
+){
+  Expr *p;
+
+  assert( pOrderBy!=0 );
+  assert( pOrderBy->nExpr>0 );
+  p = pOrderBy->a[0].pExpr;
+  if( p->op==TK_COLUMN && p->iTable==base && p->iColumn==-1 ){
+    *pbRev = pOrderBy->a[0].sortOrder;
+    return 1;
+  }
+  return 0;
+}
+
+
+/*
 ** Disable a term in the WHERE clause.  Except, do not disable the term
 ** if it controls a LEFT OUTER JOIN and it did not originate in the ON
 ** or USING clause of that join.
@@ -615,6 +642,12 @@
         }
       }
     }
+
+    /* If we found a term that tests ROWID with == or IN, that term
+    ** will be used to locate the rows in the database table.  There
+    ** is not need to continue into the code below that looks for
+    ** an index.  We will always use the ROWID over an index.
+    */
     if( iDirectEq[i]>=0 ){
       loopMask |= mask;
       pLevel->pIdx = 0;
@@ -735,35 +768,43 @@
   ** use of an index on the first table.
   */
   if( ppOrderBy && *ppOrderBy && pTabList->nSrc>0 ){
-     Index *pSortIdx;
-     Index *pIdx;
-     Table *pTab;
-     int bRev = 0;
+     Index *pSortIdx = 0;     /* Index that satisfies the ORDER BY clause */
+     Index *pIdx;             /* Index derived from the WHERE clause */
+     Table *pTab;             /* Left-most table in the FROM clause */
+     int bRev = 0;            /* True to reverse the output order */
+     int iCur;                /* Btree-cursor that will be used by pTab */
+     WhereLevel *pLevel0 = &pWInfo->a[0];
 
      pTab = pTabList->a[0].pTab;
-     pIdx = pWInfo->a[0].pIdx;
-     if( pIdx && pWInfo->a[0].score==4 ){
+     pIdx = pLevel0->pIdx;
+     iCur = pTabList->a[0].iCursor;
+     if( pIdx==0 && sortableByRowid(iCur, *ppOrderBy, &bRev) ){
+       /* The ORDER BY clause specifies ROWID order, which is what we
+       ** were going to be doing anyway...
+       */
+       *ppOrderBy = 0;
+       pLevel0->bRev = bRev;
+     }else if( pLevel0->score==4 ){
        /* If there is already an IN index on the left-most table,
        ** it will not give the correct sort order.
        ** So, pretend that no suitable index is found.
        */
-       pSortIdx = 0;
      }else if( iDirectEq[0]>=0 || iDirectLt[0]>=0 || iDirectGt[0]>=0 ){
        /* If the left-most column is accessed using its ROWID, then do
-       ** not try to sort by index.
+       ** not try to sort by index.  But do delete the ORDER BY clause
+       ** if it is redundant.
        */
-       pSortIdx = 0;
      }else{
-       int nEqCol = (pWInfo->a[0].score+4)/8;
-       pSortIdx = findSortingIndex(pParse, pTab, pTabList->a[0].iCursor, 
+       int nEqCol = (pLevel0->score+4)/8;
+       pSortIdx = findSortingIndex(pParse, pTab, iCur, 
                                    *ppOrderBy, pIdx, nEqCol, &bRev);
      }
      if( pSortIdx && (pIdx==0 || pIdx==pSortIdx) ){
        if( pIdx==0 ){
-         pWInfo->a[0].pIdx = pSortIdx;
-         pWInfo->a[0].iCur = pParse->nTab++;
+         pLevel0->pIdx = pSortIdx;
+         pLevel0->iCur = pParse->nTab++;
        }
-       pWInfo->a[0].bRev = bRev;
+       pLevel0->bRev = bRev;
        *ppOrderBy = 0;
      }
   }
@@ -828,7 +869,7 @@
       pLevel->op = OP_Noop;
     }else if( pIdx!=0 && pLevel->score>0 && pLevel->score%4==0 ){
       /* Case 2:  There is an index and all terms of the WHERE clause that
-      **          refer to the index use the "==" or "IN" operators.
+      **          refer to the index using the "==" or "IN" operators.
       */
       int start;
       int nColumn = (pLevel->score+4)/8;
@@ -892,9 +933,15 @@
       */
       int testOp = OP_Noop;
       int start;
+      int bRev = pLevel->bRev;
 
       brk = pLevel->brk = sqlite3VdbeMakeLabel(v);
       cont = pLevel->cont = sqlite3VdbeMakeLabel(v);
+      if( bRev ){
+        int t = iDirectGt[i];
+        iDirectGt[i] = iDirectLt[i];
+        iDirectLt[i] = t;
+      }
       if( iDirectGt[i]>=0 ){
         Expr *pX;
         k = iDirectGt[i];
@@ -905,10 +952,10 @@
         assert( pTerm->idxLeft==iCur );
         sqlite3ExprCode(pParse, pX->pRight);
         sqlite3VdbeAddOp(v, OP_ForceInt, pX->op==TK_LT || pX->op==TK_GT, brk);
-        sqlite3VdbeAddOp(v, OP_MoveGe, iCur, brk);
+        sqlite3VdbeAddOp(v, bRev ? OP_MoveLt : OP_MoveGe, iCur, brk);
         disableTerm(pLevel, &pTerm->p);
       }else{
-        sqlite3VdbeAddOp(v, OP_Rewind, iCur, brk);
+        sqlite3VdbeAddOp(v, bRev ? OP_Last : OP_Rewind, iCur, brk);
       }
       if( iDirectLt[i]>=0 ){
         Expr *pX;
@@ -922,14 +969,14 @@
         pLevel->iMem = pParse->nMem++;
         sqlite3VdbeAddOp(v, OP_MemStore, pLevel->iMem, 1);
         if( pX->op==TK_LT || pX->op==TK_GT ){
-          testOp = OP_Ge;
+          testOp = bRev ? OP_Le : OP_Ge;
         }else{
-          testOp = OP_Gt;
+          testOp = bRev ? OP_Lt : OP_Gt;
         }
         disableTerm(pLevel, &pTerm->p);
       }
       start = sqlite3VdbeCurrentAddr(v);
-      pLevel->op = OP_Next;
+      pLevel->op = bRev ? OP_Prev : OP_Next;
       pLevel->p1 = iCur;
       pLevel->p2 = start;
       if( testOp!=OP_Noop ){
@@ -943,12 +990,19 @@
       **          scan of the entire database table.
       */
       int start;
+      int opRewind;
 
       brk = pLevel->brk = sqlite3VdbeMakeLabel(v);
       cont = pLevel->cont = sqlite3VdbeMakeLabel(v);
-      sqlite3VdbeAddOp(v, OP_Rewind, iCur, brk);
+      if( pLevel->bRev ){
+        opRewind = OP_Last;
+        pLevel->op = OP_Prev;
+      }else{
+        opRewind = OP_Rewind;
+        pLevel->op = OP_Next;
+      }
+      sqlite3VdbeAddOp(v, opRewind, iCur, brk);
       start = sqlite3VdbeCurrentAddr(v);
-      pLevel->op = OP_Next;
       pLevel->p1 = iCur;
       pLevel->p2 = start;
       haveKey = 0;