mirror of
https://git.freebsd.org/ports.git
synced 2025-05-08 03:40:46 -04:00
1111 lines
47 KiB
Text
1111 lines
47 KiB
Text
- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+
|
|
|
|
diff --git CMakeLists.txt CMakeLists.txt
|
|
index b0046534..2efeec89 100644
|
|
--- CMakeLists.txt
|
|
+++ CMakeLists.txt
|
|
@@ -19,15 +19,16 @@
|
|
|
|
cmake_minimum_required(VERSION 3.13)
|
|
|
|
-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
|
|
-
|
|
option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
|
|
if (INTEGRATE_VCPKG)
|
|
- set(USE_ASIO ON)
|
|
+ option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
|
|
if (NOT CMAKE_TOOLCHAIN_FILE)
|
|
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
|
|
endif ()
|
|
+else ()
|
|
+ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
|
|
endif ()
|
|
+message(STATUS "USE_ASIO: ${USE_ASIO}")
|
|
|
|
option(BUILD_TESTS "Build tests" ON)
|
|
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
|
|
diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc
|
|
index 7233b2c9..bc8da970 100644
|
|
--- lib/AckGroupingTrackerEnabled.cc
|
|
+++ lib/AckGroupingTrackerEnabled.cc
|
|
@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() {
|
|
this->flush();
|
|
std::lock_guard<std::mutex> lock(this->mutexTimer_);
|
|
if (this->timer_) {
|
|
- ASIO_ERROR ec;
|
|
- this->timer_->cancel(ec);
|
|
+ this->timer_->cancel();
|
|
}
|
|
}
|
|
|
|
@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
|
|
|
|
std::lock_guard<std::mutex> lock(this->mutexTimer_);
|
|
this->timer_ = this->executor_->createDeadlineTimer();
|
|
- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
|
|
+ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
|
|
auto self = shared_from_this();
|
|
this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
|
|
if (!ec) {
|
|
diff --git lib/ClientConnection.cc lib/ClientConnection.cc
|
|
index 2037722f..de226a85 100644
|
|
--- lib/ClientConnection.cc
|
|
+++ lib/ClientConnection.cc
|
|
@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
|
|
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
|
|
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
|
|
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
|
|
- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
|
|
+ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
|
|
}
|
|
|
|
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
|
|
@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
|
|
// Only send keep-alive probes if the broker supports it
|
|
keepAliveTimer_ = executor_->createDeadlineTimer();
|
|
if (keepAliveTimer_) {
|
|
- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
|
|
+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
|
|
auto weakSelf = weak_from_this();
|
|
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
|
|
auto self = weakSelf.lock();
|
|
@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
|
|
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
|
|
// Check if we have a timer still before we set the request timer to pop again.
|
|
if (consumerStatsRequestTimer_) {
|
|
- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
|
|
+ consumerStatsRequestTimer_->expires_after(operationsTimeout_);
|
|
auto weakSelf = weak_from_this();
|
|
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
|
|
auto self = weakSelf.lock();
|
|
@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee
|
|
typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
|
|
#endif
|
|
|
|
-/*
|
|
- * TCP Connect handler
|
|
- *
|
|
- * if async_connect without any error, connected_ would be set to true
|
|
- * at this point the connection is deemed valid to be used by clients of this class
|
|
- */
|
|
-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
|
|
- if (!err) {
|
|
- std::stringstream cnxStringStream;
|
|
- try {
|
|
- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
|
|
- << "] ";
|
|
- cnxString_ = cnxStringStream.str();
|
|
- } catch (const ASIO_SYSTEM_ERROR& e) {
|
|
- LOG_ERROR("Failed to get endpoint: " << e.what());
|
|
- close(ResultRetryable);
|
|
- return;
|
|
- }
|
|
- if (logicalAddress_ == physicalAddress_) {
|
|
- LOG_INFO(cnxString_ << "Connected to broker");
|
|
- } else {
|
|
- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
|
|
- << ", proxy: " << proxyServiceUrl_
|
|
- << ", physical address:" << physicalAddress_);
|
|
- }
|
|
+void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) {
|
|
+ std::stringstream cnxStringStream;
|
|
+ try {
|
|
+ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
|
|
+ cnxString_ = cnxStringStream.str();
|
|
+ } catch (const ASIO_SYSTEM_ERROR& e) {
|
|
+ LOG_ERROR("Failed to get endpoint: " << e.what());
|
|
+ close(ResultRetryable);
|
|
+ return;
|
|
+ }
|
|
+ if (logicalAddress_ == physicalAddress_) {
|
|
+ LOG_INFO(cnxString_ << "Connected to broker");
|
|
+ } else {
|
|
+ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
|
|
+ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_);
|
|
+ }
|
|
|
|
- Lock lock(mutex_);
|
|
- if (isClosed()) {
|
|
- LOG_INFO(cnxString_ << "Connection already closed");
|
|
- return;
|
|
- }
|
|
- state_ = TcpConnected;
|
|
- lock.unlock();
|
|
+ Lock lock(mutex_);
|
|
+ if (isClosed()) {
|
|
+ LOG_INFO(cnxString_ << "Connection already closed");
|
|
+ return;
|
|
+ }
|
|
+ state_ = TcpConnected;
|
|
+ lock.unlock();
|
|
|
|
- ASIO_ERROR error;
|
|
- socket_->set_option(tcp::no_delay(true), error);
|
|
- if (error) {
|
|
- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
|
|
- }
|
|
+ ASIO_ERROR error;
|
|
+ socket_->set_option(tcp::no_delay(true), error);
|
|
+ if (error) {
|
|
+ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
|
|
+ }
|
|
|
|
- socket_->set_option(tcp::socket::keep_alive(true), error);
|
|
- if (error) {
|
|
- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
|
|
- }
|
|
+ socket_->set_option(tcp::socket::keep_alive(true), error);
|
|
+ if (error) {
|
|
+ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
|
|
+ }
|
|
|
|
- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
|
|
- // should never happen, given that we're sending our own keep-alive probes (within the TCP
|
|
- // connection) every 30 seconds
|
|
- socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
|
|
- if (error) {
|
|
- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
|
|
- }
|
|
+ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
|
|
+ // should never happen, given that we're sending our own keep-alive probes (within the TCP
|
|
+ // connection) every 30 seconds
|
|
+ socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
|
|
+ if (error) {
|
|
+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
|
|
+ }
|
|
|
|
- // Send up to 10 probes before declaring the connection broken
|
|
- socket_->set_option(tcp_keep_alive_count(10), error);
|
|
- if (error) {
|
|
- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
|
|
- }
|
|
+ // Send up to 10 probes before declaring the connection broken
|
|
+ socket_->set_option(tcp_keep_alive_count(10), error);
|
|
+ if (error) {
|
|
+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
|
|
+ }
|
|
|
|
- // Interval between probes: 6 seconds
|
|
- socket_->set_option(tcp_keep_alive_interval(6), error);
|
|
- if (error) {
|
|
- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
|
|
- }
|
|
+ // Interval between probes: 6 seconds
|
|
+ socket_->set_option(tcp_keep_alive_interval(6), error);
|
|
+ if (error) {
|
|
+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
|
|
+ }
|
|
|
|
- if (tlsSocket_) {
|
|
- if (!isTlsAllowInsecureConnection_) {
|
|
- ASIO_ERROR err;
|
|
- Url service_url;
|
|
- if (!Url::parse(physicalAddress_, service_url)) {
|
|
- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
|
|
- close();
|
|
- return;
|
|
- }
|
|
- }
|
|
- auto weakSelf = weak_from_this();
|
|
- auto socket = socket_;
|
|
- auto tlsSocket = tlsSocket_;
|
|
- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
|
|
- // fault might happen
|
|
- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
|
|
- auto self = weakSelf.lock();
|
|
- if (self) {
|
|
- self->handleHandshake(err);
|
|
- }
|
|
- };
|
|
- tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
|
|
- ASIO::bind_executor(strand_, callback));
|
|
- } else {
|
|
- handleHandshake(ASIO_SUCCESS);
|
|
- }
|
|
- } else if (endpointIterator != tcp::resolver::iterator()) {
|
|
- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
|
|
- // The connection failed. Try the next endpoint in the list.
|
|
- ASIO_ERROR closeError;
|
|
- socket_->close(closeError); // ignore the error of close
|
|
- if (closeError) {
|
|
- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
|
|
- }
|
|
- connectTimeoutTask_->stop();
|
|
- ++endpointIterator;
|
|
- if (endpointIterator != tcp::resolver::iterator()) {
|
|
- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
|
|
- connectTimeoutTask_->start();
|
|
- tcp::endpoint endpoint = *endpointIterator;
|
|
- auto weakSelf = weak_from_this();
|
|
- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
|
|
- auto self = weakSelf.lock();
|
|
- if (self) {
|
|
- self->handleTcpConnected(err, endpointIterator);
|
|
- }
|
|
- });
|
|
- } else {
|
|
- if (err == ASIO::error::operation_aborted) {
|
|
- // TCP connect timeout, which is not retryable
|
|
+ if (tlsSocket_) {
|
|
+ if (!isTlsAllowInsecureConnection_) {
|
|
+ ASIO_ERROR err;
|
|
+ Url service_url;
|
|
+ if (!Url::parse(physicalAddress_, service_url)) {
|
|
+ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
|
|
close();
|
|
- } else {
|
|
- close(ResultRetryable);
|
|
+ return;
|
|
}
|
|
}
|
|
+ auto weakSelf = weak_from_this();
|
|
+ auto socket = socket_;
|
|
+ auto tlsSocket = tlsSocket_;
|
|
+ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
|
|
+ // fault might happen
|
|
+ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
|
|
+ auto self = weakSelf.lock();
|
|
+ if (self) {
|
|
+ self->handleHandshake(err);
|
|
+ }
|
|
+ };
|
|
+ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
|
|
+ ASIO::bind_executor(strand_, callback));
|
|
} else {
|
|
- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
|
|
- close(ResultRetryable);
|
|
+ handleHandshake(ASIO_SUCCESS);
|
|
}
|
|
}
|
|
|
|
@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() {
|
|
}
|
|
|
|
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
|
|
- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
|
|
+ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port());
|
|
auto weakSelf = weak_from_this();
|
|
- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
|
|
- auto self = weakSelf.lock();
|
|
- if (self) {
|
|
- self->handleResolve(err, iterator);
|
|
- }
|
|
- });
|
|
+ resolver_->async_resolve(
|
|
+ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
|
|
+ auto self = weakSelf.lock();
|
|
+ if (!self) {
|
|
+ return;
|
|
+ }
|
|
+ if (err) {
|
|
+ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
|
|
+ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
|
|
+ close();
|
|
+ return;
|
|
+ }
|
|
+ if (results.empty()) {
|
|
+ LOG_ERROR(cnxString_ << "No IP address found");
|
|
+ close();
|
|
+ return;
|
|
+ }
|
|
+ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
|
|
+ ClientConnectionPtr ptr = weakSelf.lock();
|
|
+ if (!ptr) {
|
|
+ // Connection was already destroyed
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (ptr->state_ != Ready) {
|
|
+ LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
|
|
+ << ptr->connectTimeoutTask_->getPeriodMs()
|
|
+ << " ms, close the socket");
|
|
+ PeriodicTask::ErrorCode err;
|
|
+ ptr->socket_->close(err);
|
|
+ if (err) {
|
|
+ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
|
|
+ }
|
|
+ }
|
|
+ ptr->connectTimeoutTask_->stop();
|
|
+ });
|
|
+ connectTimeoutTask_->start();
|
|
+ std::vector<tcp::resolver::endpoint_type> endpoints;
|
|
+ for (const auto& result : results) {
|
|
+ endpoints.emplace_back(result.endpoint());
|
|
+ }
|
|
+ asyncConnect(endpoints, 0);
|
|
+ });
|
|
}
|
|
|
|
-void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
|
|
- if (err) {
|
|
- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
|
|
- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
|
|
- close();
|
|
+void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) {
|
|
+ if (index >= endpoints.size()) {
|
|
+ close(ResultRetryable);
|
|
return;
|
|
}
|
|
-
|
|
auto weakSelf = weak_from_this();
|
|
- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
|
|
- ClientConnectionPtr ptr = weakSelf.lock();
|
|
- if (!ptr) {
|
|
- // Connection was already destroyed
|
|
+ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) {
|
|
+ auto self = weakSelf.lock();
|
|
+ if (!self) {
|
|
return;
|
|
}
|
|
-
|
|
- if (ptr->state_ != Ready) {
|
|
- LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
|
|
- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
|
|
- PeriodicTask::ErrorCode err;
|
|
- ptr->socket_->close(err);
|
|
- if (err) {
|
|
- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
|
|
- }
|
|
+ if (err) {
|
|
+ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
|
|
+ asyncConnect(endpoints, index + 1);
|
|
+ return;
|
|
}
|
|
- ptr->connectTimeoutTask_->stop();
|
|
+ completeConnect(endpoints[index]);
|
|
});
|
|
-
|
|
- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
|
|
- connectTimeoutTask_->start();
|
|
- if (endpointIterator != tcp::resolver::iterator()) {
|
|
- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
|
|
- << " to " << endpointIterator->endpoint());
|
|
- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
|
|
- auto self = weakSelf.lock();
|
|
- if (self) {
|
|
- self->handleTcpConnected(err, endpointIterator);
|
|
- }
|
|
- });
|
|
- } else {
|
|
- LOG_WARN(cnxString_ << "No IP address found");
|
|
- close();
|
|
- return;
|
|
- }
|
|
}
|
|
|
|
void ClientConnection::readNextCommand() {
|
|
@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
|
|
LookupRequestData requestData;
|
|
requestData.promise = promise;
|
|
requestData.timer = executor_->createDeadlineTimer();
|
|
- requestData.timer->expires_from_now(operationsTimeout_);
|
|
+ requestData.timer->expires_after(operationsTimeout_);
|
|
auto weakSelf = weak_from_this();
|
|
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() {
|
|
PairSharedBuffer buffer =
|
|
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
|
|
|
|
- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
|
|
- // callback is called, an invalid buffer range might be passed to the underlying socket send.
|
|
+ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before
|
|
+ // the callback is called, an invalid buffer range might be passed to the underlying socket
|
|
+ // send.
|
|
asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
|
|
handleSendPair(err);
|
|
}));
|
|
@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
|
|
|
|
PendingRequestData requestData;
|
|
requestData.timer = executor_->createDeadlineTimer();
|
|
- requestData.timer->expires_from_now(operationsTimeout_);
|
|
+ requestData.timer->expires_after(operationsTimeout_);
|
|
auto weakSelf = weak_from_this();
|
|
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() {
|
|
// be zero And we do not attempt to dereference the pointer.
|
|
Lock lock(mutex_);
|
|
if (keepAliveTimer_) {
|
|
- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
|
|
+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
|
|
auto weakSelf = weak_from_this();
|
|
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
|
|
auto self = weakSelf.lock();
|
|
@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
|
|
LastMessageIdRequestData requestData;
|
|
requestData.promise = promise;
|
|
requestData.timer = executor_->createDeadlineTimer();
|
|
- requestData.timer->expires_from_now(operationsTimeout_);
|
|
+ requestData.timer->expires_after(operationsTimeout_);
|
|
auto weakSelf = weak_from_this();
|
|
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
|
|
lock.unlock();
|
|
|
|
auto weakSelf = weak_from_this();
|
|
- timer->expires_from_now(operationsTimeout_);
|
|
+ timer->expires_after(operationsTimeout_);
|
|
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
if (!self) {
|
|
@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
|
|
auto it = pendingRequests_.find(requestId);
|
|
if (it != pendingRequests_.end()) {
|
|
it->second.promise.setFailed(ResultDisconnected);
|
|
- ASIO_ERROR ec;
|
|
- it->second.timer->cancel(ec);
|
|
+ it->second.timer->cancel();
|
|
pendingRequests_.erase(it);
|
|
}
|
|
}
|
|
diff --git lib/ClientConnection.h lib/ClientConnection.h
|
|
index 7646f85e..14e07652 100644
|
|
--- lib/ClientConnection.h
|
|
+++ lib/ClientConnection.h
|
|
@@ -25,13 +25,13 @@
|
|
#include <atomic>
|
|
#ifdef USE_ASIO
|
|
#include <asio/bind_executor.hpp>
|
|
-#include <asio/io_service.hpp>
|
|
+#include <asio/io_context.hpp>
|
|
#include <asio/ip/tcp.hpp>
|
|
#include <asio/ssl/stream.hpp>
|
|
#include <asio/strand.hpp>
|
|
#else
|
|
#include <boost/asio/bind_executor.hpp>
|
|
-#include <boost/asio/io_service.hpp>
|
|
+#include <boost/asio/io_context.hpp>
|
|
#include <boost/asio/ip/tcp.hpp>
|
|
#include <boost/asio/ssl/stream.hpp>
|
|
#include <boost/asio/strand.hpp>
|
|
@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
|
|
DeadlineTimerPtr timer;
|
|
};
|
|
|
|
- /*
|
|
- * handler for connectAsync
|
|
- * creates a ConnectionPtr which has a valid ClientConnection object
|
|
- * although not usable at this point, since this is just tcp connection
|
|
- * Pulsar - Connect/Connected has yet to happen
|
|
- */
|
|
- void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
|
|
+ void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index);
|
|
+ void completeConnect(ASIO::ip::tcp::endpoint endpoint);
|
|
|
|
void handleHandshake(const ASIO_ERROR& err);
|
|
|
|
@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
|
|
|
|
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
|
|
|
|
- void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
|
|
-
|
|
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
|
|
void handleSendPair(const ASIO_ERROR& err);
|
|
void sendPendingCommands();
|
|
@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
|
|
*/
|
|
SocketPtr socket_;
|
|
TlsSocketPtr tlsSocket_;
|
|
- ASIO::strand<ASIO::io_service::executor_type> strand_;
|
|
+ ASIO::strand<ASIO::io_context::executor_type> strand_;
|
|
|
|
const std::string logicalAddress_;
|
|
/*
|
|
diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc
|
|
index 250845b3..cfdb0b2d 100644
|
|
--- lib/ConsumerImpl.cc
|
|
+++ lib/ConsumerImpl.cc
|
|
@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b
|
|
}
|
|
|
|
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
|
|
- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
|
|
+ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
|
|
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
|
|
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
|
|
auto self = weakSelf.lock();
|
|
@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
|
|
}
|
|
remainTime -= next;
|
|
|
|
- timer->expires_from_now(next);
|
|
+ timer->expires_after(next);
|
|
|
|
auto self = shared_from_this();
|
|
timer->async_wait([this, backoff, remainTime, timer, next, callback,
|
|
@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
|
|
}
|
|
|
|
void ConsumerImpl::cancelTimers() noexcept {
|
|
- ASIO_ERROR ec;
|
|
- batchReceiveTimer_->cancel(ec);
|
|
- checkExpiredChunkedTimer_->cancel(ec);
|
|
+ batchReceiveTimer_->cancel();
|
|
+ checkExpiredChunkedTimer_->cancel();
|
|
unAckedMessageTrackerPtr_->stop();
|
|
consumerStatsBasePtr_->stop();
|
|
}
|
|
diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc
|
|
index 098f2d5b..76d99370 100644
|
|
--- lib/ConsumerImplBase.cc
|
|
+++ lib/ConsumerImplBase.cc
|
|
@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
|
|
|
|
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
|
|
if (timeoutMs > 0) {
|
|
- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
|
|
+ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
|
|
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
|
|
batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
diff --git lib/ExecutorService.cc lib/ExecutorService.cc
|
|
index 794e3619..7f2a2c14 100644
|
|
--- lib/ExecutorService.cc
|
|
+++ lib/ExecutorService.cc
|
|
@@ -18,6 +18,12 @@
|
|
*/
|
|
#include "ExecutorService.h"
|
|
|
|
+#ifdef USE_ASIO
|
|
+#include <asio/post.hpp>
|
|
+#else
|
|
+#include <boost/asio/post.hpp>
|
|
+#endif
|
|
+
|
|
#include "LogUtils.h"
|
|
#include "TimeUtils.h"
|
|
DECLARE_LOG_OBJECT()
|
|
@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); }
|
|
void ExecutorService::start() {
|
|
auto self = shared_from_this();
|
|
std::thread t{[this, self] {
|
|
- LOG_DEBUG("Run io_service in a single thread");
|
|
- ASIO_ERROR ec;
|
|
+ LOG_DEBUG("Run io_context in a single thread");
|
|
while (!closed_) {
|
|
- io_service_.restart();
|
|
- IOService::work work{getIOService()};
|
|
- io_service_.run(ec);
|
|
- }
|
|
- if (ec) {
|
|
- LOG_ERROR("Failed to run io_service: " << ec.message());
|
|
- } else {
|
|
- LOG_DEBUG("Event loop of ExecutorService exits successfully");
|
|
+ io_context_.restart();
|
|
+ auto work{ASIO::make_work_guard(io_context_)};
|
|
+ io_context_.run();
|
|
}
|
|
+ LOG_DEBUG("Event loop of ExecutorService exits successfully");
|
|
{
|
|
std::lock_guard<std::mutex> lock{mutex_};
|
|
ioServiceDone_ = true;
|
|
@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() {
|
|
}
|
|
|
|
/*
|
|
- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance
|
|
+ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance
|
|
* @ returns shared_ptr to this socket
|
|
*/
|
|
SocketPtr ExecutorService::createSocket() {
|
|
try {
|
|
- return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
|
|
+ return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
|
|
} catch (const ASIO_SYSTEM_ERROR &e) {
|
|
restart();
|
|
auto error = std::string("Failed to create socket: ") + e.what();
|
|
@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont
|
|
}
|
|
|
|
/*
|
|
- * factory method of Resolver object associated with io_service_ instance
|
|
+ * factory method of Resolver object associated with io_context_ instance
|
|
* @returns shraed_ptr to resolver object
|
|
*/
|
|
TcpResolverPtr ExecutorService::createTcpResolver() {
|
|
try {
|
|
- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
|
|
+ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
|
|
} catch (const ASIO_SYSTEM_ERROR &e) {
|
|
restart();
|
|
auto error = std::string("Failed to create resolver: ") + e.what();
|
|
@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
|
|
|
|
DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
|
|
try {
|
|
- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
|
|
+ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
|
|
} catch (const ASIO_SYSTEM_ERROR &e) {
|
|
restart();
|
|
auto error = std::string("Failed to create steady_timer: ") + e.what();
|
|
@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
|
|
}
|
|
}
|
|
|
|
-void ExecutorService::restart() { io_service_.stop(); }
|
|
+void ExecutorService::restart() { io_context_.stop(); }
|
|
|
|
void ExecutorService::close(long timeoutMs) {
|
|
bool expectedState = false;
|
|
@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) {
|
|
return;
|
|
}
|
|
if (timeoutMs == 0) { // non-blocking
|
|
- io_service_.stop();
|
|
+ io_context_.stop();
|
|
return;
|
|
}
|
|
|
|
std::unique_lock<std::mutex> lock{mutex_};
|
|
- io_service_.stop();
|
|
+ io_context_.stop();
|
|
if (timeoutMs > 0) {
|
|
cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; });
|
|
} else { // < 0
|
|
@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) {
|
|
}
|
|
}
|
|
|
|
-void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
|
|
+void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); }
|
|
|
|
/////////////////////
|
|
|
|
diff --git lib/ExecutorService.h lib/ExecutorService.h
|
|
index 89d06d30..626cb203 100644
|
|
--- lib/ExecutorService.h
|
|
+++ lib/ExecutorService.h
|
|
@@ -23,11 +23,11 @@
|
|
|
|
#include <atomic>
|
|
#ifdef USE_ASIO
|
|
-#include <asio/io_service.hpp>
|
|
+#include <asio/io_context.hpp>
|
|
#include <asio/ip/tcp.hpp>
|
|
#include <asio/ssl.hpp>
|
|
#else
|
|
-#include <boost/asio/io_service.hpp>
|
|
+#include <boost/asio/io_context.hpp>
|
|
#include <boost/asio/ip/tcp.hpp>
|
|
#include <boost/asio/ssl.hpp>
|
|
#endif
|
|
@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
|
|
typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
|
|
class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
|
|
public:
|
|
- using IOService = ASIO::io_service;
|
|
+ using IOService = ASIO::io_context;
|
|
using SharedPtr = std::shared_ptr<ExecutorService>;
|
|
|
|
static SharedPtr create();
|
|
@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
|
|
// See TimeoutProcessor for the semantics of the parameter.
|
|
void close(long timeoutMs = 3000);
|
|
|
|
- IOService &getIOService() { return io_service_; }
|
|
+ IOService &getIOService() { return io_context_; }
|
|
bool isClosed() const noexcept { return closed_; }
|
|
|
|
private:
|
|
/*
|
|
- * io_service is our interface to os, io object schedule async ops on this object
|
|
+ * io_context is our interface to os, io object schedule async ops on this object
|
|
*/
|
|
- IOService io_service_;
|
|
+ IOService io_context_;
|
|
|
|
std::atomic_bool closed_{false};
|
|
std::mutex mutex_;
|
|
diff --git lib/HandlerBase.cc lib/HandlerBase.cc
|
|
index 65aa0db1..71902481 100644
|
|
--- lib/HandlerBase.cc
|
|
+++ lib/HandlerBase.cc
|
|
@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
|
|
redirectedClusterURI_("") {}
|
|
|
|
HandlerBase::~HandlerBase() {
|
|
- ASIO_ERROR ignored;
|
|
- timer_->cancel(ignored);
|
|
- creationTimer_->cancel(ignored);
|
|
+ timer_->cancel();
|
|
+ creationTimer_->cancel();
|
|
}
|
|
|
|
void HandlerBase::start() {
|
|
@@ -61,15 +60,14 @@ void HandlerBase::start() {
|
|
if (state_.compare_exchange_strong(state, Pending)) {
|
|
grabCnx();
|
|
}
|
|
- creationTimer_->expires_from_now(operationTimeut_);
|
|
+ creationTimer_->expires_after(operationTimeut_);
|
|
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
|
|
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
|
|
auto self = weakSelf.lock();
|
|
if (self && !error) {
|
|
LOG_WARN("Cancel the pending reconnection due to the start timeout");
|
|
connectionFailed(ResultTimeout);
|
|
- ASIO_ERROR ignored;
|
|
- timer_->cancel(ignored);
|
|
+ timer_->cancel();
|
|
}
|
|
});
|
|
}
|
|
@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
|
|
connectionTimeMs_ =
|
|
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
|
|
// Prevent the creationTimer_ from cancelling the timer_ in future
|
|
- ASIO_ERROR ignored;
|
|
- creationTimer_->cancel(ignored);
|
|
+ creationTimer_->cancel();
|
|
LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
|
|
} else if (isResultRetryable(result)) {
|
|
scheduleReconnection();
|
|
@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
|
|
TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
|
|
|
|
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
|
|
- timer_->expires_from_now(delay);
|
|
+ timer_->expires_after(delay);
|
|
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
|
|
// so we will not run into the case where grabCnx is invoked on out of scope handler
|
|
auto name = getName();
|
|
diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc
|
|
index dddade5c..61fbf7b8 100644
|
|
--- lib/MultiTopicsConsumerImpl.cc
|
|
+++ lib/MultiTopicsConsumerImpl.cc
|
|
@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
|
|
return numberOfConnectedConsumer;
|
|
}
|
|
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
|
|
- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
|
|
+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
|
|
auto weakSelf = weak_from_this();
|
|
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
|
|
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
|
|
@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
|
|
|
|
void MultiTopicsConsumerImpl::cancelTimers() noexcept {
|
|
if (partitionsUpdateTimer_) {
|
|
- ASIO_ERROR ec;
|
|
- partitionsUpdateTimer_->cancel(ec);
|
|
+ partitionsUpdateTimer_->cancel();
|
|
}
|
|
}
|
|
|
|
diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc
|
|
index e443496d..e50b4ca2 100644
|
|
--- lib/NegativeAcksTracker.cc
|
|
+++ lib/NegativeAcksTracker.cc
|
|
@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() {
|
|
return;
|
|
}
|
|
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
|
|
- timer_->expires_from_now(timerInterval_);
|
|
+ timer_->expires_after(timerInterval_);
|
|
timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
|
|
if (auto self = weakSelf.lock()) {
|
|
self->handleTimer(ec);
|
|
@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
|
|
|
|
void NegativeAcksTracker::close() {
|
|
closed_ = true;
|
|
- ASIO_ERROR ec;
|
|
- timer_->cancel(ec);
|
|
+ timer_->cancel();
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
nackedMessages_.clear();
|
|
}
|
|
diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc
|
|
index 4178096c..923c038b 100644
|
|
--- lib/PartitionedProducerImpl.cc
|
|
+++ lib/PartitionedProducerImpl.cc
|
|
@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
|
|
|
|
void PartitionedProducerImpl::runPartitionUpdateTask() {
|
|
auto weakSelf = weak_from_this();
|
|
- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
|
|
+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
|
|
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
if (self) {
|
|
@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
|
|
|
|
void PartitionedProducerImpl::cancelTimers() noexcept {
|
|
if (partitionsUpdateTimer_) {
|
|
- ASIO_ERROR ec;
|
|
- partitionsUpdateTimer_->cancel(ec);
|
|
+ partitionsUpdateTimer_->cancel();
|
|
}
|
|
}
|
|
|
|
diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc
|
|
index 4fc7bb61..07d9a7bc 100644
|
|
--- lib/PatternMultiTopicsConsumerImpl.cc
|
|
+++ lib/PatternMultiTopicsConsumerImpl.cc
|
|
@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern()
|
|
|
|
void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
|
|
autoDiscoveryRunning_ = false;
|
|
- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
|
|
+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
|
|
|
|
auto weakSelf = weak_from_this();
|
|
autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
|
|
@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() {
|
|
LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
|
|
|
|
if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
|
|
- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
|
|
+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
|
|
auto weakSelf = weak_from_this();
|
|
autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
|
|
if (auto self = weakSelf.lock()) {
|
|
@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
|
|
MultiTopicsConsumerImpl::closeAsync(callback);
|
|
}
|
|
|
|
-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
|
|
- ASIO_ERROR ec;
|
|
- autoDiscoveryTimer_->cancel(ec);
|
|
-}
|
|
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); }
|
|
diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc
|
|
index 9fde012a..4b5f9621 100644
|
|
--- lib/PeriodicTask.cc
|
|
+++ lib/PeriodicTask.cc
|
|
@@ -29,7 +29,7 @@ void PeriodicTask::start() {
|
|
state_ = Ready;
|
|
if (periodMs_ >= 0) {
|
|
std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
|
|
- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
|
|
+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
|
|
timer_->async_wait([weakSelf](const ErrorCode& ec) {
|
|
auto self = weakSelf.lock();
|
|
if (self) {
|
|
@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
|
|
if (!state_.compare_exchange_strong(state, Closing)) {
|
|
return;
|
|
}
|
|
- ErrorCode ec;
|
|
- timer_->cancel(ec);
|
|
+ timer_->cancel();
|
|
state_ = Pending;
|
|
}
|
|
|
|
@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
|
|
// state_ may be changed in handleTimeout, so we check state_ again
|
|
if (state_ == Ready) {
|
|
auto self = shared_from_this();
|
|
- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
|
|
+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
|
|
timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
|
|
}
|
|
}
|
|
diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc
|
|
index 4399ce5f..8b112bf1 100644
|
|
--- lib/ProducerImpl.cc
|
|
+++ lib/ProducerImpl.cc
|
|
@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
|
|
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
|
|
bool isFull = batchMessageContainer_->add(msg, callback);
|
|
if (isFirstMessage) {
|
|
- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
|
|
+ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
|
|
auto weakSelf = weak_from_this();
|
|
batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() {
|
|
|
|
void ProducerImpl::cancelTimers() noexcept {
|
|
dataKeyRefreshTask_.stop();
|
|
- ASIO_ERROR ec;
|
|
- batchTimer_->cancel(ec);
|
|
- sendTimer_->cancel(ec);
|
|
+ batchTimer_->cancel();
|
|
+ sendTimer_->cancel();
|
|
}
|
|
|
|
bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
|
|
@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() {
|
|
}
|
|
|
|
void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
|
|
- sendTimer_->expires_from_now(expiryTime);
|
|
+ sendTimer_->expires_after(expiryTime);
|
|
|
|
auto weakSelf = weak_from_this();
|
|
sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
|
|
diff --git lib/RetryableOperation.h lib/RetryableOperation.h
|
|
index dba190f4..8a235d3a 100644
|
|
--- lib/RetryableOperation.h
|
|
+++ lib/RetryableOperation.h
|
|
@@ -26,8 +26,8 @@
|
|
#include <functional>
|
|
#include <memory>
|
|
|
|
+#include "AsioTimer.h"
|
|
#include "Backoff.h"
|
|
-#include "ExecutorService.h"
|
|
#include "Future.h"
|
|
#include "LogUtils.h"
|
|
#include "ResultUtils.h"
|
|
@@ -68,8 +68,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
|
|
|
|
void cancel() {
|
|
promise_.setFailed(ResultDisconnected);
|
|
- ASIO_ERROR ec;
|
|
- timer_->cancel(ec);
|
|
+ timer_->cancel();
|
|
}
|
|
|
|
private:
|
|
@@ -107,7 +106,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
|
|
}
|
|
|
|
auto delay = std::min(backoff_.next(), remainingTime);
|
|
- timer_->expires_from_now(delay);
|
|
+ timer_->expires_after(delay);
|
|
|
|
auto nextRemainingTime = remainingTime - delay;
|
|
LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
|
|
diff --git lib/RetryableOperationCache.h lib/RetryableOperationCache.h
|
|
index e42460dd..5030c94e 100644
|
|
--- lib/RetryableOperationCache.h
|
|
+++ lib/RetryableOperationCache.h
|
|
@@ -18,7 +18,6 @@
|
|
*/
|
|
#pragma once
|
|
|
|
-#include <chrono>
|
|
#include <mutex>
|
|
#include <unordered_map>
|
|
|
|
diff --git lib/SharedBuffer.h lib/SharedBuffer.h
|
|
index 26fc59ed..a6ced186 100644
|
|
--- lib/SharedBuffer.h
|
|
+++ lib/SharedBuffer.h
|
|
@@ -151,11 +151,11 @@ class SharedBuffer {
|
|
|
|
inline bool writable() const { return writableBytes() > 0; }
|
|
|
|
- ASIO::const_buffers_1 const_asio_buffer() const {
|
|
- return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
|
|
+ ASIO::const_buffer const_asio_buffer() const {
|
|
+ return ASIO::const_buffer(ptr_ + readIdx_, readableBytes());
|
|
}
|
|
|
|
- ASIO::mutable_buffers_1 asio_buffer() {
|
|
+ ASIO::mutable_buffer asio_buffer() {
|
|
assert(data_);
|
|
return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
|
|
}
|
|
diff --git lib/UnAckedMessageTrackerEnabled.cc lib/UnAckedMessageTrackerEnabled.cc
|
|
index e371af99..3b959d8a 100644
|
|
--- lib/UnAckedMessageTrackerEnabled.cc
|
|
+++ lib/UnAckedMessageTrackerEnabled.cc
|
|
@@ -34,7 +34,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
|
|
timeoutHandlerHelper();
|
|
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
|
|
timer_ = executorService->createDeadlineTimer();
|
|
- timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
|
|
+ timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
|
|
std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
|
|
timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
@@ -173,9 +173,8 @@ void UnAckedMessageTrackerEnabled::clear() {
|
|
}
|
|
|
|
void UnAckedMessageTrackerEnabled::stop() {
|
|
- ASIO_ERROR ec;
|
|
if (timer_) {
|
|
- timer_->cancel(ec);
|
|
+ timer_->cancel();
|
|
}
|
|
}
|
|
} /* namespace pulsar */
|
|
diff --git lib/stats/ConsumerStatsImpl.cc lib/stats/ConsumerStatsImpl.cc
|
|
index 0eefabdc..e8bd919a 100644
|
|
--- lib/stats/ConsumerStatsImpl.cc
|
|
+++ lib/stats/ConsumerStatsImpl.cc
|
|
@@ -85,7 +85,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy
|
|
}
|
|
|
|
void ConsumerStatsImpl::scheduleTimer() {
|
|
- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
|
|
+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
|
|
std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
|
|
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
diff --git lib/stats/ConsumerStatsImpl.h lib/stats/ConsumerStatsImpl.h
|
|
index 3333ea85..35fda9b4 100644
|
|
--- lib/stats/ConsumerStatsImpl.h
|
|
+++ lib/stats/ConsumerStatsImpl.h
|
|
@@ -59,10 +59,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>
|
|
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
|
|
void flushAndReset(const ASIO_ERROR&);
|
|
void start() override;
|
|
- void stop() override {
|
|
- ASIO_ERROR error;
|
|
- timer_->cancel(error);
|
|
- }
|
|
+ void stop() override { timer_->cancel(); }
|
|
void receivedMessage(Message&, Result) override;
|
|
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
|
|
virtual ~ConsumerStatsImpl();
|
|
diff --git lib/stats/ProducerStatsImpl.cc lib/stats/ProducerStatsImpl.cc
|
|
index 15e9e67e..b5e00794 100644
|
|
--- lib/stats/ProducerStatsImpl.cc
|
|
+++ lib/stats/ProducerStatsImpl.cc
|
|
@@ -109,7 +109,7 @@ void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) {
|
|
ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
|
|
|
|
void ProducerStatsImpl::scheduleTimer() {
|
|
- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
|
|
+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
|
|
std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
|
|
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
|
|
auto self = weakSelf.lock();
|
|
diff --git tests/AuthPluginTest.cc tests/AuthPluginTest.cc
|
|
index 24549d7f..ed0511ea 100644
|
|
--- tests/AuthPluginTest.cc
|
|
+++ tests/AuthPluginTest.cc
|
|
@@ -309,16 +309,17 @@ namespace testAthenz {
|
|
std::string principalToken;
|
|
void mockZTS(Latch& latch, int port) {
|
|
LOG_INFO("-- MockZTS started");
|
|
- ASIO::io_service io;
|
|
- ASIO::ip::tcp::iostream stream;
|
|
+ ASIO::io_context io;
|
|
+ ASIO::ip::tcp::socket socket(io);
|
|
ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
|
|
|
|
LOG_INFO("-- MockZTS waiting for connnection");
|
|
latch.countdown();
|
|
- acceptor.accept(*stream.rdbuf());
|
|
+ acceptor.accept(socket);
|
|
LOG_INFO("-- MockZTS got connection");
|
|
|
|
std::string headerLine;
|
|
+ ASIO::ip::tcp::iostream stream(std::move(socket));
|
|
while (getline(stream, headerLine)) {
|
|
std::vector<std::string> kv;
|
|
boost::algorithm::split(kv, headerLine, boost::is_any_of(" "));
|
|
diff --git tests/ConsumerTest.h tests/ConsumerTest.h
|
|
index 82482875..9d190c10 100644
|
|
--- tests/ConsumerTest.h
|
|
+++ tests/ConsumerTest.h
|
|
@@ -46,8 +46,8 @@ class ConsumerTest {
|
|
return nullptr;
|
|
}
|
|
auto timer = cnx->executor_->createDeadlineTimer();
|
|
- timer->expires_from_now(delaySinceStartGrabCnx -
|
|
- std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
|
|
+ timer->expires_after(delaySinceStartGrabCnx -
|
|
+ std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
|
|
timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
|
|
return timer;
|
|
}
|