blob: 8e0099d995877955cd697759a4922346fbe4ca3b [file] [log] [blame]
drh23669402006-01-09 17:29:52 +00001/*
2** 2005 December 14
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12**
13** This file contains an example implementation of an asynchronous IO
drhfe0f75b2006-01-10 20:01:18 +000014** backend for SQLite.
15**
16** WHAT IS ASYNCHRONOUS I/O?
17**
18** With asynchronous I/O, write requests are handled by a separate thread
19** running in the background. This means that the thread that initiates
20** a database write does not have to wait for (sometimes slow) disk I/O
21** to occur. The write seems to happen very quickly, though in reality
22** it is happening at its usual slow pace in the background.
23**
24** Asynchronous I/O appears to give better responsiveness, but at a price.
25** You lose the Durable property. With the default I/O backend of SQLite,
26** once a write completes, you know that the information you wrote is
27** safely on disk. With the asynchronous I/O, this is no the case. If
28** your program crashes or if you take a power lose after the database
29** write but before the asynchronous write thread has completed, then the
30** database change might never make it to disk and the next user of the
31** database might not see your change.
32**
33** You lose Durability with asynchronous I/O, but you still retain the
34** other parts of ACID: Atomic, Consistent, and Isolated. Many
35** appliations get along fine without the Durablity.
36**
37** HOW IT WORKS
38**
39** Asynchronous I/O works by overloading the OS-layer disk I/O routines
40** with modified versions that store the data to be written in queue of
41** pending write operations. Look at the asyncEnable() subroutine to see
42** how overloading works. Six os-layer routines are overloaded:
43**
44** sqlite3OsOpenReadWrite;
45** sqlite3OsOpenReadOnly;
46** sqlite3OsOpenExclusive;
47** sqlite3OsDelete;
48** sqlite3OsFileExists;
49** sqlite3OsSyncDirectory;
50**
51** The original implementations of these routines are saved and are
52** used by the writer thread to do the real I/O. The substitute
53** implementations typically put the I/O operation on a queue
54** to be handled later by the writer thread, though read operations
55** must be handled right away, obviously.
56**
57** Asynchronous I/O is disabled by setting the os-layer interface routines
58** back to their original values.
59**
60** LIMITATIONS
61**
62** This demonstration code is deliberately kept simple in order to keep
63** the main ideas clear and easy to understand. Real applications that
64** want to do asynchronous I/O might want to add additional capabilities.
65** For example, in this demonstration if writes are happening at a steady
66** stream that exceeds the I/O capability of the background writer thread,
67** the queue of pending write operations will grow without bound until we
68** run out of memory. Users of this technique may want to keep track of
69** the quantity of pending writes and stop accepting new write requests
70** when the buffer gets to be too big.
drh23669402006-01-09 17:29:52 +000071*/
72
73#include "sqliteInt.h"
74#include "os.h"
75#include <tcl.h>
76
drhfe0f75b2006-01-10 20:01:18 +000077/* If the THREADSAFE macro is not set, assume that it is turned off. */
drh23669402006-01-09 17:29:52 +000078#ifndef THREADSAFE
79# define THREADSAFE 0
80#endif
81
82/*
83** This test uses pthreads and hence only works on unix and with
drhfe0f75b2006-01-10 20:01:18 +000084** a threadsafe build of SQLite. It also requires that the redefinable
85** I/O feature of SQLite be turned on. This feature is turned off by
86** default. If a required element is missing, almost all of the code
87** in this file is commented out.
drh23669402006-01-09 17:29:52 +000088*/
89#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
90
drhfe0f75b2006-01-10 20:01:18 +000091/*
92** This demo uses pthreads. If you do not have a pthreads implementation
93** for your operating system, you will need to recode the threading
94** logic.
95*/
drh23669402006-01-09 17:29:52 +000096#include <pthread.h>
97#include <sched.h>
98
drhfe0f75b2006-01-10 20:01:18 +000099/* Useful macros used in several places */
drh23669402006-01-09 17:29:52 +0000100#define MIN(x,y) ((x)<(y)?(x):(y))
101#define MAX(x,y) ((x)>(y)?(x):(y))
102
drhfe0f75b2006-01-10 20:01:18 +0000103/* Forward references */
drh23669402006-01-09 17:29:52 +0000104typedef struct AsyncWrite AsyncWrite;
105typedef struct AsyncFile AsyncFile;
106
drhfe0f75b2006-01-10 20:01:18 +0000107/* Enable for debugging */
drh99681db2006-02-13 15:29:32 +0000108static int sqlite3async_trace = 0;
109# define TRACE(X) if( sqlite3async_trace ) asyncTrace X
drhfc8748a2006-02-13 14:49:38 +0000110static void asyncTrace(const char *zFormat, ...){
111 char *z;
112 va_list ap;
113 va_start(ap, zFormat);
114 z = sqlite3_vmprintf(zFormat, ap);
115 va_end(ap);
116 fprintf(stderr, "[%d] %s", (int)pthread_self(), z);
117 free(z);
118}
drh23669402006-01-09 17:29:52 +0000119
120/*
drh23669402006-01-09 17:29:52 +0000121** THREAD SAFETY NOTES
122**
123** Basic rules:
124**
125** * Both read and write access to the global write-op queue must be
126** protected by the async.queueMutex.
127**
128** * The file handles from the underlying system are assumed not to
129** be thread safe.
130**
drhfe0f75b2006-01-10 20:01:18 +0000131** * See the last two paragraphs under "The Writer Thread" for
drh23669402006-01-09 17:29:52 +0000132** an assumption to do with file-handle synchronization by the Os.
133**
134** File system operations (invoked by SQLite thread):
135**
136** xOpenXXX (three versions)
137** xDelete
138** xFileExists
139** xSyncDirectory
140**
141** File handle operations (invoked by SQLite thread):
142**
drh23669402006-01-09 17:29:52 +0000143** asyncWrite, asyncClose, asyncTruncate, asyncSync,
144** asyncSetFullSync, asyncOpenDirectory.
145**
drhfe0f75b2006-01-10 20:01:18 +0000146** The operations above add an entry to the global write-op list. They
147** prepare the entry, acquire the async.queueMutex momentarily while
148** list pointers are manipulated to insert the new entry, then release
149** the mutex and signal the writer thread to wake up in case it happens
150** to be asleep.
151**
drh23669402006-01-09 17:29:52 +0000152**
153** asyncRead, asyncFileSize.
drhfe0f75b2006-01-10 20:01:18 +0000154**
155** Read operations. Both of these read from both the underlying file
156** first then adjust their result based on pending writes in the
157** write-op queue. So async.queueMutex is held for the duration
158** of these operations to prevent other threads from changing the
159** queue in mid operation.
160**
161**
162** asyncLock, asyncUnlock, asyncLockState, asyncCheckReservedLock
drh23669402006-01-09 17:29:52 +0000163**
drh89ea9312006-02-13 17:03:47 +0000164** These primitives implement in-process locking using a hash table
165** on the file name. Files are locked correctly for connections coming
166** from the same process. But other processes cannot see these locks
167** and will therefore not honor them.
drhfe0f75b2006-01-10 20:01:18 +0000168**
169**
170** asyncFileHandle.
drh23669402006-01-09 17:29:52 +0000171**
172** The sqlite3OsFileHandle() function is currently only used when
173** debugging the pager module. Unless sqlite3OsClose() is called on the
174** file (shouldn't be possible for other reasons), the underlying
175** implementations are safe to call without grabbing any mutex. So we just
drhfe0f75b2006-01-10 20:01:18 +0000176** go ahead and call it no matter what any other threads are doing.
drh23669402006-01-09 17:29:52 +0000177**
drhfe0f75b2006-01-10 20:01:18 +0000178**
179** asyncSeek.
drh23669402006-01-09 17:29:52 +0000180**
181** Calling this method just manipulates the AsyncFile.iOffset variable.
182** Since this variable is never accessed by writer thread, this
183** function does not require the mutex. Actual calls to OsSeek() take
184** place just before OsWrite() or OsRead(), which are always protected by
185** the mutex.
drh23669402006-01-09 17:29:52 +0000186**
187** The writer thread:
188**
189** The async.writerMutex is used to make sure only there is only
190** a single writer thread running at a time.
191**
192** Inside the writer thread is a loop that works like this:
193**
194** WHILE (write-op list is not empty)
195** Do IO operation at head of write-op list
196** Remove entry from head of write-op list
197** END WHILE
198**
199** The async.queueMutex is always held during the <write-op list is
200** not empty> test, and when the entry is removed from the head
201** of the write-op list. Sometimes it is held for the interim
drhfe0f75b2006-01-10 20:01:18 +0000202** period (while the IO is performed), and sometimes it is
drh23669402006-01-09 17:29:52 +0000203** relinquished. It is relinquished if (a) the IO op is an
204** ASYNC_CLOSE or (b) when the file handle was opened, two of
205** the underlying systems handles were opened on the same
206** file-system entry.
207**
208** If condition (b) above is true, then one file-handle
209** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the
210** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush()
211** threads to perform write() operations. This means that read
212** operations are not blocked by asynchronous writes (although
213** asynchronous writes may still be blocked by reads).
214**
215** This assumes that the OS keeps two handles open on the same file
216** properly in sync. That is, any read operation that starts after a
217** write operation on the same file system entry has completed returns
218** data consistent with the write. We also assume that if one thread
219** reads a file while another is writing it all bytes other than the
220** ones actually being written contain valid data.
221**
222** If the above assumptions are not true, set the preprocessor symbol
223** SQLITE_ASYNC_TWO_FILEHANDLES to 0.
224*/
225
226#ifndef SQLITE_ASYNC_TWO_FILEHANDLES
227/* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */
228#define SQLITE_ASYNC_TWO_FILEHANDLES 1
229#endif
230
231/*
232** State information is held in the static variable "async" defined
233** as follows:
234*/
235static struct TestAsyncStaticData {
236 pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */
237 pthread_mutex_t writerMutex; /* Prevents multiple writer threads */
drh89ea9312006-02-13 17:03:47 +0000238 pthread_mutex_t lockMutex; /* For access to aLock hash table */
drh23669402006-01-09 17:29:52 +0000239 pthread_cond_t queueSignal; /* For waking up sleeping writer thread */
240 pthread_cond_t emptySignal; /* Notify when the write queue is empty */
241 AsyncWrite *pQueueFirst; /* Next write operation to be processed */
242 AsyncWrite *pQueueLast; /* Last write operation on the list */
drh89ea9312006-02-13 17:03:47 +0000243 Hash aLock; /* Files locked */
drh23669402006-01-09 17:29:52 +0000244 volatile int ioDelay; /* Extra delay between write operations */
245 volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */
246 volatile int writerHaltNow; /* Writer thread halts after next op */
danielk1977be29bfc2006-02-14 13:25:43 +0000247 int ioError; /* True if an IO error has occured */
248 int nFile; /* Number of open files (from sqlite pov) */
drh23669402006-01-09 17:29:52 +0000249} async = {
250 PTHREAD_MUTEX_INITIALIZER,
251 PTHREAD_MUTEX_INITIALIZER,
drh89ea9312006-02-13 17:03:47 +0000252 PTHREAD_MUTEX_INITIALIZER,
drh23669402006-01-09 17:29:52 +0000253 PTHREAD_COND_INITIALIZER,
254 PTHREAD_COND_INITIALIZER,
255};
256
257/* Possible values of AsyncWrite.op */
drh4b74b262006-02-13 13:50:55 +0000258#define ASYNC_NOOP 0
drh23669402006-01-09 17:29:52 +0000259#define ASYNC_WRITE 1
260#define ASYNC_SYNC 2
261#define ASYNC_TRUNCATE 3
262#define ASYNC_CLOSE 4
263#define ASYNC_OPENDIRECTORY 5
264#define ASYNC_SETFULLSYNC 6
drh23669402006-01-09 17:29:52 +0000265#define ASYNC_DELETE 7
266#define ASYNC_OPENEXCLUSIVE 8
267#define ASYNC_SYNCDIRECTORY 9
268
drh99681db2006-02-13 15:29:32 +0000269/* Names of opcodes. Used for debugging only.
270** Make sure these stay in sync with the macros above!
271*/
272static const char *azOpcodeName[] = {
273 "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE",
drh89ea9312006-02-13 17:03:47 +0000274 "OPENDIR", "SETFULLSYNC", "DELETE", "OPENEX", "SYNCDIR",
drh99681db2006-02-13 15:29:32 +0000275};
276
drh23669402006-01-09 17:29:52 +0000277/*
drhfe0f75b2006-01-10 20:01:18 +0000278** Entries on the write-op queue are instances of the AsyncWrite
279** structure, defined here.
280**
drh23669402006-01-09 17:29:52 +0000281** The interpretation of the iOffset and nByte variables varies depending
282** on the value of AsyncWrite.op:
283**
284** ASYNC_WRITE:
285** iOffset -> Offset in file to write to.
286** nByte -> Number of bytes of data to write (pointed to by zBuf).
287**
288** ASYNC_SYNC:
289** iOffset -> Unused.
290** nByte -> Value of "fullsync" flag to pass to sqlite3OsSync().
291**
292** ASYNC_TRUNCATE:
293** iOffset -> Size to truncate file to.
294** nByte -> Unused.
295**
296** ASYNC_CLOSE:
297** iOffset -> Unused.
298** nByte -> Unused.
299**
300** ASYNC_OPENDIRECTORY:
301** iOffset -> Unused.
302** nByte -> Number of bytes of zBuf points to (directory name).
303**
304** ASYNC_SETFULLSYNC:
305** iOffset -> Unused.
306** nByte -> New value for the full-sync flag.
307**
308**
309** ASYNC_DELETE:
310** iOffset -> Unused.
311** nByte -> Number of bytes of zBuf points to (file name).
312**
313** ASYNC_OPENEXCLUSIVE:
314** iOffset -> Value of "delflag".
315** nByte -> Number of bytes of zBuf points to (file name).
316**
drh89ea9312006-02-13 17:03:47 +0000317**
drh23669402006-01-09 17:29:52 +0000318** For an ASYNC_WRITE operation, zBuf points to the data to write to the file.
319** This space is sqliteMalloc()d along with the AsyncWrite structure in a
320** single blob, so is deleted when sqliteFree() is called on the parent
321** structure.
322*/
323struct AsyncWrite {
324 AsyncFile *pFile; /* File to write data to or sync */
325 int op; /* One of ASYNC_xxx etc. */
326 i64 iOffset; /* See above */
327 int nByte; /* See above */
328 char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */
329 AsyncWrite *pNext; /* Next write operation (to any file) */
330};
331
332/*
333** The AsyncFile structure is a subclass of OsFile used for asynchronous IO.
334*/
335struct AsyncFile {
336 IoMethod *pMethod; /* Must be first */
drhfc8748a2006-02-13 14:49:38 +0000337 i64 iOffset; /* Current seek() offset in file */
drh89ea9312006-02-13 17:03:47 +0000338 char *zName; /* Underlying OS filename - used for debugging */
339 int nName; /* Number of characters in zName */
drh23669402006-01-09 17:29:52 +0000340 OsFile *pBaseRead; /* Read handle to the underlying Os file */
341 OsFile *pBaseWrite; /* Write handle to the underlying Os file */
342};
343
344/*
345** Add an entry to the end of the global write-op list. pWrite should point
drhfe0f75b2006-01-10 20:01:18 +0000346** to an AsyncWrite structure allocated using sqlite3OsMalloc(). The writer
347** thread will call sqlite3OsFree() to free the structure after the specified
348** operation has been completed.
drh23669402006-01-09 17:29:52 +0000349**
drhfe0f75b2006-01-10 20:01:18 +0000350** Once an AsyncWrite structure has been added to the list, it becomes the
351** property of the writer thread and must not be read or modified by the
352** caller.
drh23669402006-01-09 17:29:52 +0000353*/
354static void addAsyncWrite(AsyncWrite *pWrite){
drhfe0f75b2006-01-10 20:01:18 +0000355 /* We must hold the queue mutex in order to modify the queue pointers */
drh23669402006-01-09 17:29:52 +0000356 pthread_mutex_lock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000357
358 /* Add the record to the end of the write-op queue */
drh23669402006-01-09 17:29:52 +0000359 assert( !pWrite->pNext );
360 if( async.pQueueLast ){
361 assert( async.pQueueFirst );
362 async.pQueueLast->pNext = pWrite;
363 }else{
364 async.pQueueFirst = pWrite;
365 }
366 async.pQueueLast = pWrite;
danielk1977bf623f22006-02-14 13:48:33 +0000367 TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op],
368 pWrite->pFile ? pWrite->pFile->zName : "-", pWrite->iOffset));
drhfe0f75b2006-01-10 20:01:18 +0000369
danielk1977be29bfc2006-02-14 13:25:43 +0000370 if( pWrite->op==ASYNC_CLOSE ){
371 async.nFile--;
372 if( async.nFile==0 ){
373 async.ioError = SQLITE_OK;
374 }
375 }
376
drhfe0f75b2006-01-10 20:01:18 +0000377 /* Drop the queue mutex */
drh23669402006-01-09 17:29:52 +0000378 pthread_mutex_unlock(&async.queueMutex);
drhfe0f75b2006-01-10 20:01:18 +0000379
380 /* The writer thread might have been idle because there was nothing
381 ** on the write-op queue for it to do. So wake it up. */
drh23669402006-01-09 17:29:52 +0000382 pthread_cond_signal(&async.queueSignal);
383}
384
385/*
danielk1977be29bfc2006-02-14 13:25:43 +0000386** Increment async.nFile in a thread-safe manner.
387*/
388static void incrOpenFileCount(){
389 /* We must hold the queue mutex in order to modify async.nFile */
390 pthread_mutex_lock(&async.queueMutex);
391 if( async.nFile==0 ){
392 async.ioError = SQLITE_OK;
393 }
394 async.nFile++;
395 pthread_mutex_unlock(&async.queueMutex);
396}
397
398/*
drh23669402006-01-09 17:29:52 +0000399** This is a utility function to allocate and populate a new AsyncWrite
400** structure and insert it (via addAsyncWrite() ) into the global list.
401*/
402static int addNewAsyncWrite(
403 AsyncFile *pFile,
404 int op,
405 i64 iOffset,
406 int nByte,
407 const char *zByte
408){
drh4b74b262006-02-13 13:50:55 +0000409 AsyncWrite *p;
danielk1977be29bfc2006-02-14 13:25:43 +0000410 if( op!=ASYNC_CLOSE && async.ioError ){
411 return async.ioError;
drh4b74b262006-02-13 13:50:55 +0000412 }
413 p = sqlite3OsMalloc(sizeof(AsyncWrite) + (zByte?nByte:0));
drh23669402006-01-09 17:29:52 +0000414 if( !p ){
415 return SQLITE_NOMEM;
416 }
417 p->op = op;
418 p->iOffset = iOffset;
419 p->nByte = nByte;
420 p->pFile = pFile;
421 p->pNext = 0;
422 if( zByte ){
423 p->zBuf = (char *)&p[1];
424 memcpy(p->zBuf, zByte, nByte);
425 }else{
426 p->zBuf = 0;
427 }
428 addAsyncWrite(p);
429 return SQLITE_OK;
430}
431
432/*
433** Close the file. This just adds an entry to the write-op list, the file is
434** not actually closed.
435*/
436static int asyncClose(OsFile **pId){
437 return addNewAsyncWrite((AsyncFile *)*pId, ASYNC_CLOSE, 0, 0, 0);
438}
439
440/*
441** Implementation of sqlite3OsWrite() for asynchronous files. Instead of
442** writing to the underlying file, this function adds an entry to the end of
443** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be
444** returned.
445*/
446static int asyncWrite(OsFile *id, const void *pBuf, int amt){
447 AsyncFile *pFile = (AsyncFile *)id;
448 int rc = addNewAsyncWrite(pFile, ASYNC_WRITE, pFile->iOffset, amt, pBuf);
449 pFile->iOffset += (i64)amt;
450 return rc;
451}
452
453/*
454** Truncate the file to nByte bytes in length. This just adds an entry to
455** the write-op list, no IO actually takes place.
456*/
457static int asyncTruncate(OsFile *id, i64 nByte){
458 return addNewAsyncWrite((AsyncFile *)id, ASYNC_TRUNCATE, nByte, 0, 0);
459}
460
461/*
462** Open the directory identified by zName and associate it with the
463** specified file. This just adds an entry to the write-op list, the
464** directory is opened later by sqlite3_async_flush().
465*/
466static int asyncOpenDirectory(OsFile *id, const char *zName){
467 AsyncFile *pFile = (AsyncFile *)id;
468 return addNewAsyncWrite(pFile, ASYNC_OPENDIRECTORY, 0, strlen(zName)+1,zName);
469}
470
471/*
472** Sync the file. This just adds an entry to the write-op list, the
473** sync() is done later by sqlite3_async_flush().
474*/
475static int asyncSync(OsFile *id, int fullsync){
476 return addNewAsyncWrite((AsyncFile *)id, ASYNC_SYNC, 0, fullsync, 0);
477}
478
479/*
480** Set (or clear) the full-sync flag on the underlying file. This operation
481** is queued and performed later by sqlite3_async_flush().
482*/
483static void asyncSetFullSync(OsFile *id, int value){
484 addNewAsyncWrite((AsyncFile *)id, ASYNC_SETFULLSYNC, 0, value, 0);
485}
486
487/*
488** Read data from the file. First we read from the filesystem, then adjust
489** the contents of the buffer based on ASYNC_WRITE operations in the
drhfe0f75b2006-01-10 20:01:18 +0000490** write-op queue.
drh23669402006-01-09 17:29:52 +0000491**
492** This method holds the mutex from start to finish.
493*/
494static int asyncRead(OsFile *id, void *obuf, int amt){
495 int rc = SQLITE_OK;
496 i64 filesize;
497 int nRead;
498 AsyncFile *pFile = (AsyncFile *)id;
danielk1977750b03e2006-02-14 10:48:39 +0000499 OsFile *pBase = pFile->pBaseRead;
500
501 if( !pBase ){
502 pBase = pFile->pBaseWrite;
503 }
drh23669402006-01-09 17:29:52 +0000504
drh4b74b262006-02-13 13:50:55 +0000505 /* If an I/O error has previously occurred on this file, then all
506 ** subsequent operations fail.
507 */
danielk1977be29bfc2006-02-14 13:25:43 +0000508 if( async.ioError!=SQLITE_OK ){
509 return async.ioError;
drh4b74b262006-02-13 13:50:55 +0000510 }
511
drh23669402006-01-09 17:29:52 +0000512 /* Grab the write queue mutex for the duration of the call */
513 pthread_mutex_lock(&async.queueMutex);
514
danielk1977750b03e2006-02-14 10:48:39 +0000515 if( pBase ){
516 rc = sqlite3OsFileSize(pBase, &filesize);
drh23669402006-01-09 17:29:52 +0000517 if( rc!=SQLITE_OK ){
518 goto asyncread_out;
519 }
danielk1977750b03e2006-02-14 10:48:39 +0000520 rc = sqlite3OsSeek(pBase, pFile->iOffset);
drh23669402006-01-09 17:29:52 +0000521 if( rc!=SQLITE_OK ){
522 goto asyncread_out;
523 }
524 nRead = MIN(filesize - pFile->iOffset, amt);
525 if( nRead>0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000526 rc = sqlite3OsRead(pBase, obuf, nRead);
drh97bbdc02006-02-13 18:35:06 +0000527 TRACE(("READ %s %d bytes at %d\n", pFile->zName, nRead, pFile->iOffset));
drh23669402006-01-09 17:29:52 +0000528 }
529 }
530
531 if( rc==SQLITE_OK ){
532 AsyncWrite *p;
533 i64 iOffset = pFile->iOffset; /* Current seek offset */
534
535 for(p=async.pQueueFirst; p; p = p->pNext){
536 if( p->pFile==pFile && p->op==ASYNC_WRITE ){
drh44528382006-02-13 13:30:19 +0000537 int iBeginOut = (p->iOffset - iOffset);
538 int iBeginIn = -iBeginOut;
drh23669402006-01-09 17:29:52 +0000539 int nCopy;
540
541 if( iBeginIn<0 ) iBeginIn = 0;
542 if( iBeginOut<0 ) iBeginOut = 0;
543 nCopy = MIN(p->nByte-iBeginIn, amt-iBeginOut);
544
545 if( nCopy>0 ){
546 memcpy(&((char *)obuf)[iBeginOut], &p->zBuf[iBeginIn], nCopy);
drh97bbdc02006-02-13 18:35:06 +0000547 TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset));
drh23669402006-01-09 17:29:52 +0000548 }
549 }
550 }
551
552 pFile->iOffset += (i64)amt;
553 }
554
555asyncread_out:
556 pthread_mutex_unlock(&async.queueMutex);
557 return rc;
558}
559
560/*
561** Seek to the specified offset. This just adjusts the AsyncFile.iOffset
562** variable - calling seek() on the underlying file is defered until the
563** next read() or write() operation.
564*/
565static int asyncSeek(OsFile *id, i64 offset){
566 AsyncFile *pFile = (AsyncFile *)id;
567 pFile->iOffset = offset;
568 return SQLITE_OK;
569}
570
571/*
572** Read the size of the file. First we read the size of the file system
573** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations
574** currently in the write-op list.
575**
576** This method holds the mutex from start to finish.
577*/
578int asyncFileSize(OsFile *id, i64 *pSize){
579 int rc = SQLITE_OK;
580 i64 s = 0;
581 OsFile *pBase;
582
583 pthread_mutex_lock(&async.queueMutex);
584
585 /* Read the filesystem size from the base file. If pBaseRead is NULL, this
586 ** means the file hasn't been opened yet. In this case all relevant data
587 ** must be in the write-op queue anyway, so we can omit reading from the
588 ** file-system.
589 */
590 pBase = ((AsyncFile *)id)->pBaseRead;
danielk1977750b03e2006-02-14 10:48:39 +0000591 if( !pBase ){
592 pBase = ((AsyncFile *)id)->pBaseWrite;
593 }
drh23669402006-01-09 17:29:52 +0000594 if( pBase ){
595 rc = sqlite3OsFileSize(pBase, &s);
596 }
597
598 if( rc==SQLITE_OK ){
599 AsyncWrite *p;
600 for(p=async.pQueueFirst; p; p = p->pNext){
601 if( p->pFile==(AsyncFile *)id ){
602 switch( p->op ){
603 case ASYNC_WRITE:
604 s = MAX(p->iOffset + (i64)(p->nByte), s);
605 break;
606 case ASYNC_TRUNCATE:
danielk1977bf623f22006-02-14 13:48:33 +0000607 s = MIN(s, p->iOffset);
drh23669402006-01-09 17:29:52 +0000608 break;
609 }
610 }
611 }
612 *pSize = s;
613 }
614 pthread_mutex_unlock(&async.queueMutex);
615 return rc;
616}
617
618/*
619** Return the operating system file handle. This is only used for debugging
620** at the moment anyway.
621*/
622static int asyncFileHandle(OsFile *id){
623 return sqlite3OsFileHandle(((AsyncFile *)id)->pBaseRead);
624}
625
drhfe0f75b2006-01-10 20:01:18 +0000626/*
drh89ea9312006-02-13 17:03:47 +0000627** No disk locking is performed. We keep track of locks locally in
628** the async.aLock hash table. Locking should appear to work the same
629** as with standard (unmodified) SQLite as long as all connections
630** come from this one process. Connections from external processes
631** cannot see our internal hash table (obviously) and will thus not
632** honor our locks.
drhfe0f75b2006-01-10 20:01:18 +0000633*/
drh23669402006-01-09 17:29:52 +0000634static int asyncLock(OsFile *id, int lockType){
drh89ea9312006-02-13 17:03:47 +0000635 AsyncFile *pFile = (AsyncFile*)id;
636 TRACE(("LOCK %d (%s)\n", lockType, pFile->zName));
637 pthread_mutex_lock(&async.lockMutex);
638 sqlite3HashInsert(&async.aLock, pFile->zName, pFile->nName, (void*)lockType);
639 pthread_mutex_unlock(&async.lockMutex);
drh23669402006-01-09 17:29:52 +0000640 return SQLITE_OK;
641}
642static int asyncUnlock(OsFile *id, int lockType){
drh89ea9312006-02-13 17:03:47 +0000643 return asyncLock(id, lockType);
drh23669402006-01-09 17:29:52 +0000644}
645
646/*
647** This function is called when the pager layer first opens a database file
648** and is checking for a hot-journal.
649*/
650static int asyncCheckReservedLock(OsFile *id){
drh89ea9312006-02-13 17:03:47 +0000651 AsyncFile *pFile = (AsyncFile*)id;
652 int rc;
653 pthread_mutex_lock(&async.lockMutex);
654 rc = (int)sqlite3HashFind(&async.aLock, pFile->zName, pFile->nName);
655 pthread_mutex_unlock(&async.lockMutex);
656 TRACE(("CHECK-LOCK %d (%s)\n", rc, pFile->zName));
drh97bbdc02006-02-13 18:35:06 +0000657 return rc>SHARED_LOCK;
drh23669402006-01-09 17:29:52 +0000658}
659
660/*
661** This is broken. But sqlite3OsLockState() is only used for testing anyway.
662*/
663static int asyncLockState(OsFile *id){
664 return SQLITE_OK;
665}
666
667/*
668** The following variables hold pointers to the original versions of
drhfe0f75b2006-01-10 20:01:18 +0000669** OS-layer interface routines that are overloaded in order to create
670** the asynchronous I/O backend.
drh23669402006-01-09 17:29:52 +0000671*/
672static int (*xOrigOpenReadWrite)(const char*, OsFile**, int*) = 0;
673static int (*xOrigOpenExclusive)(const char*, OsFile**, int) = 0;
674static int (*xOrigOpenReadOnly)(const char*, OsFile**) = 0;
675static int (*xOrigDelete)(const char*) = 0;
676static int (*xOrigFileExists)(const char*) = 0;
677static int (*xOrigSyncDirectory)(const char*) = 0;
678
drhfe0f75b2006-01-10 20:01:18 +0000679/*
680** This routine does most of the work of opening a file and building
681** the OsFile structure.
682*/
drh23669402006-01-09 17:29:52 +0000683static int asyncOpenFile(
drhfe0f75b2006-01-10 20:01:18 +0000684 const char *zName, /* The name of the file to be opened */
685 OsFile **pFile, /* Put the OsFile structure here */
686 OsFile *pBaseRead, /* The real OsFile from the real I/O routine */
687 int openForWriting /* Open a second file handle for writing if true */
drh23669402006-01-09 17:29:52 +0000688){
drh89ea9312006-02-13 17:03:47 +0000689 int rc, i, n;
drh23669402006-01-09 17:29:52 +0000690 AsyncFile *p;
691 OsFile *pBaseWrite = 0;
692
693 static IoMethod iomethod = {
694 asyncClose,
695 asyncOpenDirectory,
696 asyncRead,
697 asyncWrite,
698 asyncSeek,
699 asyncTruncate,
700 asyncSync,
701 asyncSetFullSync,
702 asyncFileHandle,
703 asyncFileSize,
704 asyncLock,
705 asyncUnlock,
706 asyncLockState,
707 asyncCheckReservedLock
708 };
709
drhfe0f75b2006-01-10 20:01:18 +0000710 if( openForWriting && SQLITE_ASYNC_TWO_FILEHANDLES ){
drh23669402006-01-09 17:29:52 +0000711 int dummy;
712 rc = xOrigOpenReadWrite(zName, &pBaseWrite, &dummy);
713 if( rc!=SQLITE_OK ){
714 goto error_out;
715 }
716 }
717
drh89ea9312006-02-13 17:03:47 +0000718 n = strlen(zName);
719 for(i=n-1; i>=0 && zName[i]!='/'; i--){}
720 p = (AsyncFile *)sqlite3OsMalloc(sizeof(AsyncFile) + n - i);
drh23669402006-01-09 17:29:52 +0000721 if( !p ){
722 rc = SQLITE_NOMEM;
723 goto error_out;
724 }
725 memset(p, 0, sizeof(AsyncFile));
drh89ea9312006-02-13 17:03:47 +0000726 p->zName = (char*)&p[1];
727 strcpy(p->zName, &zName[i+1]);
728 p->nName = n - i;
drh23669402006-01-09 17:29:52 +0000729 p->pMethod = &iomethod;
730 p->pBaseRead = pBaseRead;
731 p->pBaseWrite = pBaseWrite;
732
733 *pFile = (OsFile *)p;
734 return SQLITE_OK;
735
736error_out:
737 assert(!p);
738 sqlite3OsClose(&pBaseRead);
739 sqlite3OsClose(&pBaseWrite);
740 *pFile = 0;
741 return rc;
742}
743
744/*
745** The async-IO backends implementation of the three functions used to open
746** a file (xOpenExclusive, xOpenReadWrite and xOpenReadOnly). Most of the
747** work is done in function asyncOpenFile() - see above.
748*/
749static int asyncOpenExclusive(const char *z, OsFile **ppFile, int delFlag){
750 int rc = asyncOpenFile(z, ppFile, 0, 0);
751 if( rc==SQLITE_OK ){
752 AsyncFile *pFile = (AsyncFile *)(*ppFile);
753 int nByte = strlen(z)+1;
754 i64 i = (i64)(delFlag);
755 rc = addNewAsyncWrite(pFile, ASYNC_OPENEXCLUSIVE, i, nByte, z);
756 if( rc!=SQLITE_OK ){
757 sqlite3OsFree(pFile);
758 *ppFile = 0;
759 }
760 }
danielk1977be29bfc2006-02-14 13:25:43 +0000761 if( rc==SQLITE_OK ){
762 incrOpenFileCount();
763 }
drh23669402006-01-09 17:29:52 +0000764 return rc;
765}
766static int asyncOpenReadOnly(const char *z, OsFile **ppFile){
767 OsFile *pBase = 0;
768 int rc = xOrigOpenReadOnly(z, &pBase);
769 if( rc==SQLITE_OK ){
770 rc = asyncOpenFile(z, ppFile, pBase, 0);
771 }
danielk1977be29bfc2006-02-14 13:25:43 +0000772 if( rc==SQLITE_OK ){
773 incrOpenFileCount();
774 }
drh23669402006-01-09 17:29:52 +0000775 return rc;
776}
777static int asyncOpenReadWrite(const char *z, OsFile **ppFile, int *pReadOnly){
778 OsFile *pBase = 0;
779 int rc = xOrigOpenReadWrite(z, &pBase, pReadOnly);
780 if( rc==SQLITE_OK ){
781 rc = asyncOpenFile(z, ppFile, pBase, (*pReadOnly ? 0 : 1));
782 }
danielk1977be29bfc2006-02-14 13:25:43 +0000783 if( rc==SQLITE_OK ){
784 incrOpenFileCount();
785 }
drh23669402006-01-09 17:29:52 +0000786 return rc;
787}
788
789/*
790** Implementation of sqlite3OsDelete. Add an entry to the end of the
791** write-op queue to perform the delete.
792*/
793static int asyncDelete(const char *z){
794 return addNewAsyncWrite(0, ASYNC_DELETE, 0, strlen(z)+1, z);
795}
796
797/*
798** Implementation of sqlite3OsSyncDirectory. Add an entry to the end of the
799** write-op queue to perform the directory sync.
800*/
801static int asyncSyncDirectory(const char *z){
802 return addNewAsyncWrite(0, ASYNC_SYNCDIRECTORY, 0, strlen(z)+1, z);
803}
804
805/*
806** Implementation of sqlite3OsFileExists. Return true if file 'z' exists
807** in the file system.
808**
809** This method holds the mutex from start to finish.
810*/
811static int asyncFileExists(const char *z){
812 int ret;
813 AsyncWrite *p;
814
815 pthread_mutex_lock(&async.queueMutex);
816
817 /* See if the real file system contains the specified file. */
818 ret = xOrigFileExists(z);
819
820 for(p=async.pQueueFirst; p; p = p->pNext){
821 if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, z) ){
822 ret = 0;
823 }else if( p->op==ASYNC_OPENEXCLUSIVE && 0==strcmp(p->zBuf, z) ){
824 ret = 1;
825 }
826 }
827
drh89ea9312006-02-13 17:03:47 +0000828 TRACE(("EXISTS: %s = %d\n", z, ret));
drh23669402006-01-09 17:29:52 +0000829 pthread_mutex_unlock(&async.queueMutex);
830 return ret;
831}
832
833/*
834** Call this routine to enable or disable the
835** asynchronous IO features implemented in this file.
836**
837** This routine is not even remotely threadsafe. Do not call
838** this routine while any SQLite database connections are open.
839*/
840static void asyncEnable(int enable){
841 if( enable && xOrigOpenReadWrite==0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000842 assert(sqlite3Os.xOpenReadWrite);
drh89ea9312006-02-13 17:03:47 +0000843 sqlite3HashInit(&async.aLock, SQLITE_HASH_BINARY, 1);
drh23669402006-01-09 17:29:52 +0000844 xOrigOpenReadWrite = sqlite3Os.xOpenReadWrite;
845 xOrigOpenReadOnly = sqlite3Os.xOpenReadOnly;
846 xOrigOpenExclusive = sqlite3Os.xOpenExclusive;
847 xOrigDelete = sqlite3Os.xDelete;
848 xOrigFileExists = sqlite3Os.xFileExists;
849 xOrigSyncDirectory = sqlite3Os.xSyncDirectory;
850
851 sqlite3Os.xOpenReadWrite = asyncOpenReadWrite;
852 sqlite3Os.xOpenReadOnly = asyncOpenReadOnly;
853 sqlite3Os.xOpenExclusive = asyncOpenExclusive;
854 sqlite3Os.xDelete = asyncDelete;
855 sqlite3Os.xFileExists = asyncFileExists;
856 sqlite3Os.xSyncDirectory = asyncSyncDirectory;
danielk1977750b03e2006-02-14 10:48:39 +0000857 assert(sqlite3Os.xOpenReadWrite);
drh23669402006-01-09 17:29:52 +0000858 }
859 if( !enable && xOrigOpenReadWrite!=0 ){
danielk1977750b03e2006-02-14 10:48:39 +0000860 assert(sqlite3Os.xOpenReadWrite);
drh89ea9312006-02-13 17:03:47 +0000861 sqlite3HashClear(&async.aLock);
drh23669402006-01-09 17:29:52 +0000862 sqlite3Os.xOpenReadWrite = xOrigOpenReadWrite;
863 sqlite3Os.xOpenReadOnly = xOrigOpenReadOnly;
864 sqlite3Os.xOpenExclusive = xOrigOpenExclusive;
865 sqlite3Os.xDelete = xOrigDelete;
866 sqlite3Os.xFileExists = xOrigFileExists;
867 sqlite3Os.xSyncDirectory = xOrigSyncDirectory;
868
869 xOrigOpenReadWrite = 0;
870 xOrigOpenReadOnly = 0;
871 xOrigOpenExclusive = 0;
872 xOrigDelete = 0;
873 xOrigFileExists = 0;
874 xOrigSyncDirectory = 0;
danielk1977750b03e2006-02-14 10:48:39 +0000875 assert(sqlite3Os.xOpenReadWrite);
drh23669402006-01-09 17:29:52 +0000876 }
877}
878
879/*
880** This procedure runs in a separate thread, reading messages off of the
881** write queue and processing them one by one.
882**
883** If async.writerHaltNow is true, then this procedure exits
884** after processing a single message.
885**
886** If async.writerHaltWhenIdle is true, then this procedure exits when
887** the write queue is empty.
888**
889** If both of the above variables are false, this procedure runs
890** indefinately, waiting for operations to be added to the write queue
891** and processing them in the order in which they arrive.
892**
893** An artifical delay of async.ioDelay milliseconds is inserted before
894** each write operation in order to simulate the effect of a slow disk.
895**
896** Only one instance of this procedure may be running at a time.
897*/
898static void *asyncWriterThread(void *NotUsed){
899 AsyncWrite *p = 0;
900 int rc = SQLITE_OK;
danielk1977be29bfc2006-02-14 13:25:43 +0000901 int holdingMutex = 0;
drh23669402006-01-09 17:29:52 +0000902
903 if( pthread_mutex_trylock(&async.writerMutex) ){
904 return 0;
905 }
906 while( async.writerHaltNow==0 ){
drh23669402006-01-09 17:29:52 +0000907 OsFile *pBase = 0;
908
danielk1977be29bfc2006-02-14 13:25:43 +0000909 if( !holdingMutex ){
910 pthread_mutex_lock(&async.queueMutex);
911 }
drh23669402006-01-09 17:29:52 +0000912 while( (p = async.pQueueFirst)==0 ){
913 pthread_cond_broadcast(&async.emptySignal);
914 if( async.writerHaltWhenIdle ){
915 pthread_mutex_unlock(&async.queueMutex);
916 break;
917 }else{
drhfc8748a2006-02-13 14:49:38 +0000918 TRACE(("IDLE\n"));
drh23669402006-01-09 17:29:52 +0000919 pthread_cond_wait(&async.queueSignal, &async.queueMutex);
drhfc8748a2006-02-13 14:49:38 +0000920 TRACE(("WAKEUP\n"));
drh23669402006-01-09 17:29:52 +0000921 }
922 }
923 if( p==0 ) break;
danielk1977be29bfc2006-02-14 13:25:43 +0000924 holdingMutex = 1;
drh23669402006-01-09 17:29:52 +0000925
926 /* Right now this thread is holding the mutex on the write-op queue.
927 ** Variable 'p' points to the first entry in the write-op queue. In
928 ** the general case, we hold on to the mutex for the entire body of
929 ** the loop.
930 **
931 ** However in the cases enumerated below, we relinquish the mutex,
932 ** perform the IO, and then re-request the mutex before removing 'p' from
933 ** the head of the write-op queue. The idea is to increase concurrency with
934 ** sqlite threads.
935 **
936 ** * An ASYNC_CLOSE operation.
937 ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish
938 ** the mutex, call the underlying xOpenExclusive() function, then
939 ** re-aquire the mutex before seting the AsyncFile.pBaseRead
940 ** variable.
941 ** * ASYNC_SYNC and ASYNC_WRITE operations, if
942 ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two
943 ** file-handles are open for the particular file being "synced".
944 */
danielk1977be29bfc2006-02-14 13:25:43 +0000945 if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){
946 p->op = ASYNC_NOOP;
947 }
drh23669402006-01-09 17:29:52 +0000948 if( p->pFile ){
949 pBase = p->pFile->pBaseWrite;
950 if(
951 p->op==ASYNC_CLOSE ||
952 p->op==ASYNC_OPENEXCLUSIVE ||
953 (pBase && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) )
954 ){
955 pthread_mutex_unlock(&async.queueMutex);
956 holdingMutex = 0;
957 }
958 if( !pBase ){
959 pBase = p->pFile->pBaseRead;
960 }
961 }
962
963 switch( p->op ){
drh4b74b262006-02-13 13:50:55 +0000964 case ASYNC_NOOP:
965 break;
966
drh23669402006-01-09 17:29:52 +0000967 case ASYNC_WRITE:
968 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000969 TRACE(("WRITE %s %d bytes at %d\n",
970 p->pFile->zName, p->nByte, p->iOffset));
drh23669402006-01-09 17:29:52 +0000971 rc = sqlite3OsSeek(pBase, p->iOffset);
972 if( rc==SQLITE_OK ){
973 rc = sqlite3OsWrite(pBase, (const void *)(p->zBuf), p->nByte);
974 }
975 break;
976
977 case ASYNC_SYNC:
978 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000979 TRACE(("SYNC %s\n", p->pFile->zName));
drh23669402006-01-09 17:29:52 +0000980 rc = sqlite3OsSync(pBase, p->nByte);
981 break;
982
983 case ASYNC_TRUNCATE:
984 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000985 TRACE(("TRUNCATE %s to %d bytes\n", p->pFile->zName, p->iOffset));
986 rc = sqlite3OsTruncate(pBase, p->iOffset);
drh23669402006-01-09 17:29:52 +0000987 break;
988
989 case ASYNC_CLOSE:
drh97bbdc02006-02-13 18:35:06 +0000990 TRACE(("CLOSE %s\n", p->pFile->zName));
drh23669402006-01-09 17:29:52 +0000991 sqlite3OsClose(&p->pFile->pBaseWrite);
danielk1977750b03e2006-02-14 10:48:39 +0000992 sqlite3OsClose(&p->pFile->pBaseRead);
drh23669402006-01-09 17:29:52 +0000993 sqlite3OsFree(p->pFile);
994 break;
995
996 case ASYNC_OPENDIRECTORY:
997 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +0000998 TRACE(("OPENDIR %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +0000999 sqlite3OsOpenDirectory(pBase, p->zBuf);
1000 break;
1001
1002 case ASYNC_SETFULLSYNC:
1003 assert( pBase );
drh97bbdc02006-02-13 18:35:06 +00001004 TRACE(("SETFULLSYNC %s %d\n", p->pFile->zName, p->nByte));
drh23669402006-01-09 17:29:52 +00001005 sqlite3OsSetFullSync(pBase, p->nByte);
1006 break;
1007
1008 case ASYNC_DELETE:
drh97bbdc02006-02-13 18:35:06 +00001009 TRACE(("DELETE %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +00001010 rc = xOrigDelete(p->zBuf);
1011 break;
1012
1013 case ASYNC_SYNCDIRECTORY:
drh97bbdc02006-02-13 18:35:06 +00001014 TRACE(("SYNCDIR %s\n", p->zBuf));
drh23669402006-01-09 17:29:52 +00001015 rc = xOrigSyncDirectory(p->zBuf);
1016 break;
1017
1018 case ASYNC_OPENEXCLUSIVE: {
1019 AsyncFile *pFile = p->pFile;
1020 int delFlag = ((p->iOffset)?1:0);
1021 OsFile *pBase = 0;
drh97bbdc02006-02-13 18:35:06 +00001022 TRACE(("OPEN %s delFlag=%d\n", p->zBuf, delFlag));
danielk1977750b03e2006-02-14 10:48:39 +00001023 assert(pFile->pBaseRead==0 && pFile->pBaseWrite==0);
drh23669402006-01-09 17:29:52 +00001024 rc = xOrigOpenExclusive(p->zBuf, &pBase, delFlag);
1025 assert( holdingMutex==0 );
1026 pthread_mutex_lock(&async.queueMutex);
1027 holdingMutex = 1;
1028 if( rc==SQLITE_OK ){
danielk1977750b03e2006-02-14 10:48:39 +00001029 pFile->pBaseWrite = pBase;
drh23669402006-01-09 17:29:52 +00001030 }
1031 break;
1032 }
1033
1034 default: assert(!"Illegal value for AsyncWrite.op");
1035 }
1036
1037 /* If we didn't hang on to the mutex during the IO op, obtain it now
1038 ** so that the AsyncWrite structure can be safely removed from the
1039 ** global write-op queue.
1040 */
1041 if( !holdingMutex ){
1042 pthread_mutex_lock(&async.queueMutex);
1043 holdingMutex = 1;
1044 }
drh99681db2006-02-13 15:29:32 +00001045 /* TRACE(("UNLINK %p\n", p)); */
drh4b74b262006-02-13 13:50:55 +00001046 if( p==async.pQueueLast ){
1047 async.pQueueLast = 0;
drh23669402006-01-09 17:29:52 +00001048 }
drh4b74b262006-02-13 13:50:55 +00001049 async.pQueueFirst = p->pNext;
drh5c323542006-02-13 13:23:57 +00001050 sqlite3OsFree(p);
drh23669402006-01-09 17:29:52 +00001051 assert( holdingMutex );
1052
danielk1977be29bfc2006-02-14 13:25:43 +00001053 /* An IO error has occured. We cannot report the error back to the
1054 ** connection that requested the I/O since the error happened
1055 ** asynchronously. The connection has already moved on. There
1056 ** really is nobody to report the error to.
1057 **
1058 ** The file for which the error occured may have been a database or
1059 ** journal file. Regardless, none of the currently queued operations
1060 ** associated with the same database should now be performed. Nor should
1061 ** any subsequently requested IO on either a database or journal file
1062 ** handle for the same database be accepted until the main database
1063 ** file handle has been closed and reopened.
1064 **
1065 ** Furthermore, no further IO should be queued or performed on any file
1066 ** handle associated with a database that may have been part of a
1067 ** multi-file transaction that included the database associated with
1068 ** the IO error (i.e. a database ATTACHed to the same handle at some
1069 ** point in time).
1070 */
1071 if( rc!=SQLITE_OK ){
1072 async.ioError = rc;
1073 }
1074
drh23669402006-01-09 17:29:52 +00001075 /* Drop the queue mutex before continuing to the next write operation
1076 ** in order to give other threads a chance to work with the write queue.
1077 */
danielk1977be29bfc2006-02-14 13:25:43 +00001078 if( !async.pQueueFirst || !async.ioError ){
danielk19772d9fcaa2006-02-14 14:02:08 +00001079 sqlite3ApiExit(0, 0);
danielk1977be29bfc2006-02-14 13:25:43 +00001080 pthread_mutex_unlock(&async.queueMutex);
1081 holdingMutex = 0;
1082 if( async.ioDelay>0 ){
1083 sqlite3OsSleep(async.ioDelay);
1084 }else{
1085 sched_yield();
1086 }
drh23669402006-01-09 17:29:52 +00001087 }
1088 }
danielk1977be29bfc2006-02-14 13:25:43 +00001089
drh23669402006-01-09 17:29:52 +00001090 pthread_mutex_unlock(&async.writerMutex);
1091 return 0;
1092}
1093
1094/**************************************************************************
1095** The remaining code defines a Tcl interface for testing the asynchronous
1096** IO implementation in this file.
1097**
1098** To adapt the code to a non-TCL environment, delete or comment out
1099** the code that follows.
1100*/
1101
1102/*
1103** sqlite3async_enable ?YES/NO?
1104**
1105** Enable or disable the asynchronous I/O backend. This command is
1106** not thread-safe. Do not call it while any database connections
1107** are open.
1108*/
1109static int testAsyncEnable(
1110 void * clientData,
1111 Tcl_Interp *interp,
1112 int objc,
1113 Tcl_Obj *CONST objv[]
1114){
1115 if( objc!=1 && objc!=2 ){
1116 Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?");
1117 return TCL_ERROR;
1118 }
1119 if( objc==1 ){
1120 Tcl_SetObjResult(interp, Tcl_NewBooleanObj(xOrigOpenReadWrite!=0));
1121 }else{
1122 int en;
1123 if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR;
1124 asyncEnable(en);
1125 }
1126 return TCL_OK;
1127}
1128
1129/*
1130** sqlite3async_halt "now"|"idle"|"never"
1131**
1132** Set the conditions at which the writer thread will halt.
1133*/
1134static int testAsyncHalt(
1135 void * clientData,
1136 Tcl_Interp *interp,
1137 int objc,
1138 Tcl_Obj *CONST objv[]
1139){
1140 const char *zCond;
1141 if( objc!=2 ){
1142 Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\"");
1143 return TCL_ERROR;
1144 }
1145 zCond = Tcl_GetString(objv[1]);
1146 if( strcmp(zCond, "now")==0 ){
1147 async.writerHaltNow = 1;
1148 pthread_cond_broadcast(&async.queueSignal);
1149 }else if( strcmp(zCond, "idle")==0 ){
1150 async.writerHaltWhenIdle = 1;
1151 async.writerHaltNow = 0;
1152 pthread_cond_broadcast(&async.queueSignal);
1153 }else if( strcmp(zCond, "never")==0 ){
1154 async.writerHaltWhenIdle = 0;
1155 async.writerHaltNow = 0;
1156 }else{
1157 Tcl_AppendResult(interp,
1158 "should be one of: \"now\", \"idle\", or \"never\"", (char*)0);
1159 return TCL_ERROR;
1160 }
1161 return TCL_OK;
1162}
1163
1164/*
1165** sqlite3async_delay ?MS?
1166**
1167** Query or set the number of milliseconds of delay in the writer
1168** thread after each write operation. The default is 0. By increasing
1169** the memory delay we can simulate the effect of slow disk I/O.
1170*/
1171static int testAsyncDelay(
1172 void * clientData,
1173 Tcl_Interp *interp,
1174 int objc,
1175 Tcl_Obj *CONST objv[]
1176){
1177 if( objc!=1 && objc!=2 ){
1178 Tcl_WrongNumArgs(interp, 1, objv, "?MS?");
1179 return TCL_ERROR;
1180 }
1181 if( objc==1 ){
1182 Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay));
1183 }else{
1184 int ioDelay;
1185 if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR;
1186 async.ioDelay = ioDelay;
1187 }
1188 return TCL_OK;
1189}
1190
1191/*
1192** sqlite3async_start
1193**
1194** Start a new writer thread.
1195*/
1196static int testAsyncStart(
1197 void * clientData,
1198 Tcl_Interp *interp,
1199 int objc,
1200 Tcl_Obj *CONST objv[]
1201){
1202 pthread_t x;
1203 int rc;
1204 rc = pthread_create(&x, 0, asyncWriterThread, 0);
1205 if( rc ){
1206 Tcl_AppendResult(interp, "failed to create the thread", 0);
1207 return TCL_ERROR;
1208 }
1209 pthread_detach(x);
1210 return TCL_OK;
1211}
1212
1213/*
1214** sqlite3async_wait
1215**
1216** Wait for the current writer thread to terminate.
1217**
1218** If the current writer thread is set to run forever then this
1219** command would block forever. To prevent that, an error is returned.
1220*/
1221static int testAsyncWait(
1222 void * clientData,
1223 Tcl_Interp *interp,
1224 int objc,
1225 Tcl_Obj *CONST objv[]
1226){
drh89ea9312006-02-13 17:03:47 +00001227 int cnt = 10;
drh23669402006-01-09 17:29:52 +00001228 if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){
1229 Tcl_AppendResult(interp, "would block forever", (char*)0);
1230 return TCL_ERROR;
1231 }
danielk1977750b03e2006-02-14 10:48:39 +00001232
drh89ea9312006-02-13 17:03:47 +00001233 while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){
1234 pthread_mutex_unlock(&async.writerMutex);
1235 sched_yield();
1236 }
1237 if( cnt>=0 ){
1238 TRACE(("WAIT\n"));
1239 pthread_mutex_lock(&async.queueMutex);
1240 pthread_cond_broadcast(&async.queueSignal);
1241 pthread_mutex_unlock(&async.queueMutex);
1242 pthread_mutex_lock(&async.writerMutex);
1243 pthread_mutex_unlock(&async.writerMutex);
1244 }else{
drh97bbdc02006-02-13 18:35:06 +00001245 TRACE(("NO-WAIT\n"));
drh89ea9312006-02-13 17:03:47 +00001246 }
drh23669402006-01-09 17:29:52 +00001247 return TCL_OK;
1248}
1249
1250
1251#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1252
1253/*
1254** This routine registers the custom TCL commands defined in this
1255** module. This should be the only procedure visible from outside
1256** of this module.
1257*/
1258int Sqlitetestasync_Init(Tcl_Interp *interp){
1259#if OS_UNIX && THREADSAFE && defined(SQLITE_ENABLE_REDEF_IO)
1260 Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0);
1261 Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0);
1262 Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0);
1263 Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0);
1264 Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0);
drh99681db2006-02-13 15:29:32 +00001265 Tcl_LinkVar(interp, "sqlite3async_trace",
1266 (char*)&sqlite3async_trace, TCL_LINK_INT);
drh23669402006-01-09 17:29:52 +00001267#endif /* OS_UNIX and THREADSAFE and defined(SQLITE_ENABLE_REDEF_IO) */
1268 return TCL_OK;
1269}