Add a new algorithm for handling INSERT which reduces fragmentation on
a VACUUM.  Ticket #2075.  More testing needed. (CVS 3643)

FossilOrigin-Name: 9f56a878cbbc715262b3a48ee696148dbd7bf1d2
diff --git a/src/insert.c b/src/insert.c
index e108673..d967806 100644
--- a/src/insert.c
+++ b/src/insert.c
@@ -12,7 +12,7 @@
 ** This file contains C code routines that are called by the parser
 ** to handle INSERT statements in SQLite.
 **
-** $Id: insert.c,v 1.172 2006/08/29 18:46:14 drh Exp $
+** $Id: insert.c,v 1.173 2007/02/13 15:01:11 drh Exp $
 */
 #include "sqliteInt.h"
 
@@ -118,6 +118,117 @@
   return 0;
 }
 
+#ifndef SQLITE_OMIT_AUTOINCREMENT
+/*
+** Write out code to initialize the autoincrement logic.  This code
+** looks up the current autoincrement value in the sqlite_sequence
+** table and stores that value in a memory cell.  Code generated by
+** autoIncStep() will keep that memory cell holding the largest
+** rowid value.  Code generated by autoIncEnd() will write the new
+** largest value of the counter back into the sqlite_sequence table.
+**
+** This routine returns the index of the mem[] cell that contains
+** the maximum rowid counter.
+**
+** Two memory cells are allocated.  The next memory cell after the
+** one returned holds the rowid in sqlite_sequence where we will
+** write back the revised maximum rowid.
+*/
+static int autoIncBegin(
+  Parse *pParse,      /* Parsing context */
+  int iDb,            /* Index of the database holding pTab */
+  Table *pTab         /* The table we are writing to */
+){
+  int memId = 0;
+  if( pTab->autoInc ){
+    Vdbe *v = pParse->pVdbe;
+    Db *pDb = &pParse->db->aDb[iDb];
+    int iCur = pParse->nTab;
+    int addr;
+    assert( v );
+    addr = sqlite3VdbeCurrentAddr(v);
+    memId = pParse->nMem+1;
+    pParse->nMem += 2;
+    sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenRead);
+    sqlite3VdbeAddOp(v, OP_Rewind, iCur, addr+13);
+    sqlite3VdbeAddOp(v, OP_Column, iCur, 0);
+    sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
+    sqlite3VdbeAddOp(v, OP_Ne, 0x100, addr+12);
+    sqlite3VdbeAddOp(v, OP_Rowid, iCur, 0);
+    sqlite3VdbeAddOp(v, OP_MemStore, memId-1, 1);
+    sqlite3VdbeAddOp(v, OP_Column, iCur, 1);
+    sqlite3VdbeAddOp(v, OP_MemStore, memId, 1);
+    sqlite3VdbeAddOp(v, OP_Goto, 0, addr+13);
+    sqlite3VdbeAddOp(v, OP_Next, iCur, addr+4);
+    sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
+  }
+  return memId;
+}
+
+/*
+** Update the maximum rowid for an autoincrement calculation.
+**
+** This routine should be called when the top of the stack holds a
+** new rowid that is about to be inserted.  If that new rowid is
+** larger than the maximum rowid in the memId memory cell, then the
+** memory cell is updated.  The stack is unchanged.
+*/
+static void autoIncStep(Parse *pParse, int memId){
+  if( memId>0 ){
+    sqlite3VdbeAddOp(pParse->pVdbe, OP_MemMax, memId, 0);
+  }
+}
+
+/*
+** After doing one or more inserts, the maximum rowid is stored
+** in mem[memId].  Generate code to write this value back into the
+** the sqlite_sequence table.
+*/
+static void autoIncEnd(
+  Parse *pParse,     /* The parsing context */
+  int iDb,           /* Index of the database holding pTab */
+  Table *pTab,       /* Table we are inserting into */
+  int memId          /* Memory cell holding the maximum rowid */
+){
+  if( pTab->autoInc ){
+    int iCur = pParse->nTab;
+    Vdbe *v = pParse->pVdbe;
+    Db *pDb = &pParse->db->aDb[iDb];
+    int addr;
+    assert( v );
+    addr = sqlite3VdbeCurrentAddr(v);
+    sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenWrite);
+    sqlite3VdbeAddOp(v, OP_MemLoad, memId-1, 0);
+    sqlite3VdbeAddOp(v, OP_NotNull, -1, addr+7);
+    sqlite3VdbeAddOp(v, OP_Pop, 1, 0);
+    sqlite3VdbeAddOp(v, OP_NewRowid, iCur, 0);
+    sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
+    sqlite3VdbeAddOp(v, OP_MemLoad, memId, 0);
+    sqlite3VdbeAddOp(v, OP_MakeRecord, 2, 0);
+    sqlite3VdbeAddOp(v, OP_Insert, iCur, 0);
+    sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
+  }
+}
+#else
+/*
+** If SQLITE_OMIT_AUTOINCREMENT is defined, then the three routines
+** above are all no-ops
+*/
+# define autoIncBegin(A,B,C) (0)
+# define autoIncStep(A,B)
+# define autoIncEnd(A,B,C,D)
+#endif /* SQLITE_OMIT_AUTOINCREMENT */
+
+
+/* Forward declaration */
+static int xferOptimization(
+  Parse *pParse,        /* Parser context */
+  Table *pDest,         /* The table we are inserting into */
+  Select *pSelect,      /* A SELECT statement to use as the data source */
+  int onError,          /* How to handle constraint errors */
+  int iDbDest           /* The database of pDest */
+);
+
 /*
 ** This routine is call to handle SQL of the following forms:
 **
@@ -133,7 +244,7 @@
 ** NULL and pSelect is a pointer to the select statement used to generate
 ** data for the insert.
 **
-** The code generated follows one of three templates.  For a simple
+** The code generated follows one of four templates.  For a simple
 ** select with data coming from a VALUES clause, the code executes
 ** once straight down through.  The template looks like this:
 **
@@ -142,16 +253,37 @@
 **         write the resulting record into <table>
 **         cleanup
 **
-** If the statement is of the form
+** The three remaining templates assume the statement is of the form
 **
 **   INSERT INTO <table> SELECT ...
 **
-** And the SELECT clause does not read from <table> at any time, then
-** the generated code follows this template:
+** If the SELECT clause is of the restricted form "SELECT * FROM <table2>" -
+** in other words if the SELECT pulls all columns from a single table
+** and there is no WHERE or LIMIT or GROUP BY or ORDER BY clauses, and
+** if <table2> and <table1> are distinct tables but have identical
+** schemas, including all the same indices, then a special optimization
+** is invoked that copies raw records from <table2> over to <table1>.
+** See the xferOptimization() function for the implementation of this
+** template.  This is the second template.
+**
+**         open a write cursor to <table>
+**         open read cursor on <table2>
+**         transfer all records in <table2> over to <table>
+**         close cursors
+**         foreach index on <table>
+**           open a write cursor on the <table> index
+**           open a read cursor on the corresponding <table2> index
+**           transfer all records from the read to the write cursors
+**           close cursors
+**         end foreach
+**
+** The third template is for when the second template does not apply
+** and the SELECT clause does not read from <table> at any time.
+** The generated code follows this template:
 **
 **         goto B
 **      A: setup for the SELECT
-**         loop over the tables in the SELECT
+**         loop over the rows in the SELECT
 **           gosub C
 **         end loop
 **         cleanup after the SELECT
@@ -162,7 +294,7 @@
 **         return
 **      D: cleanup
 **
-** The third template is used if the insert statement takes its
+** The fourth template is used if the insert statement takes its
 ** values from a SELECT but the data is being inserted into a table
 ** that is also read as part of the SELECT.  In the third form,
 ** we have to use a intermediate table to store the results of
@@ -221,10 +353,6 @@
   int triggers_exist = 0;     /* True if there are FOR EACH ROW triggers */
 #endif
 
-#ifndef SQLITE_OMIT_AUTOINCREMENT
-  int counterRowid = 0;  /* Memory cell holding rowid of autoinc counter */
-#endif
-
   if( pParse->nErr || sqlite3MallocFailed() ){
     goto insert_cleanup;
   }
@@ -291,31 +419,27 @@
     newIdx = pParse->nTab++;
   }
 
-#ifndef SQLITE_OMIT_AUTOINCREMENT
+#ifndef SQLITE_OMIT_XFER_OPT
+  /* If the statement is of the form
+  **
+  **       INSERT INTO <table1> SELECT * FROM <table2>;
+  **
+  ** Then special optimizations can be applied that make the transfer
+  ** very fast and which reduce fragmentation of indices.
+  */
+  if( pColumn==0 && xferOptimization(pParse, pTab, pSelect, onError, iDb) ){
+    assert( !triggers_exist );
+    assert( pList==0 );
+    goto insert_cleanup;
+  }
+#endif /* SQLITE_OMIT_XFER_OPT */
+
   /* If this is an AUTOINCREMENT table, look up the sequence number in the
   ** sqlite_sequence table and store it in memory cell counterMem.  Also
   ** remember the rowid of the sqlite_sequence table entry in memory cell
   ** counterRowid.
   */
-  if( pTab->autoInc ){
-    int iCur = pParse->nTab;
-    int addr = sqlite3VdbeCurrentAddr(v);
-    counterRowid = pParse->nMem++;
-    counterMem = pParse->nMem++;
-    sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenRead);
-    sqlite3VdbeAddOp(v, OP_Rewind, iCur, addr+13);
-    sqlite3VdbeAddOp(v, OP_Column, iCur, 0);
-    sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
-    sqlite3VdbeAddOp(v, OP_Ne, 0x100, addr+12);
-    sqlite3VdbeAddOp(v, OP_Rowid, iCur, 0);
-    sqlite3VdbeAddOp(v, OP_MemStore, counterRowid, 1);
-    sqlite3VdbeAddOp(v, OP_Column, iCur, 1);
-    sqlite3VdbeAddOp(v, OP_MemStore, counterMem, 1);
-    sqlite3VdbeAddOp(v, OP_Goto, 0, addr+13);
-    sqlite3VdbeAddOp(v, OP_Next, iCur, addr+4);
-    sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
-  }
-#endif /* SQLITE_OMIT_AUTOINCREMENT */
+  counterMem = autoIncBegin(pParse, iDb, pTab);
 
   /* Figure out how many columns of data are supplied.  If the data
   ** is coming from a SELECT statement, then this step also generates
@@ -591,11 +715,7 @@
     }else{
       sqlite3VdbeAddOp(v, OP_NewRowid, base, counterMem);
     }
-#ifndef SQLITE_OMIT_AUTOINCREMENT
-    if( pTab->autoInc ){
-      sqlite3VdbeAddOp(v, OP_MemMax, counterMem, 0);
-    }
-#endif /* SQLITE_OMIT_AUTOINCREMENT */
+    autoIncStep(pParse, counterMem);
 
     /* Push onto the stack, data for all columns of the new entry, beginning
     ** with the first column.
@@ -688,26 +808,11 @@
     }
   }
 
-#ifndef SQLITE_OMIT_AUTOINCREMENT
   /* Update the sqlite_sequence table by storing the content of the
   ** counter value in memory counterMem back into the sqlite_sequence
   ** table.
   */
-  if( pTab->autoInc ){
-    int iCur = pParse->nTab;
-    int addr = sqlite3VdbeCurrentAddr(v);
-    sqlite3OpenTable(pParse, iCur, iDb, pDb->pSchema->pSeqTab, OP_OpenWrite);
-    sqlite3VdbeAddOp(v, OP_MemLoad, counterRowid, 0);
-    sqlite3VdbeAddOp(v, OP_NotNull, -1, addr+7);
-    sqlite3VdbeAddOp(v, OP_Pop, 1, 0);
-    sqlite3VdbeAddOp(v, OP_NewRowid, iCur, 0);
-    sqlite3VdbeOp3(v, OP_String8, 0, 0, pTab->zName, 0);
-    sqlite3VdbeAddOp(v, OP_MemLoad, counterMem, 0);
-    sqlite3VdbeAddOp(v, OP_MakeRecord, 2, 0);
-    sqlite3VdbeAddOp(v, OP_Insert, iCur, 0);
-    sqlite3VdbeAddOp(v, OP_Close, iCur, 0);
-  }
-#endif
+  autoIncEnd(pParse, iDb, pTab, counterMem);
 
   /*
   ** Return the number of rows inserted. If this routine is 
@@ -1140,3 +1245,299 @@
     pParse->nTab = base+i;
   }
 }
+
+#ifndef SQLITE_OMIT_XFER_OPT
+/*
+** Check to collation names to see if they are compatible.
+*/
+static int xferCompatibleCollation(const char *z1, const char *z2){
+  if( z1==0 ){
+    return z2==0;
+  }
+  if( z2==0 ){
+    return 0;
+  }
+  return sqlite3StrICmp(z1, z2)==0;
+}
+
+
+/*
+** Check to see if index pSrc is compatible as a source of data
+** for index pDest in an insert transfer optimization.  The rules
+** for a compatible index:
+**
+**    *   The index is over the same set of columns
+**    *   The same DESC and ASC markings occurs on all columns
+**    *   The same onError processing (OE_Abort, OE_Ignore, etc)
+**    *   The same collating sequence on each column
+*/
+static int xferCompatibleIndex(Index *pDest, Index *pSrc){
+  int i;
+  assert( pDest && pSrc );
+  assert( pDest->pTable!=pSrc->pTable );
+  if( pDest->nColumn!=pSrc->nColumn ){
+    return 0;   /* Different number of columns */
+  }
+  if( pDest->onError!=pSrc->onError ){
+    return 0;   /* Different conflict resolution strategies */
+  }
+  for(i=0; i<pSrc->nColumn; i++){
+    if( pSrc->aiColumn[i]!=pDest->aiColumn[i] ){
+      return 0;   /* Different columns indexed */
+    }
+    if( pSrc->aSortOrder[i]!=pDest->aSortOrder[i] ){
+      return 0;   /* Different sort orders */
+    }
+    if( pSrc->azColl[i]!=pDest->azColl[i] ){
+      return 0;   /* Different sort orders */
+    }
+  }
+
+  /* If no test above fails then the indices must be compatible */
+  return 1;
+}
+
+/*
+** Attempt the transfer optimization on INSERTs of the form
+**
+**     INSERT INTO tab1 SELECT * FROM tab2;
+**
+** This optimization is only attempted if
+**
+**    (1)  tab1 and tab2 have identical schemas including all the
+**         same indices
+**
+**    (2)  tab1 and tab2 are different tables
+**
+**    (3)  There must be no triggers on tab1
+**
+**    (4)  The result set of the SELECT statement is "*"
+**
+**    (5)  The SELECT statement has no WHERE, HAVING, ORDER BY, GROUP BY,
+**         or LIMIT clause.
+**
+**    (6)  The SELECT statement is a simple (not a compound) select that
+**         contains only tab2 in its FROM clause
+**
+** This method for implementing the INSERT transfers raw records from
+** tab2 over to tab1.  The columns are not decoded.  Raw records from
+** the indices of tab2 are transfered to tab1 as well.  In so doing,
+** the resulting tab1 has much less fragmentation.
+**
+** This routine returns TRUE if the optimization is attempted.  If any
+** of the conditions above fail so that the optimization should not
+** be attempted, then this routine returns FALSE.
+*/
+static int xferOptimization(
+  Parse *pParse,        /* Parser context */
+  Table *pDest,         /* The table we are inserting into */
+  Select *pSelect,      /* A SELECT statement to use as the data source */
+  int onError,          /* How to handle constraint errors */
+  int iDbDest           /* The database of pDest */
+){
+  ExprList *pEList;                /* The result set of the SELECT */
+  Table *pSrc;                     /* The table in the FROM clause of SELECT */
+  Index *pSrcIdx, *pDestIdx;       /* Source and destination indices */
+  struct SrcList_item *pItem;      /* An element of pSelect->pSrc */
+  int i;                           /* Loop counter */
+  int iDbSrc;                      /* The database of pSrc */
+  int iSrc, iDest;                 /* Cursors from source and destination */
+  int addr1, addr2;                /* Loop addresses */
+  int emptyDestTest;               /* Address of test for empty pDest */
+  int emptySrcTest;                /* Address of test for empty pSrc */
+  int memRowid;                    /* A memcell containing a rowid from pSrc */
+  Vdbe *v;                         /* The VDBE we are building */
+  KeyInfo *pKey;                   /* Key information for an index */
+  int counterMem;                  /* Memory register used by AUTOINC */
+
+  if( pSelect==0 ){
+    return 0;   /* Must be of the form  INSERT INTO ... SELECT ... */
+  }
+  if( pDest->pTrigger ){
+    return 0;   /* tab1 must not have triggers */
+  }
+#ifndef SQLITE_OMIT_VIRTUALTABLE
+  if( pDest->isVirtual ){
+    return 0;   /* tab1 must not be a virtual table */
+  }
+#endif
+  if( onError==OE_Default ){
+    onError = OE_Abort;
+  }
+  if( onError!=OE_Abort && onError!=OE_Rollback ){
+    return 0;   /* Cannot do OR REPLACE or OR IGNORE or OR FAIL */
+  }
+  if( pSelect->pSrc==0 ){
+    return 0;   /* SELECT must have a FROM clause */
+  }
+  if( pSelect->pSrc->nSrc!=1 ){
+    return 0;   /* FROM clause must have exactly one term */
+  }
+  if( pSelect->pSrc->a[0].pSelect ){
+    return 0;   /* FROM clause cannot contain a subquery */
+  }
+  if( pSelect->pWhere ){
+    return 0;   /* SELECT may not have a WHERE clause */
+  }
+  if( pSelect->pOrderBy ){
+    return 0;   /* SELECT may not have an ORDER BY clause */
+  }
+  if( pSelect->pHaving ){
+    return 0;   /* SELECT may not have a HAVING clause */
+  }
+  if( pSelect->pGroupBy ){
+    return 0;   /* SELECT may not have a GROUP BY clause */
+  }
+  if( pSelect->pLimit ){
+    return 0;   /* SELECT may not have a LIMIT clause */
+  }
+  if( pSelect->pOffset ){
+    return 0;   /* SELECT may not have an OFFSET clause */
+  }
+  if( pSelect->pPrior ){
+    return 0;   /* SELECT may not be a compound query */
+  }
+  if( pSelect->isDistinct ){
+    return 0;   /* SELECT may not be DISTINCT */
+  }
+  pEList = pSelect->pEList;
+  assert( pEList!=0 );
+  if( pEList->nExpr!=1 ){
+    return 0;   /* The result set must have exactly one column */
+  }
+  assert( pEList->a[0].pExpr );
+  if( pEList->a[0].pExpr->op!=TK_ALL ){
+    return 0;   /* The result set must be the special operator "*" */
+  }
+
+  /* At this point we have established that the statement is of the
+  ** correct syntactic form to participate in this optimization.  Now
+  ** we have to check the semantics.
+  */
+  pItem = pSelect->pSrc->a;
+  pSrc = sqlite3LocateTable(pParse, pItem->zName, pItem->zDatabase);
+  if( pSrc==0 ){
+    return 0;   /* FROM clause does not contain a real table */
+  }
+  if( pSrc==pDest ){
+    return 0;   /* tab1 and tab2 may not be the same table */
+  }
+#ifndef SQLITE_OMIT_VIRTUALTABLE
+  if( pSrc->isVirtual ){
+    return 0;   /* tab2 must not be a virtual table */
+  }
+#endif
+  if( pSrc->pSelect ){
+    return 0;   /* tab2 may not be a view */
+  }
+  if( pDest->nCol!=pSrc->nCol ){
+    return 0;   /* Number of columns must be the same in tab1 and tab2 */
+  }
+  if( pDest->iPKey!=pSrc->iPKey ){
+    return 0;   /* Both tables must have the same INTEGER PRIMARY KEY */
+  }
+  for(i=0; i<pDest->nCol; i++){
+    if( pDest->aCol[i].affinity!=pSrc->aCol[i].affinity ){
+      return 0;    /* Affinity must be the same on all columns */
+    }
+    if( !xferCompatibleCollation(pDest->aCol[i].zColl, pSrc->aCol[i].zColl) ){
+      return 0;    /* Collating sequence must be the same on all columns */
+    }
+    if( pDest->aCol[i].notNull && !pSrc->aCol[i].notNull ){
+      return 0;    /* tab2 must be NOT NULL if tab1 is */
+    }
+  }
+  for(pDestIdx=pDest->pIndex; pDestIdx; pDestIdx=pDestIdx->pNext){
+    for(pSrcIdx=pSrc->pIndex; pSrcIdx; pSrcIdx=pSrcIdx->pNext){
+      if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break;
+    }
+    if( pSrcIdx==0 ){
+      return 0;    /* pDestIdx has no corresponding index in pSrc */
+    }
+  }
+
+  /* If we get this far, it means either:
+  **
+  **    *   We can always do the transfer if the table contains an
+  **        an integer primary key
+  **
+  **    *   We can conditionally do the transfer if the destination
+  **        table is empty.
+  */
+  iDbSrc = sqlite3SchemaToIndex(pParse->db, pSrc->pSchema);
+  v = sqlite3GetVdbe(pParse);
+  iSrc = pParse->nTab++;
+  iDest = pParse->nTab++;
+  counterMem = autoIncBegin(pParse, iDbDest, pDest);
+  sqlite3OpenTable(pParse, iDest, iDbDest, pDest, OP_OpenWrite);
+  if( pDest->iPKey<0 ){
+    /* The tables do not have an INTEGER PRIMARY KEY so that
+    ** transfer optimization is only allowed if the destination
+    ** table is initially empty
+    */
+    addr1 = sqlite3VdbeAddOp(v, OP_Rewind, iDest, 0);
+    emptyDestTest = sqlite3VdbeAddOp(v, OP_Goto, 0, 0);
+    sqlite3VdbeJumpHere(v, addr1);
+  }else{
+    emptyDestTest = 0;
+  }
+  sqlite3OpenTable(pParse, iSrc, iDbSrc, pSrc, OP_OpenRead);
+  emptySrcTest = sqlite3VdbeAddOp(v, OP_Rewind, iSrc, 0);
+  memRowid = pParse->nMem++;
+  sqlite3VdbeAddOp(v, OP_Rowid, iSrc, 0);
+  sqlite3VdbeAddOp(v, OP_MemStore, memRowid, 1);
+  addr1 = sqlite3VdbeAddOp(v, OP_Rowid, iSrc, 0);
+  sqlite3VdbeAddOp(v, OP_Dup, 0, 0);
+  addr2 = sqlite3VdbeAddOp(v, OP_NotExists, iDest, 0);
+  sqlite3VdbeOp3(v, OP_Halt, SQLITE_CONSTRAINT, onError, 
+                    "PRIMARY KEY must be unique", P3_STATIC);
+  sqlite3VdbeJumpHere(v, addr2);
+  autoIncStep(pParse, counterMem);
+  sqlite3VdbeAddOp(v, OP_RowData, iSrc, 0);
+  sqlite3VdbeOp3(v, OP_Insert, iDest, OPFLAG_NCHANGE|OPFLAG_LASTROWID,
+                    pDest->zName, 0);
+  sqlite3VdbeAddOp(v, OP_Next, iSrc, addr1);
+  autoIncEnd(pParse, iDbDest, pDest, counterMem);
+  for(pDestIdx=pDest->pIndex; pDestIdx; pDestIdx=pDestIdx->pNext){
+    for(pSrcIdx=pSrc->pIndex; pSrcIdx; pSrcIdx=pSrcIdx->pNext){
+      if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break;
+    }
+    assert( pSrcIdx );
+    sqlite3VdbeAddOp(v, OP_Close, iSrc, 0);
+    sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
+    sqlite3VdbeAddOp(v, OP_Integer, iDbSrc, 0);
+    pKey = sqlite3IndexKeyinfo(pParse, pSrcIdx);
+    VdbeComment((v, "# %s", pSrcIdx->zName));
+    sqlite3VdbeOp3(v, OP_OpenRead, iSrc, pSrcIdx->tnum, 
+                   (char*)pKey, P3_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp(v, OP_Integer, iDbDest, 0);
+    pKey = sqlite3IndexKeyinfo(pParse, pDestIdx);
+    VdbeComment((v, "# %s", pDestIdx->zName));
+    sqlite3VdbeOp3(v, OP_OpenWrite, iDest, pDestIdx->tnum, 
+                   (char*)pKey, P3_KEYINFO_HANDOFF);
+    addr1 = sqlite3VdbeAddOp(v, OP_Rewind, iSrc, 0);
+    sqlite3VdbeAddOp(v, OP_RowKey, iSrc, 0);
+    if( pDestIdx->onError!=OE_None ){
+      sqlite3VdbeAddOp(v, OP_MemLoad, memRowid, 0);
+      addr2 = sqlite3VdbeAddOp(v, OP_IsUnique, iDest, 0);
+      sqlite3VdbeOp3(v, OP_Halt, SQLITE_CONSTRAINT, onError, 
+                    "UNIQUE constraint failed", P3_STATIC);
+      sqlite3VdbeJumpHere(v, addr2);
+    }
+    sqlite3VdbeAddOp(v, OP_IdxInsert, iDest, 0);
+    sqlite3VdbeAddOp(v, OP_Next, iSrc, addr1+1);
+    sqlite3VdbeJumpHere(v, addr1);
+  }
+  sqlite3VdbeJumpHere(v, emptySrcTest);
+  sqlite3VdbeAddOp(v, OP_Close, iSrc, 0);
+  sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
+  if( emptyDestTest ){
+    sqlite3VdbeAddOp(v, OP_Halt, SQLITE_OK, 0);
+    sqlite3VdbeJumpHere(v, emptyDestTest);
+    sqlite3VdbeAddOp(v, OP_Close, iDest, 0);
+    return 0;
+  }else{
+    return 1;
+  }
+}
+#endif /* SQLITE_OMIT_XFER_OPT */