Cache the location of overflow pages in cursors used for incremental blob IO. (CVS 3899)

FossilOrigin-Name: 349f1ea7895f06c40affc985a13aa6686dfdea07
diff --git a/manifest b/manifest
index 87983fa..6a016ca 100644
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Fix\san\sinvalid\sUTF8\sencoding\sin\sthe\stests\sfor\sthe\strim\sfunction.\s(CVS\s3898)
-D 2007-05-02T15:36:02
+C Cache\sthe\slocation\sof\soverflow\spages\sin\scursors\sused\sfor\sincremental\sblob\sIO.\s(CVS\s3899)
+D 2007-05-02T16:48:37
 F Makefile.in 8cab54f7c9f5af8f22fd97ddf1ecfd1e1860de62
 F Makefile.linux-gcc 2d8574d1ba75f129aba2019f0b959db380a90935
 F README 9c4e2d6706bdcc3efdd773ce752a8cdab4f90028
@@ -59,8 +59,8 @@
 F src/analyze.c 4bbf5ddf9680587c6d4917e02e378b6037be3651
 F src/attach.c a16ada4a4654a0d126b8223ec9494ebb81bc5c3c
 F src/auth.c 902f4722661c796b97f007d9606bd7529c02597f
-F src/btree.c 0b2c181ea3ee23b5daef6f89d07a8a60d0f6370f
-F src/btree.h b2ef1ccc337fd37c58c8c17189a237aea341fb48
+F src/btree.c 4efb50fa388aa1678eb9cce5e5fc6fa8247406b2
+F src/btree.h 2c187d60cf76d74c2b4767294d6b5fa267037ff0
 F src/build.c 02e01ec7907c7d947ab3041fda0e81eaed05db42
 F src/callback.c 6414ed32d55859d0f65067aa5b88d2da27b3af9e
 F src/complete.c 7d1a44be8f37de125fcafd3d3a018690b3799675
@@ -130,7 +130,7 @@
 F src/vdbeInt.h cb02cbbceddf3b40d49012e9f41576f17bcbec97
 F src/vdbeapi.c 37d793559390bec8a00c556f651f21b5f9e589af
 F src/vdbeaux.c 8c7f22e22d1ea578971f5a3fcd3a56a6882ced64
-F src/vdbeblob.c 6d3128c71d5a6b8db627ea3052ed5aaaaf26e672
+F src/vdbeblob.c 396148feb5917e666edfb643f259f185051351b4
 F src/vdbefifo.c 3ca8049c561d5d67cbcb94dc909ae9bb68c0bf8f
 F src/vdbemem.c ba98f8572ec4609846b368fa7580db178022f1bb
 F src/vtab.c 89a0d5f39c1beba65a77fdb4d507b831fc5e6baf
@@ -471,7 +471,7 @@
 F www/vdbe.tcl 87a31ace769f20d3627a64fa1fade7fed47b90d0
 F www/version3.tcl 890248cf7b70e60c383b0e84d77d5132b3ead42b
 F www/whentouse.tcl fc46eae081251c3c181bd79c5faef8195d7991a5
-P e6d560ddeeb48fb0cbd9f5a10612280b055baef7
-R 15848809be73f4e08f83f1fd795bb746
-U drh
-Z f28dc40a5ade2da68f7833936614ab80
+P 4dbbfff4a7d4be197aac19c80400dafe10dd5e58
+R c0b16184c9579452ff42a74fabe3dca1
+U danielk1977
+Z 7dadee4977fc90359d0189698014dc76
diff --git a/manifest.uuid b/manifest.uuid
index 9c1a931..0ed0535 100644
--- a/manifest.uuid
+++ b/manifest.uuid
@@ -1 +1 @@
-4dbbfff4a7d4be197aac19c80400dafe10dd5e58
\ No newline at end of file
+349f1ea7895f06c40affc985a13aa6686dfdea07
\ No newline at end of file
diff --git a/src/btree.c b/src/btree.c
index 484b5287..c572c1b 100644
--- a/src/btree.c
+++ b/src/btree.c
@@ -9,7 +9,7 @@
 **    May you share freely, never taking more than you give.
 **
 *************************************************************************
-** $Id: btree.c,v 1.365 2007/05/02 13:16:30 danielk1977 Exp $
+** $Id: btree.c,v 1.366 2007/05/02 16:48:37 danielk1977 Exp $
 **
 ** This file implements a external (disk-based) database using BTrees.
 ** For a detailed discussion of BTrees, refer to
@@ -394,6 +394,10 @@
   void *pKey;      /* Saved key that was cursor's last known position */
   i64 nKey;        /* Size of pKey, or last integer key */
   int skip;        /* (skip<0) -> Prev() is a no-op. (skip>0) -> Next() is */
+#ifndef SQLITE_OMIT_INCRBLOB
+  u8 cacheOverflow;         /* True to use aOverflow */
+  Pgno *aOverflow;          /* Cache of overflow page locations */
+#endif
 };
 
 /*
@@ -691,6 +695,12 @@
     pCur->eState = CURSOR_REQUIRESEEK;
   }
 
+#ifndef SQLITE_OMIT_INCRBLOB
+  /* Delete the cache of overflow page numbers. */
+  sqliteFree(pCur->aOverflow);
+  pCur->aOverflow = 0;
+#endif
+
   return rc;
 }
 
@@ -2413,13 +2423,20 @@
 */
 int sqlite3BtreeIncrVacuum(Btree *p){
   BtShared *pBt = p->pBt;
+  BtCursor *pCur;
 
   assert( pBt->inTransaction==TRANS_WRITE && p->inTrans==TRANS_WRITE );
   if( !pBt->autoVacuum ){
     return SQLITE_DONE;
   }
-
-  return incrVacuumStep(p->pBt, 0);
+#ifndef SQLITE_OMIT_INCRBLOB
+  for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
+    /* Delete the cache of overflow page numbers. */
+    sqliteFree(pCur->aOverflow);
+    pCur->aOverflow = 0;
+  }
+#endif
+  return incrVacuumStep(pBt, 0);
 }
 
 /*
@@ -2438,6 +2455,15 @@
   int nRef = sqlite3PagerRefcount(pPager);
 #endif
 
+#ifndef SQLITE_OMIT_INCRBLOB
+  BtCursor *pCur;
+  for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
+    /* Delete the cache of overflow page numbers. */
+    sqliteFree(pCur->aOverflow);
+    pCur->aOverflow = 0;
+  }
+#endif
+
   assert(pBt->autoVacuum);
   if( !pBt->incrVacuum ){
     Pgno nFin = 0;
@@ -2933,6 +2959,9 @@
   }
   releasePage(pCur->pPage);
   unlockBtreeIfUnused(pBt);
+#ifndef SQLITE_OMIT_INCRBLOB
+  sqliteFree(pCur->aOverflow);
+#endif
   sqliteFree(pCur);
   return SQLITE_OK;
 }
@@ -3132,6 +3161,7 @@
   BtShared *pBt;
   int ovflSize;
   u32 nKey;
+  int iIdx = 0;
 
   assert( pCur!=0 && pCur->pPage!=0 );
   assert( pCur->eState==CURSOR_VALID );
@@ -3170,7 +3200,21 @@
   ovflSize = pBt->usableSize - 4;
   if( amt>0 ){
     nextPage = get4byte(&aPayload[pCur->info.nLocal]);
-    while( amt>0 && nextPage ){
+#ifndef SQLITE_OMIT_INCRBLOB
+    if( pCur->cacheOverflow && !pCur->aOverflow ){
+      int nOvfl = (pCur->info.nPayload-pCur->info.nLocal+ovflSize-1)/ovflSize;
+      pCur->aOverflow = (Pgno *)sqliteMalloc(sizeof(Pgno)*nOvfl);
+      if( nOvfl && !pCur->aOverflow ){
+        return SQLITE_NOMEM;
+      }
+    }
+    if( pCur->aOverflow && pCur->aOverflow[offset/ovflSize] ){
+      iIdx = (offset/ovflSize);
+      nextPage = pCur->aOverflow[iIdx];
+      offset = (offset%ovflSize);
+    }
+#endif
+    for(iIdx++; amt>0 && nextPage; iIdx++){
       if( offset>=ovflSize ){
         /* The only reason to read this page is to obtain the page
         ** number for the next page in the overflow chain. So try
@@ -3181,6 +3225,12 @@
           return rc;
         }
         offset -= ovflSize;
+#ifndef SQLITE_OMIT_INCRBLOB
+        if( pCur->aOverflow ){
+          assert(nextPage);
+          pCur->aOverflow[iIdx] = nextPage;
+        }
+#endif
       }else{
         /* Need to read this page properly, to obtain data to copy into
         ** the caller's buffer.
@@ -3201,6 +3251,11 @@
         amt -= a;
         pBuf += a;
         sqlite3PagerUnref(pDbPage);
+#ifndef SQLITE_OMIT_INCRBLOB
+        if( pCur->aOverflow && nextPage ){
+          pCur->aOverflow[iIdx] = nextPage;
+        }
+#endif
       }
     }
   }
@@ -6874,6 +6929,7 @@
   u8 *zRem = (u8 *)z;    /* Pointer to data not yet written */
   u32 iOffset = offset;  /* Offset from traversal point to start of write */
 
+  Pgno iIdx = 0;         /* Index of overflow page in pCsr->aOverflow */
   Pgno iOvfl;            /* Page number for next overflow page */
   int ovflSize;          /* Bytes of data per overflow page. */
 
@@ -6908,6 +6964,16 @@
   if( pInfo->nData<(offset+amt) ){
     return SQLITE_ERROR;
   }
+  ovflSize = pBt->usableSize - 4;
+
+  assert(pCsr->cacheOverflow);
+  if( !pCsr->aOverflow ){
+    int nOverflow = (pInfo->nPayload - pInfo->nLocal + ovflSize - 1)/ovflSize;
+    pCsr->aOverflow = (Pgno *)sqliteMalloc(sizeof(Pgno)*nOverflow);
+    if( nOverflow && !pCsr->aOverflow ){
+      return SQLITE_NOMEM;
+    }
+  }
 
   if( pInfo->nLocal>iOffset ){
     /* In this case data must be written to the b-tree page. */
@@ -6926,50 +6992,69 @@
   }
   iOffset = ((iOffset<pInfo->nLocal)?0:(iOffset-pInfo->nLocal));
 
-  ovflSize = pBt->usableSize - 4;
   assert(pInfo->iOverflow>0 || iRem==0);
-  iOvfl = get4byte(&pInfo->pCell[pInfo->iOverflow]);
-  while( iRem>0 ){
-    if( iOffset>ovflSize ){
-      /* The only reason to read this page is to obtain the page
-      ** number for the next page in the overflow chain. So try
-      ** the getOverflowPage() shortcut.  */
-      rc = getOverflowPage(pBt, iOvfl, 0, &iOvfl);
-      if( rc!=SQLITE_OK ){
-        return rc;
-      }
-      iOffset -= ovflSize;
+  if( iRem>0 ){
+    if( pCsr->aOverflow[iOffset/ovflSize] ){
+      iIdx = iOffset/ovflSize;
+      iOvfl = pCsr->aOverflow[iIdx];
+      iOffset = iOffset%ovflSize;
     }else{
-      int iWrite = ovflSize - iOffset;
-      DbPage *pOvfl;          /* The overflow page. */
-      u8 *aData;              /* Page data */
-
-      rc = sqlite3PagerGet(pBt->pPager, iOvfl, &pOvfl);
-      if( rc!=SQLITE_OK ){
-        return rc;
-      }
-      rc = sqlite3PagerWrite(pOvfl);
-      if( rc!=SQLITE_OK ){
+      iOvfl = get4byte(&pInfo->pCell[pInfo->iOverflow]);
+    }
+    for(iIdx++; iRem>0; iIdx++){
+      if( iOffset>ovflSize ){
+        /* The only reason to read this page is to obtain the page
+        ** number for the next page in the overflow chain. So try
+        ** the getOverflowPage() shortcut.  */
+        rc = getOverflowPage(pBt, iOvfl, 0, &iOvfl);
+        if( rc!=SQLITE_OK ){
+          return rc;
+        }
+        iOffset -= ovflSize;
+        pCsr->aOverflow[iIdx] = iOvfl;
+      }else{
+        int iWrite = ovflSize - iOffset;
+        DbPage *pOvfl;          /* The overflow page. */
+        u8 *aData;              /* Page data */
+  
+        rc = sqlite3PagerGet(pBt->pPager, iOvfl, &pOvfl);
+        if( rc!=SQLITE_OK ){
+          return rc;
+        }
+        rc = sqlite3PagerWrite(pOvfl);
+        if( rc!=SQLITE_OK ){
+          sqlite3PagerUnref(pOvfl);
+          return rc;
+        }
+  
+        aData = sqlite3PagerGetData(pOvfl);
+        iOvfl = get4byte(aData);
+        pCsr->aOverflow[iIdx] = iOvfl;
+        if( iWrite>iRem ){
+          iWrite = iRem;
+        }
+        memcpy(&aData[iOffset+4], zRem, iWrite);
         sqlite3PagerUnref(pOvfl);
-        return rc;
+  
+        zRem += iWrite;
+        iRem -= iWrite;
+        iOffset = ((iOffset<ovflSize)?0:(iOffset-ovflSize));
       }
-
-      aData = sqlite3PagerGetData(pOvfl);
-      iOvfl = get4byte(aData);
-      if( iWrite>iRem ){
-        iWrite = iRem;
-      }
-      memcpy(&aData[iOffset+4], zRem, iWrite);
-      sqlite3PagerUnref(pOvfl);
-
-      zRem += iWrite;
-      iRem -= iWrite;
-      iOffset = ((iOffset<ovflSize)?0:(iOffset-ovflSize));
     }
   }
 
   return SQLITE_OK;
 }
+
+/* 
+** Set a flag on this cursor to cache the locations of pages from the 
+** overflow list for the current row.
+*/
+void sqlite3BtreeCacheOverflow(BtCursor *pCur){
+  assert(!pCur->cacheOverflow);
+  assert(!pCur->aOverflow);
+  pCur->cacheOverflow = 1;
+}
 #endif
 
 /*
diff --git a/src/btree.h b/src/btree.h
index b3cd585..69e72e3 100644
--- a/src/btree.h
+++ b/src/btree.h
@@ -13,7 +13,7 @@
 ** subsystem.  See comments in the source code for a detailed description
 ** of what each interface routine does.
 **
-** @(#) $Id: btree.h,v 1.77 2007/05/02 01:34:31 drh Exp $
+** @(#) $Id: btree.h,v 1.78 2007/05/02 16:48:37 danielk1977 Exp $
 */
 #ifndef _BTREE_H_
 #define _BTREE_H_
@@ -143,6 +143,7 @@
 struct Pager *sqlite3BtreePager(Btree*);
 
 int sqlite3BtreePutData(BtCursor*, u32 offset, u32 amt, const void*);
+void sqlite3BtreeCacheOverflow(BtCursor *);
 
 #ifdef SQLITE_TEST
 int sqlite3BtreeCursorInfo(BtCursor*, int*, int);
diff --git a/src/vdbeblob.c b/src/vdbeblob.c
index ddc81b6..711802b 100644
--- a/src/vdbeblob.c
+++ b/src/vdbeblob.c
@@ -10,7 +10,7 @@
 **
 *************************************************************************
 **
-** $Id: vdbeblob.c,v 1.1 2007/05/01 17:49:49 danielk1977 Exp $
+** $Id: vdbeblob.c,v 1.2 2007/05/02 16:48:37 danielk1977 Exp $
 */
 
 #include "sqliteInt.h"
@@ -173,6 +173,7 @@
     }
     pBlob->flags = flags;
     pBlob->pCsr =  v->apCsr[0]->pCursor;
+    sqlite3BtreeCacheOverflow(pBlob->pCsr);
     pBlob->pStmt = (sqlite3_stmt *)v;
     pBlob->iOffset = v->apCsr[0]->aOffset[iCol];
     pBlob->nByte = sqlite3VdbeSerialTypeLen(type);