Change the table record format to support manifest typing. (CVS 1361)

FossilOrigin-Name: 0242c9e4f7c85e9c911cf30d90b0cdb1015f3d7d
diff --git a/src/printf.c b/src/printf.c
index bce1770..5db2ca3 100644
--- a/src/printf.c
+++ b/src/printf.c
@@ -358,12 +358,21 @@
         if( longvalue==0 && infop->base==8 ) flag_alternateform = 0;
 #endif
         if( infop->flags & FLAG_SIGNED ){
-          if( *(long*)&longvalue<0 ){
-            longvalue = -*(long*)&longvalue;
-            prefix = '-';
-          }else if( flag_plussign )  prefix = '+';
-          else if( flag_blanksign )  prefix = ' ';
-          else                       prefix = 0;
+          if( flag_longlong ){
+            if( *(i64*)&longvalue<0 ){
+              longvalue = -*(i64*)&longvalue;
+              prefix = '-';
+            }else if( flag_plussign )  prefix = '+';
+            else if( flag_blanksign )  prefix = ' ';
+            else                       prefix = 0;
+          }else{
+            if( *(long*)&longvalue<0 ){
+              longvalue = -*(long*)&longvalue;
+              prefix = '-';
+            }else if( flag_plussign )  prefix = '+';
+            else if( flag_blanksign )  prefix = ' ';
+            else                       prefix = 0;
+          }
         }else                        prefix = 0;
         if( flag_zeropad && precision<width-(prefix!=0) ){
           precision = width-(prefix!=0);
diff --git a/src/sqliteInt.h b/src/sqliteInt.h
index 1176887..fcfe381 100644
--- a/src/sqliteInt.h
+++ b/src/sqliteInt.h
@@ -11,7 +11,7 @@
 *************************************************************************
 ** Internal interface definitions for SQLite.
 **
-** @(#) $Id: sqliteInt.h,v 1.229 2004/05/10 23:29:50 drh Exp $
+** @(#) $Id: sqliteInt.h,v 1.230 2004/05/12 07:33:33 danielk1977 Exp $
 */
 #include "config.h"
 #include "sqlite.h"
@@ -1288,3 +1288,5 @@
 int sqlite3PutVarint(unsigned char *, u64);
 int sqlite3GetVarint(const unsigned char *, u64 *);
 int sqlite3VarintLen(u64 v);
+
+
diff --git a/src/vdbe.c b/src/vdbe.c
index cbe9f80..cdef6a7 100644
--- a/src/vdbe.c
+++ b/src/vdbe.c
@@ -43,7 +43,7 @@
 ** in this file for details.  If in doubt, do not deviate from existing
 ** commenting and indentation practices when changing or adding code.
 **
-** $Id: vdbe.c,v 1.282 2004/05/11 09:57:35 drh Exp $
+** $Id: vdbe.c,v 1.283 2004/05/12 07:33:33 danielk1977 Exp $
 */
 #include "sqliteInt.h"
 #include "os.h"
@@ -199,7 +199,7 @@
   if( fg & MEM_Real ){
     sqlite3_snprintf(sizeof(pStack->zShort),pStack->zShort,"%.15g",pStack->r);
   }else if( fg & MEM_Int ){
-    sqlite3_snprintf(sizeof(pStack->zShort),pStack->zShort,"%d",pStack->i);
+    sqlite3_snprintf(sizeof(pStack->zShort),pStack->zShort,"%lld",pStack->i);
   }else{
     pStack->zShort[0] = 0;
   }
@@ -1014,7 +1014,7 @@
     Release(pTos);
     pTos->flags = MEM_Null;
   }else if( (pTos->flags & pNos->flags & MEM_Int)==MEM_Int ){
-    int a, b;
+    i64 a, b;
     a = pTos->i;
     b = pNos->i;
     switch( pOp->opcode ){
@@ -1811,45 +1811,48 @@
 
 /* Opcode: Column3 P1 P2 *
 **
-** This opcode (not yet in use) is a replacement for the current
-** OP_Column3 that supports the SQLite3 manifest typing feature.
+** This opcode (not yet in use) is a replacement for the current OP_Column3
+** that supports the SQLite3 manifest typing feature.
 **
-** Interpret the data that cursor P1 points to as
-** a structure built using the MakeRecord instruction.
-** (See the MakeRecord opcode for additional information about
-** the format of the data.)
-** Push onto the stack the value of the P2-th column contained
-** in the data.
+** Interpret the data that cursor P1 points to as a structure built using
+** the MakeRecord instruction.  (See the MakeRecord opcode for additional
+** information about the format of the data.) Push onto the stack the value
+** of the P2-th column contained in the data.
 **
-** If the KeyAsData opcode has previously executed on this cursor,
-** then the field might be extracted from the key rather than the
-** data.
+** If the KeyAsData opcode has previously executed on this cursor, then the
+** field might be extracted from the key rather than the data.
 **
-** If P1 is negative, then the record is stored on the stack rather
-** than in a table.  For P1==-1, the top of the stack is used.
-** For P1==-2, the next on the stack is used.  And so forth.  The
-** value pushed is always just a pointer into the record which is
-** stored further down on the stack.  The column value is not copied.
+** If P1 is negative, then the record is stored on the stack rather than in
+** a table.  For P1==-1, the top of the stack is used.  For P1==-2, the
+** next on the stack is used.  And so forth.  The value pushed is always
+** just a pointer into the record which is stored further down on the
+** stack.  The column value is not copied.
 */
-case OP_Column3: {
-  int payloadSize;
+case OP_Column: {
+  int payloadSize;   /* Number of bytes in the record */
   int i = pOp->p1;
-  int p2 = pOp->p2;
+  int p2 = pOp->p2;  /* column number to retrieve */
   Cursor *pC;
-  char *zRec;
+  char *zRec;        /* Pointer to record-data from stack or pseudo-table. */
   BtCursor *pCrsr;
 
-  char *zHdr = 0;
-  int freeZHdr = 0;
-  int dataOffsetLen;
-  u64 dataOffset;
-  char *zIdx = 0;
-  int cnt;
-  u64 idxN;
-  u64 idxN1;
+  char *zData;       
+  int freeZdata = 0; /* zData requires sqliteFree() */
+
+  u64 nFields;       /* number of fields in the record */
+  u64 *aTypes;       /* An array of serial types (size nFields) */
+
+  int len;           /* The length of the serialized data for the column */
+  int offset;
+  int nn;
 
   assert( i<p->nCursor );
   pTos++;
+
+  /* This block sets the variable payloadSize, and if the data is coming
+  ** from the stack or from a pseudo-table zRec. If the data is coming
+  ** from a real cursor, then zRec is left as NULL.
+  */
   if( i<0 ){
     assert( &pTos[i]>=p->aStack );
     assert( pTos[i].flags & MEM_Str );
@@ -1882,89 +1885,140 @@
     break;
   }
 
-  /* Read the data-offset for this record */
-  if( zRec ){
-    dataOffsetLen = sqlite3GetVarint(zRec, &dataOffset);
-  }else{
-    unsigned char zDataOffset[9];
-    if( pC->keyAsData ){
-      sqlite3BtreeKey(pCrsr, 0, 9, zDataOffset);
-    }else{
-      sqlite3BtreeData(pCrsr, 0, 9, zDataOffset);
-    }
-    dataOffsetLen = sqlite3GetVarint(zDataOffset, &dataOffset);
-  }
-
-  /* Set zHdr to point at the start of the Idx() fields of the
-  ** record. Set freeZHdr to 1 if we need to sqliteFree(zHdr) later.
+  /* Read the number of fields for the record.
+  ** FIX ME: The Cursor object should cache this data and the array of
+  ** field types for subsequent OP_Column instructions.
   */
   if( zRec ){
-    zHdr = zRec + dataOffsetLen;
+    zData = zRec;
   }else{
-    zHdr = sqliteMalloc(dataOffset);
-    if( !zHdr ){
-      rc = SQLITE_NOMEM;
-      goto abort_due_to_error;
-    }
-    freeZHdr = 1;
+    /* We can assume that 9 bytes (maximum length of a varint) fits
+    ** on the main page in all cases.
+    */
     if( pC->keyAsData ){
-      sqlite3BtreeKey(pCrsr, dataOffsetLen, dataOffset, zHdr);
+      zData = (char *)sqlite3BtreeKeyFetch(pCrsr, 9>payloadSize?payloadSize:9);
     }else{
-      sqlite3BtreeData(pCrsr, dataOffsetLen, dataOffset, zHdr);
+      zData = (char *)sqlite3BtreeDataFetch(pCrsr, 9>payloadSize?payloadSize:9);
     }
+    assert( zData );
   }
+  offset = sqlite3GetVarint(zData, &nFields);
 
-  /* Find the Nth byte of zHdr that does not have the 0x80
-  ** bit set. The byte after this one is the start of the Idx(N)
-  ** varint. Then read Idx(N) and Idx(N+1)
-  */
-  cnt = p2;
-  zIdx = zHdr;
-  while( cnt>0 ){
-    assert( (zIdx-zHdr)<dataOffset );
-    if( !(*zIdx & 0x80) ) cnt--;
-    zIdx++;
-  }
-  zIdx += sqlite3GetVarint(zIdx, &idxN);
-  sqlite3GetVarint(zIdx, &idxN1);
-
-  /* Set zHdr to point at the field data */
-  if( freeZHdr ){
-    sqliteFree(zHdr);
-    freeZHdr = 0;
-  }
-  if( zRec ){
-    zHdr = zRec + (dataOffsetLen + dataOffset + idxN);
-  }else{
-    cnt = idxN1 - idxN;
-    assert( cnt>0 );
-    zHdr = sqliteMalloc(cnt);
-    if( !zHdr ){
-      rc = SQLITE_NOMEM;
-      goto abort_due_to_error;
+  if( !zRec ){
+    /* If the record is stored in a table, see if enough of it is on
+    ** the main page to use sqlite3BtreeDataFetch() to get the data
+    ** containing the nFields serial types (varints). This will almost
+    ** always work, but if it doesn't sqliteMalloc() space and use
+    ** sqlite3BtreeData().
+    **
+    ** Estimate the maximum space required by the nFields varints by
+    ** assuming the maximum space for each is the length required to store:
+    **
+    **     (<record length> * 2) + 13
+    **
+    ** This is the serial-type for a text object as long as the record
+    ** itself. In all cases the length required to store this is three
+    ** bytes or less. 
+    */
+    int max_space = sqlite3VarintLen((((u64)payloadSize)<<1)+13)*nFields;
+    max_space += offset;
+    if( max_space>payloadSize ){
+      max_space = payloadSize;
     }
-    freeZHdr = 1;
+
     if( pC->keyAsData ){
-      sqlite3BtreeKey(pCrsr, dataOffsetLen+dataOffset+idxN, cnt, zHdr);
+      zData = (char *)sqlite3BtreeKeyFetch(pCrsr, max_space);
     }else{
-      sqlite3BtreeData(pCrsr, dataOffsetLen+dataOffset+idxN, cnt, zHdr);
+      zData = (char *)sqlite3BtreeDataFetch(pCrsr, max_space);
+    }
+    if( !zData ){
+      /* This code will run very infrequently (e.g. tables with several
+      ** hundred columns).
+      */
+      zData = (char *)sqliteMalloc(offset+max_space);
+      if( !zData ){
+        rc = SQLITE_NOMEM;
+        goto abort_due_to_error;
+      }
+      if( pC->keyAsData ){
+        rc = sqlite3BtreeKey(pCrsr, 0, max_space, zData);
+      }else{
+        rc = sqlite3BtreeData(pCrsr, 0, max_space, zData);
+      }
+      if( rc!=SQLITE_OK ){
+        sqliteFree(zData);
+        goto abort_due_to_error;
+      }
+      freeZdata = 1;
     }
   }
 
-  /* Deserialize the field value directory into the top of the
-  ** stack. If the deserialized length does not match the expected
-  ** length, this indicates corruption.
+  /* Dynamically allocate space for the aTypes array. and read all
+  ** the serial types for the record. At the end of this block variable
+  ** offset is set to the offset to the start of Data0 in the record.
   */
-  if( (idxN1-idxN)!=sqlite3VdbeDeserialize(pTos, zHdr) ){
-    if( freeZHdr ){
-      sqliteFree(zHdr);
+  aTypes = (u64 *)sqliteMalloc(sizeof(u64)*nFields);
+  if( !aTypes ){
+    if( freeZdata ){
+      sqliteFree(zData);
+      freeZdata = 0;
     }
-    rc = SQLITE_CORRUPT;
+    rc = SQLITE_NOMEM;
     goto abort_due_to_error;
   }
+  for(nn=0; nn<nFields; nn++){
+    offset += sqlite3GetVarint(&zData[offset], &aTypes[nn]);
+  }
+ 
+  if( freeZdata ){
+    freeZdata = 0;
+    sqliteFree(zData);
+  }
 
-  if( freeZHdr ){
-    sqliteFree(zHdr);
+  for(nn=0; nn<p2; nn++){
+    offset += sqlite3VdbeSerialTypeLen(aTypes[nn]);
+  }
+  len = sqlite3VdbeSerialTypeLen(aTypes[p2]);
+
+  if( !zRec ){
+    /* If the record is stored in a table, see if enough of it
+    ** is on the main page to read our column using
+    ** sqlite3BtreeDataFetch(). If not sqliteMalloc() space and read data
+    ** with sqlite3BtreeData().
+    */
+    if( pC->keyAsData ){
+      zData = (char *)sqlite3BtreeKeyFetch(pCrsr, offset+len);
+    }else{
+      zData = (char *)sqlite3BtreeDataFetch(pCrsr, offset+len);
+    }
+    if( !zData && len>0 ){
+      zData = (char *)sqliteMalloc(len);
+      if( !zData ){
+        sqliteFree(aTypes);
+        rc = SQLITE_NOMEM;
+        goto abort_due_to_error;
+      }
+      if( pC->keyAsData ){
+        rc = sqlite3BtreeKey(pCrsr, offset, len, zData);
+      }else{
+        rc = sqlite3BtreeData(pCrsr, offset, len, zData);
+      }
+      if( rc!=SQLITE_OK ){
+        sqliteFree( aTypes );
+        sqliteFree( zData );
+        goto abort_due_to_error;
+      }
+      freeZdata = 1;
+      offset = 0;
+    }
+  }
+
+  /* Deserialize the value directly into the top of the stack */
+  sqlite3VdbeSerialGet(&zData[offset], aTypes[p2], pTos);
+
+  sqliteFree(aTypes);
+  if( freeZdata ){
+    sqliteFree(zData);
   }
   break;
 }
@@ -1981,223 +2035,80 @@
 ** opcode can decode the record later.  Refer to source code
 ** comments for the details of the record format.
 */
-case OP_MakeRecord3: {
+case OP_MakeRecord: {
   /* Assuming the record contains N fields, the record format looks
   ** like this:
   **
   ** --------------------------------------------------------------------------
-  ** | data-offset | idx1 | ... | idx(N-1) | idx(N) | data0 | ... | data(N-1) |
+  ** | num-fields | type 0 | type 1 | ... | type N-1 | data0 | ... | data N-1 | 
   ** --------------------------------------------------------------------------
   **
   ** Data(0) is taken from the lowest element of the stack and data(N-1) is
   ** the top of the stack.
   **
-  ** The data-offset and each of the idx() entries is stored as a 1-9 
-  ** byte variable-length integer (see comments in btree.c). The
-  ** data-offset contains the offset from the end of itself to the start 
-  ** of data(0).
+  ** Each type field is a varint representing the serial type of the 
+  ** corresponding data element (see sqlite3VdbeSerialType()). The
+  ** num-fields field is also a varint storing N.
   ** 
-  ** Idx(k) contains the offset from the start of data(0) to the first 
-  ** byte of data(k). Idx(0) is implicitly 0. Hence:
-  ** 
-  **    sizeof(data-offset) + data-offset + Idx(N) 
-  **
-  ** is the number of bytes in the record. The offset to start of data(X)
-  ** is sizeof(data-offset) + data-offset + Idx(X
-  **
   ** TODO: Even when the record is short enough for Mem::zShort, this opcode
   **   allocates it dynamically.
   */
-  int nDataLen = 0;
-  int nHdrLen = 0;
-  int data_offset = 0;
   int nField = pOp->p1;
   unsigned char *zNewRecord;
-  unsigned char *zHdr;
+  unsigned char *zCsr;
   Mem *pRec;
+  int nBytes;    /* Space required for this record */
 
   Mem *pData0 = &pTos[1-nField];
   assert( pData0>=p->aStack );
 
-  /* Loop through the elements that will make up the record, determining
-  ** the aggregate length of the Data() segments and the data_offset.
+  /* Loop through the elements that will make up the record to figure
+  ** out how much space is required for the new record.
   */
-  for(pRec=pData0; pRec!=pTos; pRec++){
-    nDataLen += sqlite3VdbeSerialLen(pRec);
-    data_offset += sqlite3VarintLen(nDataLen);
-  }
- 
-  /* The size of the header is the data-offset + the size of the
-  ** data-offset as a varint. If the size of the header combined with
-  ** the size of the Data() segments is greater than MAX_BYTES_PER_ROW, 
-  ** report an error.
-  */
-  nHdrLen = data_offset + sqlite3VarintLen(data_offset);
-  if( (nHdrLen+nDataLen)>MAX_BYTES_PER_ROW ){
-    rc = SQLITE_TOOBIG;
-    goto abort_due_to_error;
+  nBytes = sqlite3VarintLen(nField);
+  for(pRec=pData0; pRec<=pTos; pRec++){
+    u64 serial_type = sqlite3VdbeSerialType(pRec);
+    nBytes += sqlite3VdbeSerialTypeLen(serial_type);
+    nBytes += sqlite3VarintLen(serial_type);
   }
 
-  /* Allocate space for the new row. */
-  zNewRecord = sqliteMalloc(nHdrLen+nDataLen);
+  /* Allocate space for the new record. */
+  zNewRecord = sqliteMalloc(nBytes);
   if( !zNewRecord ){
     rc = SQLITE_NOMEM;
     goto abort_due_to_error;
   }
 
-  /* Write the data offset */
-  zHdr = zNewRecord;
-  zHdr += sqlite3PutVarint(zHdr, data_offset);
+  /* Write the record */
+  zCsr = zNewRecord;
+  zCsr += sqlite3PutVarint(zCsr, nField);             /* number of fields */
+  for(pRec=pData0; pRec<=pTos; pRec++){
+    u64 serial_type = sqlite3VdbeSerialType(pRec);
+    zCsr += sqlite3PutVarint(zCsr, serial_type);      /* serial type */
+  }
+  for(pRec=pData0; pRec<=pTos; pRec++){
+    zCsr += sqlite3VdbeSerialPut(zCsr, pRec);  /* serial data */
+  }
 
-  /* Loop through the values on the stack writing both the serialized value
-  ** and the the Idx() offset for each.
+  /* If zCsr has not been advanced exactly nBytes bytes, then one
+  ** of the sqlite3PutVarint() or sqlite3VdbeSerialPut() calls above
+  ** failed. This indicates a corrupted memory cell or code bug.
   */
-  nDataLen = 0;
-  for(pRec=pData0; pRec!=pTos; pRec++){
-    nDataLen += sqlite3VdbeSerialize(pRec, &zNewRecord[nDataLen]);
-    zHdr += sqlite3PutVarint(zHdr, nDataLen);
+  if( zCsr!=(zNewRecord+nBytes) ){
+    rc = SQLITE_INTERNAL;
+    goto abort_due_to_error;
   }
 
   /* Pop nField entries from the stack and push the new entry on */
   popStack(&pTos, nField);
   pTos++;
-  pTos->n = nDataLen+nHdrLen;
+  pTos->n = nBytes;
   pTos->z = zNewRecord;
   pTos->flags = MEM_Str | MEM_Dyn;
 
   break;
 }
 
-/* Opcode: MakeRecord P1 P2 *
-**
-** Convert the top P1 entries of the stack into a single entry
-** suitable for use as a data record in a database table.  The
-** details of the format are irrelavant as long as the OP_Column
-** opcode can decode the record later.  Refer to source code
-** comments for the details of the record format.
-**
-** If P2 is true (non-zero) and one or more of the P1 entries
-** that go into building the record is NULL, then add some extra
-** bytes to the record to make it distinct for other entries created
-** during the same run of the VDBE.  The extra bytes added are a
-** counter that is reset with each run of the VDBE, so records
-** created this way will not necessarily be distinct across runs.
-** But they should be distinct for transient tables (created using
-** OP_OpenTemp) which is what they are intended for.
-**
-** (Later:) The P2==1 option was intended to make NULLs distinct
-** for the UNION operator.  But I have since discovered that NULLs
-** are indistinct for UNION.  So this option is never used.
-*/
-case OP_MakeRecord: {
-  char *zNewRecord;
-  int nByte;
-  int nField;
-  int i, j;
-  int idxWidth;
-  u32 addr;
-  Mem *pRec;
-  int addUnique = 0;   /* True to cause bytes to be added to make the
-                       ** generated record distinct */
-  char zTemp[NBFS];    /* Temp space for small records */
-
-  /* Assuming the record contains N fields, the record format looks
-  ** like this:
-  **
-  **   -------------------------------------------------------------------
-  **   | idx0 | idx1 | ... | idx(N-1) | idx(N) | data0 | ... | data(N-1) |
-  **   -------------------------------------------------------------------
-  **
-  ** All data fields are converted to strings before being stored and
-  ** are stored with their null terminators.  NULL entries omit the
-  ** null terminator.  Thus an empty string uses 1 byte and a NULL uses
-  ** zero bytes.  Data(0) is taken from the lowest element of the stack
-  ** and data(N-1) is the top of the stack.
-  **
-  ** Each of the idx() entries is either 1, 2, or 3 bytes depending on
-  ** how big the total record is.  Idx(0) contains the offset to the start
-  ** of data(0).  Idx(k) contains the offset to the start of data(k).
-  ** Idx(N) contains the total number of bytes in the record.
-  */
-  nField = pOp->p1;
-  pRec = &pTos[1-nField];
-  assert( pRec>=p->aStack );
-  nByte = 0;
-  for(i=0; i<nField; i++, pRec++){
-    if( pRec->flags & MEM_Null ){
-      addUnique = pOp->p2;
-    }else{
-      Stringify(pRec);
-      nByte += pRec->n;
-    }
-  }
-  if( addUnique ) nByte += sizeof(p->uniqueCnt);
-  if( nByte + nField + 1 < 256 ){
-    idxWidth = 1;
-  }else if( nByte + 2*nField + 2 < 65536 ){
-    idxWidth = 2;
-  }else{
-    idxWidth = 3;
-  }
-  nByte += idxWidth*(nField + 1);
-  if( nByte>MAX_BYTES_PER_ROW ){
-    rc = SQLITE_TOOBIG;
-    goto abort_due_to_error;
-  }
-  if( nByte<=NBFS ){
-    zNewRecord = zTemp;
-  }else{
-    zNewRecord = sqliteMallocRaw( nByte );
-    if( zNewRecord==0 ) goto no_mem;
-  }
-  j = 0;
-  addr = idxWidth*(nField+1) + addUnique*sizeof(p->uniqueCnt);
-  for(i=0, pRec=&pTos[1-nField]; i<nField; i++, pRec++){
-    zNewRecord[j++] = addr & 0xff;
-    if( idxWidth>1 ){
-      zNewRecord[j++] = (addr>>8)&0xff;
-      if( idxWidth>2 ){
-        zNewRecord[j++] = (addr>>16)&0xff;
-      }
-    }
-    if( (pRec->flags & MEM_Null)==0 ){
-      addr += pRec->n;
-    }
-  }
-  zNewRecord[j++] = addr & 0xff;
-  if( idxWidth>1 ){
-    zNewRecord[j++] = (addr>>8)&0xff;
-    if( idxWidth>2 ){
-      zNewRecord[j++] = (addr>>16)&0xff;
-    }
-  }
-  if( addUnique ){
-    memcpy(&zNewRecord[j], &p->uniqueCnt, sizeof(p->uniqueCnt));
-    p->uniqueCnt++;
-    j += sizeof(p->uniqueCnt);
-  }
-  for(i=0, pRec=&pTos[1-nField]; i<nField; i++, pRec++){
-    if( (pRec->flags & MEM_Null)==0 ){
-      memcpy(&zNewRecord[j], pRec->z, pRec->n);
-      j += pRec->n;
-    }
-  }
-  popStack(&pTos, nField);
-  pTos++;
-  pTos->n = nByte;
-  if( nByte<=NBFS ){
-    assert( zNewRecord==zTemp );
-    memcpy(pTos->zShort, zTemp, nByte);
-    pTos->z = pTos->zShort;
-    pTos->flags = MEM_Str | MEM_Short;
-  }else{
-    assert( zNewRecord!=zTemp );
-    pTos->z = zNewRecord;
-    pTos->flags = MEM_Str | MEM_Dyn;
-  }
-  break;
-}
-
 /* Opcode: MakeKey P1 P2 P3
 **
 ** Convert the top P1 entries of the stack into a single entry suitable
@@ -3402,135 +3313,6 @@
   break;
 }
 
-/* Opcode: Column P1 P2 *
-**
-** Interpret the data that cursor P1 points to as
-** a structure built using the MakeRecord instruction.
-** (See the MakeRecord opcode for additional information about
-** the format of the data.)
-** Push onto the stack the value of the P2-th column contained
-** in the data.
-**
-** If the KeyAsData opcode has previously executed on this cursor,
-** then the field might be extracted from the key rather than the
-** data.
-**
-** If P1 is negative, then the record is stored on the stack rather
-** than in a table.  For P1==-1, the top of the stack is used.
-** For P1==-2, the next on the stack is used.  And so forth.  The
-** value pushed is always just a pointer into the record which is
-** stored further down on the stack.  The column value is not copied.
-*/
-case OP_Column: {
-  int amt, offset, end, payloadSize;
-  int i = pOp->p1;
-  int p2 = pOp->p2;
-  Cursor *pC;
-  char *zRec;
-  BtCursor *pCrsr;
-  int idxWidth;
-  unsigned char aHdr[10];
-
-  assert( i<p->nCursor );
-  pTos++;
-  if( i<0 ){
-    assert( &pTos[i]>=p->aStack );
-    assert( pTos[i].flags & MEM_Str );
-    zRec = pTos[i].z;
-    payloadSize = pTos[i].n;
-  }else if( (pC = &p->aCsr[i])->pCursor!=0 ){
-    sqlite3VdbeCursorMoveto(pC);
-    zRec = 0;
-    pCrsr = pC->pCursor;
-    if( pC->nullRow ){
-      payloadSize = 0;
-    }else if( pC->keyAsData ){
-      u64 pl64;
-      assert( !pC->intKey );
-      sqlite3BtreeKeySize(pCrsr, &pl64);
-      payloadSize = pl64;
-    }else{
-      sqlite3BtreeDataSize(pCrsr, &payloadSize);
-    }
-  }else if( pC->pseudoTable ){
-    payloadSize = pC->nData;
-    zRec = pC->pData;
-    assert( payloadSize==0 || zRec!=0 );
-  }else{
-    payloadSize = 0;
-  }
-
-  /* Figure out how many bytes in the column data and where the column
-  ** data begins.
-  */
-  if( payloadSize==0 ){
-    pTos->flags = MEM_Null;
-    break;
-  }else if( payloadSize<256 ){
-    idxWidth = 1;
-  }else if( payloadSize<65536 ){
-    idxWidth = 2;
-  }else{
-    idxWidth = 3;
-  }
-
-  /* Figure out where the requested column is stored and how big it is.
-  */
-  if( payloadSize < idxWidth*(p2+1) ){
-    rc = SQLITE_CORRUPT;
-    goto abort_due_to_error;
-  }
-  if( zRec ){
-    memcpy(aHdr, &zRec[idxWidth*p2], idxWidth*2);
-  }else if( pC->keyAsData ){
-    sqlite3BtreeKey(pCrsr, idxWidth*p2, idxWidth*2, (char*)aHdr);
-  }else{
-    sqlite3BtreeData(pCrsr, idxWidth*p2, idxWidth*2, (char*)aHdr);
-  }
-  offset = aHdr[0];
-  end = aHdr[idxWidth];
-  if( idxWidth>1 ){
-    offset |= aHdr[1]<<8;
-    end |= aHdr[idxWidth+1]<<8;
-    if( idxWidth>2 ){
-      offset |= aHdr[2]<<16;
-      end |= aHdr[idxWidth+2]<<16;
-    }
-  }
-  amt = end - offset;
-  if( amt<0 || offset<0 || end>payloadSize ){
-    rc = SQLITE_CORRUPT;
-    goto abort_due_to_error;
-  }
-
-  /* amt and offset now hold the offset to the start of data and the
-  ** amount of data.  Go get the data and put it on the stack.
-  */
-  pTos->n = amt;
-  if( amt==0 ){
-    pTos->flags = MEM_Null;
-  }else if( zRec ){
-    pTos->flags = MEM_Str | MEM_Ephem;
-    pTos->z = &zRec[offset];
-  }else{
-    if( amt<=NBFS ){
-      pTos->flags = MEM_Str | MEM_Short;
-      pTos->z = pTos->zShort;
-    }else{
-      char *z = sqliteMallocRaw( amt );
-      if( z==0 ) goto no_mem;
-      pTos->flags = MEM_Str | MEM_Dyn;
-      pTos->z = z;
-    }
-    if( pC->keyAsData ){
-      sqlite3BtreeKey(pCrsr, offset, amt, pTos->z);
-    }else{
-      sqlite3BtreeData(pCrsr, offset, amt, pTos->z);
-    }
-  }
-  break;
-}
-
 /* Opcode: Recno P1 * *
 **
 ** Push onto the stack an integer which is the first 4 bytes of the
diff --git a/src/vdbeInt.h b/src/vdbeInt.h
index f250a1a..6e91494 100644
--- a/src/vdbeInt.h
+++ b/src/vdbeInt.h
@@ -317,8 +317,9 @@
 #if !defined(NDEBUG) || defined(VDBE_PROFILE)
 void sqlite3VdbePrintOp(FILE*, int, Op*);
 #endif
-int sqlite3VdbeSerialize(const Mem *, unsigned char *);
-int sqlite3VdbeSerialLen(const Mem *);
-int sqlite3VdbeDeserialize(Mem *, const unsigned char *);
+int sqlite3VdbeSerialTypeLen(u64);
+u64 sqlite3VdbeSerialType(const Mem *);
+int sqlite3VdbeSerialPut(unsigned char *, const Mem *);
+int sqlite3VdbeSerialGet(const unsigned char *, u64, Mem *);
 
 int sqlite2BtreeKeyCompare(BtCursor *, const void *, int, int, int *);
diff --git a/src/vdbeaux.c b/src/vdbeaux.c
index 6d2a4f6..ad4ba57 100644
--- a/src/vdbeaux.c
+++ b/src/vdbeaux.c
@@ -1107,17 +1107,28 @@
 }
 
 /*
-** The following three functions:
+** The following functions:
 **
-** sqlite3VdbeSerialize()
+** sqlite3VdbeSerialType()
+** sqlite3VdbeSerialTypeLen()
+** sqlite3VdbeSerialRead()
 ** sqlite3VdbeSerialLen()
-** sqlite3VdbeDeserialize()
+** sqlite3VdbeSerialWrite()
 **
 ** encapsulate the code that serializes values for storage in SQLite
-** databases. Each serialized value consists of a variable length integer
-** followed by type specific storage.
+** data and index records. Each serialized value consists of a
+** 'serial-type' and a blob of data. The serial type is an 8-byte unsigned
+** integer, stored as a varint.
 **
-**   initial varint     bytes to follow    type
+** In an SQLite index record, the serial type is stored directly before
+** the blob of data that it corresponds to. In a table record, all serial
+** types are stored at the start of the record, and the blobs of data at
+** the end. Hence these functions allow the caller to handle the
+** serial-type and data blob seperately.
+**
+** The following table describes the various storage classes for data:
+**
+**   serial type        bytes of data      type
 **   --------------     ---------------    ---------------
 **      0                     0            NULL
 **      1                     1            signed integer
@@ -1132,149 +1143,145 @@
 */
 
 /*
-** Write the serialized form of the value held by pMem into zBuf. Return
-** the number of bytes written.
+** Return the serial-type for the value stored in pMem.
 */
-int sqlite3VdbeSerialize(
-  const Mem *pMem,      /* Pointer to vdbe value to serialize */
-  unsigned char *zBuf   /* Buffer to write to */
-){
-  if( pMem->flags&MEM_Null ){
-    return sqlite3PutVarint(zBuf, 0);
+u64 sqlite3VdbeSerialType(const Mem *pMem){
+  int flags = pMem->flags;
+
+  if( flags&MEM_Null ){
+    return 0;
   }
-
-  if( pMem->flags&MEM_Real ){
-    assert(!"TODO: float");
+  if( flags&MEM_Int ){
+    /* Figure out whether to use 1, 2, 4 or 8 bytes. */
+    i64 i = pMem->i;
+    if( i>=-127 && i<=127 ) return 1;
+    if( i>=-32767 && i<=32767 ) return 2;
+    if( i>=-2147483647 && i<=2147483647 ) return 3;
+    return 4;
   }
-
-  if( pMem->flags&MEM_Str ){
-    int data_type_len;
-    u64 data_type = (pMem->n*2+31);
-
-    data_type_len = sqlite3PutVarint(zBuf, data_type); 
-    memcpy(&zBuf[data_type_len], pMem->z, pMem->n);
-    return pMem->n + data_type_len;
+  if( flags&MEM_Real ){
+    return 5;
   }
-
-  if( pMem->flags& MEM_Int ){
-    u64 absval;
-    int size = 8;
-    int ii;
-
-    if( pMem->i<0 ){
-      absval = pMem->i * -1;
-    }else{
-      absval = pMem->i;
-    }
-    if( absval<=127 ){
-      size = 1;
-      sqlite3PutVarint(zBuf, 1);
-    }else if( absval<=32767 ){
-      size = 2;
-      sqlite3PutVarint(zBuf, 2);
-    }else if( absval<=2147483647 ){
-      size = 4;
-      sqlite3PutVarint(zBuf, 3);
-    }else{
-      size = 8;
-      sqlite3PutVarint(zBuf, 4);
-    }
-
-    for(ii=0; ii<size; ii++){
-      zBuf[ii+1] = (pMem->i >> (8*ii)) & 0xFF;
-    }
-    if( pMem->i<0 ){
-      zBuf[size] = zBuf[size] & 0x80;
-    }
-
-    return size+1;
+  if( flags&MEM_Str ){
+    return (pMem->n*2 + 13);
   }
-
-  return -1;
+  if( flags&MEM_Blob ){
+    return (pMem->n*2 + 12);
+  }
+  return 0;
 }
 
 /*
-** Return the number of bytes that would be consumed by the serialized
-** form of the value held by pMem. Return negative if an error occurs.
+** Return the length of the data corresponding to the supplied serial-type.
 */
-int sqlite3VdbeSerialLen(const Mem *pMem){
-  if( pMem->flags&MEM_Null ){
-    return 1; /* Varint 0 is 1 byte */
+int sqlite3VdbeSerialTypeLen(u64 serial_type){
+  switch(serial_type){
+    case 0: return 0;                  /* NULL */
+    case 1: return 1;                  /* 1 byte integer */
+    case 2: return 2;                  /* 2 byte integer */
+    case 3: return 4;                  /* 4 byte integer */
+    case 4: return 8;                  /* 8 byte integer */
+    case 5: return 8;                  /* 8 byte float */
   }
-  if( pMem->flags&MEM_Real ){
-    return 9; /* Varing 5 (1 byte) + 8 bytes IEEE float */    
-  }
-  if( pMem->flags&MEM_Str ){
-    return pMem->n + sqlite3VarintLen((pMem->n*2)+13);
-  }
-  if( pMem->flags& MEM_Int ){
-    u64 absval;
-    if( pMem->i<0 ){
-      absval = pMem->i * -1;
-    }else{
-      absval = pMem->i;
-    }
-    if( absval<=127 ) return 2;        /* 1 byte integer */
-    if( absval<=32767 ) return 3;      /* 2 byte integer */
-    if( absval<=2147483647 ) return 5; /* 4 byte integer */
-    return 9;                         /* 8 byte integer */
-  }
-
-  return -1;
+  assert( serial_type>=12 );
+  return ((serial_type-12)>>1);        /* text or blob */
 }
 
 /*
-** Deserialize a value from zBuf and store it in *pMem. Return the number
-** of bytes written, or negative if an error occurs.
-*/
-int sqlite3VdbeDeserialize(
-  Mem *pMem,                   /* structure to write new value to */
-  const unsigned char *zBuf    /* Buffer to read from */
-){
-  u64 data_type;
-  int ret;
+** Write the serialized data blob for the value stored in pMem into 
+** buf. It is assumed that the caller has allocated sufficient space.
+** Return the number of bytes written.
+*/ 
+int sqlite3VdbeSerialPut(unsigned char *buf, const Mem *pMem){
+  u64 serial_type = sqlite3VdbeSerialType(pMem);
+  int len;
+ 
+  /* NULL */
+  if( serial_type==0 ){
+    return 0;
+  }
+ 
+  /* Integer */
+  if( serial_type<5 ){
+    i64 i = pMem->i;
+    len = sqlite3VdbeSerialTypeLen(serial_type);
+    while( len-- ){
+      buf[len] = (i&0xFF);
+      i = i >> 8;
+    }
+    return sqlite3VdbeSerialTypeLen(serial_type);
+  }
+
+  /* Float */
+  if( serial_type==5 ){
+    /* TODO: byte ordering? */
+    assert( sizeof(double)==8 );
+    memcpy(buf, &pMem->r, 8);
+    return 8;
+  }
+  
+  /* String or blob */
+  assert( serial_type>=12 );
+  len = sqlite3VdbeSerialTypeLen(serial_type);
+  memcpy(buf, pMem->z, len);
+  return len;
+}
+
+/*
+** Deserialize the data blob pointed to by buf as serial type serial_type
+** and store the result in pMem.  Return the number of bytes read.
+*/ 
+int sqlite3VdbeSerialGet(const unsigned char *buf, u64 serial_type, Mem *pMem){
   int len;
 
-  memset(pMem, 0, sizeof(Mem));
-  ret = sqlite3GetVarint(zBuf, &data_type);
+  /* memset(pMem, 0, sizeof(pMem)); */
+  pMem->flags = 0;
+  pMem->z = 0;
 
-  if( data_type==0 ){  /* NULL */
+  /* NULL */
+  if( serial_type==0 ){
     pMem->flags = MEM_Null;
-    return ret;
+    return 0;
   }
+ 
+  /* Integer */
+  if( serial_type<5 ){
+    i64 i = 0;
+    int n;
+    len = sqlite3VdbeSerialTypeLen(serial_type);
 
-  /* FIX ME: update for 8-byte integers */
-  if( data_type>0 && data_type<5 ){  /* 1, 2, 4 or 8 byte integer */
-    int ii;
-    int bytes = 1 << (data_type-1);
-
+    if( buf[0]&0x80 ){
+      for(n=0; n<(8-len); n++){
+        i = (i<<8)+0xFF;
+      }
+    }
+    for(n=0; n<len; n++){
+      i = i << 8;
+      i = i + buf[n];
+    }
     pMem->flags = MEM_Int;
-    pMem->i = 0;
-
-    for(ii=0; ii<bytes; ii++){
-      pMem->i = (pMem->i<<8) + zBuf[ii+ret];
-    }
-
-    /* If this is a 1, 2 or 4 byte integer, extend the sign-bit if need be. */
-    if( bytes<8 && pMem->i & (1<<(bytes*8-1)) ){
-      pMem->i = pMem->i - (1<<(bytes*8));
-    }
-
-    return ret+bytes;
+    pMem->i = i;
+    return sqlite3VdbeSerialTypeLen(serial_type);
   }
 
-  if( data_type==5 ){ /* IEEE float */
-    assert(!"TODO: float");
+  /* Float */
+  if( serial_type==5 ){
+    /* TODO: byte ordering? */
+    assert( sizeof(double)==8 );
+    memcpy(&pMem->r, buf, 8);
+    pMem->flags = MEM_Real;
+    return 8;
   }
-
-  /* Must be text or a blob */
-  assert( data_type>=12 );
-  len = (data_type-12)/2;
-  pMem->flags = MEM_Str;  /* FIX ME: there should be a MEM_Blob or similar */
-
-  /* If the length of the text or blob is greater than NBFS, use space
-  ** dynamically allocated. Otherwise, store the value in Mem::zShort.
-  */
+  
+  /* String or blob */
+  assert( serial_type>=12 );
+  if( serial_type&0x01 ){
+    pMem->flags = MEM_Str;
+  }else{
+    pMem->flags = MEM_Blob;
+  }
+  len = sqlite3VdbeSerialTypeLen(serial_type);
+  pMem->n = len;
   if( len>NBFS ){
     pMem->z = sqliteMalloc( len );
     if( !pMem->z ){
@@ -1285,10 +1292,9 @@
     pMem->z = pMem->zShort;
     pMem->flags |= MEM_Short;
   }
-  memcpy(pMem->z, &zBuf[ret], len); 
-  ret += len;
+  memcpy(pMem->z, buf, len); 
 
-  return ret;
+  return len;
 }
 
 /*