Stop future writes if a log file Close() fails.

See https://github.com/google/leveldb/issues/1081

PiperOrigin-RevId: 499519182
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 1a4e459..1ec2afb 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -1368,8 +1368,22 @@
         versions_->ReuseFileNumber(new_log_number);
         break;
       }
+
       delete log_;
+
+      s = logfile_->Close();
+      if (!s.ok()) {
+        // We may have lost some data written to the previous log file.
+        // Switch to the new log file anyway, but record as a background
+        // error so we do not attempt any more writes.
+        //
+        // We could perhaps attempt to save the memtable corresponding
+        // to log file and suppress the error if that works, but that
+        // would add more complexity in a critical code path.
+        RecordBackgroundError(s);
+      }
       delete logfile_;
+
       logfile_ = lfile;
       logfile_number_ = new_log_number;
       log_ = new log::Writer(lfile);
diff --git a/db/db_test.cc b/db/db_test.cc
index 9bd6e14..472258b 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -65,6 +65,19 @@
 void DelayMilliseconds(int millis) {
   Env::Default()->SleepForMicroseconds(millis * 1000);
 }
+
+bool IsLdbFile(const std::string& f) {
+  return strstr(f.c_str(), ".ldb") != nullptr;
+}
+
+bool IsLogFile(const std::string& f) {
+  return strstr(f.c_str(), ".log") != nullptr;
+}
+
+bool IsManifestFile(const std::string& f) {
+  return strstr(f.c_str(), "MANIFEST") != nullptr;
+}
+
 }  // namespace
 
 // Test Env to override default Env behavior for testing.
@@ -100,6 +113,10 @@
 // Special Env used to delay background operations.
 class SpecialEnv : public EnvWrapper {
  public:
+  // For historical reasons, the std::atomic<> fields below are currently
+  // accessed via acquired loads and release stores. We should switch
+  // to plain load(), store() calls that provide sequential consistency.
+
   // sstable/log Sync() calls are blocked while this pointer is non-null.
   std::atomic<bool> delay_data_sync_;
 
@@ -118,6 +135,9 @@
   // Force write to manifest files to fail while this pointer is non-null.
   std::atomic<bool> manifest_write_error_;
 
+  // Force log file close to fail while this bool is true.
+  std::atomic<bool> log_file_close_;
+
   bool count_random_reads_;
   AtomicCounter random_read_counter_;
 
@@ -129,6 +149,7 @@
         non_writable_(false),
         manifest_sync_error_(false),
         manifest_write_error_(false),
+        log_file_close_(false),
         count_random_reads_(false) {}
 
   Status NewWritableFile(const std::string& f, WritableFile** r) {
@@ -136,9 +157,12 @@
      private:
       SpecialEnv* const env_;
       WritableFile* const base_;
+      const std::string fname_;
 
      public:
-      DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) {}
+      DataFile(SpecialEnv* env, WritableFile* base, const std::string& fname)
+          : env_(env), base_(base), fname_(fname) {}
+
       ~DataFile() { delete base_; }
       Status Append(const Slice& data) {
         if (env_->no_space_.load(std::memory_order_acquire)) {
@@ -148,7 +172,14 @@
           return base_->Append(data);
         }
       }
-      Status Close() { return base_->Close(); }
+      Status Close() {
+        Status s = base_->Close();
+        if (s.ok() && IsLogFile(fname_) &&
+            env_->log_file_close_.load(std::memory_order_acquire)) {
+          s = Status::IOError("simulated log file Close error");
+        }
+        return s;
+      }
       Status Flush() { return base_->Flush(); }
       Status Sync() {
         if (env_->data_sync_error_.load(std::memory_order_acquire)) {
@@ -192,10 +223,9 @@
 
     Status s = target()->NewWritableFile(f, r);
     if (s.ok()) {
-      if (strstr(f.c_str(), ".ldb") != nullptr ||
-          strstr(f.c_str(), ".log") != nullptr) {
-        *r = new DataFile(this, *r);
-      } else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
+      if (IsLdbFile(f) || IsLogFile(f)) {
+        *r = new DataFile(this, *r, f);
+      } else if (IsManifestFile(f)) {
         *r = new ManifestFile(this, *r);
       }
     }
@@ -1941,6 +1971,33 @@
   delete options.filter_policy;
 }
 
+TEST_F(DBTest, LogCloseError) {
+  // Regression test for bug where we could ignore log file
+  // Close() error when switching to a new log file.
+  const int kValueSize = 20000;
+  const int kWriteCount = 10;
+  const int kWriteBufferSize = (kValueSize * kWriteCount) / 2;
+
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = kWriteBufferSize;  // Small write buffer
+  Reopen(&options);
+  env_->log_file_close_.store(true, std::memory_order_release);
+
+  std::string value(kValueSize, 'x');
+  Status s;
+  for (int i = 0; i < kWriteCount && s.ok(); i++) {
+    s = Put(Key(i), value);
+  }
+  ASSERT_TRUE(!s.ok()) << "succeeded even after log file Close failure";
+
+  // Future writes should also fail after an earlier error.
+  s = Put("hello", "world");
+  ASSERT_TRUE(!s.ok()) << "write succeeded after log file Close failure";
+
+  env_->log_file_close_.store(false, std::memory_order_release);
+}
+
 // Multi-threaded test:
 namespace {