Initial code for incremental checkpoint in WAL mode.  This check-in compiles
on unix and runs as long as you do not engage WAL mode.  WAL mode crashes and
burns.  Consider this check-in a baseline implementation for getting the new
capability up and running.

FossilOrigin-Name: ef3ba7a17ff90674d702e5694b9e792851ab6998
diff --git a/src/os.c b/src/os.c
index 0593654..0b17a6b 100644
--- a/src/os.c
+++ b/src/os.c
@@ -110,8 +110,8 @@
 int sqlite3OsShmRelease(sqlite3_file *id){
   return id->pMethods->xShmRelease(id);
 }
-int sqlite3OsShmLock(sqlite3_file *id, int desiredLock, int *pGotLock){
-  return id->pMethods->xShmLock(id, desiredLock, pGotLock);
+int sqlite3OsShmLock(sqlite3_file *id, int offset, int n, int flags){
+  return id->pMethods->xShmLock(id, offset, n, flags);
 }
 void sqlite3OsShmBarrier(sqlite3_file *id){
   id->pMethods->xShmBarrier(id);
diff --git a/src/os.h b/src/os.h
index ba00c44..670ee43 100644
--- a/src/os.h
+++ b/src/os.h
@@ -247,7 +247,7 @@
 int sqlite3OsShmSize(sqlite3_file *id, int, int*);
 int sqlite3OsShmGet(sqlite3_file *id, int, int*, void volatile**);
 int sqlite3OsShmRelease(sqlite3_file *id);
-int sqlite3OsShmLock(sqlite3_file *id, int, int*);
+int sqlite3OsShmLock(sqlite3_file *id, int, int, int);
 void sqlite3OsShmBarrier(sqlite3_file *id);
 int sqlite3OsShmClose(sqlite3_file *id, int);
 
diff --git a/src/os_unix.c b/src/os_unix.c
index 227a3c5..cdba3d9 100644
--- a/src/os_unix.c
+++ b/src/os_unix.c
@@ -3168,30 +3168,20 @@
 struct unixShm {
   unixShmNode *pShmNode;     /* The underlying unixShmNode object */
   unixShm *pNext;            /* Next unixShm with the same unixShmNode */
-  u8 lockState;              /* Current lock state */
   u8 hasMutex;               /* True if holding the unixShmNode mutex */
   u8 hasMutexBuf;            /* True if holding pFile->mutexBuf */
-  u8 sharedMask;             /* Mask of shared locks held */
-  u8 exclMask;               /* Mask of exclusive locks held */
+  u16 sharedMask;            /* Mask of shared locks held */
+  u16 exclMask;              /* Mask of exclusive locks held */
 #ifdef SQLITE_DEBUG
   u8 id;                     /* Id of this connection within its unixShmNode */
 #endif
 };
 
 /*
-** Size increment by which shared memory grows
-*/
-#define SQLITE_UNIX_SHM_INCR  4096
-
-/*
 ** Constants used for locking
 */
 #define UNIX_SHM_BASE      80        /* Byte offset of the first lock byte */
-#define UNIX_SHM_DMS       0x01      /* Mask for Dead-Man-Switch lock */
-#define UNIX_SHM_A         0x10      /* Mask for region locks... */
-#define UNIX_SHM_B         0x20
-#define UNIX_SHM_C         0x40
-#define UNIX_SHM_D         0x80
+#define UNIX_SHM_DMS       80        /* The deadman switch lock */
 
 #ifdef SQLITE_DEBUG
 /*
@@ -3205,30 +3195,32 @@
 ** This routine is for debugging purposes only and does not appear
 ** in a production build.
 */
-static const char *unixShmLockString(u8 mask){
-  static char zBuf[48];
+static const char *unixShmLockString(u16 maskShared, u16 maskExclusive){
+  static char zBuf[52];
   static int iBuf = 0;
+  int i;
+  u16 mask;
   char *z;
 
   z = &zBuf[iBuf];
-  iBuf += 8;
+  iBuf += 16;
   if( iBuf>=sizeof(zBuf) ) iBuf = 0;
-
-  z[0] = (mask & UNIX_SHM_DMS)   ? 'S' : '.';
-  z[1] = (mask & UNIX_SHM_A)     ? 'A' : '.';
-  z[2] = (mask & UNIX_SHM_B)     ? 'B' : '.';
-  z[3] = (mask & UNIX_SHM_C)     ? 'C' : '.';
-  z[4] = (mask & UNIX_SHM_D)     ? 'D' : '.';
-  z[5] = 0;
+  for(i=0, mask=1; i<SQLITE_SHM_NLOCK; i++, mask += mask){
+    if( mask & maskShared ){
+      z[i] = 's';
+    }else if( mask & maskExclusive ){
+      z[i] = 'E';
+    }else{
+      z[i] = '.';
+    }
+  }
+  z[i] = 0;
   return z;
 }
 #endif /* SQLITE_DEBUG */
 
 /*
-** Apply posix advisory locks for all bytes identified in lockMask.
-**
-** lockMask might contain multiple bits but all bits are guaranteed
-** to be contiguous.
+** Apply posix advisory locks for all bytes from ofst through ofst+n-1.
 **
 ** Locks block if the mask is exactly UNIX_SHM_C and are non-blocking
 ** otherwise.
@@ -3236,198 +3228,69 @@
 static int unixShmSystemLock(
   unixShmNode *pShmNode, /* Apply locks to this open shared-memory segment */
   int lockType,          /* F_UNLCK, F_RDLCK, or F_WRLCK */
-  u8 lockMask            /* Which bytes to lock or unlock */
+  int ofst,              /* First byte of the locking range */
+  int n                  /* Number of bytes to lock */
 ){
   struct flock f;       /* The posix advisory locking structure */
-  int lockOp;           /* The opcode for fcntl() */
-  int i;                /* Offset into the locking byte range */
-  int rc;               /* Result code form fcntl() */
-  u8 mask;              /* Mask of bits in lockMask */
+  int rc = SQLITE_OK;   /* Result code form fcntl() */
 
   /* Access to the unixShmNode object is serialized by the caller */
   assert( sqlite3_mutex_held(pShmNode->mutex) || pShmNode->nRef==0 );
 
+  /* Shared locks never span more than one byte */
+  assert( n==1 || lockType!=F_RDLCK );
+
+  /* Locks are within range */
+  assert( n>=1 && ofst>=0 && ofst+n<SQLITE_SHM_NLOCK );
+
   /* Initialize the locking parameters */
   memset(&f, 0, sizeof(f));
   f.l_type = lockType;
   f.l_whence = SEEK_SET;
-  if( lockMask==UNIX_SHM_C && lockType!=F_UNLCK ){
-    lockOp = F_SETLKW;
-    OSTRACE(("SHM-LOCK requesting blocking lock\n"));
-  }else{
-    lockOp = F_SETLK;
-  }
+  f.l_start = ofst+UNIX_SHM_BASE;
+  f.l_len = n;
 
-  /* Find the first bit in lockMask that is set */
-  for(i=0, mask=0x01; mask!=0 && (lockMask&mask)==0; mask <<= 1, i++){}
-  assert( mask!=0 );
-  f.l_start = i+UNIX_SHM_BASE;
-  f.l_len = 1;
-
-  /* Extend the locking range for each additional bit that is set */
-  mask <<= 1;
-  while( mask!=0 && (lockMask & mask)!=0 ){
-    f.l_len++;
-    mask <<= 1;
-  }
-
-  /* Verify that all bits set in lockMask are contiguous */
-  assert( mask==0 || (lockMask & ~(mask | (mask-1)))==0 );
-
-  /* Acquire the system-level lock */
-  rc = fcntl(pShmNode->h, lockOp, &f);
+  rc = fcntl(pShmNode->h, F_SETLK, &f);
   rc = (rc!=(-1)) ? SQLITE_OK : SQLITE_BUSY;
 
   /* Update the global lock state and do debug tracing */
 #ifdef SQLITE_DEBUG
+  { u16 mask;
   OSTRACE(("SHM-LOCK "));
+  mask = (1<<(ofst+n)) - (1<<ofst);
   if( rc==SQLITE_OK ){
     if( lockType==F_UNLCK ){
-      OSTRACE(("unlock ok"));
-      pShmNode->exclMask &= ~lockMask;
-      pShmNode->sharedMask &= ~lockMask;
+      OSTRACE(("unlock %d ok", ofst));
+      pShmNode->exclMask &= ~mask;
+      pShmNode->sharedMask &= ~mask;
     }else if( lockType==F_RDLCK ){
-      OSTRACE(("read-lock ok"));
-      pShmNode->exclMask &= ~lockMask;
-      pShmNode->sharedMask |= lockMask;
+      OSTRACE(("read-lock %d ok", ofst));
+      pShmNode->exclMask &= ~mask;
+      pShmNode->sharedMask |= mask;
     }else{
       assert( lockType==F_WRLCK );
-      OSTRACE(("write-lock ok"));
-      pShmNode->exclMask |= lockMask;
-      pShmNode->sharedMask &= ~lockMask;
+      OSTRACE(("write-lock %d ok", ofst));
+      pShmNode->exclMask |= mask;
+      pShmNode->sharedMask &= ~mask;
     }
   }else{
     if( lockType==F_UNLCK ){
-      OSTRACE(("unlock failed"));
+      OSTRACE(("unlock %d failed", ofst));
     }else if( lockType==F_RDLCK ){
       OSTRACE(("read-lock failed"));
     }else{
       assert( lockType==F_WRLCK );
-      OSTRACE(("write-lock failed"));
+      OSTRACE(("write-lock %d failed", ofst));
     }
   }
-  OSTRACE((" - change requested %s - afterwards %s:%s\n",
-           unixShmLockString(lockMask),
-           unixShmLockString(pShmNode->sharedMask),
-           unixShmLockString(pShmNode->exclMask)));
+  OSTRACE((" - afterwards %s\n",
+           unixShmLockString(pShmNode->sharedMask, pShmNode->exclMask)));
+  }
 #endif
 
   return rc;        
 }
 
-/*
-** For connection p, unlock all of the locks identified by the unlockMask
-** parameter.
-*/
-static int unixShmUnlock(
-  unixShmNode *pShmNode,   /* The underlying shared-memory file */
-  unixShm *p,              /* The connection to be unlocked */
-  u8 unlockMask            /* Mask of locks to be unlocked */
-){
-  int rc;      /* Result code */
-  unixShm *pX; /* For looping over all sibling connections */
-  u8 allMask;  /* Union of locks held by connections other than "p" */
-
-  /* Access to the unixShmNode object is serialized by the caller */
-  assert( sqlite3_mutex_held(pShmNode->mutex) );
-
-  /* Compute locks held by sibling connections */
-  allMask = 0;
-  for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
-    if( pX==p ) continue;
-    assert( (pX->exclMask & (p->exclMask|p->sharedMask))==0 );
-    allMask |= pX->sharedMask;
-  }
-
-  /* Unlock the system-level locks */
-  if( (unlockMask & allMask)!=unlockMask ){
-    rc = unixShmSystemLock(pShmNode, F_UNLCK, unlockMask & ~allMask);
-  }else{
-    rc = SQLITE_OK;
-  }
-
-  /* Undo the local locks */
-  if( rc==SQLITE_OK ){
-    p->exclMask &= ~unlockMask;
-    p->sharedMask &= ~unlockMask;
-  } 
-  return rc;
-}
-
-/*
-** Get reader locks for connection p on all locks in the readMask parameter.
-*/
-static int unixShmSharedLock(
-  unixShmNode *pShmNode,   /* The underlying shared-memory file */
-  unixShm *p,              /* The connection to get the shared locks */
-  u8 readMask              /* Mask of shared locks to be acquired */
-){
-  int rc;        /* Result code */
-  unixShm *pX;   /* For looping over all sibling connections */
-  u8 allShared;  /* Union of locks held by connections other than "p" */
-
-  /* Access to the unixShmNode object is serialized by the caller */
-  assert( sqlite3_mutex_held(pShmNode->mutex) );
-
-  /* Find out which shared locks are already held by sibling connections.
-  ** If any sibling already holds an exclusive lock, go ahead and return
-  ** SQLITE_BUSY.
-  */
-  allShared = 0;
-  for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
-    if( pX==p ) continue;
-    if( (pX->exclMask & readMask)!=0 ) return SQLITE_BUSY;
-    allShared |= pX->sharedMask;
-  }
-
-  /* Get shared locks at the system level, if necessary */
-  if( (~allShared) & readMask ){
-    rc = unixShmSystemLock(pShmNode, F_RDLCK, readMask);
-  }else{
-    rc = SQLITE_OK;
-  }
-
-  /* Get the local shared locks */
-  if( rc==SQLITE_OK ){
-    p->sharedMask |= readMask;
-  }
-  return rc;
-}
-
-/*
-** For connection p, get an exclusive lock on all locks identified in
-** the writeMask parameter.
-*/
-static int unixShmExclusiveLock(
-  unixShmNode *pShmNode,    /* The underlying shared-memory file */
-  unixShm *p,               /* The connection to get the exclusive locks */
-  u8 writeMask              /* Mask of exclusive locks to be acquired */
-){
-  int rc;        /* Result code */
-  unixShm *pX;   /* For looping over all sibling connections */
-
-  /* Access to the unixShmNode object is serialized by the caller */
-  assert( sqlite3_mutex_held(pShmNode->mutex) );
-
-  /* Make sure no sibling connections hold locks that will block this
-  ** lock.  If any do, return SQLITE_BUSY right away.
-  */
-  for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
-    if( pX==p ) continue;
-    if( (pX->exclMask & writeMask)!=0 ) return SQLITE_BUSY;
-    if( (pX->sharedMask & writeMask)!=0 ) return SQLITE_BUSY;
-  }
-
-  /* Get the exclusive locks at the system level.  Then if successful
-  ** also mark the local connection as being locked.
-  */
-  rc = unixShmSystemLock(pShmNode, F_WRLCK, writeMask);
-  if( rc==SQLITE_OK ){
-    p->sharedMask &= ~writeMask;
-    p->exclMask |= writeMask;
-  }
-  return rc;
-}
 
 /*
 ** Purge the unixShmNodeList list of all entries with unixShmNode.nRef==0.
@@ -3520,13 +3383,13 @@
     ** If not, truncate the file to zero length. 
     */
     rc = SQLITE_OK;
-    if( unixShmSystemLock(pShmNode, F_WRLCK, UNIX_SHM_DMS)==SQLITE_OK ){
+    if( unixShmSystemLock(pShmNode, F_WRLCK, UNIX_SHM_DMS, 1)==SQLITE_OK ){
       if( ftruncate(pShmNode->h, 0) ){
         rc = SQLITE_IOERR;
       }
     }
     if( rc==SQLITE_OK ){
-      rc = unixShmSystemLock(pShmNode, F_RDLCK, UNIX_SHM_DMS);
+      rc = unixShmSystemLock(pShmNode, F_RDLCK, UNIX_SHM_DMS, 1);
     }
     if( rc ) goto shm_open_err;
   }
@@ -3687,7 +3550,7 @@
   assert( pShmNode==pDbFd->pInode->pShmNode );
   assert( pShmNode->pInode==pDbFd->pInode );
 
-  if( p->lockState!=SQLITE_SHM_CHECKPOINT && p->hasMutexBuf==0 ){
+  if( p->hasMutexBuf==0 ){
     assert( sqlite3_mutex_notheld(pShmNode->mutex) );
     sqlite3_mutex_enter(pShmNode->mutexBuf);
     p->hasMutexBuf = 1;
@@ -3731,7 +3594,7 @@
   unixFile *pDbFd = (unixFile*)fd;
   unixShm *p = pDbFd->pShm;
 
-  if( p->hasMutexBuf && p->lockState!=SQLITE_SHM_RECOVER ){
+  if( p->hasMutexBuf ){
     assert( sqlite3_mutex_notheld(p->pShmNode->mutex) );
     sqlite3_mutex_leave(p->pShmNode->mutexBuf);
     p->hasMutexBuf = 0;
@@ -3739,147 +3602,113 @@
   return SQLITE_OK;
 }
 
-/*
-** Symbolic names for LOCK states used for debugging.
-*/
-#ifdef SQLITE_DEBUG
-static const char *azLkName[] = {
-  "UNLOCK",
-  "READ",
-  "READ_FULL",
-  "WRITE",
-  "PENDING",
-  "CHECKPOINT",
-  "RECOVER"
-};
-#endif
-
 
 /*
 ** Change the lock state for a shared-memory segment.
 */
 static int unixShmLock(
   sqlite3_file *fd,          /* Database file holding the shared memory */
-  int desiredLock,           /* One of SQLITE_SHM_xxxxx locking states */
-  int *pGotLock              /* The lock you actually got */
+  int ofst,                  /* First lock to acquire or release */
+  int n,                     /* Number of locks to acquire or release */
+  int flags                  /* What to do with the lock */
 ){
-  unixFile *pDbFd = (unixFile*)fd;
-  unixShm *p = pDbFd->pShm;
-  unixShmNode *pShmNode = p->pShmNode;
-  int rc = SQLITE_PROTOCOL;
+  unixFile *pDbFd = (unixFile*)fd;      /* Connection holding shared memory */
+  unixShm *p = pDbFd->pShm;             /* The shared memory being locked */
+  unixShm *pX;                          /* For looping over all siblings */
+  unixShmNode *pShmNode = p->pShmNode;  /* The underlying file iNode */
+  int rc = SQLITE_OK;                   /* Result code */
+  u16 mask;                             /* Mask of locks to take or release */
 
   assert( pShmNode==pDbFd->pInode->pShmNode );
   assert( pShmNode->pInode==pDbFd->pInode );
+  assert( ofst>=0 && ofst+n<SQLITE_SHM_NLOCK );
+  assert( n>=1 );
+  assert( flags==(SQLITE_SHM_LOCK | SQLITE_SHM_SHARED)
+       || flags==(SQLITE_SHM_LOCK | SQLITE_SHM_EXCLUSIVE)
+       || flags==(SQLITE_SHM_UNLOCK | SQLITE_SHM_SHARED)
+       || flags==(SQLITE_SHM_UNLOCK | SQLITE_SHM_EXCLUSIVE) );
+  assert( n==1 || (flags & SQLITE_SHM_EXCLUSIVE)!=0 );
 
-  /* Note that SQLITE_SHM_READ_FULL and SQLITE_SHM_PENDING are never
-  ** directly requested; they are side effects from requesting
-  ** SQLITE_SHM_READ and SQLITE_SHM_CHECKPOINT, respectively.
-  */
-  assert( desiredLock==SQLITE_SHM_UNLOCK
-       || desiredLock==SQLITE_SHM_READ
-       || desiredLock==SQLITE_SHM_WRITE
-       || desiredLock==SQLITE_SHM_CHECKPOINT
-       || desiredLock==SQLITE_SHM_RECOVER );
-
-  /* Return directly if this is just a lock state query, or if
-  ** the connection is already in the desired locking state.
-  */
-  if( desiredLock==p->lockState
-   || (desiredLock==SQLITE_SHM_READ && p->lockState==SQLITE_SHM_READ_FULL)
-  ){
-    OSTRACE(("SHM-LOCK shmid-%d, pid-%d request %s and got %s\n",
-             p->id, getpid(), azLkName[desiredLock], azLkName[p->lockState]));
-    if( pGotLock ) *pGotLock = p->lockState;
-    return SQLITE_OK;
-  }
-
-  OSTRACE(("SHM-LOCK shmid-%d, pid-%d request %s->%s\n",
-            p->id, getpid(), azLkName[p->lockState], azLkName[desiredLock]));
-  
-  if( desiredLock==SQLITE_SHM_RECOVER && !p->hasMutexBuf ){
-    assert( sqlite3_mutex_notheld(pShmNode->mutex) );
-    sqlite3_mutex_enter(pShmNode->mutexBuf);
-    p->hasMutexBuf = 1;
-  }
+  mask = (1<<(ofst+n+1)) - (1<<(ofst+1));
+  assert( n>1 || mask==(1<<ofst) );
   sqlite3_mutex_enter(pShmNode->mutex);
-  switch( desiredLock ){
-    case SQLITE_SHM_UNLOCK: {
-      assert( p->lockState!=SQLITE_SHM_RECOVER );
-      unixShmUnlock(pShmNode, p, UNIX_SHM_A|UNIX_SHM_B|UNIX_SHM_C|UNIX_SHM_D);
+  if( flags & SQLITE_SHM_UNLOCK ){
+    u16 allMask = 0; /* Mask of locks held by siblings */
+
+    /* See if any siblings hold this same lock */
+    for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
+      if( pX==p ) continue;
+      assert( (pX->exclMask & (p->exclMask|p->sharedMask))==0 );
+      allMask |= pX->sharedMask;
+    }
+
+    /* Unlock the system-level locks */
+    if( (mask & allMask)==0 ){
+      rc = unixShmSystemLock(pShmNode, F_UNLCK, ofst+1, n);
+    }else{
       rc = SQLITE_OK;
-      p->lockState = SQLITE_SHM_UNLOCK;
-      break;
     }
-    case SQLITE_SHM_READ: {
-      if( p->lockState==SQLITE_SHM_UNLOCK ){
-        int nAttempt;
+
+    /* Undo the local locks */
+    if( rc==SQLITE_OK ){
+      p->exclMask &= ~mask;
+      p->sharedMask &= ~mask;
+    } 
+  }else if( flags & SQLITE_SHM_SHARED ){
+    u16 allShared = 0;  /* Union of locks held by connections other than "p" */
+
+    /* Find out which shared locks are already held by sibling connections.
+    ** If any sibling already holds an exclusive lock, go ahead and return
+    ** SQLITE_BUSY.
+    */
+    for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
+      if( pX==p ) continue;
+      if( (pX->exclMask & mask)!=0 ){
         rc = SQLITE_BUSY;
-        assert( p->lockState==SQLITE_SHM_UNLOCK );
-        for(nAttempt=0; nAttempt<5 && rc==SQLITE_BUSY; nAttempt++){
-          rc = unixShmSharedLock(pShmNode, p, UNIX_SHM_A|UNIX_SHM_B);
-          if( rc==SQLITE_BUSY ){
-            rc = unixShmSharedLock(pShmNode, p, UNIX_SHM_D);
-            if( rc==SQLITE_OK ){
-              p->lockState = SQLITE_SHM_READ_FULL;
-            }
-          }else{
-            unixShmUnlock(pShmNode, p, UNIX_SHM_B);
-            p->lockState = SQLITE_SHM_READ;
-          }
-        }
+        break;
+      }
+      allShared |= pX->sharedMask;
+    }
+
+    /* Get shared locks at the system level, if necessary */
+    if( rc==SQLITE_OK ){
+      if( (allShared & mask)==0 ){
+        rc = unixShmSystemLock(pShmNode, F_RDLCK, ofst+1, n);
       }else{
-       assert( p->lockState==SQLITE_SHM_WRITE
-               || p->lockState==SQLITE_SHM_RECOVER );
-        rc = unixShmSharedLock(pShmNode, p, UNIX_SHM_A);
-        unixShmUnlock(pShmNode, p, UNIX_SHM_C|UNIX_SHM_D);
-        p->lockState = SQLITE_SHM_READ;
+        rc = SQLITE_OK;
       }
-      break;
     }
-    case SQLITE_SHM_WRITE: {
-      assert( p->lockState==SQLITE_SHM_READ 
-              || p->lockState==SQLITE_SHM_READ_FULL );
-      rc = unixShmExclusiveLock(pShmNode, p, UNIX_SHM_C|UNIX_SHM_D);
+
+    /* Get the local shared locks */
+    if( rc==SQLITE_OK ){
+      p->sharedMask |= mask;
+    }
+  }else{
+    /* Make sure no sibling connections hold locks that will block this
+    ** lock.  If any do, return SQLITE_BUSY right away.
+    */
+    for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
+      if( pX==p ) continue;
+      if( (pX->exclMask & mask)!=0 || (pX->sharedMask & mask)!=0 ){
+        rc = SQLITE_BUSY;
+        break;
+      }
+    }
+  
+    /* Get the exclusive locks at the system level.  Then if successful
+    ** also mark the local connection as being locked.
+    */
+    if( rc==SQLITE_OK ){
+      rc = unixShmSystemLock(pShmNode, F_WRLCK, ofst+1, n);
       if( rc==SQLITE_OK ){
-        p->lockState = SQLITE_SHM_WRITE;
+        p->sharedMask &= ~mask;
+        p->exclMask |= mask;
       }
-      break;
-    }
-    case SQLITE_SHM_CHECKPOINT: {
-      assert( p->lockState==SQLITE_SHM_UNLOCK
-           || p->lockState==SQLITE_SHM_PENDING
-      );
-      if( p->lockState==SQLITE_SHM_UNLOCK ){
-        rc = unixShmExclusiveLock(pShmNode, p, UNIX_SHM_B|UNIX_SHM_C);
-        if( rc==SQLITE_OK ){
-          p->lockState = SQLITE_SHM_PENDING;
-        }
-      }
-      if( p->lockState==SQLITE_SHM_PENDING ){
-        rc = unixShmExclusiveLock(pShmNode, p, UNIX_SHM_A);
-        if( rc==SQLITE_OK ){
-          p->lockState = SQLITE_SHM_CHECKPOINT;
-        }
-      }
-      break;
-    }
-    default: {
-      assert( desiredLock==SQLITE_SHM_RECOVER );
-      assert( p->lockState==SQLITE_SHM_READ
-           || p->lockState==SQLITE_SHM_READ_FULL
-      );
-      assert( sqlite3_mutex_held(pShmNode->mutexBuf) );
-      rc = unixShmExclusiveLock(pShmNode, p, UNIX_SHM_C);
-      if( rc==SQLITE_OK ){
-        p->lockState = SQLITE_SHM_RECOVER;
-      }
-      break;
     }
   }
   sqlite3_mutex_leave(pShmNode->mutex);
   OSTRACE(("SHM-LOCK shmid-%d, pid-%d got %s\n",
-           p->id, getpid(), azLkName[p->lockState]));
-  if( pGotLock ) *pGotLock = p->lockState;
+           p->id, getpid(), unixShmLockString(p->sharedMask, p->exclMask)));
   return rc;
 }
 
diff --git a/src/pager.c b/src/pager.c
index 5a3d35f..a88ec5c 100644
--- a/src/pager.c
+++ b/src/pager.c
@@ -1203,7 +1203,7 @@
 # define pagerRollbackWal(x) 0
 # define pagerWalFrames(v,w,x,y,z) 0
 # define pagerOpenWalIfPresent(z) SQLITE_OK
-# define pagerOpenSnapshot(z) SQLITE_OK
+# define pagerBeginReadTransaction(z) SQLITE_OK
 #endif
 
 /*
@@ -1238,7 +1238,7 @@
     pPager->dbSizeValid = 0;
 
     if( pagerUseWal(pPager) ){
-      sqlite3WalCloseSnapshot(pPager->pWal);
+      sqlite3WalEndReadTransaction(pPager->pWal);
     }else{
       rc = osUnlock(pPager->fd, NO_LOCK);
     }
@@ -1437,7 +1437,7 @@
   sqlite3PcacheCleanAll(pPager->pPCache);
 
   if( pagerUseWal(pPager) ){
-    rc2 = sqlite3WalWriteLock(pPager->pWal, 0);
+    rc2 = sqlite3WalEndWriteTransaction(pPager->pWal);
     pPager->state = PAGER_SHARED;
 
     /* If the connection was in locking_mode=exclusive mode but is no longer,
@@ -2362,15 +2362,20 @@
 }
 
 /*
-** Open a WAL snapshot on the log file this pager is connected to.
+** Begin a read transaction on the WAL.
+**
+** This routine used to be called "pagerOpenSnapshot()" because it essentially
+** makes a snapshot of the database at the current point in time and preserves
+** that snapshot for use by the reader in spite of concurrently changes by
+** other writers or checkpointers.
 */
-static int pagerOpenSnapshot(Pager *pPager){
+static int pagerBeginReadTransaction(Pager *pPager){
   int rc;                         /* Return code */
   int changed = 0;                /* True if cache must be reset */
 
   assert( pagerUseWal(pPager) );
 
-  rc = sqlite3WalOpenSnapshot(pPager->pWal, &changed);
+  rc = sqlite3WalBeginReadTransaction(pPager->pWal, &changed);
   if( rc==SQLITE_OK ){
     int dummy;
     if( changed ){
@@ -2428,7 +2433,7 @@
         pager_reset(pPager);
         rc = sqlite3PagerOpenWal(pPager, 0);
         if( rc==SQLITE_OK ){
-          rc = pagerOpenSnapshot(pPager);
+          rc = pagerBeginReadTransaction(pPager);
         }
       }else if( pPager->journalMode==PAGER_JOURNALMODE_WAL ){
         pPager->journalMode = PAGER_JOURNALMODE_DELETE;
@@ -4002,7 +4007,7 @@
   }
 
   if( pagerUseWal(pPager) ){
-    rc = pagerOpenSnapshot(pPager);
+    rc = pagerBeginReadTransaction(pPager);
   }else if( pPager->state==PAGER_UNLOCK || isErrorReset ){
     sqlite3_vfs * const pVfs = pPager->pVfs;
     int isHotJournal = 0;
@@ -4561,7 +4566,7 @@
       ** may copy data from the sub-journal into the database file as well
       ** as into the page cache. Which would be incorrect in WAL mode.
       */
-      rc = sqlite3WalWriteLock(pPager->pWal, 1);
+      rc = sqlite3WalBeginWriteTransaction(pPager->pWal);
       if( rc==SQLITE_OK ){
         pPager->dbOrigSize = pPager->dbSize;
         pPager->state = PAGER_RESERVED;
@@ -5892,8 +5897,7 @@
     u8 *zBuf = (u8 *)pPager->pTmpSpace;
     rc = sqlite3WalCheckpoint(pPager->pWal,
         (pPager->noSync ? 0 : pPager->sync_flags),
-        pPager->pageSize, zBuf, 
-        pPager->xBusyHandler, pPager->pBusyHandlerArg
+        pPager->pageSize, zBuf
     );
   }
   return rc;
diff --git a/src/sqlite.h.in b/src/sqlite.h.in
index 0256399..0b931dc 100644
--- a/src/sqlite.h.in
+++ b/src/sqlite.h.in
@@ -444,7 +444,8 @@
 #define SQLITE_IOERR_LOCK              (SQLITE_IOERR | (15<<8))
 #define SQLITE_IOERR_CLOSE             (SQLITE_IOERR | (16<<8))
 #define SQLITE_IOERR_DIR_CLOSE         (SQLITE_IOERR | (17<<8))
-#define SQLITE_LOCKED_SHAREDCACHE      (SQLITE_LOCKED | (1<<8) )
+#define SQLITE_LOCKED_SHAREDCACHE      (SQLITE_LOCKED |  (1<<8))
+#define SQLITE_BUSY_RECOVERY           (SQLITE_BUSY   |  (1<<8))
 
 /*
 ** CAPI3REF: Flags For File Open Operations
@@ -658,7 +659,7 @@
   int (*xShmSize)(sqlite3_file*, int reqSize, int *pNewSize);
   int (*xShmGet)(sqlite3_file*, int reqSize, int *pSize, void volatile**);
   int (*xShmRelease)(sqlite3_file*);
-  int (*xShmLock)(sqlite3_file*, int desiredLock, int *gotLock);
+  int (*xShmLock)(sqlite3_file*, int offset, int n, int flags);
   void (*xShmBarrier)(sqlite3_file*);
   int (*xShmClose)(sqlite3_file*, int deleteFlag);
   /* Methods above are valid for version 2 */
@@ -888,16 +889,40 @@
 /*
 ** CAPI3REF: Flags for the xShmLock VFS method
 **
-** These integer constants define the various locking states that
-** an sqlite3_shm object can be in.
+** These integer constants define the various locking operations
+** allowed by the xShmLock method of [sqlite3_io_methods].  The
+** following are the only legal combinations of flags to the
+** xShmLock method:
+**
+** <ul>
+** <li>  SQLITE_SHM_LOCK | SQLITE_SHM_SHARED
+** <li>  SQLITE_SHM_LOCK | SQLITE_SHM_EXCLUSIVE
+** <li>  SQLITE_SHM_UNLOCK | SQLITE_SHM_SHARED
+** <li>  SQLITE_SHM_UNLOCK | SQLITE_SHM_EXCLUSIVE
+** </ul>
+**
+** When unlocking, the same SHARED or EXCLUSIVE flag must be supplied as
+** was given no the corresponding lock.  
+**
+** The xShmLock method can transition between unlocked and SHARED or
+** between unlocked and EXCLUSIVE.  It cannot transition between SHARED
+** and EXCLUSIVE.
 */
-#define SQLITE_SHM_UNLOCK       0
-#define SQLITE_SHM_READ         1
-#define SQLITE_SHM_READ_FULL    2
-#define SQLITE_SHM_WRITE        3
-#define SQLITE_SHM_PENDING      4
-#define SQLITE_SHM_CHECKPOINT   5
-#define SQLITE_SHM_RECOVER      6
+#define SQLITE_SHM_UNLOCK       1
+#define SQLITE_SHM_LOCK         2
+#define SQLITE_SHM_SHARED       4
+#define SQLITE_SHM_EXCLUSIVE    8
+
+/*
+** CAPI3REF: Maximum xShmLock index
+**
+** The xShmLock method on [sqlite3_io_methods] may use values
+** between 0 and this upper bound as its "offset" argument.
+** The SQLite core will never attempt to acquire or release a
+** lock outside of this range
+*/
+#define SQLITE_SHM_NLOCK        8
+
 
 /*
 ** CAPI3REF: Initialize The SQLite Library
diff --git a/src/test6.c b/src/test6.c
index 743e7ca..1dded82 100644
--- a/src/test6.c
+++ b/src/test6.c
@@ -540,8 +540,8 @@
 static int cfShmRelease(sqlite3_file *pFile){
   return sqlite3OsShmRelease(((CrashFile*)pFile)->pRealFile);
 }
-static int cfShmLock(sqlite3_file *pFile, int desired, int *pGot){
-  return sqlite3OsShmLock(((CrashFile*)pFile)->pRealFile, desired, pGot);
+static int cfShmLock(sqlite3_file *pFile, int ofst, int n, int flags){
+  return sqlite3OsShmLock(((CrashFile*)pFile)->pRealFile, ofst, n, flags);
 }
 static void cfShmBarrier(sqlite3_file *pFile){
   sqlite3OsShmBarrier(((CrashFile*)pFile)->pRealFile);
diff --git a/src/test_devsym.c b/src/test_devsym.c
index 196dccf..0464804 100644
--- a/src/test_devsym.c
+++ b/src/test_devsym.c
@@ -54,7 +54,7 @@
 static int devsymShmSize(sqlite3_file*,int,int*);
 static int devsymShmGet(sqlite3_file*,int,int*,volatile void**);
 static int devsymShmRelease(sqlite3_file*);
-static int devsymShmLock(sqlite3_file*,int,int*);
+static int devsymShmLock(sqlite3_file*,int,int,int);
 static void devsymShmBarrier(sqlite3_file*);
 static int devsymShmClose(sqlite3_file*,int);
 
@@ -263,9 +263,9 @@
   devsym_file *p = (devsym_file *)pFile;
   return sqlite3OsShmRelease(p->pReal);
 }
-static int devsymShmLock(sqlite3_file *pFile, int desired, int *pGot){
+static int devsymShmLock(sqlite3_file *pFile, int ofst, int n, int flags){
   devsym_file *p = (devsym_file *)pFile;
-  return sqlite3OsShmLock(p->pReal, desired, pGot);
+  return sqlite3OsShmLock(p->pReal, ofst, n, flags);
 }
 static void devsymShmBarrier(sqlite3_file *pFile){
   devsym_file *p = (devsym_file *)pFile;
diff --git a/src/test_osinst.c b/src/test_osinst.c
index 6697fa1..52e04fb 100644
--- a/src/test_osinst.c
+++ b/src/test_osinst.c
@@ -155,7 +155,7 @@
 static int vfslogShmSize(sqlite3_file *pFile, int reqSize, int *pNewSize);
 static int vfslogShmGet(sqlite3_file *pFile, int,int*,volatile void **);
 static int vfslogShmRelease(sqlite3_file *pFile);
-static int vfslogShmLock(sqlite3_file *pFile, int desiredLock, int *gotLock);
+static int vfslogShmLock(sqlite3_file *pFile, int ofst, int n, int flags);
 static void vfslogShmBarrier(sqlite3_file*);
 static int vfslogShmClose(sqlite3_file *pFile, int deleteFlag);
 
@@ -460,12 +460,12 @@
   vfslog_call(p->pVfslog, OS_SHMRELEASE, p->iFileId, t, rc, 0, 0);
   return rc;
 }
-static int vfslogShmLock(sqlite3_file *pFile, int desiredLock, int *gotLock){
+static int vfslogShmLock(sqlite3_file *pFile, int ofst, int n, int flags){
   int rc;
   sqlite3_uint64 t;
   VfslogFile *p = (VfslogFile *)pFile;
   t = vfslog_time();
-  rc = p->pReal->pMethods->xShmLock(p->pReal, desiredLock, gotLock);
+  rc = p->pReal->pMethods->xShmLock(p->pReal, ofst, n, flags);
   t = vfslog_time() - t;
   vfslog_call(p->pVfslog, OS_SHMLOCK, p->iFileId, t, rc, 0, 0);
   return rc;
diff --git a/src/test_vfs.c b/src/test_vfs.c
index f955879..d5e8ea1 100644
--- a/src/test_vfs.c
+++ b/src/test_vfs.c
@@ -102,7 +102,7 @@
 static int tvfsShmSize(sqlite3_file*, int , int *);
 static int tvfsShmGet(sqlite3_file*, int , int *, volatile void **);
 static int tvfsShmRelease(sqlite3_file*);
-static int tvfsShmLock(sqlite3_file*, int , int *);
+static int tvfsShmLock(sqlite3_file*, int , int, int);
 static void tvfsShmBarrier(sqlite3_file*);
 static int tvfsShmClose(sqlite3_file*, int);
 
@@ -544,31 +544,34 @@
 
 static int tvfsShmLock(
   sqlite3_file *pFile,
-  int desiredLock,
-  int *gotLock
+  int ofst,
+  int n,
+  int flags
 ){
   int rc = SQLITE_OK;
   TestvfsFile *pFd = (TestvfsFile *)pFile;
   Testvfs *p = (Testvfs *)(pFd->pVfs->pAppData);
-  char *zLock = "";
+  int nLock;
+  char zLock[80];
 
-  switch( desiredLock ){
-    case SQLITE_SHM_READ:         zLock = "READ";       break;
-    case SQLITE_SHM_WRITE:        zLock = "WRITE";      break;
-    case SQLITE_SHM_CHECKPOINT:   zLock = "CHECKPOINT"; break;
-    case SQLITE_SHM_RECOVER:      zLock = "RECOVER";    break;
-    case SQLITE_SHM_PENDING:      zLock = "PENDING";    break;
-    case SQLITE_SHM_UNLOCK:       zLock = "UNLOCK";     break;
+  sqlite3_snprintf(sizeof(zLock), zLock, "%d %d", ofst, n);
+  nLock = strlen(zLock);
+  if( flags & SQLITE_SHM_LOCK ){
+    strcpy(&zLock[nLock], " lock");
+  }else{
+    strcpy(&zLock[nLock], " unlock");
+  }
+  nLock += strlen(&zLock[nLock]);
+  if( flags & SQLITE_SHM_SHARED ){
+    strcpy(&zLock[nLock], " shared");
+  }else{
+    strcpy(&zLock[nLock], " exclusive");
   }
   tvfsExecTcl(p, "xShmLock", 
       Tcl_NewStringObj(pFd->pShm->zFile, -1), pFd->pShmId,
       Tcl_NewStringObj(zLock, -1)
   );
   tvfsResultCode(p, &rc);
-  if( rc==SQLITE_OK ){
-    *gotLock = desiredLock;
-  }
-
   return rc;
 }
 
@@ -716,9 +719,7 @@
 **
 **   SCRIPT xShmLock    FILENAME ID LOCK
 **
-** where LOCK is one of "UNLOCK", "READ", "READ_FULL", "WRITE", "PENDING",
-** "CHECKPOINT" or "RECOVER". The script should return an SQLite error
-** code.
+** where LOCK is of the form "OFFSET NBYTE lock/unlock shared/exclusive"
 */
 static int testvfs_cmd(
   ClientData cd,
diff --git a/src/wal.c b/src/wal.c
index 2bbbdcd..0bdd4ad 100644
--- a/src/wal.c
+++ b/src/wal.c
@@ -93,12 +93,22 @@
 **
 ** To read a page from the database (call it page number P), a reader
 ** first checks the WAL to see if it contains page P.  If so, then the
-** last valid instance of page P that is or is followed by a commit frame
-** become the value read.  If the WAL contains no copies of page P that
-** are valid and which are or are followed by a commit frame, then page
-** P is read from the database file.
+** last valid instance of page P that is a followed by a commit frame
+** or is a commit frame itself becomes the value read.  If the WAL
+** contains no copies of page P that are valid and which are a commit
+** frame or are followed by a commit frame, then page P is read from
+** the database file.
 **
-** The reader algorithm in the previous paragraph works correctly, but 
+** To start a read transaction, the reader records the index of the last
+** valid frame in the WAL.  The reader uses this recorded "mxFrame" value
+** for all subsequent read operations.  New transactions can be appended
+** to the WAL, but as long as the reader uses its original mxFrame value
+** and ignores the newly appended content, it will see a consistent snapshot
+** of the database from a single point in time.  This technique allows
+** multiple concurrent readers to view different versions of the database
+** content simultaneously.
+**
+** The reader algorithm in the previous paragraphs works correctly, but 
 ** because frames for page P can appear anywhere within the WAL, the
 ** reader has to scan the entire WAL looking for page P frames.  If the
 ** WAL is large (multiple megabytes is typical) that scan can be slow,
@@ -161,8 +171,7 @@
 ** the mapping section.  (For index blocks other than the last, K will
 ** always be exactly HASHTABLE_NPAGE (4096) and for the last index block
 ** K will be (mxFrame%HASHTABLE_NPAGE).)  Unused slots of the hash table
-** contain a value greater than K.  Note that no hash table slot ever
-** contains a zero value.
+** contain a value of 0.
 **
 ** To look for page P in the hash table, first compute a hash iKey on
 ** P as follows:
@@ -214,10 +223,22 @@
 
 #include "wal.h"
 
+/*
+** Indices of various locking bytes.   WAL_NREADER is the number
+** of available reader locks and should be at least 3.
+*/
+#define WAL_WRITE_LOCK         0
+#define WAL_ALL_BUT_WRITE      1
+#define WAL_CKPT_LOCK          1
+#define WAL_RECOVER_LOCK       2
+#define WAL_READ_LOCK(I)       (3+(I))
+#define WAL_NREADER            (SQLITE_SHM_NLOCK-3)
+
 
 /* Object declarations */
 typedef struct WalIndexHdr WalIndexHdr;
 typedef struct WalIterator WalIterator;
+typedef struct WalCkptInfo WalCkptInfo;
 
 
 /*
@@ -237,13 +258,65 @@
   u32 aCksum[2];                  /* Checksum over all prior fields */
 };
 
+/*
+** A copy of the following object occurs in the wal-index immediately
+** following the second copy of the WalIndexHdr.  This object stores
+** information used by checkpoint.
+**
+** nBackfill is the number of frames in the WAL that have been written
+** back into the database. (We call the act of moving content from WAL to
+** database "backfilling".)  The nBackfill number is never greater than
+** WalIndexHdr.mxFrame.  nBackfill can only be increased by threads
+** holding the WAL_CKPT_LOCK lock (which includes a recovery thread).
+** However, a WAL_WRITE_LOCK thread can move the value of nBackfill from
+** mxFrame back to zero when the WAL is reset.
+**
+** There is one entry in aReadMark[] for each reader lock.  If a reader
+** holds read-lock K, then the value in aReadMark[K] is no greater than
+** the mxFrame for that reader.  aReadMark[0] is a special case.  It
+** always holds zero.  Readers holding WAL_READ_LOCK(0) always ignore 
+** the entire WAL and read all content directly from the database.
+**
+** The value of aReadMark[K] may only be changed by a thread that
+** is holding an exclusive lock on WAL_READ_LOCK(K).  Thus, the value of
+** aReadMark[K] cannot changed while there is a reader is using that mark
+** since the reader will be holding a shared lock on WAL_READ_LOCK(K).
+**
+** The checkpointer may only transfer frames from WAL to database where
+** the frame numbers are less than or equal to every aReadMark[] that is
+** in use (that is, every aReadMark[j] for which there is a corresponding
+** WAL_READ_LOCK(j)).  New readers (usually) pick the aReadMark[] with the
+** largest value and will increase an unused aReadMark[] to mxFrame if there
+** is not already an aReadMark[] equal to mxFrame.  The exception to the
+** previous sentence is when nBackfill equals mxFrame (meaning that everything
+** in the WAL has been backfilled into the database) then new readers
+** will choose aReadMark[0] which has value 0 and hence such reader will
+** get all their all content directly from the database file and ignore 
+** the WAL.
+**
+** Writers normally append new frames to the end of the WAL.  However,
+** if nBackfill equals mxFrame (meaning that all WAL content has been
+** written back into the database) and if no readers are using the WAL
+** (in other words, if there are no WAL_READ_LOCK(i) where i>0) then
+** the writer will first "reset" the WAL back to the beginning and start
+** writing new content beginning at frame 1.
+**
+** We assume that 32-bit loads are atomic and so no locks are needed in
+** order to read from any aReadMark[] entries.
+*/
+struct WalCkptInfo {
+  u32 nBackfill;                  /* Number of WAL frames backfilled into DB */
+  u32 aReadMark[WAL_NREADER];     /* Reader marks */
+};
+
+
 /* A block of WALINDEX_LOCK_RESERVED bytes beginning at
 ** WALINDEX_LOCK_OFFSET is reserved for locks. Since some systems
 ** only support mandatory file-locks, we do not read or write data
 ** from the region of the file on which locks are applied.
 */
-#define WALINDEX_LOCK_OFFSET   (sizeof(WalIndexHdr)*2)
-#define WALINDEX_LOCK_RESERVED 8
+#define WALINDEX_LOCK_OFFSET   (sizeof(WalIndexHdr)*2 + sizeof(WalCkptInfo))
+#define WALINDEX_LOCK_RESERVED 16
 #define WALINDEX_HDR_SIZE      (WALINDEX_LOCK_OFFSET+WALINDEX_LOCK_RESERVED)
 
 /* Size of header before each frame in wal */
@@ -277,22 +350,31 @@
 ** following object.
 */
 struct Wal {
-  sqlite3_vfs *pVfs;         /* The VFS used to create pFd */
+  sqlite3_vfs *pVfs;         /* The VFS used to create pDbFd */
   sqlite3_file *pDbFd;       /* File handle for the database file */
   sqlite3_file *pWalFd;      /* File handle for WAL file */
   u32 iCallback;             /* Value to pass to log callback (or 0) */
   int szWIndex;              /* Size of the wal-index that is mapped in mem */
   volatile u32 *pWiData;     /* Pointer to wal-index content in memory */
-  u8 lockState;              /* SQLITE_SHM_xxxx constant showing lock state */
-  u8 readerType;             /* SQLITE_SHM_READ or SQLITE_SHM_READ_FULL */
+  u16 szPage;                /* Database page size */
+  i16 readLock;              /* Which read lock is being held.  -1 for none */
   u8 exclusiveMode;          /* Non-zero if connection is in exclusive mode */
-  u8 isWindexOpen;           /* True if ShmOpen() called on pDbFd */
-  WalIndexHdr hdr;           /* Wal-index for current snapshot */
+  u8 isWIndexOpen;           /* True if ShmOpen() called on pDbFd */
+  u8 writeLock;              /* True if in a write transaction */
+  u8 ckptLock;               /* True if holding a checkpoint lock */
+  WalIndexHdr hdr;           /* Wal-index header for current transaction */
   char *zWalName;            /* Name of WAL file */
-  int szPage;                /* Database page size */
   u32 nCkpt;                 /* Checkpoint sequence counter in the wal-header */
 };
 
+/*
+** Return a pointer to the WalCkptInfo structure in the wal-index.
+*/
+static volatile WalCkptInfo *walCkptInfo(Wal *pWal){
+  assert( pWal->pWiData!=0 );
+  return (volatile WalCkptInfo*)&pWal->pWiData[sizeof(WalIndexHdr)/2];
+}
+
 
 /*
 ** This structure is used to implement an iterator that loops through
@@ -380,42 +462,20 @@
 }
 
 /*
-** Attempt to change the lock status.
-**
-** When changing the lock status to SQLITE_SHM_READ, store the
-** type of reader lock (either SQLITE_SHM_READ or SQLITE_SHM_READ_FULL)
-** in pWal->readerType.
-*/
-static int walSetLock(Wal *pWal, int desiredStatus){
-  int rc = SQLITE_OK;             /* Return code */
-  if( pWal->exclusiveMode || pWal->lockState==desiredStatus ){
-    pWal->lockState = desiredStatus;
-  }else{
-    int got = pWal->lockState;
-    rc = sqlite3OsShmLock(pWal->pDbFd, desiredStatus, &got);
-    pWal->lockState = got;
-    if( got==SQLITE_SHM_READ_FULL || got==SQLITE_SHM_READ ){
-      pWal->readerType = got;
-      pWal->lockState = SQLITE_SHM_READ;
-    }
-  }
-  return rc;
-}
-
-/*
 ** Write the header information in pWal->hdr into the wal-index.
 **
 ** The checksum on pWal->hdr is updated before it is written.
 */
 static void walIndexWriteHdr(Wal *pWal){
   WalIndexHdr *aHdr;
-  walChecksumBytes(1, (u8*)&pWal->hdr,
-                   sizeof(pWal->hdr) - sizeof(pWal->hdr.aCksum),
+
+  assert( pWal->writeLock );
+  walChecksumBytes(1, (u8*)&pWal->hdr, offsetof(WalIndexHdr, aCksum),
                    0, pWal->hdr.aCksum);
   aHdr = (WalIndexHdr*)pWal->pWiData;
-  memcpy(&aHdr[1], &pWal->hdr, sizeof(pWal->hdr));
+  memcpy(&aHdr[1], &pWal->hdr, sizeof(WalIndexHdr));
   sqlite3OsShmBarrier(pWal->pDbFd);
-  memcpy(&aHdr[0], &pWal->hdr, sizeof(pWal->hdr));
+  memcpy(&aHdr[0], &pWal->hdr, sizeof(WalIndexHdr));
 }
 
 /*
@@ -521,6 +581,32 @@
 #define HASHTABLE_NBYTE      (sizeof(HASHTABLE_DATATYPE)*HASHTABLE_NSLOT)
 
 /*
+** Set or release locks.
+**
+** In locking_mode=EXCLUSIVE, all of these routines become no-ops.
+*/
+static int walLockShared(Wal *pWal, int lockIdx){
+  if( pWal->exclusiveMode ) return SQLITE_OK;
+  return sqlite3OsShmLock(pWal->pDbFd, lockIdx, 1,
+                          SQLITE_SHM_LOCK | SQLITE_SHM_SHARED);
+}
+static void walUnlockShared(Wal *pWal, int lockIdx){
+  if( pWal->exclusiveMode ) return;
+  (void)sqlite3OsShmLock(pWal->pDbFd, lockIdx, 1,
+                         SQLITE_SHM_UNLOCK | SQLITE_SHM_SHARED);
+}
+static int walLockExclusive(Wal *pWal, int lockIdx, int n){
+  if( pWal->exclusiveMode ) return SQLITE_OK;
+  return sqlite3OsShmLock(pWal->pDbFd, lockIdx, n,
+                          SQLITE_SHM_LOCK | SQLITE_SHM_EXCLUSIVE);
+}
+static void walUnlockExclusive(Wal *pWal, int lockIdx, int n){
+  if( pWal->exclusiveMode ) return;
+  (void)sqlite3OsShmLock(pWal->pDbFd, lockIdx, n,
+                         SQLITE_SHM_UNLOCK | SQLITE_SHM_EXCLUSIVE);
+}
+
+/*
 ** Return the index in the Wal.pWiData array that corresponds to 
 ** frame iFrame.
 **
@@ -600,7 +686,7 @@
 static int walIndexRemap(Wal *pWal, int enlargeTo){
   int rc;
   int sz;
-  assert( pWal->lockState>=SQLITE_SHM_WRITE );
+  assert( pWal->writeLock );
   rc = sqlite3OsShmSize(pWal->pDbFd, enlargeTo, &sz);
   if( rc==SQLITE_OK && sz>pWal->szWIndex ){
     walIndexUnmap(pWal);
@@ -685,7 +771,7 @@
   u32 iZero;                           /* frame == (aHash[x]+iZero) */
   int iLimit;                          /* Zero values greater than this */
 
-  assert( pWal->lockState==SQLITE_SHM_WRITE );
+  assert( pWal->writeLock );
   walHashFind(pWal, pWal->hdr.mxFrame+1, &aHash, &aPgno, &iZero);
   iLimit = pWal->hdr.mxFrame - iZero;
   if( iLimit>0 ){
@@ -810,19 +896,29 @@
 
 /*
 ** Recover the wal-index by reading the write-ahead log file. 
-** The caller must hold RECOVER lock on the wal-index file.
+**
+** This routine first tries to establish an exclusive lock on the
+** wal-index to prevent other threads/processes from doing anything
+** with the WAL or wal-index while recovery is running.  The
+** WAL_RECOVER_LOCK is also held so that other threads will know
+** that this thread is running recovery.  If unable to establish
+** the necessary locks, this routine returns SQLITE_BUSY.
 */
 static int walIndexRecover(Wal *pWal){
   int rc;                         /* Return Code */
   i64 nSize;                      /* Size of log file */
   u32 aFrameCksum[2] = {0, 0};
 
-  assert( pWal->lockState>SQLITE_SHM_READ );
+  rc = walLockExclusive(pWal, WAL_ALL_BUT_WRITE, SQLITE_SHM_NLOCK-1);
+  if( rc ){
+    return rc;
+  }
+
   memset(&pWal->hdr, 0, sizeof(WalIndexHdr));
 
   rc = sqlite3OsFileSize(pWal->pWalFd, &nSize);
   if( rc!=SQLITE_OK ){
-    return rc;
+    goto recovery_error;
   }
 
   if( nSize>WAL_HDRSIZE ){
@@ -838,7 +934,7 @@
     /* Read in the WAL header. */
     rc = sqlite3OsRead(pWal->pWalFd, aBuf, WAL_HDRSIZE, 0);
     if( rc!=SQLITE_OK ){
-      return rc;
+      goto recovery_error;
     }
 
     /* If the database page size is not a power of two, or is greater than
@@ -867,7 +963,8 @@
     szFrame = szPage + WAL_FRAME_HDRSIZE;
     aFrame = (u8 *)sqlite3_malloc(szFrame);
     if( !aFrame ){
-      return SQLITE_NOMEM;
+      rc = SQLITE_NOMEM;
+      goto recovery_error;
     }
     aData = &aFrame[WAL_FRAME_HDRSIZE];
 
@@ -908,6 +1005,9 @@
     pWal->hdr.aFrameCksum[1] = aFrameCksum[1];
     walIndexWriteHdr(pWal);
   }
+
+recovery_error:
+  walUnlockExclusive(pWal, WAL_ALL_BUT_WRITE, SQLITE_SHM_NLOCK-1);
   return rc;
 }
 
@@ -915,11 +1015,9 @@
 ** Close an open wal-index.
 */
 static void walIndexClose(Wal *pWal, int isDelete){
-  if( pWal->isWindexOpen ){
-    int notUsed;
-    sqlite3OsShmLock(pWal->pDbFd, SQLITE_SHM_UNLOCK, &notUsed);
+  if( pWal->isWIndexOpen ){
     sqlite3OsShmClose(pWal->pDbFd, isDelete);
-    pWal->isWindexOpen = 0;
+    pWal->isWIndexOpen = 0;
   }
 }
 
@@ -978,6 +1076,7 @@
   pRet->pWalFd = (sqlite3_file *)&pRet[1];
   pRet->pDbFd = pDbFd;
   pRet->szWIndex = -1;
+  pRet->readLock = -1;
   sqlite3_randomness(8, &pRet->hdr.aSalt);
   pRet->zWalName = zWal = pVfs->szOsFile + (char*)pRet->pWalFd;
   sqlite3_snprintf(nWal, zWal, "%s-wal", zDbName);
@@ -985,7 +1084,7 @@
 
   /* Open file handle on the write-ahead log file. */
   if( rc==SQLITE_OK ){
-    pRet->isWindexOpen = 1;
+    pRet->isWIndexOpen = 1;
     flags = (SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_MAIN_JOURNAL);
     rc = sqlite3OsOpen(pVfs, zWal, pRet->pWalFd, flags, &flags);
   }
@@ -1133,7 +1232,7 @@
   ** running (or, indeed, while the WalIterator object exists).  Hence,
   ** we can cast off the volatile qualifacation from shared memory
   */
-  assert( pWal->lockState==SQLITE_SHM_CHECKPOINT );
+  assert( pWal->ckptLock );
   aData = (u32*)pWal->pWiData;
 
   /* Allocate space for the WalIterator object */
@@ -1180,8 +1279,37 @@
   sqlite3_free(p);
 }
 
+
 /*
-** Checkpoint the contents of the log file.
+** Copy as much content as we can from the WAL back into the database file
+** in response to an sqlite3_wal_checkpoint() request or the equivalent.
+**
+** The amount of information copies from WAL to database might be limited
+** by active readers.  This routine will never overwrite a database page
+** that a concurrent reader might be using.
+**
+** All I/O barrier operations (a.k.a fsyncs) occur in this routine when
+** SQLite is in WAL-mode in synchronous=NORMAL.  That means that if 
+** checkpoints are always run by a background thread or background 
+** process, foreground threads will never block on a lengthy fsync call.
+**
+** Fsync is called on the WAL before writing content out of the WAL and
+** into the database.  This ensures that if the new content is persistent
+** in the WAL and can be recovered following a power-loss or hard reset.
+**
+** Fsync is also called on the database file if (and only if) the entire
+** WAL content is copied into the database file.  This second fsync makes
+** it safe to delete the WAL since the new content will persist in the
+** database file.
+**
+** This routine uses and updates the nBackfill field of the wal-index header.
+** This is the only routine tha will increase the value of nBackfill.  
+** (A WAL reset or recovery will revert nBackfill to zero, but not increase
+** its value.)
+**
+** The caller must be holding sufficient locks to ensure that no other
+** checkpoint is running (in any other thread or process) at the same
+** time.
 */
 static int walCheckpoint(
   Wal *pWal,                      /* Wal connection */
@@ -1194,51 +1322,83 @@
   WalIterator *pIter = 0;         /* Wal iterator context */
   u32 iDbpage = 0;                /* Next database page to write */
   u32 iFrame = 0;                 /* Wal frame containing data for iDbpage */
+  u32 mxSafeFrame;                /* Max frame that can be backfilled */
+  int i;                          /* Loop counter */
+  volatile WalIndexHdr *pHdr;     /* The actual wal-index header in SHM */
+  volatile WalCkptInfo *pInfo;    /* The checkpoint status information */
 
   /* Allocate the iterator */
   rc = walIteratorInit(pWal, &pIter);
   if( rc!=SQLITE_OK || pWal->hdr.mxFrame==0 ){
-    goto out;
+    walIteratorFree(pIter);
+    return rc;
   }
 
+  /*** TODO:  Move this test out to the caller.  Make it an assert() here ***/
   if( pWal->hdr.szPage!=nBuf ){
-    rc = SQLITE_CORRUPT_BKPT;
-    goto out;
+    walIteratorFree(pIter);
+    return SQLITE_CORRUPT_BKPT;
   }
 
-  /* Sync the log file to disk */
-  if( sync_flags ){
-    rc = sqlite3OsSync(pWal->pWalFd, sync_flags);
-    if( rc!=SQLITE_OK ) goto out;
+  /* Compute in mxSafeFrame the index of the last frame of the WAL that is
+  ** safe to write into the database.  Frames beyond mxSafeFrame might
+  ** overwrite database pages that are in use by active readers and thus
+  ** cannot be backfilled from the WAL.
+  */
+  mxSafeFrame = 0;
+  pHdr = (volatile WalIndexHdr*)pWal->pWiData;
+  pInfo = (volatile WalCkptInfo*)&pHdr[2];
+  assert( pInfo==walCkptInfo(pWal) );
+  for(i=1; i<WAL_NREADER; i++){
+    u32 y = pInfo->aReadMark[i];
+    if( y>0 && (mxSafeFrame==0 || mxSafeFrame<y) ){
+      if( y<pWal->hdr.mxFrame
+       && (rc = walLockExclusive(pWal, WAL_READ_LOCK(i), 1))==SQLITE_OK
+      ){
+        pInfo->aReadMark[i] = 0;
+        walUnlockExclusive(pWal, WAL_READ_LOCK(i), 1);
+      }else{
+        mxSafeFrame = y;
+      }
+    }
   }
 
-  /* Iterate through the contents of the log, copying data to the db file. */
-  while( 0==walIteratorNext(pIter, &iDbpage, &iFrame) ){
-    rc = sqlite3OsRead(pWal->pWalFd, zBuf, szPage, 
-        walFrameOffset(iFrame, szPage) + WAL_FRAME_HDRSIZE
-    );
-    if( rc!=SQLITE_OK ) goto out;
-    rc = sqlite3OsWrite(pWal->pDbFd, zBuf, szPage, (iDbpage-1)*szPage);
-    if( rc!=SQLITE_OK ) goto out;
+  if( pInfo->nBackfill<mxSafeFrame
+   && (rc = walLockExclusive(pWal, WAL_READ_LOCK(0), 1))==SQLITE_OK
+  ){
+    u32 nBackfill = pInfo->nBackfill;
+
+    /* Sync the WAL to disk */
+    if( sync_flags ){
+      rc = sqlite3OsSync(pWal->pWalFd, sync_flags);
+    }
+
+    /* Iterate through the contents of the WAL, copying data to the db file. */
+    while( rc==SQLITE_OK && 0==walIteratorNext(pIter, &iDbpage, &iFrame) ){
+      if( iFrame<=nBackfill || iFrame>mxSafeFrame ) continue;
+      rc = sqlite3OsRead(pWal->pWalFd, zBuf, szPage, 
+          walFrameOffset(iFrame, szPage) + WAL_FRAME_HDRSIZE
+      );
+      if( rc!=SQLITE_OK ) break;
+      rc = sqlite3OsWrite(pWal->pDbFd, zBuf, szPage, (iDbpage-1)*szPage);
+      if( rc!=SQLITE_OK ) break;
+    }
+
+    /* If work was actually accomplished... */
+    if( rc==SQLITE_OK && pInfo->nBackfill<mxSafeFrame ){
+      pInfo->nBackfill = mxSafeFrame;
+      if( mxSafeFrame==pHdr[0].mxFrame && sync_flags ){
+        rc = sqlite3OsTruncate(pWal->pDbFd, ((i64)pWal->hdr.nPage*(i64)szPage));
+        if( rc==SQLITE_OK && sync_flags ){
+          rc = sqlite3OsSync(pWal->pDbFd, sync_flags);
+        }
+      }
+    }
+
+    /* Release the reader lock held while backfilling */
+    walUnlockExclusive(pWal, WAL_READ_LOCK(0), 1);
   }
 
-  /* Truncate the database file */
-  rc = sqlite3OsTruncate(pWal->pDbFd, ((i64)pWal->hdr.nPage*(i64)szPage));
-  if( rc!=SQLITE_OK ) goto out;
-
-  /* Sync the database file. If successful, update the wal-index. */
-  if( sync_flags ){
-    rc = sqlite3OsSync(pWal->pDbFd, sync_flags);
-    if( rc!=SQLITE_OK ) goto out;
-  }
-  pWal->hdr.mxFrame = 0;
-  pWal->nCkpt++;
-  sqlite3Put4byte((u8*)pWal->hdr.aSalt,
-                   1 + sqlite3Get4byte((u8*)pWal->hdr.aSalt));
-  sqlite3_randomness(4, &pWal->hdr.aSalt[1]);
-  walIndexWriteHdr(pWal);
-
- out:
   walIteratorFree(pIter);
   return rc;
 }
@@ -1266,7 +1426,8 @@
     */
     rc = sqlite3OsLock(pWal->pDbFd, SQLITE_LOCK_EXCLUSIVE);
     if( rc==SQLITE_OK ){
-      rc = sqlite3WalCheckpoint(pWal, sync_flags, nBuf, zBuf, 0, 0);
+      pWal->exclusiveMode = 1;
+      rc = walCheckpoint(pWal, sync_flags, nBuf, zBuf);
       if( rc==SQLITE_OK ){
         isDelete = 1;
       }
@@ -1290,7 +1451,8 @@
 ** The wal-index is in shared memory.  Another thread or process might
 ** be writing the header at the same time this procedure is trying to
 ** read it, which might result in inconsistency.  A dirty read is detected
-** by verifying a checksum on the header.
+** by verifying that both copies of the header are the same and also by
+** a checksum on the header.
 **
 ** If and only if the read is consistent and the header is different from
 ** pWal->hdr, then pWal->hdr is updated to the content of the new header
@@ -1311,9 +1473,9 @@
   }
   assert( pWal->pWiData );
 
-  /* Read the header. The caller may or may not have an exclusive 
-  ** (WRITE, PENDING, CHECKPOINT or RECOVER) lock on the wal-index
-  ** file, meaning it is possible that an inconsistent snapshot is read
+  /* Read the header. This might happen currently with a write to the
+  ** same area of shared memory on a different CPU in a SMP,
+  ** meaning it is possible that an inconsistent snapshot is read
   ** from the file. If this happens, return non-zero.
   **
   ** There are two copies of the header at the beginning of the wal-index.
@@ -1367,52 +1529,39 @@
 */
 static int walIndexReadHdr(Wal *pWal, int *pChanged){
   int rc;                         /* Return code */
-  int lockState;                  /* pWal->lockState before running recovery */
+  int badHdr;                     /* True if a header read failed */
 
-  assert( pWal->lockState>=SQLITE_SHM_READ );
   assert( pChanged );
   rc = walIndexMap(pWal, walMappingSize(1));
   if( rc!=SQLITE_OK ){
     return rc;
   }
 
-  /* First attempt to read the wal-index header. This may fail for one
-  ** of two reasons: (a) the wal-index does not yet exist or has been
-  ** corrupted and needs to be constructed by running recovery, or (b)
-  ** the caller is only holding a READ lock and made a dirty read of
-  ** the wal-index header.
-  **
-  ** A dirty read of the wal-index header occurs if another thread or
-  ** process happens to be writing to the wal-index header at roughly
-  ** the same time as this thread is reading it. In this case it is 
-  ** possible that an inconsistent header is read (which is detected
-  ** using the header checksum mechanism).
+  /* Try once to read the header straight out.  This works most of the
+  ** time.
   */
-  if( walIndexTryHdr(pWal, pChanged)!=0 ){
+  badHdr = walIndexTryHdr(pWal, pChanged);
 
-    /* If the first attempt to read the header failed, lock the wal-index
-    ** file with an exclusive lock and try again. If the header checksum 
-    ** verification fails again, we can be sure that it is not simply a
-    ** dirty read, but that the wal-index really does need to be 
-    ** reconstructed by running log recovery.
-    **
-    ** In the paragraph above, an "exclusive lock" may be any of WRITE,
-    ** PENDING, CHECKPOINT or RECOVER. If any of these are already held,
-    ** no locking operations are required. If the caller currently holds
-    ** a READ lock, then upgrade to a RECOVER lock before re-reading the
-    ** wal-index header and revert to a READ lock before returning.
-    */
-    lockState = pWal->lockState;
-    if( lockState>SQLITE_SHM_READ
-     || SQLITE_OK==(rc = walSetLock(pWal, SQLITE_SHM_RECOVER)) 
-    ){
-      if( walIndexTryHdr(pWal, pChanged) ){
-        *pChanged = 1;
+  /* If the first attempt failed, it might have been due to a race
+  ** with a writer.  So get a WRITE lock and try again.
+  */
+  assert( pWal->writeLock==0 );
+  if( badHdr ){
+    rc = walLockExclusive(pWal, WAL_WRITE_LOCK, 1);
+    if( rc==SQLITE_OK ){
+      pWal->writeLock = 1;
+      badHdr = walIndexTryHdr(pWal, pChanged);
+      if( badHdr ){
+        /* If the wal-index header is still malformed even while holding
+        ** a WRITE lock, it can only mean that the header is corrupted and
+        ** needs to be reconstructed.  So run recovery to do exactly that.
+        */
         rc = walIndexRecover(pWal);
       }
-      if( lockState==SQLITE_SHM_READ ){
-        walSetLock(pWal, SQLITE_SHM_READ);
-      }
+      walUnlockExclusive(pWal, WAL_WRITE_LOCK, 1);
+      pWal->writeLock = 0;
+    }else if( rc!=SQLITE_BUSY ){
+      return rc;
     }
   }
 
@@ -1428,53 +1577,193 @@
 }
 
 /*
-** Take a snapshot of the state of the WAL and wal-index for the current
-** instant in time.  The current thread will continue to use this snapshot.
-** Other threads might containing appending to the WAL and wal-index but
-** the extra content appended will be ignored by the current thread.
-**
-** A snapshot is like a read transaction.
-**
-** No other threads are allowed to run a checkpoint while this thread is
-** holding the snapshot since a checkpoint would remove data out from under
-** this thread.
-**
-** If this call obtains a new read-lock and the database contents have been
-** modified since the most recent call to WalCloseSnapshot() on this Wal
-** connection, then *pChanged is set to 1 before returning. Otherwise, it 
-** is left unmodified. This is used by the pager layer to determine whether 
-** or not any cached pages may be safely reused.
+** This is the value that walTryBeginRead returns when it needs to
+** be retried.
 */
-int sqlite3WalOpenSnapshot(Wal *pWal, int *pChanged){
-  int rc;                         /* Return code */
+#define WAL_RETRY  (-1)
 
-  rc = walSetLock(pWal, SQLITE_SHM_READ);
-  assert( rc!=SQLITE_OK || pWal->lockState==SQLITE_SHM_READ );
+/*
+** Attempt to start a read transaction.  This might fail due to a race or
+** other transient condition.  When that happens, it returns WAL_RETRY to
+** indicate to the caller that it is safe to retry immediately.
+**
+** On success return SQLITE_OK.  On a permantent failure (such an
+** I/O error or an SQLITE_BUSY because another process is running
+** recovery) return a positive error code.
+**
+** On success, this routine obtains a read lock on 
+** WAL_READ_LOCK(pWal->readLock).  The pWal->readLock integer is
+** in the range 0 <= pWal->readLock < WAL_NREADER.  If pWal->readLock==(-1)
+** that means the Wal does not hold any read lock.  The reader must not
+** access any database page that is modified by a WAL frame up to and
+** including frame number aReadMark[pWal->readLock].  The reader will
+** use WAL frames up to and including pWal->hdr.mxFrame if pWal->readLock>0
+** Or if pWal->readLock==0, then the reader will ignore the WAL
+** completely and get all content directly from the database file.
+** When the read transaction is completed, the caller must release the
+** lock on WAL_READ_LOCK(pWal->readLock) and set pWal->readLock to -1.
+**
+** This routine uses the nBackfill and aReadMark[] fields of the header
+** to select a particular WAL_READ_LOCK() that strives to let the
+** checkpoint process do as much work as possible.  This routine might
+** update values of the aReadMark[] array in the header, but if it does
+** so it takes care to hold an exclusive lock on the corresponding
+** WAL_READ_LOCK() while changing values.
+*/
+static int walTryBeginRead(Wal *pWal, int *pChanged, int useWal){
+  volatile WalIndexHdr *pHdr;     /* Header of the wal-index */
+  volatile WalCkptInfo *pInfo;    /* Checkpoint information in wal-index */
+  u32 mxReadMark;                 /* Largest aReadMark[] value */
+  int mxI;                        /* Index of largest aReadMark[] value */
+  int i;                          /* Loop counter */
+  int rc;                         /* Return code  */
 
-  if( rc==SQLITE_OK ){
+  assert( pWal->readLock<0 );  /* No read lock held on entry */
+
+  if( !useWal ){
     rc = walIndexReadHdr(pWal, pChanged);
-    if( rc!=SQLITE_OK ){
-      /* An error occured while attempting log recovery. */
-      sqlite3WalCloseSnapshot(pWal);
+    if( rc==SQLITE_BUSY ){
+      /* If there is not a recovery running in another thread or process
+      ** then convert BUSY errors to WAL_RETRY.  If recovery is known to
+      ** be running, convert BUSY to BUSY_RECOVERY.  There is a race here
+      ** which might cause WAL_RETRY to be returned even if BUSY_RECOVERY
+      ** would be technically correct.  But the race is benign since with
+      ** WAL_RETRY this routine will be called again and will probably be
+      ** right on the second iteration.
+      */
+      rc = walLockShared(pWal, WAL_RECOVER_LOCK);
+      if( rc==SQLITE_OK ){
+        walUnlockShared(pWal, WAL_RECOVER_LOCK);
+        rc = WAL_RETRY;
+      }else if( rc==SQLITE_BUSY ){
+        rc = SQLITE_BUSY_RECOVERY;
+      }
+    }
+  }else{
+    rc = walIndexMap(pWal, pWal->hdr.mxFrame);
+  }
+  if( rc!=SQLITE_OK ){
+    return rc;
+  }
+
+  pHdr = (volatile WalIndexHdr*)pWal->pWiData;
+  pInfo = (volatile WalCkptInfo*)&pHdr[2];
+  assert( pInfo==walCkptInfo(pWal) );
+  if( !useWal && pInfo->nBackfill==pWal->hdr.mxFrame ){
+    /* The WAL has been completely backfilled (or it is empty).
+    ** and can be safely ignored.
+    */
+    rc = walLockShared(pWal, WAL_READ_LOCK(0));
+    if( rc==SQLITE_OK ){
+      if( pHdr->mxFrame!=pWal->hdr.mxFrame ){
+        walUnlockShared(pWal, WAL_READ_LOCK(0));
+        return WAL_RETRY;
+      }
+      pWal->readLock = 0;
+      return SQLITE_OK;
+    }else if( rc!=SQLITE_BUSY ){
+      return rc;
     }
   }
 
+  /* If we get this far, it means that the reader will want to use
+  ** the WAL to get at content from recent commits.  The job now is
+  ** to select one of the aReadMark[] entries that is closest to
+  ** but not exceeding pWal->hdr.mxFrame and lock that entry.
+  */
+  mxReadMark = 0;
+  mxI = 0;
+  for(i=1; i<WAL_NREADER; i++){
+    u32 thisMark = pInfo->aReadMark[i];
+    if( mxReadMark<thisMark ){
+      mxReadMark = thisMark;
+      mxI = i;
+    }
+  }
+  if( mxI==0 ){
+    /* If we get here, it means that all of the aReadMark[] entries between
+    ** 1 and WAL_NREADER-1 are zero.  Try to initialize aReadMark[1] to
+    ** be mxFrame, then retry.
+    */
+    rc = walLockExclusive(pWal, WAL_READ_LOCK(1), 1);
+    if( rc==SQLITE_OK ){
+      pInfo->aReadMark[1] = pWal->hdr.mxFrame;
+      walUnlockExclusive(pWal, WAL_READ_LOCK(1), 1);
+    }
+    return WAL_RETRY;
+  }else{
+    if( mxReadMark < pWal->hdr.mxFrame ){
+      for(i=0; i<WAL_NREADER; i++){
+        rc = walLockExclusive(pWal, WAL_READ_LOCK(i), 1);
+        if( rc==SQLITE_OK ){
+          pInfo->aReadMark[i] = pWal->hdr.mxFrame;
+          mxReadMark = pWal->hdr.mxFrame;
+          mxI = i;
+          walUnlockExclusive(pWal, WAL_READ_LOCK(i), 1);
+          break;
+        }
+      }
+    }
+
+    rc = walLockShared(pWal, WAL_READ_LOCK(mxI));
+    if( rc ){
+      return rc==SQLITE_BUSY ? WAL_RETRY : rc;
+    }
+    if( pInfo->aReadMark[mxI]!=mxReadMark
+     || pHdr[0].mxFrame!=pWal->hdr.mxFrame
+     || (sqlite3OsShmBarrier(pWal->pDbFd), pHdr[1].mxFrame!=pWal->hdr.mxFrame)
+    ){
+      walUnlockShared(pWal, WAL_READ_LOCK(mxI));
+      return WAL_RETRY;
+    }else{
+      pWal->readLock = mxI;
+    }
+  }
+  return rc;
+}
+
+/*
+** Begin a read transaction on the database.
+**
+** This routine used to be called sqlite3OpenSnapshot() and with good reason:
+** it takes a snapshot of the state of the WAL and wal-index for the current
+** instant in time.  The current thread will continue to use this snapshot.
+** Other threads might append new content to the WAL and wal-index but
+** that extra content is ignored by the current thread.
+**
+** If the database contents have changes since the previous read
+** transaction, then *pChanged is set to 1 before returning.  The
+** Pager layer will use this to know that is cache is stale and
+** needs to be flushed.
+*/
+int sqlite3WalBeginReadTransaction(Wal *pWal, int *pChanged){
+  int rc;                         /* Return code */
+
+  do{
+    rc = walTryBeginRead(pWal, pChanged, 0);
+  }while( rc==WAL_RETRY );
   walIndexUnmap(pWal);
   return rc;
 }
 
 /*
-** Unlock the current snapshot.
+** Finish with a read transaction.  All this does is release the
+** read-lock.
 */
-void sqlite3WalCloseSnapshot(Wal *pWal){
-  assert( pWal->lockState==SQLITE_SHM_READ
-       || pWal->lockState==SQLITE_SHM_UNLOCK
-  );
-  walSetLock(pWal, SQLITE_SHM_UNLOCK);
+void sqlite3WalEndReadTransaction(Wal *pWal){
+  if( pWal->readLock>=0 ){
+    walUnlockShared(pWal, WAL_READ_LOCK(pWal->readLock));
+    pWal->readLock = -1;
+  }
 }
 
 /*
-** Read a page from the log, if it is present. 
+** Read a page from the WAL, if it is present in the WAL and if the 
+** current read transaction is configured to use the WAL.  
+**
+** The *pInWal is set to 1 if the requested page is in the WAL and
+** has been loaded.  Or *pInWal is set to 0 if the page was not in 
+** the WAL and needs to be read out of the database.
 */
 int sqlite3WalRead(
   Wal *pWal,                      /* WAL handle */
@@ -1488,17 +1777,21 @@
   u32 iLast = pWal->hdr.mxFrame;  /* Last page in WAL for this reader */
   int iHash;                      /* Used to loop through N hash tables */
 
+  /* This routine is only called from within a read transaction */
+  assert( pWal->readLock>=0 );
+
   /* If the "last page" field of the wal-index header snapshot is 0, then
   ** no data will be read from the wal under any circumstances. Return early
-  ** in this case to avoid the walIndexMap/Unmap overhead.
+  ** in this case to avoid the walIndexMap/Unmap overhead.  Likewise, if
+  ** pWal->readLock==0, then the WAL is ignored by the reader so
+  ** return early, as if the WAL were empty.
   */
-  if( iLast==0 ){
+  if( iLast==0 || pWal->readLock==0 ){
     *pInWal = 0;
     return SQLITE_OK;
   }
 
   /* Ensure the wal-index is mapped. */
-  assert( pWal->lockState==SQLITE_SHM_READ||pWal->lockState==SQLITE_SHM_WRITE );
   rc = walIndexMap(pWal, walMappingSize(iLast));
   if( rc!=SQLITE_OK ){
     return rc;
@@ -1607,51 +1900,97 @@
 ** Set *pPgno to the size of the database file (or zero, if unknown).
 */
 void sqlite3WalDbsize(Wal *pWal, Pgno *pPgno){
-  assert( pWal->lockState==SQLITE_SHM_READ
-       || pWal->lockState==SQLITE_SHM_WRITE );
+  assert( pWal->readLock>=0 );
   *pPgno = pWal->hdr.nPage;
 }
 
-/* 
-** This function returns SQLITE_OK if the caller may write to the database.
-** Otherwise, if the caller is operating on a snapshot that has already
-** been overwritten by another writer, SQLITE_BUSY is returned.
-*/
-int sqlite3WalWriteLock(Wal *pWal, int op){
-  int rc = SQLITE_OK;
-  if( op ){
-    assert( pWal->lockState==SQLITE_SHM_READ );
-    rc = walSetLock(pWal, SQLITE_SHM_WRITE);
 
-    /* If this connection is not reading the most recent database snapshot,
-    ** it is not possible to write to the database. In this case release
-    ** the write locks and return SQLITE_BUSY.
-    */
+/* 
+** This function starts a write transaction on the WAL.
+**
+** A read transaction must have already been started by a prior call
+** to sqlite3WalBeginReadTransaction().
+**
+** If another thread or process has written into the database since
+** the read transaction was started, then it is not possible for this
+** thread to write as doing so would cause a fork.  So this routine
+** returns SQLITE_BUSY in that case and no write transaction is started.
+**
+** There can only be a single writer active at a time.
+*/
+int sqlite3WalBeginWriteTransaction(Wal *pWal){
+  int rc;
+  volatile WalCkptInfo *pInfo;
+
+  /* Cannot start a write transaction without first holding a read
+  ** transaction. */
+  assert( pWal->readLock>=0 );
+
+  /* Only one writer allowed at a time.  Get the write lock.  Return
+  ** SQLITE_BUSY if unable.
+  */
+  rc = walLockExclusive(pWal, WAL_WRITE_LOCK, 1);
+  if( rc ){
+    return rc;
+  }
+
+  /* If another connection has written to the database file since the
+  ** time the read transaction on this connection was started, then
+  ** the write is disallowed.
+  */
+  rc = walIndexMap(pWal, pWal->hdr.mxFrame);
+  if( rc ){
+    walUnlockExclusive(pWal, WAL_WRITE_LOCK, 1);
+    return rc;
+  }
+  if( memcmp(&pWal->hdr, (void*)pWal->pWiData, sizeof(WalIndexHdr))!=0 ){
+    walUnlockExclusive(pWal, WAL_WRITE_LOCK, 1);
+    walIndexUnmap(pWal);
+    return SQLITE_BUSY;
+  }
+
+  pInfo = walCkptInfo(pWal);
+  if( pWal->readLock==0 && pInfo->nBackfill==pWal->hdr.mxFrame ){
+    rc = walLockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
     if( rc==SQLITE_OK ){
-      rc = walIndexMap(pWal, walMappingSize(1));
-      assert( pWal->szWIndex>=WALINDEX_HDR_SIZE || rc!=SQLITE_OK );
-      if( rc==SQLITE_OK
-       && memcmp(&pWal->hdr, (void*)pWal->pWiData, sizeof(WalIndexHdr))
-      ){
-        rc = SQLITE_BUSY;
-      }
-      walIndexUnmap(pWal);
-      if( rc!=SQLITE_OK ){
-        walSetLock(pWal, SQLITE_SHM_READ);
-      }
+      /* If all readers are using WAL_READ_LOCK(0) (in other words if no
+      ** readers are currently using the WAL) */
+      pWal->nCkpt++;
+      pWal->hdr.mxFrame = 0;
+      sqlite3Put4byte((u8*)pWal->hdr.aSalt,
+                       1 + sqlite3Get4byte((u8*)pWal->hdr.aSalt));
+      sqlite3_randomness(4, &pWal->hdr.aSalt[1]);
+      walIndexWriteHdr(pWal);
+      pInfo->nBackfill = 0;
+      memset(&pInfo->aReadMark[1], 0, sizeof(pInfo->aReadMark)-sizeof(u32));
+      rc = sqlite3OsTruncate(pWal->pDbFd, 
+                             ((i64)pWal->hdr.nPage*(i64)pWal->szPage));
+      walUnlockExclusive(pWal, WAL_READ_LOCK(1), WAL_NREADER-1);
     }
-  }else if( pWal->lockState==SQLITE_SHM_WRITE ){
-    rc = walSetLock(pWal, SQLITE_SHM_READ);
+    walUnlockShared(pWal, WAL_READ_LOCK(0));
+    do{
+      int notUsed;
+      rc = walTryBeginRead(pWal, &notUsed, 1);
+    }while( rc==WAL_RETRY );
   }
   return rc;
 }
 
 /*
+** End a write transaction.  The commit has already been done.  This
+** routine merely releases the lock.
+*/
+int sqlite3WalEndWriteTransaction(Wal *pWal){
+  walUnlockExclusive(pWal, WAL_WRITE_LOCK, 1);
+  return SQLITE_OK;
+}
+
+/*
 ** If any data has been written (but not committed) to the log file, this
 ** function moves the write-pointer back to the start of the transaction.
 **
 ** Additionally, the callback function is invoked for each frame written
-** to the log since the start of the transaction. If the callback returns
+** to the WAL since the start of the transaction. If the callback returns
 ** other than SQLITE_OK, it is not invoked again and the error code is
 ** returned to the caller.
 **
@@ -1660,7 +1999,7 @@
 */
 int sqlite3WalUndo(Wal *pWal, int (*xUndo)(void *, Pgno), void *pUndoCtx){
   int rc = SQLITE_OK;
-  if( pWal->lockState==SQLITE_SHM_WRITE ){
+  if( pWal->writeLock ){
     int unused;
     Pgno iMax = pWal->hdr.mxFrame;
     Pgno iFrame;
@@ -1672,7 +2011,7 @@
     }
     if( rc==SQLITE_OK ){
       for(iFrame=pWal->hdr.mxFrame+1; rc==SQLITE_OK && iFrame<=iMax; iFrame++){
-        assert( pWal->lockState==SQLITE_SHM_WRITE );
+        assert( pWal->writeLock );
         rc = xUndo(pUndoCtx, pWal->pWiData[walIndexEntry(iFrame)]);
       }
       walCleanupHash(pWal);
@@ -1689,7 +2028,7 @@
 ** point in the event of a savepoint rollback (via WalSavepointUndo()).
 */
 void sqlite3WalSavepoint(Wal *pWal, u32 *aWalData){
-  assert( pWal->lockState==SQLITE_SHM_WRITE );
+  assert( pWal->writeLock );
   aWalData[0] = pWal->hdr.mxFrame;
   aWalData[1] = pWal->hdr.aFrameCksum[0];
   aWalData[2] = pWal->hdr.aFrameCksum[1];
@@ -1703,7 +2042,7 @@
 */
 int sqlite3WalSavepointUndo(Wal *pWal, u32 *aWalData){
   int rc = SQLITE_OK;
-  assert( pWal->lockState==SQLITE_SHM_WRITE );
+  assert( pWal->writeLock );
 
   assert( aWalData[0]<=pWal->hdr.mxFrame );
   if( aWalData[0]<pWal->hdr.mxFrame ){
@@ -1739,7 +2078,7 @@
   int nLast = 0;                  /* Number of extra copies of last page */
 
   assert( pList );
-  assert( pWal->lockState==SQLITE_SHM_WRITE );
+  assert( pWal->writeLock );
   assert( pWal->pWiData==0 );
 
   /* If this is the first frame written into the log, write the WAL
@@ -1852,42 +2191,28 @@
 }
 
 /* 
-** Checkpoint the database:
+** This routine is called to implement sqlite3_wal_checkpoint() and
+** related interfaces.
 **
-**   1. Acquire a CHECKPOINT lock
-**   2. Copy the contents of the log into the database file.
-**   3. Zero the wal-index header (so new readers will ignore the log).
-**   4. Drop the CHECKPOINT lock.
+** Obtain a CHECKPOINT lock and then backfill as much information as
+** we can from WAL into the database.
 */
 int sqlite3WalCheckpoint(
   Wal *pWal,                      /* Wal connection */
   int sync_flags,                 /* Flags to sync db file with (or 0) */
   int nBuf,                       /* Size of temporary buffer */
-  u8 *zBuf,                       /* Temporary buffer to use */
-  int (*xBusyHandler)(void *),    /* Pointer to busy-handler function */
-  void *pBusyHandlerArg           /* Argument to pass to xBusyHandler */
+  u8 *zBuf                        /* Temporary buffer to use */
 ){
   int rc;                         /* Return code */
   int isChanged = 0;              /* True if a new wal-index header is loaded */
 
   assert( pWal->pWiData==0 );
 
-  /* Get the CHECKPOINT lock. 
-  **
-  ** Normally, the connection will be in UNLOCK state at this point. But
-  ** if the connection is in exclusive-mode it may still be in READ state
-  ** even though the upper layer has no active read-transaction (because
-  ** WalCloseSnapshot() is not called in exclusive mode). The state will
-  ** be set to UNLOCK when this function returns. This is Ok.
-  */
-  assert( (pWal->lockState==SQLITE_SHM_UNLOCK)
-       || (pWal->lockState==SQLITE_SHM_READ) );
-  walSetLock(pWal, SQLITE_SHM_UNLOCK);
-  do {
-    rc = walSetLock(pWal, SQLITE_SHM_CHECKPOINT);
-  }while( rc==SQLITE_BUSY && xBusyHandler(pBusyHandlerArg) );
-  if( rc!=SQLITE_OK ){
-    walSetLock(pWal, SQLITE_SHM_UNLOCK);
+  rc = walLockExclusive(pWal, WAL_CKPT_LOCK, 1);
+  if( rc ){
+    /* Usually this is SQLITE_BUSY meaning that another thread or process
+    ** is already running a checkpoint, or maybe a recovery.  But it might
+    ** also be SQLITE_IOERR. */
     return rc;
   }
 
@@ -1908,7 +2233,7 @@
 
   /* Release the locks. */
   walIndexUnmap(pWal);
-  walSetLock(pWal, SQLITE_SHM_UNLOCK);
+  walUnlockExclusive(pWal, WAL_CKPT_LOCK, 1);
   return rc;
 }
 
@@ -1934,9 +2259,6 @@
 ** locking_mode=exclusive mode). If the EXCLUSIVE lock is to be dropped,
 ** the flag set by this function should be cleared before doing so.
 **
-** The value of the exclusive-mode flag may only be modified when
-** the WAL connection is in READ state.
-**
 ** When the flag is set, this module does not call the VFS xShmLock()
 ** method to obtain any locks on the wal-index (as it assumes it
 ** has exclusive access to the wal and wal-index files anyhow). It
@@ -1950,7 +2272,6 @@
 */
 int sqlite3WalExclusiveMode(Wal *pWal, int op){
   if( op>=0 ){
-    assert( pWal->lockState==SQLITE_SHM_READ );
     pWal->exclusiveMode = (u8)op;
   }
   return pWal->exclusiveMode;
diff --git a/src/wal.h b/src/wal.h
index bf40c3b..32aade1 100644
--- a/src/wal.h
+++ b/src/wal.h
@@ -20,19 +20,20 @@
 #include "sqliteInt.h"
 
 #ifdef SQLITE_OMIT_WAL
-# define sqlite3WalOpen(x,y,z)             0
-# define sqlite3WalClose(w,x,y,z)          0
-# define sqlite3WalOpenSnapshot(y,z)       0
-# define sqlite3WalCloseSnapshot(z) 
-# define sqlite3WalRead(v,w,x,y,z)         0
+# define sqlite3WalOpen(x,y,z)                 0
+# define sqlite3WalClose(w,x,y,z)              0
+# define sqlite3WalBeginReadTransaction(y,z)   0
+# define sqlite3WalEndReadTransaction(z)
+# define sqlite3WalRead(v,w,x,y,z)             0
 # define sqlite3WalDbsize(y,z)
-# define sqlite3WalWriteLock(y,z)          0
-# define sqlite3WalUndo(x,y,z)             0
+# define sqlite3WalBeginWriteTransaction(y)    0
+# define sqlite3WalEndWRiteTransaction(x)      0
+# define sqlite3WalUndo(x,y,z)                 0
 # define sqlite3WalSavepoint(y,z)
-# define sqlite3WalSavepointUndo(y,z)      0
-# define sqlite3WalFrames(u,v,w,x,y,z)     0
-# define sqlite3WalCheckpoint(u,v,w,x,y,z) 0
-# define sqlite3WalCallback(z)             0
+# define sqlite3WalSavepointUndo(y,z)          0
+# define sqlite3WalFrames(u,v,w,x,y,z)         0
+# define sqlite3WalCheckpoint(u,v,w,x)         0
+# define sqlite3WalCallback(z)                 0
 #else
 
 #define WAL_SAVEPOINT_NDATA 3
@@ -53,8 +54,8 @@
 ** write to or checkpoint the WAL.  sqlite3WalCloseSnapshot() closes the
 ** transaction and releases the lock.
 */
-int sqlite3WalOpenSnapshot(Wal *pWal, int *);
-void sqlite3WalCloseSnapshot(Wal *pWal);
+int sqlite3WalBeginReadTransaction(Wal *pWal, int *);
+void sqlite3WalEndReadTransaction(Wal *pWal);
 
 /* Read a page from the write-ahead log, if it is present. */
 int sqlite3WalRead(Wal *pWal, Pgno pgno, int *pInWal, int nOut, u8 *pOut);
@@ -64,7 +65,8 @@
 void sqlite3WalDbsize(Wal *pWal, Pgno *pPgno);
 
 /* Obtain or release the WRITER lock. */
-int sqlite3WalWriteLock(Wal *pWal, int op);
+int sqlite3WalBeginWriteTransaction(Wal *pWal);
+int sqlite3WalEndWriteTransaction(Wal *pWal);
 
 /* Undo any frames written (but not committed) to the log */
 int sqlite3WalUndo(Wal *pWal, int (*xUndo)(void *, Pgno), void *pUndoCtx);
@@ -85,9 +87,7 @@
   Wal *pWal,                      /* Write-ahead log connection */
   int sync_flags,                 /* Flags to sync db file with (or 0) */
   int nBuf,                       /* Size of buffer nBuf */
-  u8 *zBuf,                       /* Temporary buffer to use */
-  int (*xBusyHandler)(void *),    /* Pointer to busy-handler function */
-  void *pBusyHandlerArg           /* Argument to pass to xBusyHandler */
+  u8 *zBuf                        /* Temporary buffer to use */
 );
 
 /* Return the value to pass to a sqlite3_wal_hook callback, the