Reference count the KeyInfo object.  Cache a copy of an appropriate KeyInfo
for each index in the Index object, and reuse that one copy as much as possible.

FossilOrigin-Name: defd5205a7cc3543cdd18f906f568e943b8b3a2c
diff --git a/src/analyze.c b/src/analyze.c
index 5ed9078..f1094f7 100644
--- a/src/analyze.c
+++ b/src/analyze.c
@@ -988,7 +988,6 @@
 
   for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){
     int nCol;                     /* Number of columns indexed by pIdx */
-    KeyInfo *pKey;                /* KeyInfo structure for pIdx */
     int *aGotoChng;               /* Array of jump instruction addresses */
     int addrRewind;               /* Address of "OP_Rewind iIdxCur" */
     int addrGotoChng0;            /* Address of "Goto addr_chng_0" */
@@ -1001,7 +1000,6 @@
     nCol = pIdx->nKeyCol;
     aGotoChng = sqlite3DbMallocRaw(db, sizeof(int)*(nCol+1));
     if( aGotoChng==0 ) continue;
-    pKey = sqlite3IndexKeyinfo(pParse, pIdx);
 
     /* Populate the register containing the index name. */
     if( pIdx->autoIndex==2 && !HasRowid(pTab) ){
@@ -1052,7 +1050,7 @@
     /* Open a read-only cursor on the index being analyzed. */
     assert( iDb==sqlite3SchemaToIndex(db, pIdx->pSchema) );
     sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum, iDb);
-    sqlite3VdbeChangeP4(v, -1, (char*)pKey, P4_KEYINFO_HANDOFF); 
+    sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
     VdbeComment((v, "%s", pIdx->zName));
 
     /* Invoke the stat_init() function. The arguments are:
diff --git a/src/build.c b/src/build.c
index f97ca30..11de95e 100644
--- a/src/build.c
+++ b/src/build.c
@@ -382,6 +382,7 @@
 #ifndef SQLITE_OMIT_ANALYZE
   sqlite3DeleteIndexSamples(db, p);
 #endif
+  if( db==0 || db->pnBytesFreed==0 ) sqlite3KeyInfoUnref(p->pKeyInfo);
   sqlite3ExprDelete(db, p->pPartIdxWhere);
   sqlite3DbFree(db, p->zColAff);
   if( p->isResized ) sqlite3DbFree(db, p->azColl);
@@ -2655,11 +2656,12 @@
   }else{
     tnum = pIndex->tnum;
   }
-  pKey = sqlite3IndexKeyinfo(pParse, pIndex);
+  pKey = sqlite3KeyInfoOfIndex(pParse, pIndex);
 
   /* Open the sorter cursor if we are to use one. */
   iSorter = pParse->nTab++;
-  sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, 0, (char*)pKey, P4_KEYINFO);
+  sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, 0, (char*)
+                    sqlite3KeyInfoRef(pKey), P4_KEYINFO);
 
   /* Open the table. Loop through all rows of the table, inserting index
   ** records into the sorter. */
@@ -2674,7 +2676,7 @@
   sqlite3VdbeJumpHere(v, addr1);
   if( memRootPage<0 ) sqlite3VdbeAddOp2(v, OP_Clear, tnum, iDb);
   sqlite3VdbeAddOp4(v, OP_OpenWrite, iIdx, tnum, iDb, 
-                    (char *)pKey, P4_KEYINFO_HANDOFF);
+                    (char *)pKey, P4_KEYINFO);
   sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR|((memRootPage>=0)?OPFLAG_P2ISREG:0));
 
   addr1 = sqlite3VdbeAddOp2(v, OP_SorterSort, iSorter, 0);
@@ -4142,38 +4144,42 @@
 #endif
 
 /*
-** Return a dynamicly allocated KeyInfo structure that can be used
-** with OP_OpenRead or OP_OpenWrite to access database index pIdx.
+** Return a KeyInfo structure that is appropriate for the given Index.
 **
-** If successful, a pointer to the new structure is returned. In this case
-** the caller is responsible for calling sqlite3DbFree(db, ) on the returned 
-** pointer. If an error occurs (out of memory or missing collation 
-** sequence), NULL is returned and the state of pParse updated to reflect
-** the error.
+** The KeyInfo structure for an index is cached in the Index object.
+** So there might be multiple references to the returned pointer.  The
+** caller should not try to modify the KeyInfo object.
+**
+** The caller should invoke sqlite3KeyInfoUnref() on the returned object
+** when it has finished using it.
 */
-KeyInfo *sqlite3IndexKeyinfo(Parse *pParse, Index *pIdx){
+KeyInfo *sqlite3KeyInfoOfIndex(Parse *pParse, Index *pIdx){
   int i;
   int nCol = pIdx->nColumn;
   int nKey = pIdx->nKeyCol;
   KeyInfo *pKey;
 
-  if( pIdx->uniqNotNull ){
-    pKey = sqlite3KeyInfoAlloc(pParse->db, nKey, nCol-nKey);
-  }else{
-    pKey = sqlite3KeyInfoAlloc(pParse->db, nCol, 0);
-  }
-  if( pKey ){
-    for(i=0; i<nCol; i++){
-      char *zColl = pIdx->azColl[i];
-      if( zColl==0 ) zColl = "BINARY";
-      pKey->aColl[i] = sqlite3LocateCollSeq(pParse, zColl);
-      pKey->aSortOrder[i] = pIdx->aSortOrder[i];
+  if( pParse->nErr ) return 0;
+  if( pIdx->pKeyInfo==0 ){
+    if( pIdx->uniqNotNull ){
+      pKey = sqlite3KeyInfoAlloc(pParse->db, nKey, nCol-nKey);
+    }else{
+      pKey = sqlite3KeyInfoAlloc(pParse->db, nCol, 0);
+    }
+    if( pKey ){
+      assert( sqlite3KeyInfoIsWriteable(pKey) );
+      for(i=0; i<nCol; i++){
+        char *zColl = pIdx->azColl[i];
+        if( zColl==0 ) zColl = "BINARY";
+        pKey->aColl[i] = sqlite3LocateCollSeq(pParse, zColl);
+        pKey->aSortOrder[i] = pIdx->aSortOrder[i];
+      }
+      if( pParse->nErr ){
+        sqlite3KeyInfoUnref(pKey);
+      }else{
+        pIdx->pKeyInfo = pKey;
+      }
     }
   }
-
-  if( pParse->nErr ){
-    sqlite3DbFree(pParse->db, pKey);
-    pKey = 0;
-  }
-  return pKey;
+  return sqlite3KeyInfoRef(pIdx->pKeyInfo);
 }
diff --git a/src/delete.c b/src/delete.c
index 21bbb02..a1b626e 100644
--- a/src/delete.c
+++ b/src/delete.c
@@ -382,9 +382,8 @@
     iKey = ++pParse->nMem;
     iEph = pParse->nTab++;
 
-    sqlite3VdbeAddOp4(v, OP_OpenEphemeral, iEph, nPk, 0, 
-                      (char*)sqlite3IndexKeyinfo(pParse, pPk),
-                      P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp2(v, OP_OpenEphemeral, iEph, nPk);
+    sqlite3VdbeSetP4KeyInfo(pParse, pPk);
     pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0, 0, 0);
     if( pWInfo==0 ) goto delete_from_cleanup;
     for(i=0; i<nPk; i++){
diff --git a/src/expr.c b/src/expr.c
index 045ef15..7e72b44 100644
--- a/src/expr.c
+++ b/src/expr.c
@@ -1581,14 +1581,9 @@
          && sqlite3FindCollSeq(db, ENC(db), pIdx->azColl[0], 0)==pReq
          && (!mustBeUnique || (pIdx->nKeyCol==1 && pIdx->onError!=OE_None))
         ){
-          int iAddr;
-          char *pKey;
-  
-          pKey = (char *)sqlite3IndexKeyinfo(pParse, pIdx);
-          iAddr = sqlite3CodeOnce(pParse);
-  
-          sqlite3VdbeAddOp4(v, OP_OpenRead, iTab, pIdx->tnum, iDb,
-                               pKey,P4_KEYINFO_HANDOFF);
+          int iAddr = sqlite3CodeOnce(pParse);
+          sqlite3VdbeAddOp3(v, OP_OpenRead, iTab, pIdx->tnum, iDb);
+          sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
           VdbeComment((v, "%s", pIdx->zName));
           assert( IN_INDEX_INDEX_DESC == IN_INDEX_INDEX_ASC+1 );
           eType = IN_INDEX_INDEX_ASC + pIdx->aSortOrder[0];
@@ -1746,13 +1741,14 @@
         pExpr->x.pSelect->iLimit = 0;
         testcase( pKeyInfo==0 ); /* Caused by OOM in sqlite3KeyInfoAlloc() */
         if( sqlite3Select(pParse, pExpr->x.pSelect, &dest) ){
-          sqlite3DbFree(pParse->db, pKeyInfo);
+          sqlite3KeyInfoUnref(pKeyInfo);
           return 0;
         }
         pEList = pExpr->x.pSelect->pEList;
         assert( pKeyInfo!=0 ); /* OOM will cause exit after sqlite3Select() */
         assert( pEList!=0 );
         assert( pEList->nExpr>0 );
+        assert( sqlite3KeyInfoIsWriteable(pKeyInfo) );
         pKeyInfo->aColl[0] = sqlite3BinaryCompareCollSeq(pParse, pExpr->pLeft,
                                                          pEList->a[0].pExpr);
       }else if( ALWAYS(pExpr->x.pList!=0) ){
@@ -1772,6 +1768,7 @@
           affinity = SQLITE_AFF_NONE;
         }
         if( pKeyInfo ){
+          assert( sqlite3KeyInfoIsWriteable(pKeyInfo) );
           pKeyInfo->aColl[0] = sqlite3ExprCollSeq(pParse, pExpr->pLeft);
         }
 
@@ -1813,7 +1810,7 @@
         sqlite3ReleaseTempReg(pParse, r2);
       }
       if( pKeyInfo ){
-        sqlite3VdbeChangeP4(v, addr, (void *)pKeyInfo, P4_KEYINFO_HANDOFF);
+        sqlite3VdbeChangeP4(v, addr, (void *)pKeyInfo, P4_KEYINFO);
       }
       break;
     }
diff --git a/src/fkey.c b/src/fkey.c
index b0ff0f0..552abc4 100644
--- a/src/fkey.c
+++ b/src/fkey.c
@@ -379,10 +379,9 @@
       int nCol = pFKey->nCol;
       int regTemp = sqlite3GetTempRange(pParse, nCol);
       int regRec = sqlite3GetTempReg(pParse);
-      KeyInfo *pKey = sqlite3IndexKeyinfo(pParse, pIdx);
   
       sqlite3VdbeAddOp3(v, OP_OpenRead, iCur, pIdx->tnum, iDb);
-      sqlite3VdbeChangeP4(v, -1, (char*)pKey, P4_KEYINFO_HANDOFF);
+      sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
       for(i=0; i<nCol; i++){
         sqlite3VdbeAddOp2(v, OP_Copy, aiCol[i]+1+regData, regTemp+i);
       }
diff --git a/src/insert.c b/src/insert.c
index 857585f..e380d1b 100644
--- a/src/insert.c
+++ b/src/insert.c
@@ -24,7 +24,7 @@
 ** for that table that is actually opened.
 */
 void sqlite3OpenTable(
-  Parse *p,       /* Generate code into this VDBE */
+  Parse *pParse,  /* Generate code into this VDBE */
   int iCur,       /* The cursor number of the table */
   int iDb,        /* The database index in sqlite3.aDb[] */
   Table *pTab,    /* The table to be opened */
@@ -32,9 +32,10 @@
 ){
   Vdbe *v;
   assert( !IsVirtual(pTab) );
-  v = sqlite3GetVdbe(p);
+  v = sqlite3GetVdbe(pParse);
   assert( opcode==OP_OpenWrite || opcode==OP_OpenRead );
-  sqlite3TableLock(p, iDb, pTab->tnum, (opcode==OP_OpenWrite)?1:0, pTab->zName);
+  sqlite3TableLock(pParse, iDb, pTab->tnum, 
+                   (opcode==OP_OpenWrite)?1:0, pTab->zName);
   if( HasRowid(pTab) ){
     sqlite3VdbeAddOp4Int(v, opcode, iCur, pTab->tnum, iDb, pTab->nCol);
     VdbeComment((v, "%s", pTab->zName));
@@ -42,8 +43,8 @@
     Index *pPk = sqlite3PrimaryKeyIndex(pTab);
     assert( pPk!=0 );
     assert( pPk->tnum=pTab->tnum );
-    sqlite3VdbeAddOp4(v, opcode, iCur, pPk->tnum, iDb,
-                      (char*)sqlite3IndexKeyinfo(p, pPk), P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp3(v, opcode, iCur, pPk->tnum, iDb);
+    sqlite3VdbeSetP4KeyInfo(pParse, pPk);
     VdbeComment((v, "%s", pTab->zName));
   }
 }
@@ -1706,12 +1707,11 @@
   }
   *piIdxCur = iBase;
   for(i=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, i++){
-    KeyInfo *pKey = sqlite3IndexKeyinfo(pParse, pIdx);
     int iIdxCur = iBase++;
     assert( pIdx->pSchema==pTab->pSchema );
     if( pIdx->autoIndex==2 && !HasRowid(pTab) ) *piDataCur = iIdxCur;
-    sqlite3VdbeAddOp4(v, op, iIdxCur, pIdx->tnum, iDb,
-                      (char*)pKey, P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp3(v, op, iIdxCur, pIdx->tnum, iDb);
+    sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
     VdbeComment((v, "%s", pIdx->zName));
   }
   if( iBase>pParse->nTab ) pParse->nTab = iBase;
@@ -1828,7 +1828,6 @@
   int emptyDestTest = 0;           /* Address of test for empty pDest */
   int emptySrcTest = 0;            /* Address of test for empty pSrc */
   Vdbe *v;                         /* The VDBE we are building */
-  KeyInfo *pKey;                   /* Key information for an index */
   int regAutoinc;                  /* Memory register used by AUTOINC */
   int destHasUniqueIdx = 0;        /* True if pDest has a UNIQUE index */
   int regData, regRowid;           /* Registers holding data and rowid */
@@ -2028,13 +2027,11 @@
       if( xferCompatibleIndex(pDestIdx, pSrcIdx) ) break;
     }
     assert( pSrcIdx );
-    pKey = sqlite3IndexKeyinfo(pParse, pSrcIdx);
-    sqlite3VdbeAddOp4(v, OP_OpenRead, iSrc, pSrcIdx->tnum, iDbSrc,
-                      (char*)pKey, P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp3(v, OP_OpenRead, iSrc, pSrcIdx->tnum, iDbSrc);
+    sqlite3VdbeSetP4KeyInfo(pParse, pSrcIdx);
     VdbeComment((v, "%s", pSrcIdx->zName));
-    pKey = sqlite3IndexKeyinfo(pParse, pDestIdx);
-    sqlite3VdbeAddOp4(v, OP_OpenWrite, iDest, pDestIdx->tnum, iDbDest,
-                      (char*)pKey, P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp3(v, OP_OpenWrite, iDest, pDestIdx->tnum, iDbDest);
+    sqlite3VdbeSetP4KeyInfo(pParse, pDestIdx);
     sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR);
     VdbeComment((v, "%s", pDestIdx->zName));
     addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
diff --git a/src/pragma.c b/src/pragma.c
index 948d960..9211a2c 100644
--- a/src/pragma.c
+++ b/src/pragma.c
@@ -1680,9 +1680,8 @@
           if( pIdx==0 ){
             sqlite3OpenTable(pParse, i, iDb, pParent, OP_OpenRead);
           }else{
-            KeyInfo *pKey = sqlite3IndexKeyinfo(pParse, pIdx);
             sqlite3VdbeAddOp3(v, OP_OpenRead, i, pIdx->tnum, iDb);
-            sqlite3VdbeChangeP4(v, -1, (char*)pKey, P4_KEYINFO_HANDOFF);
+            sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
           }
         }else{
           k = 0;
diff --git a/src/select.c b/src/select.c
index c54837b..2d985d6 100644
--- a/src/select.c
+++ b/src/select.c
@@ -805,13 +805,9 @@
 /*
 ** Allocate a KeyInfo object sufficient for an index of N key columns and
 ** X extra columns.
-**
-** Actually, always allocate one extra column for the rowid at the end
-** of the index.  So the KeyInfo returned will have space sufficient for
-** N+1 columns.
 */
 KeyInfo *sqlite3KeyInfoAlloc(sqlite3 *db, int N, int X){
-  KeyInfo *p = sqlite3DbMallocZero(db, 
+  KeyInfo *p = sqlite3DbMallocZero(0, 
                    sizeof(KeyInfo) + (N+X)*(sizeof(CollSeq*)+1));
   if( p ){
     p->aSortOrder = (u8*)&p->aColl[N+X];
@@ -819,11 +815,47 @@
     p->nXField = (u16)X;
     p->enc = ENC(db);
     p->db = db;
+    p->nRef = 1;
+  }else{
+    db->mallocFailed = 1;
   }
   return p;
 }
 
 /*
+** Deallocate a KeyInfo object
+*/
+void sqlite3KeyInfoUnref(KeyInfo *p){
+  if( p ){
+    assert( p->nRef>0 );
+    assert( p->db==0 || p->db->pnBytesFreed==0 );
+    p->nRef--;
+    if( p->nRef==0 ) sqlite3DbFree(p->db, p);
+  }
+}
+
+/*
+** Make a new pointer to a KeyInfo object
+*/
+KeyInfo *sqlite3KeyInfoRef(KeyInfo *p){
+  if( p ){
+    assert( p->nRef>0 );
+    p->nRef++;
+  }
+  return p;
+}
+
+#ifdef SQLITE_DEBUG
+/*
+** Return TRUE if a KeyInfo object can be change.  The KeyInfo object
+** can only be changed if this is just a single reference to the object.
+**
+** This routine is used only inside of assert() statements.
+*/
+int sqlite3KeyInfoIsWriteable(KeyInfo *p){ return p->nRef==1; }
+#endif /* SQLITE_DEBUG */
+
+/*
 ** Given an expression list, generate a KeyInfo structure that records
 ** the collating sequence for each expression in that expression list.
 **
@@ -835,8 +867,7 @@
 **
 ** Space to hold the KeyInfo structure is obtain from malloc.  The calling
 ** function is responsible for seeing that this structure is eventually
-** freed.  Add the KeyInfo structure to the P4 field of an opcode using
-** P4_KEYINFO_HANDOFF is the usual way of dealing with this.
+** freed.
 */
 static KeyInfo *keyInfoFromExprList(Parse *pParse, ExprList *pList){
   int nExpr;
@@ -848,6 +879,7 @@
   nExpr = pList->nExpr;
   pInfo = sqlite3KeyInfoAlloc(db, nExpr, 1);
   if( pInfo ){
+    assert( sqlite3KeyInfoIsWriteable(pInfo) );
     for(i=0, pItem=pList->a; i<nExpr; i++, pItem++){
       CollSeq *pColl;
       pColl = sqlite3ExprCollSeq(pParse, pItem->pExpr);
@@ -2012,11 +2044,12 @@
           break;
         }
         sqlite3VdbeChangeP2(v, addr, nCol);
-        sqlite3VdbeChangeP4(v, addr, (char*)pKeyInfo, P4_KEYINFO);
+        sqlite3VdbeChangeP4(v, addr, (char*)sqlite3KeyInfoRef(pKeyInfo),
+                            P4_KEYINFO);
         pLoop->addrOpenEphm[i] = -1;
       }
     }
-    sqlite3DbFree(db, pKeyInfo);
+    sqlite3KeyInfoUnref(pKeyInfo);
   }
 
 multi_select_end:
@@ -2055,7 +2088,6 @@
   int regReturn,          /* The return address register */
   int regPrev,            /* Previous result register.  No uniqueness if 0 */
   KeyInfo *pKeyInfo,      /* For comparing with previous entry */
-  int p4type,             /* The p4 type for pKeyInfo */
   int iBreak              /* Jump here if we hit the LIMIT */
 ){
   Vdbe *v = pParse->pVdbe;
@@ -2071,7 +2103,7 @@
     int j1, j2;
     j1 = sqlite3VdbeAddOp1(v, OP_IfNot, regPrev);
     j2 = sqlite3VdbeAddOp4(v, OP_Compare, pIn->iSdst, regPrev+1, pIn->nSdst,
-                              (char*)pKeyInfo, p4type);
+                              (char*)sqlite3KeyInfoRef(pKeyInfo), P4_KEYINFO);
     sqlite3VdbeAddOp3(v, OP_Jump, j2+2, iContinue, j2+2);
     sqlite3VdbeJumpHere(v, j1);
     sqlite3VdbeAddOp3(v, OP_Copy, pIn->iSdst, regPrev+1, pIn->nSdst-1);
@@ -2382,6 +2414,7 @@
           pOrderBy->a[i].pExpr =
              sqlite3ExprAddCollateString(pParse, pTerm, pColl->zName);
         }
+        assert( sqlite3KeyInfoIsWriteable(pKeyMerge) );
         pKeyMerge->aColl[i] = pColl;
         pKeyMerge->aSortOrder[i] = pOrderBy->a[i].sortOrder;
       }
@@ -2409,6 +2442,7 @@
     sqlite3VdbeAddOp2(v, OP_Integer, 0, regPrev);
     pKeyDup = sqlite3KeyInfoAlloc(db, nExpr, 1);
     if( pKeyDup ){
+      assert( sqlite3KeyInfoIsWriteable(pKeyDup) );
       for(i=0; i<nExpr; i++){
         pKeyDup->aColl[i] = multiSelectCollSeq(pParse, p, i);
         pKeyDup->aSortOrder[i] = 0;
@@ -2490,7 +2524,7 @@
   VdbeNoopComment((v, "Output routine for A"));
   addrOutA = generateOutputSubroutine(pParse,
                  p, &destA, pDest, regOutA,
-                 regPrev, pKeyDup, P4_KEYINFO_HANDOFF, labelEnd);
+                 regPrev, pKeyDup, labelEnd);
   
   /* Generate a subroutine that outputs the current row of the B
   ** select as the next output row of the compound select.
@@ -2499,8 +2533,9 @@
     VdbeNoopComment((v, "Output routine for B"));
     addrOutB = generateOutputSubroutine(pParse,
                  p, &destB, pDest, regOutB,
-                 regPrev, pKeyDup, P4_KEYINFO_STATIC, labelEnd);
+                 regPrev, pKeyDup, labelEnd);
   }
+  sqlite3KeyInfoUnref(pKeyDup);
 
   /* Generate a subroutine to run when the results from select A
   ** are exhausted and only data in select B remains.
@@ -2579,7 +2614,7 @@
   sqlite3VdbeResolveLabel(v, labelCmpr);
   sqlite3VdbeAddOp4(v, OP_Permutation, 0, 0, 0, (char*)aPermute, P4_INTARRAY);
   sqlite3VdbeAddOp4(v, OP_Compare, destA.iSdst, destB.iSdst, nOrderBy,
-                         (char*)pKeyMerge, P4_KEYINFO_HANDOFF);
+                         (char*)pKeyMerge, P4_KEYINFO);
   sqlite3VdbeChangeP5(v, OPFLAG_PERMUTE);
   sqlite3VdbeAddOp3(v, OP_Jump, addrAltB, addrAeqB, addrAgtB);
 
@@ -3805,7 +3840,7 @@
       }else{
         KeyInfo *pKeyInfo = keyInfoFromExprList(pParse, pE->x.pList);
         sqlite3VdbeAddOp4(v, OP_OpenEphemeral, pFunc->iDistinct, 0, 0,
-                          (char*)pKeyInfo, P4_KEYINFO_HANDOFF);
+                          (char*)pKeyInfo, P4_KEYINFO);
       }
     }
   }
@@ -4260,7 +4295,7 @@
     p->addrOpenEphm[2] = addrSortIndex =
       sqlite3VdbeAddOp4(v, OP_OpenEphemeral,
                            pOrderBy->iECursor, pOrderBy->nExpr+2, 0,
-                           (char*)pKeyInfo, P4_KEYINFO_HANDOFF);
+                           (char*)pKeyInfo, P4_KEYINFO);
   }else{
     addrSortIndex = -1;
   }
@@ -4288,7 +4323,7 @@
     sDistinct.addrTnct = sqlite3VdbeAddOp4(v, OP_OpenEphemeral,
                                 sDistinct.tabTnct, 0, 0,
                                 (char*)keyInfoFromExprList(pParse, p->pEList),
-                                P4_KEYINFO_HANDOFF);
+                                P4_KEYINFO);
     sqlite3VdbeChangeP5(v, BTREE_UNORDERED);
     sDistinct.eTnctType = WHERE_DISTINCT_UNORDERED;
   }else{
@@ -4412,7 +4447,7 @@
       pKeyInfo = keyInfoFromExprList(pParse, pGroupBy);
       addrSortingIdx = sqlite3VdbeAddOp4(v, OP_SorterOpen, 
           sAggInfo.sortingIdx, sAggInfo.nSortingColumn, 
-          0, (char*)pKeyInfo, P4_KEYINFO_HANDOFF);
+          0, (char*)pKeyInfo, P4_KEYINFO);
 
       /* Initialize memory locations used by GROUP BY aggregate processing
       */
@@ -4526,7 +4561,7 @@
         }
       }
       sqlite3VdbeAddOp4(v, OP_Compare, iAMem, iBMem, pGroupBy->nExpr,
-                          (char*)pKeyInfo, P4_KEYINFO);
+                          (char*)sqlite3KeyInfoRef(pKeyInfo), P4_KEYINFO);
       j1 = sqlite3VdbeCurrentAddr(v);
       sqlite3VdbeAddOp3(v, OP_Jump, j1+1, 0, j1+1);
 
@@ -4652,13 +4687,13 @@
         }
         if( pBest ){
           iRoot = pBest->tnum;
-          pKeyInfo = sqlite3IndexKeyinfo(pParse, pBest);
+          pKeyInfo = sqlite3KeyInfoOfIndex(pParse, pBest);
         }
 
         /* Open a read-only cursor, execute the OP_Count, close the cursor. */
         sqlite3VdbeAddOp4Int(v, OP_OpenRead, iCsr, iRoot, iDb, 1);
         if( pKeyInfo ){
-          sqlite3VdbeChangeP4(v, -1, (char *)pKeyInfo, P4_KEYINFO_HANDOFF);
+          sqlite3VdbeChangeP4(v, -1, (char *)pKeyInfo, P4_KEYINFO);
         }
         sqlite3VdbeAddOp2(v, OP_Count, iCsr, sAggInfo.aFunc[0].iMem);
         sqlite3VdbeAddOp1(v, OP_Close, iCsr);
diff --git a/src/sqliteInt.h b/src/sqliteInt.h
index b0b9761..ea0cde7 100644
--- a/src/sqliteInt.h
+++ b/src/sqliteInt.h
@@ -1532,10 +1532,11 @@
 ** for the rowid at the end.
 */
 struct KeyInfo {
-  sqlite3 *db;        /* The database connection */
+  u32 nRef;           /* Number of references to this KeyInfo object */
   u8 enc;             /* Text encoding - one of the SQLITE_UTF* values */
   u16 nField;         /* Number of key columns in the index */
   u16 nXField;        /* Number of columns beyond the key columns */
+  sqlite3 *db;        /* The database connection */
   u8 *aSortOrder;     /* Sort order for each column. */
   CollSeq *aColl[1];  /* Collating sequence for each term of the key */
 };
@@ -1604,6 +1605,7 @@
   u8 *aSortOrder;          /* for each column: True==DESC, False==ASC */
   char **azColl;           /* Array of collation sequence names for index */
   Expr *pPartIdxWhere;     /* WHERE clause for partial indices */
+  KeyInfo *pKeyInfo;       /* A KeyInfo object suitable for this index */
   int tnum;                /* DB Page containing root of this index */
   LogEst szIdxRow;         /* Estimated average row size in bytes */
   u16 nKeyCol;             /* Number of columns forming the key */
@@ -3159,7 +3161,12 @@
 Schema *sqlite3SchemaGet(sqlite3 *, Btree *);
 int sqlite3SchemaToIndex(sqlite3 *db, Schema *);
 KeyInfo *sqlite3KeyInfoAlloc(sqlite3*,int,int);
-KeyInfo *sqlite3IndexKeyinfo(Parse *, Index *);
+void sqlite3KeyInfoUnref(KeyInfo*);
+KeyInfo *sqlite3KeyInfoRef(KeyInfo*);
+KeyInfo *sqlite3KeyInfoOfIndex(Parse*, Index*);
+#ifdef SQLITE_DEBUG
+int sqlite3KeyInfoIsWriteable(KeyInfo*);
+#endif
 int sqlite3CreateFunc(sqlite3 *, const char *, int, int, void *, 
   void (*)(sqlite3_context*,int,sqlite3_value **),
   void (*)(sqlite3_context*,int,sqlite3_value **), void (*)(sqlite3_context*),
diff --git a/src/update.c b/src/update.c
index f7e4ec3..065e5d6 100644
--- a/src/update.c
+++ b/src/update.c
@@ -360,9 +360,8 @@
     pParse->nMem += nPk;
     regKey = ++pParse->nMem;
     iEph = pParse->nTab++;
-    sqlite3VdbeAddOp4(v, OP_OpenEphemeral, iEph, nPk, 0, 
-                      (char*)sqlite3IndexKeyinfo(pParse, pPk),
-                      P4_KEYINFO_HANDOFF);
+    sqlite3VdbeAddOp2(v, OP_OpenEphemeral, iEph, nPk);
+    sqlite3VdbeSetP4KeyInfo(pParse, pPk);
     pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0, 0, 0);
     if( pWInfo==0 ) goto update_cleanup;
     for(i=0; i<nPk; i++){
@@ -408,9 +407,8 @@
     for(i=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, i++){
       assert( aRegIdx );
       if( openAll || aRegIdx[i]>0 ){
-        KeyInfo *pKey = sqlite3IndexKeyinfo(pParse, pIdx);
-        sqlite3VdbeAddOp4(v, OP_OpenWrite, iIdxCur+i, pIdx->tnum, iDb,
-                       (char*)pKey, P4_KEYINFO_HANDOFF);
+        sqlite3VdbeAddOp3(v, OP_OpenWrite, iIdxCur+i, pIdx->tnum, iDb);
+        sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
         assert( pParse->nTab>iIdxCur+i );
         VdbeComment((v, "%s", pIdx->zName));
       }
diff --git a/src/vdbe.h b/src/vdbe.h
index 0f52ba6..91d6a0f 100644
--- a/src/vdbe.h
+++ b/src/vdbe.h
@@ -117,16 +117,6 @@
 #define P4_SUBPROGRAM  (-18) /* P4 is a pointer to a SubProgram structure */
 #define P4_ADVANCE  (-19) /* P4 is a pointer to BtreeNext() or BtreePrev() */
 
-/* When adding a P4 argument using P4_KEYINFO, a copy of the KeyInfo structure
-** is made.  That copy is freed when the Vdbe is finalized.  But if the
-** argument is P4_KEYINFO_HANDOFF, the passed in pointer is used.  It still
-** gets freed when the Vdbe is finalized so it still should be obtained
-** from a single sqliteMalloc().  But no copy is made and the calling
-** function should *not* try to free the KeyInfo.
-*/
-#define P4_KEYINFO_HANDOFF (-16)
-#define P4_KEYINFO_STATIC  (-17)
-
 /* Error message codes for OP_Halt */
 #define P5_ConstraintNotNull 1
 #define P5_ConstraintUnique  2
@@ -186,6 +176,7 @@
 void sqlite3VdbeJumpHere(Vdbe*, int addr);
 void sqlite3VdbeChangeToNoop(Vdbe*, int addr);
 void sqlite3VdbeChangeP4(Vdbe*, int addr, const char *zP4, int N);
+void sqlite3VdbeSetP4KeyInfo(Parse*, Index*);
 void sqlite3VdbeUsesBtree(Vdbe*, int);
 VdbeOp *sqlite3VdbeGetOp(Vdbe*, int);
 int sqlite3VdbeMakeLabel(Vdbe*);
diff --git a/src/vdbeaux.c b/src/vdbeaux.c
index 5a0e434..1f11f66 100644
--- a/src/vdbeaux.c
+++ b/src/vdbeaux.c
@@ -637,12 +637,14 @@
       case P4_REAL:
       case P4_INT64:
       case P4_DYNAMIC:
-      case P4_KEYINFO:
-      case P4_INTARRAY:
-      case P4_KEYINFO_HANDOFF: {
+      case P4_INTARRAY: {
         sqlite3DbFree(db, p4);
         break;
       }
+      case P4_KEYINFO: {
+        if( db->pnBytesFreed==0 ) sqlite3KeyInfoUnref((KeyInfo*)p4);
+        break;
+      }
       case P4_MPRINTF: {
         if( db->pnBytesFreed==0 ) sqlite3_free(p4);
         break;
@@ -721,14 +723,6 @@
 ** the string is made into memory obtained from sqlite3_malloc().
 ** A value of n==0 means copy bytes of zP4 up to and including the
 ** first null byte.  If n>0 then copy n+1 bytes of zP4.
-**
-** If n==P4_KEYINFO it means that zP4 is a pointer to a KeyInfo structure.
-** A copy is made of the KeyInfo structure into memory obtained from
-** sqlite3_malloc, to be freed when the Vdbe is finalized.
-** n==P4_KEYINFO_HANDOFF indicates that zP4 points to a KeyInfo structure
-** stored in memory that the caller has obtained from sqlite3_malloc. The 
-** caller should not free the allocation, it will be freed when the Vdbe is
-** finalized.
 ** 
 ** Other values of n (P4_STATIC, P4_COLLSEQ etc.) indicate that zP4 points
 ** to a string or structure that is guaranteed to exist for the lifetime of
@@ -743,7 +737,7 @@
   db = p->db;
   assert( p->magic==VDBE_MAGIC_INIT );
   if( p->aOp==0 || db->mallocFailed ){
-    if ( n!=P4_KEYINFO && n!=P4_VTAB ) {
+    if( n!=P4_VTAB ){
       freeP4(db, n, (void*)*(char**)&zP4);
     }
     return;
@@ -766,21 +760,6 @@
     pOp->p4.p = 0;
     pOp->p4type = P4_NOTUSED;
   }else if( n==P4_KEYINFO ){
-    KeyInfo *pOrig, *pNew;
-
-    pOrig = (KeyInfo*)zP4;
-    pNew = sqlite3KeyInfoAlloc(db, pOrig->nField, pOrig->nXField);
-    pOp->p4.pKeyInfo = pNew;
-    if( pNew ){
-      int n = pOrig->nField+pOrig->nXField;
-      memcpy(pNew->aColl, pOrig->aColl, n*sizeof(pNew->aColl[0]));
-      memcpy(pNew->aSortOrder, pOrig->aSortOrder, n);
-      pOp->p4type = P4_KEYINFO;
-    }else{
-      p->db->mallocFailed = 1;
-      pOp->p4type = P4_NOTUSED;
-    }
-  }else if( n==P4_KEYINFO_HANDOFF ){
     pOp->p4.p = (void*)zP4;
     pOp->p4type = P4_KEYINFO;
   }else if( n==P4_VTAB ){
@@ -798,6 +777,18 @@
   }
 }
 
+/*
+** Set the P4 on the most recently added opcode to the KeyInfo for the
+** index given.
+*/
+void sqlite3VdbeSetP4KeyInfo(Parse *pParse, Index *pIdx){
+  Vdbe *v = pParse->pVdbe;
+  assert( v!=0 );
+  assert( pIdx!=0 );
+  sqlite3VdbeChangeP4(v, -1, (char*)sqlite3KeyInfoOfIndex(pParse, pIdx),
+                      P4_KEYINFO);
+}
+
 #ifdef SQLITE_ENABLE_EXPLAIN_COMMENTS
 /*
 ** Change the comment on the most recently coded instruction.  Or
@@ -958,7 +949,6 @@
   char *zP4 = zTemp;
   assert( nTemp>=20 );
   switch( pOp->p4type ){
-    case P4_KEYINFO_STATIC:
     case P4_KEYINFO: {
       int i, j;
       KeyInfo *pKeyInfo = pOp->p4.pKeyInfo;
diff --git a/src/vdbemem.c b/src/vdbemem.c
index ab1addf..c4bae54 100644
--- a/src/vdbemem.c
+++ b/src/vdbemem.c
@@ -1036,10 +1036,10 @@
       nByte = sizeof(Mem) * nCol + sizeof(UnpackedRecord);
       pRec = (UnpackedRecord*)sqlite3DbMallocZero(db, nByte);
       if( pRec ){
-        pRec->pKeyInfo = sqlite3IndexKeyinfo(p->pParse, pIdx);
+        pRec->pKeyInfo = sqlite3KeyInfoOfIndex(p->pParse, pIdx);
         if( pRec->pKeyInfo ){
           assert( pRec->pKeyInfo->nField+pRec->pKeyInfo->nXField==nCol );
-          pRec->pKeyInfo->enc = ENC(db);
+          assert( pRec->pKeyInfo->enc==ENC(db) );
           pRec->flags = UNPACKED_PREFIX_MATCH;
           pRec->aMem = (Mem *)&pRec[1];
           for(i=0; i<nCol; i++){
@@ -1368,7 +1368,7 @@
     for(i=0; i<nCol; i++){
       sqlite3DbFree(db, aMem[i].zMalloc);
     }
-    sqlite3DbFree(db, pRec->pKeyInfo);
+    sqlite3KeyInfoUnref(pRec->pKeyInfo);
     sqlite3DbFree(db, pRec);
   }
 }
diff --git a/src/where.c b/src/where.c
index 966aa87..02bec95 100644
--- a/src/where.c
+++ b/src/where.c
@@ -2015,7 +2015,6 @@
   Vdbe *v;                    /* Prepared statement under construction */
   int addrInit;               /* Address of the initialization bypass jump */
   Table *pTable;              /* The table being indexed */
-  KeyInfo *pKeyinfo;          /* Key information for the index */   
   int addrTop;                /* Top of the index fill loop */
   int regRecord;              /* Register holding an index record */
   int n;                      /* Column counter */
@@ -2132,11 +2131,10 @@
   pIdx->azColl[n] = "BINARY";
 
   /* Create the automatic index */
-  pKeyinfo = sqlite3IndexKeyinfo(pParse, pIdx);
   assert( pLevel->iIdxCur>=0 );
   pLevel->iIdxCur = pParse->nTab++;
-  sqlite3VdbeAddOp4(v, OP_OpenAutoindex, pLevel->iIdxCur, nKeyCol+1, 0,
-                    (char*)pKeyinfo, P4_KEYINFO_HANDOFF);
+  sqlite3VdbeAddOp2(v, OP_OpenAutoindex, pLevel->iIdxCur, nKeyCol+1);
+  sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
   VdbeComment((v, "for %s", pTable->zName));
 
   /* Fill the automatic index with content */
@@ -3996,6 +3994,7 @@
       p->u.vtab.idxStr = 0;
     }else if( (p->wsFlags & WHERE_AUTO_INDEX)!=0 && p->u.btree.pIndex!=0 ){
       sqlite3DbFree(db, p->u.btree.pIndex->zColAff);
+      sqlite3KeyInfoUnref(p->u.btree.pIndex->pKeyInfo);
       sqlite3DbFree(db, p->u.btree.pIndex);
       p->u.btree.pIndex = 0;
     }
@@ -6038,13 +6037,12 @@
     }
     if( pLoop->wsFlags & WHERE_INDEXED ){
       Index *pIx = pLoop->u.btree.pIndex;
-      KeyInfo *pKey = sqlite3IndexKeyinfo(pParse, pIx);
       /* FIXME:  As an optimization use pTabItem->iCursor if WHERE_IDX_ONLY */
       int iIndexCur = pLevel->iIdxCur = iIdxCur ? iIdxCur : pParse->nTab++;
       assert( pIx->pSchema==pTab->pSchema );
       assert( iIndexCur>=0 );
-      sqlite3VdbeAddOp4(v, OP_OpenRead, iIndexCur, pIx->tnum, iDb,
-                        (char*)pKey, P4_KEYINFO_HANDOFF);
+      sqlite3VdbeAddOp3(v, OP_OpenRead, iIndexCur, pIx->tnum, iDb);
+      sqlite3VdbeSetP4KeyInfo(pParse, pIx);
       VdbeComment((v, "%s", pIx->zName));
     }
     sqlite3CodeVerifySchema(pParse, iDb);