|
47 | 47 | #include <thread> |
48 | 48 | #include <utility> |
49 | 49 | #include <vector> |
| 50 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 51 | +# include <exception> |
| 52 | +#endif |
50 | 53 |
|
51 | 54 | #if !defined(__CYGWIN__) && defined(_WIN32) |
52 | 55 | # ifndef WIN32_LEAN_AND_MEAN |
@@ -1424,71 +1427,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender |
1424 | 1427 | return; |
1425 | 1428 | } |
1426 | 1429 |
|
1427 | | - std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock}; |
1428 | | - if (file_->background_flush_thread) |
| 1430 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 1431 | + try |
1429 | 1432 | { |
1430 | | - return; |
1431 | | - } |
1432 | | - |
1433 | | - std::shared_ptr<FileStats> concurrency_file = file_; |
1434 | | - std::chrono::microseconds flush_interval = options_.flush_interval; |
1435 | | - file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() { |
1436 | | - std::chrono::system_clock::time_point last_free_job_timepoint = |
1437 | | - std::chrono::system_clock::now(); |
1438 | | - std::size_t last_record_count = 0; |
| 1433 | +#endif |
1439 | 1434 |
|
1440 | | - while (true) |
| 1435 | + std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock}; |
| 1436 | + if (file_->background_flush_thread) |
1441 | 1437 | { |
1442 | | - std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); |
1443 | | - // Exit flush thread if there is not data to flush more than one minute. |
1444 | | - if (now - last_free_job_timepoint > std::chrono::minutes{1}) |
1445 | | - { |
1446 | | - break; |
1447 | | - } |
| 1438 | + return; |
| 1439 | + } |
1448 | 1440 |
|
1449 | | - if (concurrency_file->is_shutdown.load(std::memory_order_acquire)) |
1450 | | - { |
1451 | | - break; |
1452 | | - } |
| 1441 | + std::shared_ptr<FileStats> concurrency_file = file_; |
| 1442 | + std::chrono::microseconds flush_interval = options_.flush_interval; |
| 1443 | + file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() { |
| 1444 | + std::chrono::system_clock::time_point last_free_job_timepoint = |
| 1445 | + std::chrono::system_clock::now(); |
| 1446 | + std::size_t last_record_count = 0; |
1453 | 1447 |
|
| 1448 | + while (true) |
1454 | 1449 | { |
1455 | | - std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock); |
1456 | | - concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval); |
1457 | | - } |
| 1450 | + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); |
| 1451 | + // Exit flush thread if there is not data to flush more than one minute. |
| 1452 | + if (now - last_free_job_timepoint > std::chrono::minutes{1}) |
| 1453 | + { |
| 1454 | + break; |
| 1455 | + } |
1458 | 1456 |
|
1459 | | - { |
1460 | | - std::size_t current_record_count = |
1461 | | - concurrency_file->record_count.load(std::memory_order_acquire); |
1462 | | - std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock}; |
1463 | | - if (current_record_count != last_record_count) |
| 1457 | + if (concurrency_file->is_shutdown.load(std::memory_order_acquire)) |
1464 | 1458 | { |
1465 | | - last_record_count = current_record_count; |
1466 | | - last_free_job_timepoint = std::chrono::system_clock::now(); |
| 1459 | + break; |
1467 | 1460 | } |
1468 | 1461 |
|
1469 | | - if (concurrency_file->current_file) |
1470 | 1462 | { |
1471 | | - fflush(concurrency_file->current_file.get()); |
| 1463 | + std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock); |
| 1464 | + concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval); |
1472 | 1465 | } |
1473 | 1466 |
|
1474 | | - concurrency_file->flushed_record_count.store(current_record_count, |
1475 | | - std::memory_order_release); |
1476 | | - } |
| 1467 | + { |
| 1468 | + std::size_t current_record_count = |
| 1469 | + concurrency_file->record_count.load(std::memory_order_acquire); |
| 1470 | + std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock}; |
| 1471 | + if (current_record_count != last_record_count) |
| 1472 | + { |
| 1473 | + last_record_count = current_record_count; |
| 1474 | + last_free_job_timepoint = std::chrono::system_clock::now(); |
| 1475 | + } |
1477 | 1476 |
|
1478 | | - concurrency_file->background_thread_waiter_cv.notify_all(); |
1479 | | - } |
| 1477 | + if (concurrency_file->current_file) |
| 1478 | + { |
| 1479 | + fflush(concurrency_file->current_file.get()); |
| 1480 | + } |
1480 | 1481 |
|
1481 | | - // Detach running thread because it will exit soon |
1482 | | - std::unique_ptr<std::thread> background_flush_thread; |
1483 | | - { |
1484 | | - std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock}; |
1485 | | - background_flush_thread.swap(concurrency_file->background_flush_thread); |
1486 | | - } |
1487 | | - if (background_flush_thread && background_flush_thread->joinable()) |
1488 | | - { |
1489 | | - background_flush_thread->detach(); |
1490 | | - } |
1491 | | - })); |
| 1482 | + concurrency_file->flushed_record_count.store(current_record_count, |
| 1483 | + std::memory_order_release); |
| 1484 | + } |
| 1485 | + |
| 1486 | + concurrency_file->background_thread_waiter_cv.notify_all(); |
| 1487 | + } |
| 1488 | + |
| 1489 | + // Detach running thread because it will exit soon |
| 1490 | + std::unique_ptr<std::thread> background_flush_thread; |
| 1491 | + { |
| 1492 | + std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock}; |
| 1493 | + background_flush_thread.swap(concurrency_file->background_flush_thread); |
| 1494 | + } |
| 1495 | + if (background_flush_thread && background_flush_thread->joinable()) |
| 1496 | + { |
| 1497 | + background_flush_thread->detach(); |
| 1498 | + } |
| 1499 | + })); |
| 1500 | +#if OPENTELEMETRY_HAVE_EXCEPTIONS |
| 1501 | + } |
| 1502 | + catch (std::exception &e) |
| 1503 | + { |
| 1504 | + OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: " |
| 1505 | + << e.what() << ".Data writing may experience some delays."); |
| 1506 | + } |
| 1507 | + catch (...) |
| 1508 | + { |
| 1509 | + OTEL_INTERNAL_LOG_WARN( |
| 1510 | + "[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may " |
| 1511 | + "experience some delays."); |
| 1512 | + } |
| 1513 | +#endif |
1492 | 1514 | } |
1493 | 1515 |
|
1494 | 1516 | private: |
|
0 commit comments