blob: a6348611b092f73fb08df6ff8844ccc9e5d9ed3d [file] [log] [blame]
drh23669402006-01-09 17:29:52 +00001/*
2** 2005 December 14
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12**
13** This file contains an example implementation of an asynchronous IO
drhfe0f75b2006-01-10 20:01:18 +000014** backend for SQLite.
15**
16** WHAT IS ASYNCHRONOUS I/O?
17**
18** With asynchronous I/O, write requests are handled by a separate thread
19** running in the background. This means that the thread that initiates
20** a database write does not have to wait for (sometimes slow) disk I/O
21** to occur. The write seems to happen very quickly, though in reality
22** it is happening at its usual slow pace in the background.
23**
24** Asynchronous I/O appears to give better responsiveness, but at a price.
25** You lose the Durable property. With the default I/O backend of SQLite,
26** once a write completes, you know that the information you wrote is
27** safely on disk. With the asynchronous I/O, this is no the case. If
28** your program crashes or if you take a power lose after the database
29** write but before the asynchronous write thread has completed, then the
30** database change might never make it to disk and the next user of the
31** database might not see your change.
32**
33** You lose Durability with asynchronous I/O, but you still retain the
34** other parts of ACID: Atomic, Consistent, and Isolated. Many
35** appliations get along fine without the Durablity.
36**
37** HOW IT WORKS
38**
39** Asynchronous I/O works by overloading the OS-layer disk I/O routines
40** with modified versions that store the data to be written in queue of
41** pending write operations. Look at the asyncEnable() subroutine to see
42** how overloading works. Six os-layer routines are overloaded:
43**
44** sqlite3OsOpenReadWrite;
45** sqlite3OsOpenReadOnly;
46** sqlite3OsOpenExclusive;
47** sqlite3OsDelete;
48** sqlite3OsFileExists;
49** sqlite3OsSyncDirectory;
50**
51** The original implementations of these routines are saved and are
52** used by the writer thread to do the real I/O. The substitute
53** implementations typically put the I/O operation on a queue
54** to be handled later by the writer thread, though read operations
55** must be handled right away, obviously.
56**
57** Asynchronous I/O is disabled by setting the os-layer interface routines
58** back to their original values.
59**
60** LIMITATIONS
61**
62** This demonstration code is deliberately kept simple in order to keep
63** the main ideas clear and easy to understand. Real applications that
64** want to do asynchronous I/O might want to add additional capabilities.
65** For example, in this demonstration if writes are happening at a steady
66** stream that exceeds the I/O capability of the background writer thread,
67** the queue of pending write operations will grow without bound until we
68** run out of memory. Users of this technique may want to keep track of
69** the quantity of pending writes and stop accepting new write requests
70** when the buffer gets to be too big.
drh23669402006-01-09 17:29:52 +000071*/
72
73#include "sqliteInt.h"
74#include "os.h"
75#include <tcl.h>
76
drhfe0f75b2006-01-10 20:01:18 +000077/* If the THREADSAFE macro is not set, assume that it is turned off. */
drh23669402006-01-09 17:29:52 +000078#ifndef THREADSAFE
79# define THREADSAFE 0
80#endif
81
82/*
83** This test uses pthreads and hence only works on unix and with
drhfe0f75b2006-01-10 20:01:18 +000084** a threadsafe build of SQLite. It also requires that the redefinable
85** I/O feature of SQLite be turned on. This feature is turned off by
86** default. If a required element is missing, almost all of the code
87** in this file is commented out.
drh23669402006-01-09 17:29:52 +000088*/
89#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
90
drhfe0f75b2006-01-10 20:01:18 +000091/*
92** This demo uses pthreads. If you do not have a pthreads implementation
93** for your operating system, you will need to recode the threading
94** logic.
95*/
drh23669402006-01-09 17:29:52 +000096#include <pthread.h>
97#include <sched.h>
98
drhfe0f75b2006-01-10 20:01:18 +000099/* Useful macros used in several places */
drh23669402006-01-09 17:29:52 +0000100#define MIN(x,y) ((x)<(y)?(x):(y))
101#define MAX(x,y) ((x)>(y)?(x):(y))
102
drhfe0f75b2006-01-10 20:01:18 +0000103/* Forward references */
drh23669402006-01-09 17:29:52 +0000104typedef struct AsyncWrite AsyncWrite;
105typedef struct AsyncFile AsyncFile;
106
drhfe0f75b2006-01-10 20:01:18 +0000107/* Enable for debugging */
drh99681db2006-02-13 15:29:32 +0000108static int sqlite3async_trace = 0;
109# define TRACE(X) if( sqlite3async_trace ) asyncTrace X
drhfc8748a2006-02-13 14:49:38 +0000110static void asyncTrace(const char *zFormat, ...){
111 char *z;
112 va_list ap;
113 va_start(ap, zFormat);
114 z = sqlite3_vmprintf(zFormat, ap);
115 va_end(ap);
116 fprintf(stderr, "[%d] %s", (int)pthread_self(), z);
danielk197773375822007-03-22 15:20:00 +0000117 sqlite3_free(z);
drhfc8748a2006-02-13 14:49:38 +0000118}
drh23669402006-01-09 17:29:52 +0000119
120/*
drh23669402006-01-09 17:29:52 +0000121** THREAD SAFETY NOTES
122**
123** Basic rules:
124**
125** * Both read and write access to the global write-op queue must be
126** protected by the async.queueMutex.
127**
128** * The file handles from the underlying system are assumed not to
129** be thread safe.
130**
drhfe0f75b2006-01-10 20:01:18 +0000131** * See the last two paragraphs under "The Writer Thread" for
drh23669402006-01-09 17:29:52 +0000132** an assumption to do with file-handle synchronization by the Os.
133**
134** File system operations (invoked by SQLite thread):
135**
136** xOpenXXX (three versions)
137** xDelete
138** xFileExists
139** xSyncDirectory
140**
141** File handle operations (invoked by SQLite thread):
142**
drh23669402006-01-09 17:29:52 +0000143** asyncWrite, asyncClose, asyncTruncate, asyncSync,
144** asyncSetFullSync, asyncOpenDirectory.
145**
drhfe0f75b2006-01-10 20:01:18 +0000146** The operations above add an entry to the global write-op list. They
147** prepare the entry, acquire the async.queueMutex momentarily while
148** list pointers are manipulated to insert the new entry, then release
149** the mutex and signal the writer thread to wake up in case it happens
150** to be asleep.
151**
drh23669402006-01-09 17:29:52 +0000152**
153** asyncRead, asyncFileSize.
drhfe0f75b2006-01-10 20:01:18 +0000154**
155** Read operations. Both of these read from both the underlying file
156** first then adjust their result based on pending writes in the
157** write-op queue. So async.queueMutex is held for the duration
158** of these operations to prevent other threads from changing the
159** queue in mid operation.
160**
161**
162** asyncLock, asyncUnlock, asyncLockState, asyncCheckReservedLock
drh23669402006-01-09 17:29:52 +0000163**
drh89ea9312006-02-13 17:03:47 +0000164** These primitives implement in-process locking using a hash table
165** on the file name. Files are locked correctly for connections coming
166** from the same process. But other processes cannot see these locks
167** and will therefore not honor them.
drhfe0f75b2006-01-10 20:01:18 +0000168**
169**
170** asyncFileHandle.
drh23669402006-01-09 17:29:52 +0000171**
172** The sqlite3OsFileHandle() function is currently only used when
173** debugging the pager module. Unless sqlite3OsClose() is called on the
174** file (shouldn't be possible for other reasons), the underlying
175** implementations are safe to call without grabbing any mutex. So we just
drhfe0f75b2006-01-10 20:01:18 +0000176** go ahead and call it no matter what any other threads are doing.
drh23669402006-01-09 17:29:52 +0000177**
drhfe0f75b2006-01-10 20:01:18 +0000178**
179** asyncSeek.
drh23669402006-01-09 17:29:52 +0000180**
181** Calling this method just manipulates the AsyncFile.iOffset variable.
182** Since this variable is never accessed by writer thread, this
183** function does not require the mutex. Actual calls to OsSeek() take
184** place just before OsWrite() or OsRead(), which are always protected by
185** the mutex.
drh23669402006-01-09 17:29:52 +0000186**
187** The writer thread:
188**
189** The async.writerMutex is used to make sure only there is only
190** a single writer thread running at a time.
191**
192** Inside the writer thread is a loop that works like this:
193**
194** WHILE (write-op list is not empty)
195** Do IO operation at head of write-op list
196** Remove entry from head of write-op list
197** END WHILE
198**
199** The async.queueMutex is always held during the <write-op list is
200** not empty> test, and when the entry is removed from the head
201** of the write-op list. Sometimes it is held for the interim
drhfe0f75b2006-01-10 20:01:18 +0000202** period (while the IO is performed), and sometimes it is
drh23669402006-01-09 17:29:52 +0000203** relinquished. It is relinquished if (a) the IO op is an
204** ASYNC_CLOSE or (b) when the file handle was opened, two of
205** the underlying systems handles were opened on the same
206** file-system entry.
207**
208** If condition (b) above is true, then one file-handle
209** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the
210** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush()
211** threads to perform write() operations. This means that read
212** operations are not blocked by asynchronous writes (although
213** asynchronous writes may still be blocked by reads).
214**
215** This assumes that the OS keeps two handles open on the same file
216** properly in sync. That is, any read operation that starts after a
217** write operation on the same file system entry has completed returns
218** data consistent with the write. We also assume that if one thread
219** reads a file while another is writing it all bytes other than the
220** ones actually being written contain valid data.
221**
222** If the above assumptions are not true, set the preprocessor symbol
223** SQLITE_ASYNC_TWO_FILEHANDLES to 0.
224*/
225
226#ifndef SQLITE_ASYNC_TWO_FILEHANDLES
227/* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */
228#define SQLITE_ASYNC_TWO_FILEHANDLES 1
229#endif
230
231/*
232** State information is held in the static variable "async" defined
233** as follows:
234*/
235static struct TestAsyncStaticData {
236 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */
237 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */
drh89ea9312006-02-13 17:03:47 +0000238 pthread_mutex_t lockMutex; /* For access to aLock hash table */
drh23669402006-01-09 17:29:52 +0000239 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */
240 pthread_cond_t emptySignal; /* Notify when the write queue is empty */
241 AsyncWrite *pQueueFirst; /* Next write operation to be processed */
242 AsyncWrite *pQueueLast; /* Last write operation on the list */
drh89ea9312006-02-13 17:03:47 +0000243 Hash aLock; /* Files locked */
drh23669402006-01-09 17:29:52 +0000244 volatile int ioDelay; /* Extra delay between write operations */
245 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */
246 volatile int writerHaltNow; /* Writer thread halts after next op */
danielk1977be29bfc2006-02-14 13:25:43 +0000247 int ioError; /* True if an IO error has occured */
248 int nFile; /* Number of open files (from sqlite pov) */
drh23669402006-01-09 17:29:52 +0000249} async = {
250 PTHREAD_MUTEX_INITIALIZER,
251 PTHREAD_MUTEX_INITIALIZER,
drh89ea9312006-02-13 17:03:47 +0000252 PTHREAD_MUTEX_INITIALIZER,
drh23669402006-01-09 17:29:52 +0000253 PTHREAD_COND_INITIALIZER,
254 PTHREAD_COND_INITIALIZER,
255};
256
257/* Possible values of AsyncWrite.op */
drh4b74b262006-02-13 13:50:55 +0000258#define ASYNC_NOOP 0
drh23669402006-01-09 17:29:52 +0000259#define ASYNC_WRITE 1
260#define ASYNC_SYNC 2
261#define ASYNC_TRUNCATE 3
262#define ASYNC_CLOSE 4
263#define ASYNC_OPENDIRECTORY 5
264#define ASYNC_SETFULLSYNC 6
drh23669402006-01-09 17:29:52 +0000265#define ASYNC_DELETE 7
266#define ASYNC_OPENEXCLUSIVE 8
267#define ASYNC_SYNCDIRECTORY 9
268
drh99681db2006-02-13 15:29:32 +0000269/* Names of opcodes. Used for debugging only.
270** Make sure these stay in sync with the macros above!
271*/
272static const char *azOpcodeName[] = {
273 "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE",
drh89ea9312006-02-13 17:03:47 +0000274 "OPENDIR", "SETFULLSYNC", "DELETE", "OPENEX", "SYNCDIR",
drh99681db2006-02-13 15:29:32 +0000275};
276
drh23669402006-01-09 17:29:52 +0000277/*
drhfe0f75b2006-01-10 20:01:18 +0000278** Entries on the write-op queue are instances of the AsyncWrite
279** structure, defined here.
280**
drh23669402006-01-09 17:29:52 +0000281** The interpretation of the iOffset and nByte variables varies depending
282** on the value of AsyncWrite.op:
283**
284** ASYNC_WRITE:
285** iOffset -> Offset in file to write to.
286** nByte -> Number of bytes of data to write (pointed to by zBuf).
287**
288** ASYNC_SYNC:
289** iOffset -> Unused.
290** nByte -> Value of "fullsync" flag to pass to sqlite3OsSync().
291**
292** ASYNC_TRUNCATE:
293** iOffset -> Size to truncate file to.
294** nByte -> Unused.
295**
296** ASYNC_CLOSE:
297** iOffset -> Unused.
298** nByte -> Unused.
299**
300** ASYNC_OPENDIRECTORY:
301** iOffset -> Unused.
302** nByte -> Number of bytes of zBuf points to (directory name).
303**
304** ASYNC_SETFULLSYNC:
305** iOffset -> Unused.
306** nByte -> New value for the full-sync flag.
307**
308**
309** ASYNC_DELETE:
310** iOffset -> Unused.
311** nByte -> Number of bytes of zBuf points to (file name).
312**
313** ASYNC_OPENEXCLUSIVE:
314** iOffset -> Value of "delflag".
315** nByte -> Number of bytes of zBuf points to (file name).
316**
drh89ea9312006-02-13 17:03:47 +0000317**
drh23669402006-01-09 17:29:52 +0000318** For an ASYNC_WRITE operation, zBuf points to the data to write to the file.
319** This space is sqliteMalloc()d along with the AsyncWrite structure in a
320** single blob, so is deleted when sqliteFree() is called on the parent
321** structure.
322*/
323struct AsyncWrite {
324 AsyncFile *pFile; /* File to write data to or sync */
325 int op; /* One of ASYNC_xxx etc. */
326 i64 iOffset; /* See above */
327 int nByte; /* See above */
328 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */
329 AsyncWrite *pNext; /* Next write operation (to any file) */
330};
331
332/*
333** The AsyncFile structure is a subclass of OsFile used for asynchronous IO.
334*/
335struct AsyncFile {
336 IoMethod *pMethod; /* Must be first */
drhfc8748a2006-02-13 14:49:38 +0000337 i64 iOffset; /* Current seek() offset in file */
drh89ea9312006-02-13 17:03:47 +0000338 char *zName; /* Underlying OS filename - used for debugging */
339 int nName; /* Number of characters in zName */
drh23669402006-01-09 17:29:52 +0000340 OsFile *pBaseRead; /* Read handle to the underlying Os file */
341 OsFile *pBaseWrite; /* Write handle to the underlying Os file */
342};
343
344/*
345** Add an entry to the end of the global write-op list. pWrite should point
drhfe0f75b2006-01-10 20:01:18 +0000346** to an AsyncWrite structure allocated using sqlite3OsMalloc(). The writer
347** thread will call sqlite3OsFree() to free the structure after the specified
348** operation has been completed.
drh23669402006-01-09 17:29:52 +0000349**
drhfe0f75b2006-01-10 20:01:18 +0000350** Once an AsyncWrite structure has been added to the list, it becomes the
351** property of the writer thread and must not be read or modified by the
352** caller.
drh23669402006-01-09 17:29:52 +0000353*/
354static void addAsyncWrite(AsyncWrite *pWrite){
drhfe0f75b2006-01-10 20:01:18 +0000355 /* We must hold the queue mutex in order to modify the queue pointers */
drh23669402006-01-09 17:29:52 +0000356 pthread_mutex_lock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000357
358 /* Add the record to the end of the write-op queue */
drh23669402006-01-09 17:29:52 +0000359 assert( !pWrite->pNext );
360 if( async.pQueueLast ){
361 assert( async.pQueueFirst );
362 async.pQueueLast->pNext = pWrite;
363 }else{
364 async.pQueueFirst = pWrite;
365 }
366 async.pQueueLast = pWrite;
danielk1977bf623f22006-02-14 13:48:33 +0000367 TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op],
368 pWrite->pFile ? pWrite->pFile->zName : "-", pWrite->iOffset));
drhfe0f75b2006-01-10 20:01:18 +0000369
danielk1977be29bfc2006-02-14 13:25:43 +0000370 if( pWrite->op==ASYNC_CLOSE ){
371 async.nFile--;
372 if( async.nFile==0 ){
373 async.ioError = SQLITE_OK;
374 }
375 }
376
drhfe0f75b2006-01-10 20:01:18 +0000377 /* Drop the queue mutex */
drh23669402006-01-09 17:29:52 +0000378 pthread_mutex_unlock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000379
380 /* The writer thread might have been idle because there was nothing
381 ** on the write-op queue for it to do. So wake it up. */
drh23669402006-01-09 17:29:52 +0000382 pthread_cond_signal(&async.queueSignal);
383}
384
385/*
danielk1977be29bfc2006-02-14 13:25:43 +0000386** Increment async.nFile in a thread-safe manner.
387*/
388static void incrOpenFileCount(){
389 /* We must hold the queue mutex in order to modify async.nFile */
390 pthread_mutex_lock(&async.queueMutex);
391 if( async.nFile==0 ){
392 async.ioError = SQLITE_OK;
393 }
394 async.nFile++;
395 pthread_mutex_unlock(&async.queueMutex);
396}
397
398/*
drh23669402006-01-09 17:29:52 +0000399** This is a utility function to allocate and populate a new AsyncWrite
400** structure and insert it (via addAsyncWrite() ) into the global list.
401*/
402static int addNewAsyncWrite(
403 AsyncFile *pFile,
404 int op,
405 i64 iOffset,
406 int nByte,
407 const char *zByte
408){
drh4b74b262006-02-13 13:50:55 +0000409 AsyncWrite *p;
danielk1977be29bfc2006-02-14 13:25:43 +0000410 if( op!=ASYNC_CLOSE && async.ioError ){
411 return async.ioError;
drh4b74b262006-02-13 13:50:55 +0000412 }
413 p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0));
drh23669402006-01-09 17:29:52 +0000414 if( !p ){
415 return SQLITE_NOMEM;
416 }
417 p->op = op;
418 p->iOffset = iOffset;
419 p->nByte = nByte;
420 p->pFile = pFile;
421 p->pNext = 0;
422 if( zByte ){
423 p->zBuf = (char *)&p[1];
424 memcpy(p->zBuf, zByte, nByte);
425 }else{
426 p->zBuf = 0;
427 }
428 addAsyncWrite(p);
429 return SQLITE_OK;
430}
431
432/*
433** Close the file. This just adds an entry to the write-op list, the file is
434** not actually closed.
435*/
436static int asyncClose(OsFile **pId){
437 return addNewAsyncWrite((AsyncFile *)*pId, ASYNC_CLOSE, 0, 0, 0);
438}
439
440/*
441** Implementation of sqlite3OsWrite() for asynchronous files. Instead of
442** writing to the underlying file, this function adds an entry to the end of
443** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be
444** returned.
445*/
446static int asyncWrite(OsFile *id, const void *pBuf, int amt){
447 AsyncFile *pFile = (AsyncFile *)id;
448 int rc = addNewAsyncWrite(pFile, ASYNC_WRITE, pFile->iOffset, amt, pBuf);
449 pFile->iOffset += (i64)amt;
450 return rc;
451}
452
453/*
454** Truncate the file to nByte bytes in length. This just adds an entry to
455** the write-op list, no IO actually takes place.
456*/
457static int asyncTruncate(OsFile *id, i64 nByte){
458 return addNewAsyncWrite((AsyncFile *)id, ASYNC_TRUNCATE, nByte, 0, 0);
459}
460
461/*
462** Open the directory identified by zName and associate it with the
463** specified file. This just adds an entry to the write-op list, the
464** directory is opened later by sqlite3_async_flush().
465*/
466static int asyncOpenDirectory(OsFile *id, const char *zName){
467 AsyncFile *pFile = (AsyncFile *)id;
468 return addNewAsyncWrite(pFile, ASYNC_OPENDIRECTORY, 0, strlen(zName)+1,zName);
469}
470
471/*
472** Sync the file. This just adds an entry to the write-op list, the
473** sync() is done later by sqlite3_async_flush().
474*/
475static int asyncSync(OsFile *id, int fullsync){
476 return addNewAsyncWrite((AsyncFile *)id, ASYNC_SYNC, 0, fullsync, 0);
477}
478
479/*
480** Set (or clear) the full-sync flag on the underlying file. This operation
481** is queued and performed later by sqlite3_async_flush().
482*/
483static void asyncSetFullSync(OsFile *id, int value){
484 addNewAsyncWrite((AsyncFile *)id, ASYNC_SETFULLSYNC, 0, value, 0);
485}
486
487/*
488** Read data from the file. First we read from the filesystem, then adjust
489** the contents of the buffer based on ASYNC_WRITE operations in the
drhfe0f75b2006-01-10 20:01:18 +0000490** write-op queue.
drh23669402006-01-09 17:29:52 +0000491**
492** This method holds the mutex from start to finish.
493*/
494static int asyncRead(OsFile *id, void *obuf, int amt){
495 int rc = SQLITE_OK;
496 i64 filesize;
497 int nRead;
498 AsyncFile *pFile = (AsyncFile *)id;
danielk1977750b03e2006-02-14 10:48:39 +0000499 OsFile *pBase = pFile->pBaseRead;
500
drh4b74b262006-02-13 13:50:55 +0000501 /* If an I/O error has previously occurred on this file, then all
502 ** subsequent operations fail.
503 */
danielk1977be29bfc2006-02-14 13:25:43 +0000504 if( async.ioError!=SQLITE_OK ){
505 return async.ioError;
drh4b74b262006-02-13 13:50:55 +0000506 }
507
drh23669402006-01-09 17:29:52 +0000508 /* Grab the write queue mutex for the duration of the call */
509 pthread_mutex_lock(&async.queueMutex);
510
danielk1977750b03e2006-02-14 10:48:39 +0000511 if( pBase ){
512 rc = sqlite3OsFileSize(pBase, &filesize);
drh23669402006-01-09 17:29:52 +0000513 if( rc!=SQLITE_OK ){
514 goto asyncread_out;
515 }
danielk1977750b03e2006-02-14 10:48:39 +0000516 rc = sqlite3OsSeek(pBase, pFile->iOffset);
drh23669402006-01-09 17:29:52 +0000517 if( rc!=SQLITE_OK ){
518 goto asyncread_out;
519 }
520 nRead = MIN(filesize - pFile->iOffset, amt);
521 if( nRead>0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000522 rc = sqlite3OsRead(pBase, obuf, nRead);
drh97bbdc02006-02-13 18:35:06 +0000523 TRACE(("READ %s %d bytes at %d\n", pFile->zName, nRead, pFile->iOffset));
drh23669402006-01-09 17:29:52 +0000524 }
525 }
526
527 if( rc==SQLITE_OK ){
528 AsyncWrite *p;
529 i64 iOffset = pFile->iOffset; /* Current seek offset */
530
531 for(p=async.pQueueFirst; p; p = p->pNext){
532 if( p->pFile==pFile && p->op==ASYNC_WRITE ){
drh44528382006-02-13 13:30:19 +0000533 int iBeginOut = (p->iOffset - iOffset);
534 int iBeginIn = -iBeginOut;
drh23669402006-01-09 17:29:52 +0000535 int nCopy;
536
537 if( iBeginIn<0 ) iBeginIn = 0;
538 if( iBeginOut<0 ) iBeginOut = 0;
539 nCopy = MIN(p->nByte-iBeginIn, amt-iBeginOut);
540
541 if( nCopy>0 ){
542 memcpy(&((char *)obuf)[iBeginOut], &p->zBuf[iBeginIn], nCopy);
drh97bbdc02006-02-13 18:35:06 +0000543 TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset));
drh23669402006-01-09 17:29:52 +0000544 }
545 }
546 }
547
548 pFile->iOffset += (i64)amt;
549 }
550
551asyncread_out:
552 pthread_mutex_unlock(&async.queueMutex);
553 return rc;
554}
555
556/*
557** Seek to the specified offset. This just adjusts the AsyncFile.iOffset
558** variable - calling seek() on the underlying file is defered until the
559** next read() or write() operation.
560*/
561static int asyncSeek(OsFile *id, i64 offset){
562 AsyncFile *pFile = (AsyncFile *)id;
563 pFile->iOffset = offset;
564 return SQLITE_OK;
565}
566
567/*
568** Read the size of the file. First we read the size of the file system
569** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations
570** currently in the write-op list.
571**
572** This method holds the mutex from start to finish.
573*/
574int asyncFileSize(OsFile *id, i64 *pSize){
575 int rc = SQLITE_OK;
576 i64 s = 0;
577 OsFile *pBase;
578
579 pthread_mutex_lock(&async.queueMutex);
580
581 /* Read the filesystem size from the base file. If pBaseRead is NULL, this
582 ** means the file hasn't been opened yet. In this case all relevant data
583 ** must be in the write-op queue anyway, so we can omit reading from the
584 ** file-system.
585 */
586 pBase = ((AsyncFile *)id)->pBaseRead;
587 if( pBase ){
588 rc = sqlite3OsFileSize(pBase, &s);
589 }
590
591 if( rc==SQLITE_OK ){
592 AsyncWrite *p;
593 for(p=async.pQueueFirst; p; p = p->pNext){
594 if( p->pFile==(AsyncFile *)id ){
595 switch( p->op ){
596 case ASYNC_WRITE:
597 s = MAX(p->iOffset + (i64)(p->nByte), s);
598 break;
599 case ASYNC_TRUNCATE:
danielk1977bf623f22006-02-14 13:48:33 +0000600 s = MIN(s, p->iOffset);
drh23669402006-01-09 17:29:52 +0000601 break;
602 }
603 }
604 }
605 *pSize = s;
606 }
607 pthread_mutex_unlock(&async.queueMutex);
608 return rc;
609}
610
611/*
612** Return the operating system file handle. This is only used for debugging
613** at the moment anyway.
614*/
615static int asyncFileHandle(OsFile *id){
616 return sqlite3OsFileHandle(((AsyncFile *)id)->pBaseRead);
617}
618
drhfe0f75b2006-01-10 20:01:18 +0000619/*
drh89ea9312006-02-13 17:03:47 +0000620** No disk locking is performed. We keep track of locks locally in
621** the async.aLock hash table. Locking should appear to work the same
622** as with standard (unmodified) SQLite as long as all connections
623** come from this one process. Connections from external processes
624** cannot see our internal hash table (obviously) and will thus not
625** honor our locks.
drhfe0f75b2006-01-10 20:01:18 +0000626*/
drh23669402006-01-09 17:29:52 +0000627static int asyncLock(OsFile *id, int lockType){
drh89ea9312006-02-13 17:03:47 +0000628 AsyncFile *pFile = (AsyncFile*)id;
629 TRACE(("LOCK %d (%s)\n", lockType, pFile->zName));
630 pthread_mutex_lock(&async.lockMutex);
631 sqlite3HashInsert(&async.aLock, pFile->zName, pFile->nName, (void*)lockType);
632 pthread_mutex_unlock(&async.lockMutex);
drh23669402006-01-09 17:29:52 +0000633 return SQLITE_OK;
634}
635static int asyncUnlock(OsFile *id, int lockType){
drh89ea9312006-02-13 17:03:47 +0000636 return asyncLock(id, lockType);
drh23669402006-01-09 17:29:52 +0000637}
638
639/*
640** This function is called when the pager layer first opens a database file
641** and is checking for a hot-journal.
642*/
643static int asyncCheckReservedLock(OsFile *id){
drh89ea9312006-02-13 17:03:47 +0000644 AsyncFile *pFile = (AsyncFile*)id;
645 int rc;
646 pthread_mutex_lock(&async.lockMutex);
647 rc = (int)sqlite3HashFind(&async.aLock, pFile->zName, pFile->nName);
648 pthread_mutex_unlock(&async.lockMutex);
649 TRACE(("CHECK-LOCK %d (%s)\n", rc, pFile->zName));
drh97bbdc02006-02-13 18:35:06 +0000650 return rc>SHARED_LOCK;
drh23669402006-01-09 17:29:52 +0000651}
652
danielk1977b4721172007-03-19 05:54:48 +0000653static int asyncSectorSize(OsFile *id){
654 /* TODO: This is tricky to implement, as this backend might not have
655 ** an open file handle at this point.
656 */
657 return 512;
658}
659
drh23669402006-01-09 17:29:52 +0000660/*
661** This is broken. But sqlite3OsLockState() is only used for testing anyway.
662*/
663static int asyncLockState(OsFile *id){
664 return SQLITE_OK;
665}
666
667/*
668** The following variables hold pointers to the original versions of
drhfe0f75b2006-01-10 20:01:18 +0000669** OS-layer interface routines that are overloaded in order to create
670** the asynchronous I/O backend.
drh23669402006-01-09 17:29:52 +0000671*/
672static int (*xOrigOpenReadWrite)(const char*, OsFile**, int*) = 0;
673static int (*xOrigOpenExclusive)(const char*, OsFile**, int) = 0;
674static int (*xOrigOpenReadOnly)(const char*, OsFile**) = 0;
675static int (*xOrigDelete)(const char*) = 0;
676static int (*xOrigFileExists)(const char*) = 0;
677static int (*xOrigSyncDirectory)(const char*) = 0;
678
drhfe0f75b2006-01-10 20:01:18 +0000679/*
680** This routine does most of the work of opening a file and building
681** the OsFile structure.
682*/
drh23669402006-01-09 17:29:52 +0000683static int asyncOpenFile(
drhfe0f75b2006-01-10 20:01:18 +0000684 const char *zName, /* The name of the file to be opened */
685 OsFile **pFile, /* Put the OsFile structure here */
686 OsFile *pBaseRead, /* The real OsFile from the real I/O routine */
687 int openForWriting /* Open a second file handle for writing if true */
drh23669402006-01-09 17:29:52 +0000688){
drh89ea9312006-02-13 17:03:47 +0000689 int rc, i, n;
drh23669402006-01-09 17:29:52 +0000690 AsyncFile *p;
691 OsFile *pBaseWrite = 0;
692
693 static IoMethod iomethod = {
694 asyncClose,
695 asyncOpenDirectory,
696 asyncRead,
697 asyncWrite,
698 asyncSeek,
699 asyncTruncate,
700 asyncSync,
701 asyncSetFullSync,
702 asyncFileHandle,
703 asyncFileSize,
704 asyncLock,
705 asyncUnlock,
706 asyncLockState,
danielk1977b4721172007-03-19 05:54:48 +0000707 asyncCheckReservedLock,
708 asyncSectorSize,
drh23669402006-01-09 17:29:52 +0000709 };
710
drhfe0f75b2006-01-10 20:01:18 +0000711 if( openForWriting && SQLITE_ASYNC_TWO_FILEHANDLES ){
drh23669402006-01-09 17:29:52 +0000712 int dummy;
713 rc = xOrigOpenReadWrite(zName, &pBaseWrite, &dummy);
714 if( rc!=SQLITE_OK ){
715 goto error_out;
716 }
717 }
718
drh89ea9312006-02-13 17:03:47 +0000719 n = strlen(zName);
720 for(i=n-1; i>=0 && zName[i]!='/'; i--){}
721 p = (AsyncFile *)sqlite3OsMalloc(sizeof(AsyncFile) + n - i);
drh23669402006-01-09 17:29:52 +0000722 if( !p ){
723 rc = SQLITE_NOMEM;
724 goto error_out;
725 }
726 memset(p, 0, sizeof(AsyncFile));
drh89ea9312006-02-13 17:03:47 +0000727 p->zName = (char*)&p[1];
728 strcpy(p->zName, &zName[i+1]);
729 p->nName = n - i;
drh23669402006-01-09 17:29:52 +0000730 p->pMethod = &iomethod;
731 p->pBaseRead = pBaseRead;
732 p->pBaseWrite = pBaseWrite;
733
734 *pFile = (OsFile *)p;
735 return SQLITE_OK;
736
737error_out:
738 assert(!p);
739 sqlite3OsClose(&pBaseRead);
740 sqlite3OsClose(&pBaseWrite);
741 *pFile = 0;
742 return rc;
743}
744
745/*
746** The async-IO backends implementation of the three functions used to open
747** a file (xOpenExclusive, xOpenReadWrite and xOpenReadOnly). Most of the
748** work is done in function asyncOpenFile() - see above.
749*/
750static int asyncOpenExclusive(const char *z, OsFile **ppFile, int delFlag){
751 int rc = asyncOpenFile(z, ppFile, 0, 0);
752 if( rc==SQLITE_OK ){
753 AsyncFile *pFile = (AsyncFile *)(*ppFile);
754 int nByte = strlen(z)+1;
755 i64 i = (i64)(delFlag);
756 rc = addNewAsyncWrite(pFile, ASYNC_OPENEXCLUSIVE, i, nByte, z);
757 if( rc!=SQLITE_OK ){
758 sqlite3OsFree(pFile);
759 *ppFile = 0;
760 }
761 }
danielk1977be29bfc2006-02-14 13:25:43 +0000762 if( rc==SQLITE_OK ){
763 incrOpenFileCount();
764 }
drh23669402006-01-09 17:29:52 +0000765 return rc;
766}
767static int asyncOpenReadOnly(const char *z, OsFile **ppFile){
768 OsFile *pBase = 0;
769 int rc = xOrigOpenReadOnly(z, &pBase);
770 if( rc==SQLITE_OK ){
771 rc = asyncOpenFile(z, ppFile, pBase, 0);
772 }
danielk1977be29bfc2006-02-14 13:25:43 +0000773 if( rc==SQLITE_OK ){
774 incrOpenFileCount();
775 }
drh23669402006-01-09 17:29:52 +0000776 return rc;
777}
778static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){
779 OsFile *pBase = 0;
780 int rc = xOrigOpenReadWrite(z, &pBase, pReadOnly);
781 if( rc==SQLITE_OK ){
782 rc = asyncOpenFile(z, ppFile, pBase, (*pReadOnly ? 0 : 1));
783 }
danielk1977be29bfc2006-02-14 13:25:43 +0000784 if( rc==SQLITE_OK ){
785 incrOpenFileCount();
786 }
drh23669402006-01-09 17:29:52 +0000787 return rc;
788}
789
790/*
791** Implementation of sqlite3OsDelete. Add an entry to the end of the
792** write-op queue to perform the delete.
793*/
794static int asyncDelete(const char *z){
795 return addNewAsyncWrite(0, ASYNC_DELETE, 0, strlen(z)+1, z);
796}
797
798/*
799** Implementation of sqlite3OsSyncDirectory. Add an entry to the end of the
800** write-op queue to perform the directory sync.
801*/
802static int asyncSyncDirectory(const char *z){
803 return addNewAsyncWrite(0, ASYNC_SYNCDIRECTORY, 0, strlen(z)+1, z);
804}
805
806/*
807** Implementation of sqlite3OsFileExists. Return true if file 'z' exists
808** in the file system.
809**
810** This method holds the mutex from start to finish.
811*/
812static int asyncFileExists(const char *z){
813 int ret;
814 AsyncWrite *p;
815
816 pthread_mutex_lock(&async.queueMutex);
817
818 /* See if the real file system contains the specified file. */
819 ret = xOrigFileExists(z);
820
821 for(p=async.pQueueFirst; p; p = p->pNext){
822 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, z) ){
823 ret = 0;
824 }else if( p->op==ASYNC_OPENEXCLUSIVE && 0==strcmp(p->zBuf, z) ){
825 ret = 1;
826 }
827 }
828
drh89ea9312006-02-13 17:03:47 +0000829 TRACE(("EXISTS: %s = %d\n", z, ret));
drh23669402006-01-09 17:29:52 +0000830 pthread_mutex_unlock(&async.queueMutex);
831 return ret;
832}
833
834/*
835** Call this routine to enable or disable the
836** asynchronous IO features implemented in this file.
837**
838** This routine is not even remotely threadsafe. Do not call
839** this routine while any SQLite database connections are open.
840*/
841static void asyncEnable(int enable){
842 if( enable && xOrigOpenReadWrite==0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000843 assert(sqlite3Os.xOpenReadWrite);
drh89ea9312006-02-13 17:03:47 +0000844 sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1);
drh23669402006-01-09 17:29:52 +0000845 xOrigOpenReadWrite = sqlite3Os.xOpenReadWrite;
846 xOrigOpenReadOnly = sqlite3Os.xOpenReadOnly;
847 xOrigOpenExclusive = sqlite3Os.xOpenExclusive;
848 xOrigDelete = sqlite3Os.xDelete;
849 xOrigFileExists = sqlite3Os.xFileExists;
850 xOrigSyncDirectory = sqlite3Os.xSyncDirectory;
851
852 sqlite3Os.xOpenReadWrite = asyncOpenReadWrite;
853 sqlite3Os.xOpenReadOnly = asyncOpenReadOnly;
854 sqlite3Os.xOpenExclusive = asyncOpenExclusive;
855 sqlite3Os.xDelete = asyncDelete;
856 sqlite3Os.xFileExists = asyncFileExists;
857 sqlite3Os.xSyncDirectory = asyncSyncDirectory;
danielk1977750b03e2006-02-14 10:48:39 +0000858 assert(sqlite3Os.xOpenReadWrite);
drh23669402006-01-09 17:29:52 +0000859 }
860 if( !enable && xOrigOpenReadWrite!=0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000861 assert(sqlite3Os.xOpenReadWrite);
drh89ea9312006-02-13 17:03:47 +0000862 sqlite3HashClear(&async.aLock);
drh23669402006-01-09 17:29:52 +0000863 sqlite3Os.xOpenReadWrite = xOrigOpenReadWrite;
864 sqlite3Os.xOpenReadOnly = xOrigOpenReadOnly;
865 sqlite3Os.xOpenExclusive = xOrigOpenExclusive;
866 sqlite3Os.xDelete = xOrigDelete;
867 sqlite3Os.xFileExists = xOrigFileExists;
868 sqlite3Os.xSyncDirectory = xOrigSyncDirectory;
869
870 xOrigOpenReadWrite = 0;
871 xOrigOpenReadOnly = 0;
872 xOrigOpenExclusive = 0;
873 xOrigDelete = 0;
874 xOrigFileExists = 0;
875 xOrigSyncDirectory = 0;
danielk1977750b03e2006-02-14 10:48:39 +0000876 assert(sqlite3Os.xOpenReadWrite);
drh23669402006-01-09 17:29:52 +0000877 }
878}
879
880/*
881** This procedure runs in a separate thread, reading messages off of the
882** write queue and processing them one by one.
883**
884** If async.writerHaltNow is true, then this procedure exits
885** after processing a single message.
886**
887** If async.writerHaltWhenIdle is true, then this procedure exits when
888** the write queue is empty.
889**
890** If both of the above variables are false, this procedure runs
891** indefinately, waiting for operations to be added to the write queue
892** and processing them in the order in which they arrive.
893**
894** An artifical delay of async.ioDelay milliseconds is inserted before
895** each write operation in order to simulate the effect of a slow disk.
896**
897** Only one instance of this procedure may be running at a time.
898*/
899static void *asyncWriterThread(void *NotUsed){
900 AsyncWrite *p = 0;
901 int rc = SQLITE_OK;
danielk1977be29bfc2006-02-14 13:25:43 +0000902 int holdingMutex = 0;
drh23669402006-01-09 17:29:52 +0000903
904 if( pthread_mutex_trylock(&async.writerMutex) ){
905 return 0;
906 }
907 while( async.writerHaltNow==0 ){
drh23669402006-01-09 17:29:52 +0000908 OsFile *pBase = 0;
909
danielk1977be29bfc2006-02-14 13:25:43 +0000910 if( !holdingMutex ){
911 pthread_mutex_lock(&async.queueMutex);
912 }
drh23669402006-01-09 17:29:52 +0000913 while( (p = async.pQueueFirst)==0 ){
914 pthread_cond_broadcast(&async.emptySignal);
915 if( async.writerHaltWhenIdle ){
916 pthread_mutex_unlock(&async.queueMutex);
917 break;
918 }else{
drhfc8748a2006-02-13 14:49:38 +0000919 TRACE(("IDLE\n"));
drh23669402006-01-09 17:29:52 +0000920 pthread_cond_wait(&async.queueSignal, &async.queueMutex);
drhfc8748a2006-02-13 14:49:38 +0000921 TRACE(("WAKEUP\n"));
drh23669402006-01-09 17:29:52 +0000922 }
923 }
924 if( p==0 ) break;
danielk1977be29bfc2006-02-14 13:25:43 +0000925 holdingMutex = 1;
drh23669402006-01-09 17:29:52 +0000926
927 /* Right now this thread is holding the mutex on the write-op queue.
928 ** Variable 'p' points to the first entry in the write-op queue. In
929 ** the general case, we hold on to the mutex for the entire body of
930 ** the loop.
931 **
932 ** However in the cases enumerated below, we relinquish the mutex,
933 ** perform the IO, and then re-request the mutex before removing 'p' from
934 ** the head of the write-op queue. The idea is to increase concurrency with
935 ** sqlite threads.
936 **
937 ** * An ASYNC_CLOSE operation.
938 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish
939 ** the mutex, call the underlying xOpenExclusive() function, then
940 ** re-aquire the mutex before seting the AsyncFile.pBaseRead
941 ** variable.
942 ** * ASYNC_SYNC and ASYNC_WRITE operations, if
943 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two
944 ** file-handles are open for the particular file being "synced".
945 */
danielk1977be29bfc2006-02-14 13:25:43 +0000946 if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){
947 p->op = ASYNC_NOOP;
948 }
drh23669402006-01-09 17:29:52 +0000949 if( p->pFile ){
950 pBase = p->pFile->pBaseWrite;
951 if(
952 p->op==ASYNC_CLOSE ||
953 p->op==ASYNC_OPENEXCLUSIVE ||
954 (pBase && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) )
955 ){
956 pthread_mutex_unlock(&async.queueMutex);
957 holdingMutex = 0;
958 }
959 if( !pBase ){
960 pBase = p->pFile->pBaseRead;
961 }
962 }
963
964 switch( p->op ){
drh4b74b262006-02-13 13:50:55 +0000965 case ASYNC_NOOP:
966 break;
967
drh23669402006-01-09 17:29:52 +0000968 case ASYNC_WRITE:
969 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000970 TRACE(("WRITE %s %d bytes at %d\n",
971 p->pFile->zName, p->nByte, p->iOffset));
drh23669402006-01-09 17:29:52 +0000972 rc = sqlite3OsSeek(pBase, p->iOffset);
973 if( rc==SQLITE_OK ){
974 rc = sqlite3OsWrite(pBase, (const void *)(p->zBuf), p->nByte);
975 }
976 break;
977
978 case ASYNC_SYNC:
979 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000980 TRACE(("SYNC %s\n", p->pFile->zName));
drh23669402006-01-09 17:29:52 +0000981 rc = sqlite3OsSync(pBase, p->nByte);
982 break;
983
984 case ASYNC_TRUNCATE:
985 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000986 TRACE(("TRUNCATE %s to %d bytes\n", p->pFile->zName, p->iOffset));
987 rc = sqlite3OsTruncate(pBase, p->iOffset);
drh23669402006-01-09 17:29:52 +0000988 break;
989
990 case ASYNC_CLOSE:
drh97bbdc02006-02-13 18:35:06 +0000991 TRACE(("CLOSE %s\n", p->pFile->zName));
drh23669402006-01-09 17:29:52 +0000992 sqlite3OsClose(&p->pFile->pBaseWrite);
danielk1977750b03e2006-02-14 10:48:39 +0000993 sqlite3OsClose(&p->pFile->pBaseRead);
drh23669402006-01-09 17:29:52 +0000994 sqlite3OsFree(p->pFile);
995 break;
996
997 case ASYNC_OPENDIRECTORY:
998 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000999 TRACE(("OPENDIR %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +00001000 sqlite3OsOpenDirectory(pBase, p->zBuf);
1001 break;
1002
1003 case ASYNC_SETFULLSYNC:
1004 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +00001005 TRACE(("SETFULLSYNC %s %d\n", p->pFile->zName, p->nByte));
drh23669402006-01-09 17:29:52 +00001006 sqlite3OsSetFullSync(pBase, p->nByte);
1007 break;
1008
1009 case ASYNC_DELETE:
drh97bbdc02006-02-13 18:35:06 +00001010 TRACE(("DELETE %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +00001011 rc = xOrigDelete(p->zBuf);
1012 break;
1013
1014 case ASYNC_SYNCDIRECTORY:
drh97bbdc02006-02-13 18:35:06 +00001015 TRACE(("SYNCDIR %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +00001016 rc = xOrigSyncDirectory(p->zBuf);
1017 break;
1018
1019 case ASYNC_OPENEXCLUSIVE: {
1020 AsyncFile *pFile = p->pFile;
1021 int delFlag = ((p->iOffset)?1:0);
1022 OsFile *pBase = 0;
drh97bbdc02006-02-13 18:35:06 +00001023 TRACE(("OPEN %s delFlag=%d\n", p->zBuf, delFlag));
danielk1977750b03e2006-02-14 10:48:39 +00001024 assert(pFile->pBaseRead==0 && pFile->pBaseWrite==0);
drh23669402006-01-09 17:29:52 +00001025 rc = xOrigOpenExclusive(p->zBuf, &pBase, delFlag);
1026 assert( holdingMutex==0 );
1027 pthread_mutex_lock(&async.queueMutex);
1028 holdingMutex = 1;
1029 if( rc==SQLITE_OK ){
danielk197716825cb2006-02-14 14:46:41 +00001030 pFile->pBaseRead = pBase;
drh23669402006-01-09 17:29:52 +00001031 }
1032 break;
1033 }
1034
1035 default: assert(!"Illegal value for AsyncWrite.op");
1036 }
1037
1038 /* If we didn't hang on to the mutex during the IO op, obtain it now
1039 ** so that the AsyncWrite structure can be safely removed from the
1040 ** global write-op queue.
1041 */
1042 if( !holdingMutex ){
1043 pthread_mutex_lock(&async.queueMutex);
1044 holdingMutex = 1;
1045 }
drh99681db2006-02-13 15:29:32 +00001046 /* TRACE(("UNLINK %p\n", p)); */
drh4b74b262006-02-13 13:50:55 +00001047 if( p==async.pQueueLast ){
1048 async.pQueueLast = 0;
drh23669402006-01-09 17:29:52 +00001049 }
drh4b74b262006-02-13 13:50:55 +00001050 async.pQueueFirst = p->pNext;
drh5c323542006-02-13 13:23:57 +00001051 sqlite3OsFree(p);
drh23669402006-01-09 17:29:52 +00001052 assert( holdingMutex );
1053
danielk1977be29bfc2006-02-14 13:25:43 +00001054 /* An IO error has occured. We cannot report the error back to the
1055 ** connection that requested the I/O since the error happened
1056 ** asynchronously. The connection has already moved on. There
1057 ** really is nobody to report the error to.
1058 **
1059 ** The file for which the error occured may have been a database or
1060 ** journal file. Regardless, none of the currently queued operations
1061 ** associated with the same database should now be performed. Nor should
1062 ** any subsequently requested IO on either a database or journal file
1063 ** handle for the same database be accepted until the main database
1064 ** file handle has been closed and reopened.
1065 **
1066 ** Furthermore, no further IO should be queued or performed on any file
1067 ** handle associated with a database that may have been part of a
1068 ** multi-file transaction that included the database associated with
1069 ** the IO error (i.e. a database ATTACHed to the same handle at some
1070 ** point in time).
1071 */
1072 if( rc!=SQLITE_OK ){
1073 async.ioError = rc;
1074 }
1075
drh23669402006-01-09 17:29:52 +00001076 /* Drop the queue mutex before continuing to the next write operation
1077 ** in order to give other threads a chance to work with the write queue.
1078 */
danielk1977be29bfc2006-02-14 13:25:43 +00001079 if( !async.pQueueFirst || !async.ioError ){
danielk19772d9fcaa2006-02-14 14:02:08 +00001080 sqlite3ApiExit(0, 0);
danielk1977be29bfc2006-02-14 13:25:43 +00001081 pthread_mutex_unlock(&async.queueMutex);
1082 holdingMutex = 0;
1083 if( async.ioDelay>0 ){
1084 sqlite3OsSleep(async.ioDelay);
1085 }else{
1086 sched_yield();
1087 }
drh23669402006-01-09 17:29:52 +00001088 }
1089 }
danielk1977be29bfc2006-02-14 13:25:43 +00001090
drh23669402006-01-09 17:29:52 +00001091 pthread_mutex_unlock(&async.writerMutex);
1092 return 0;
1093}
1094
1095/**************************************************************************
1096** The remaining code defines a Tcl interface for testing the asynchronous
1097** IO implementation in this file.
1098**
1099** To adapt the code to a non-TCL environment, delete or comment out
1100** the code that follows.
1101*/
1102
1103/*
1104** sqlite3async_enable ?YES/NO?
1105**
1106** Enable or disable the asynchronous I/O backend. This command is
1107** not thread-safe. Do not call it while any database connections
1108** are open.
1109*/
1110static int testAsyncEnable(
1111 void * clientData,
1112 Tcl_Interp *interp,
1113 int objc,
1114 Tcl_Obj *CONST objv[]
1115){
1116 if( objc!=1 && objc!=2 ){
1117 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?");
1118 return TCL_ERROR;
1119 }
1120 if( objc==1 ){
1121 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(xOrigOpenReadWrite!=0));
1122 }else{
1123 int en;
1124 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR;
1125 asyncEnable(en);
1126 }
1127 return TCL_OK;
1128}
1129
1130/*
1131** sqlite3async_halt "now"|"idle"|"never"
1132**
1133** Set the conditions at which the writer thread will halt.
1134*/
1135static int testAsyncHalt(
1136 void * clientData,
1137 Tcl_Interp *interp,
1138 int objc,
1139 Tcl_Obj *CONST objv[]
1140){
1141 const char *zCond;
1142 if( objc!=2 ){
1143 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\"");
1144 return TCL_ERROR;
1145 }
1146 zCond = Tcl_GetString(objv[1]);
1147 if( strcmp(zCond, "now")==0 ){
1148 async.writerHaltNow = 1;
1149 pthread_cond_broadcast(&async.queueSignal);
1150 }else if( strcmp(zCond, "idle")==0 ){
1151 async.writerHaltWhenIdle = 1;
1152 async.writerHaltNow = 0;
1153 pthread_cond_broadcast(&async.queueSignal);
1154 }else if( strcmp(zCond, "never")==0 ){
1155 async.writerHaltWhenIdle = 0;
1156 async.writerHaltNow = 0;
1157 }else{
1158 Tcl_AppendResult(interp,
1159 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0);
1160 return TCL_ERROR;
1161 }
1162 return TCL_OK;
1163}
1164
1165/*
1166** sqlite3async_delay ?MS?
1167**
1168** Query or set the number of milliseconds of delay in the writer
1169** thread after each write operation. The default is 0. By increasing
1170** the memory delay we can simulate the effect of slow disk I/O.
1171*/
1172static int testAsyncDelay(
1173 void * clientData,
1174 Tcl_Interp *interp,
1175 int objc,
1176 Tcl_Obj *CONST objv[]
1177){
1178 if( objc!=1 && objc!=2 ){
1179 Tcl_WrongNumArgs(interp, 1, objv, "?MS?");
1180 return TCL_ERROR;
1181 }
1182 if( objc==1 ){
1183 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay));
1184 }else{
1185 int ioDelay;
1186 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR;
1187 async.ioDelay = ioDelay;
1188 }
1189 return TCL_OK;
1190}
1191
1192/*
1193** sqlite3async_start
1194**
1195** Start a new writer thread.
1196*/
1197static int testAsyncStart(
1198 void * clientData,
1199 Tcl_Interp *interp,
1200 int objc,
1201 Tcl_Obj *CONST objv[]
1202){
1203 pthread_t x;
1204 int rc;
1205 rc = pthread_create(&x, 0, asyncWriterThread, 0);
1206 if( rc ){
1207 Tcl_AppendResult(interp, "failed to create the thread", 0);
1208 return TCL_ERROR;
1209 }
1210 pthread_detach(x);
1211 return TCL_OK;
1212}
1213
1214/*
1215** sqlite3async_wait
1216**
1217** Wait for the current writer thread to terminate.
1218**
1219** If the current writer thread is set to run forever then this
1220** command would block forever. To prevent that, an error is returned.
1221*/
1222static int testAsyncWait(
1223 void * clientData,
1224 Tcl_Interp *interp,
1225 int objc,
1226 Tcl_Obj *CONST objv[]
1227){
drh89ea9312006-02-13 17:03:47 +00001228 int cnt = 10;
drh23669402006-01-09 17:29:52 +00001229 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){
1230 Tcl_AppendResult(interp, "would block forever", (char*)0);
1231 return TCL_ERROR;
1232 }
danielk1977750b03e2006-02-14 10:48:39 +00001233
drh89ea9312006-02-13 17:03:47 +00001234 while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){
1235 pthread_mutex_unlock(&async.writerMutex);
1236 sched_yield();
1237 }
1238 if( cnt>=0 ){
1239 TRACE(("WAIT\n"));
1240 pthread_mutex_lock(&async.queueMutex);
1241 pthread_cond_broadcast(&async.queueSignal);
1242 pthread_mutex_unlock(&async.queueMutex);
1243 pthread_mutex_lock(&async.writerMutex);
1244 pthread_mutex_unlock(&async.writerMutex);
1245 }else{
drh97bbdc02006-02-13 18:35:06 +00001246 TRACE(("NO-WAIT\n"));
drh89ea9312006-02-13 17:03:47 +00001247 }
drh23669402006-01-09 17:29:52 +00001248 return TCL_OK;
1249}
1250
1251
1252#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1253
1254/*
1255** This routine registers the custom TCL commands defined in this
1256** module. This should be the only procedure visible from outside
1257** of this module.
1258*/
1259int Sqlitetestasync_Init(Tcl_Interp *interp){
1260#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
1261 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0);
1262 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0);
1263 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0);
1264 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0);
1265 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0);
drh99681db2006-02-13 15:29:32 +00001266 Tcl_LinkVar(interp, "sqlite3async_trace",
1267 (char*)&sqlite3async_trace, TCL_LINK_INT);
drh23669402006-01-09 17:29:52 +00001268#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1269 return TCL_OK;
1270}