Scan the table backwards if there is an ORDER BY ... DESC clause that can
be satisfied by an index. (CVS 795)
FossilOrigin-Name: c7a3487981de0ed5b43ea3ff4d46ab4437068dca
diff --git a/src/sqliteInt.h b/src/sqliteInt.h
index 642b76c..10095a8 100644
--- a/src/sqliteInt.h
+++ b/src/sqliteInt.h
@@ -11,7 +11,7 @@
*************************************************************************
** Internal interface definitions for SQLite.
**
-** @(#) $Id: sqliteInt.h,v 1.149 2002/11/20 11:55:19 drh Exp $
+** @(#) $Id: sqliteInt.h,v 1.150 2002/12/04 20:01:06 drh Exp $
*/
#include "config.h"
#include "sqlite.h"
@@ -614,6 +614,7 @@
int iLeftJoin; /* Memory cell used to implement LEFT OUTER JOIN */
int top; /* First instruction of interior of the loop */
int inOp, inP1, inP2;/* Opcode used to implement an IN operator */
+ int bRev; /* Do the scan in the reverse direction */
};
/*
diff --git a/src/vdbe.c b/src/vdbe.c
index 3446d69..93b4ee9 100644
--- a/src/vdbe.c
+++ b/src/vdbe.c
@@ -36,7 +36,7 @@
** in this file for details. If in doubt, do not deviate from existing
** commenting and indentation practices when changing or adding code.
**
-** $Id: vdbe.c,v 1.185 2002/12/02 04:25:21 drh Exp $
+** $Id: vdbe.c,v 1.186 2002/12/04 20:01:06 drh Exp $
*/
#include "sqliteInt.h"
#include <ctype.h>
@@ -3354,8 +3354,19 @@
** If there are no records greater than the key and P2 is not zero,
** then an immediate jump to P2 is made.
**
-** See also: Found, NotFound, Distinct
+** See also: Found, NotFound, Distinct, MoveLt
*/
+/* Opcode: MoveLt P1 P2 *
+**
+** Pop the top of the stack and use its value as a key. Reposition
+** cursor P1 so that it points to the entry with the largest key that is
+** less than the key popped from the stack.
+** If there are no records less than than the key and P2
+** is not zero then an immediate jump to P2 is made.
+**
+** See also: MoveTo
+*/
+case OP_MoveLt:
case OP_MoveTo: {
int i = pOp->p1;
int tos = p->tos;
@@ -3363,7 +3374,7 @@
VERIFY( if( tos<0 ) goto not_enough_stack; )
if( i>=0 && i<p->nCursor && (pC = &p->aCsr[i])->pCursor!=0 ){
- int res;
+ int res, oc;
if( aStack[tos].flags & STK_Int ){
int iKey = intToKey(aStack[tos].i);
sqliteBtreeMoveto(pC->pCursor, (char*)&iKey, sizeof(int), &res);
@@ -3376,12 +3387,19 @@
}
pC->nullRow = 0;
sqlite_search_count++;
- if( res<0 ){
+ oc = pOp->opcode;
+ if( oc==OP_MoveTo && res<0 ){
sqliteBtreeNext(pC->pCursor, &res);
pC->recnoIsValid = 0;
if( res && pOp->p2>0 ){
pc = pOp->p2 - 1;
}
+ }else if( oc==OP_MoveLt && res>=0 ){
+ sqliteBtreePrevious(pC->pCursor, &res);
+ pC->recnoIsValid = 0;
+ if( res && pOp->p2>0 ){
+ pc = pOp->p2 - 1;
+ }
}
}
POPSTACK;
@@ -4026,7 +4044,17 @@
** table or index. If there are no more key/value pairs then fall through
** to the following instruction. But if the cursor advance was successful,
** jump immediately to P2.
+**
+** See also: Prev
*/
+/* Opcode: Prev P1 P2 *
+**
+** Back up cursor P1 so that it points to the previous key/data pair in its
+** table or index. If there is no previous key/value pairs then fall through
+** to the following instruction. But if the cursor backup was successful,
+** jump immediately to P2.
+*/
+case OP_Prev:
case OP_Next: {
int i = pOp->p1;
BtCursor *pCrsr;
@@ -4036,7 +4064,8 @@
if( p->aCsr[i].nullRow ){
res = 1;
}else{
- rc = sqliteBtreeNext(pCrsr, &res);
+ rc = pOp->opcode==OP_Next ? sqliteBtreeNext(pCrsr, &res) :
+ sqliteBtreePrevious(pCrsr, &res);
p->aCsr[i].nullRow = res;
}
if( res==0 ){
@@ -4164,6 +4193,15 @@
** then jump to P2. Otherwise fall through to the next instruction.
** In either case, the stack is popped once.
*/
+/* Opcode: IdxLT P1 P2 *
+**
+** Compare the top of the stack against the key on the index entry that
+** cursor P1 is currently pointing to. Ignore the last 4 bytes of the
+** index entry. If the index entry is less than the top of the stack
+** then jump to P2. Otherwise fall through to the next instruction.
+** In either case, the stack is popped once.
+*/
+case OP_IdxLT:
case OP_IdxGT:
case OP_IdxGE: {
int i= pOp->p1;
@@ -4178,7 +4216,9 @@
if( rc!=SQLITE_OK ){
break;
}
- if( pOp->opcode==OP_IdxGE ){
+ if( pOp->opcode==OP_IdxLT ){
+ res = -res;
+ }else if( pOp->opcode==OP_IdxGE ){
res++;
}
if( res>0 ){
diff --git a/src/where.c b/src/where.c
index 1c37040..fe4758d 100644
--- a/src/where.c
+++ b/src/where.c
@@ -13,7 +13,7 @@
** the WHERE clause of SQL statements. Also found here are subroutines
** to generate VDBE code to evaluate expressions.
**
-** $Id: where.c,v 1.67 2002/12/03 02:22:52 drh Exp $
+** $Id: where.c,v 1.68 2002/12/04 20:01:06 drh Exp $
*/
#include "sqliteInt.h"
@@ -169,18 +169,22 @@
Table *pTab, /* The table to be sorted */
int base, /* Cursor number for pTab */
ExprList *pOrderBy, /* The ORDER BY clause */
- Index *pPreferredIdx /* Use this index, if possible and not NULL */
+ Index *pPreferredIdx, /* Use this index, if possible and not NULL */
+ int *pbRev /* Set to 1 if ORDER BY is DESC */
){
int i;
Index *pMatch;
Index *pIdx;
+ int sortOrder;
assert( pOrderBy!=0 );
assert( pOrderBy->nExpr>0 );
+ sortOrder = pOrderBy->a[0].sortOrder & SQLITE_SO_DIRMASK;
for(i=0; i<pOrderBy->nExpr; i++){
Expr *p;
- if( (pOrderBy->a[i].sortOrder & SQLITE_SO_DIRMASK)!=SQLITE_SO_ASC ){
- /* Indices can only be used for ascending sort order */
+ if( (pOrderBy->a[i].sortOrder & SQLITE_SO_DIRMASK)!=sortOrder ){
+ /* Indices can only be used if all ORDER BY terms are either
+ ** DESC or ASC. Indices cannot be used on a mixture. */
return 0;
}
if( (pOrderBy->a[i].sortOrder & SQLITE_SO_TYPEMASK)!=SQLITE_SO_UNK ){
@@ -194,7 +198,7 @@
return 0;
}
}
-
+
/* If we get this far, it means the ORDER BY clause consists only of
** ascending columns in the left-most table of the FROM clause. Now
** check for a matching index.
@@ -210,6 +214,9 @@
if( pIdx==pPreferredIdx ) break;
}
}
+ if( pMatch && pbRev ){
+ *pbRev = sortOrder==SQLITE_SO_DESC;
+ }
return pMatch;
}
@@ -468,17 +475,18 @@
**
** The best index is determined as follows. For each of the
** left-most terms that is fixed by an equality operator, add
- ** 4 to the score. The right-most term of the index may be
+ ** 8 to the score. The right-most term of the index may be
** constrained by an inequality. Add 1 if for an "x<..." constraint
** and add 2 for an "x>..." constraint. Chose the index that
** gives the best score.
**
** This scoring system is designed so that the score can later be
- ** used to determine how the index is used. If the score&3 is 0
+ ** used to determine how the index is used. If the score&7 is 0
** then all constraints are equalities. If score&1 is not 0 then
** there is an inequality used as a termination key. (ex: "x<...")
** If score&2 is not 0 then there is an inequality used as the
- ** start key. (ex: "x>...");
+ ** start key. (ex: "x>..."). A score or 4 is the special case
+ ** of an IN operator constraint. (ex: "x IN ...").
**
** The IN operator (as in "<expr> IN (...)") is treated the same as
** an equality comparison except that it can only be used on the
@@ -562,15 +570,19 @@
}
}
}
+
+ /* The following loop ends with nEq set to the number of columns
+ ** on the left of the index with == constraints.
+ */
for(nEq=0; nEq<pIdx->nColumn; nEq++){
m = (1<<(nEq+1))-1;
if( (m & eqMask)!=m ) break;
}
- score = nEq*4;
+ score = nEq*8; /* Base score is 8 times number of == constraints */
m = 1<<nEq;
- if( m & ltMask ) score++;
- if( m & gtMask ) score+=2;
- if( score==0 && inMask ) score = 4;
+ if( m & ltMask ) score++; /* Increase score for a < constraint */
+ if( m & gtMask ) score+=2; /* Increase score for a > constraint */
+ if( score==0 && inMask ) score = 4; /* Default score for IN constraint */
if( score>bestScore ){
pBestIdx = pIdx;
bestScore = score;
@@ -578,6 +590,7 @@
}
pWInfo->a[i].pIdx = pBestIdx;
pWInfo->a[i].score = bestScore;
+ pWInfo->a[i].bRev = 0;
loopMask |= 1<<idx;
if( pBestIdx ){
pWInfo->a[i].iCur = pParse->nTab++;
@@ -592,15 +605,14 @@
Index *pSortIdx;
Index *pIdx;
Table *pTab;
+ int bRev = 0;
pTab = pTabList->a[0].pTab;
pIdx = pWInfo->a[0].pIdx;
if( pIdx && pWInfo->a[0].score==4 ){
- /* If there is already an index on the left-most column and it is
- ** an equality index, then either sorting is not helpful, or the
- ** index is an IN operator, in which case the index does not give
- ** the correct sort order. Either way, pretend that no suitable
- ** index is found.
+ /* If there is already an IN index on the left-most table,
+ ** it will not give the correct sort order.
+ ** So, pretend that no suitable index is found.
*/
pSortIdx = 0;
}else if( iDirectEq[0]>=0 || iDirectLt[0]>=0 || iDirectGt[0]>=0 ){
@@ -609,7 +621,7 @@
*/
pSortIdx = 0;
}else{
- pSortIdx = findSortingIndex(pTab, base, *ppOrderBy, pIdx);
+ pSortIdx = findSortingIndex(pTab, base, *ppOrderBy, pIdx, &bRev);
}
if( pSortIdx && (pIdx==0 || pIdx==pSortIdx) ){
if( pIdx==0 ){
@@ -617,6 +629,7 @@
pWInfo->a[0].iCur = pParse->nTab++;
pWInfo->peakNTab = pParse->nTab;
}
+ pWInfo->a[0].bRev = bRev;
*ppOrderBy = 0;
}
}
@@ -708,7 +721,7 @@
*/
int start;
int testOp;
- int nColumn = pLevel->score/4;
+ int nColumn = (pLevel->score+4)/8;
brk = pLevel->brk = sqliteVdbeMakeLabel(v);
for(j=0; j<nColumn; j++){
for(k=0; k<nExpr; k++){
@@ -756,7 +769,7 @@
cont = pLevel->cont = sqliteVdbeMakeLabel(v);
sqliteVdbeAddOp(v, OP_MakeKey, nColumn, 0);
sqliteAddIdxKeyType(v, pIdx);
- if( nColumn==pIdx->nColumn ){
+ if( nColumn==pIdx->nColumn || pLevel->bRev ){
sqliteVdbeAddOp(v, OP_MemStore, pLevel->iMem, 0);
testOp = OP_IdxGT;
}else{
@@ -765,17 +778,28 @@
sqliteVdbeAddOp(v, OP_MemStore, pLevel->iMem, 1);
testOp = OP_IdxGE;
}
- sqliteVdbeAddOp(v, OP_MoveTo, pLevel->iCur, brk);
- start = sqliteVdbeAddOp(v, OP_MemLoad, pLevel->iMem, 0);
- sqliteVdbeAddOp(v, testOp, pLevel->iCur, brk);
- sqliteVdbeAddOp(v, OP_IdxRecno, pLevel->iCur, 0);
+ if( pLevel->bRev ){
+ /* Scan in reverse order */
+ sqliteVdbeAddOp(v, OP_IncrKey, 0, 0);
+ sqliteVdbeAddOp(v, OP_MoveLt, pLevel->iCur, brk);
+ start = sqliteVdbeAddOp(v, OP_MemLoad, pLevel->iMem, 0);
+ sqliteVdbeAddOp(v, OP_IdxLT, pLevel->iCur, brk);
+ sqliteVdbeAddOp(v, OP_IdxRecno, pLevel->iCur, 0);
+ pLevel->op = OP_Prev;
+ }else{
+ /* Scan in the forward order */
+ sqliteVdbeAddOp(v, OP_MoveTo, pLevel->iCur, brk);
+ start = sqliteVdbeAddOp(v, OP_MemLoad, pLevel->iMem, 0);
+ sqliteVdbeAddOp(v, testOp, pLevel->iCur, brk);
+ sqliteVdbeAddOp(v, OP_IdxRecno, pLevel->iCur, 0);
+ pLevel->op = OP_Next;
+ }
if( i==pTabList->nSrc-1 && pushKey ){
haveKey = 1;
}else{
sqliteVdbeAddOp(v, OP_MoveTo, base+idx, 0);
haveKey = 0;
}
- pLevel->op = OP_Next;
pLevel->p1 = pLevel->iCur;
pLevel->p2 = start;
}else if( i<ARRAYSIZE(iDirectLt) && (iDirectLt[i]>=0 || iDirectGt[i]>=0) ){
@@ -862,7 +886,7 @@
** to force the output order to conform to an ORDER BY.
*/
int score = pLevel->score;
- int nEqColumn = score/4;
+ int nEqColumn = score/8;
int start;
int leFlag, geFlag;
int testOp;
@@ -901,9 +925,17 @@
sqliteVdbeAddOp(v, OP_Dup, nEqColumn-1, 0);
}
+ /* Labels for the beginning and end of the loop
+ */
+ cont = pLevel->cont = sqliteVdbeMakeLabel(v);
+ brk = pLevel->brk = sqliteVdbeMakeLabel(v);
+
/* Generate the termination key. This is the key value that
** will end the search. There is no termination key if there
** are no equality terms and no "X<..." term.
+ **
+ ** 2002-Dec-04: On a reverse-order scan, the so-called "termination"
+ ** key computed here really ends up being the start key.
*/
if( (score & 1)!=0 ){
for(k=0; k<nExpr; k++){
@@ -942,7 +974,13 @@
if( leFlag ){
sqliteVdbeAddOp(v, OP_IncrKey, 0, 0);
}
- sqliteVdbeAddOp(v, OP_MemStore, pLevel->iMem, 1);
+ if( pLevel->bRev ){
+ sqliteVdbeAddOp(v, OP_MoveLt, pLevel->iCur, brk);
+ }else{
+ sqliteVdbeAddOp(v, OP_MemStore, pLevel->iMem, 1);
+ }
+ }else if( pLevel->bRev ){
+ sqliteVdbeAddOp(v, OP_Last, pLevel->iCur, brk);
}
/* Generate the start key. This is the key that defines the lower
@@ -950,6 +988,9 @@
** equality terms and if there is no "X>..." term. In
** that case, generate a "Rewind" instruction in place of the
** start key search.
+ **
+ ** 2002-Dec-04: In the case of a reverse-order search, the so-called
+ ** "start" key really ends up being used as the termination key.
*/
if( (score & 2)!=0 ){
for(k=0; k<nExpr; k++){
@@ -979,15 +1020,21 @@
}else{
geFlag = 1;
}
- brk = pLevel->brk = sqliteVdbeMakeLabel(v);
- cont = pLevel->cont = sqliteVdbeMakeLabel(v);
if( nEqColumn>0 || (score&2)!=0 ){
sqliteVdbeAddOp(v, OP_MakeKey, nEqColumn + ((score&2)!=0), 0);
sqliteAddIdxKeyType(v, pIdx);
if( !geFlag ){
sqliteVdbeAddOp(v, OP_IncrKey, 0, 0);
}
- sqliteVdbeAddOp(v, OP_MoveTo, pLevel->iCur, brk);
+ if( pLevel->bRev ){
+ pLevel->iMem = pParse->nMem++;
+ sqliteVdbeAddOp(v, OP_MemStore, pLevel->iMem, 1);
+ testOp = OP_IdxLT;
+ }else{
+ sqliteVdbeAddOp(v, OP_MoveTo, pLevel->iCur, brk);
+ }
+ }else if( pLevel->bRev ){
+ testOp = OP_Noop;
}else{
sqliteVdbeAddOp(v, OP_Rewind, pLevel->iCur, brk);
}
@@ -1011,7 +1058,7 @@
/* Record the instruction used to terminate the loop.
*/
- pLevel->op = OP_Next;
+ pLevel->op = pLevel->bRev ? OP_Prev : OP_Next;
pLevel->p1 = pLevel->iCur;
pLevel->p2 = start;
}