Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of
the codebase.

Originally landed in the QUIC repo

Original review metadata:

```
  PR-URL: nodejs/quic#165
  Reviewed-By: James M Snell <[email protected]>
  Reviewed-By: Daniel Bevenius <[email protected]>
```

Signed-off-by: James M Snell <[email protected]>

PR-URL: #31871
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Ben Noordhuis <[email protected]>
  • Loading branch information
addaleax authored and MylesBorins committed Mar 4, 2020
commit 4dc59b91a71099f054e9ece5fb05657b1b916af6
4 changes: 3 additions & 1 deletion lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
this.on('listening', onListening);
}

if (port instanceof UDP) {
if (port !== null &&
typeof port === 'object' &&
typeof port.recvStart === 'function') {
replaceHandle(this, port);
startListening(this);
return this;
Expand Down
205 changes: 151 additions & 54 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
}


inline bool SendWrap::have_callback() const {
bool SendWrap::have_callback() const {
return have_callback_;
}

UDPListener::~UDPListener() {
if (wrap_ != nullptr)
wrap_->set_listener(nullptr);
}

UDPWrapBase::~UDPWrapBase() {
set_listener(nullptr);
}

UDPListener* UDPWrapBase::listener() const {
CHECK_NOT_NULL(listener_);
return listener_;
}

void UDPWrapBase::set_listener(UDPListener* listener) {
if (listener_ != nullptr)
listener_->wrap_ = nullptr;
listener_ = listener;
if (listener_ != nullptr) {
CHECK_NULL(listener_->wrap_);
listener_->wrap_ = this;
}
}

UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
return static_cast<UDPWrapBase*>(
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
}

void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
}

UDPWrap::UDPWrap(Environment* env, Local<Object> object)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_UDPWRAP) {
object->SetAlignedPointerInInternalField(
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));

int r = uv_udp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // can't fail anyway

set_listener(this);
}


Expand All @@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
Environment* env = Environment::GetCurrent(context);

Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
t->InstanceTemplate()->SetInternalFieldCount(
UDPWrapBase::kInternalFieldCount);
Local<String> udpString =
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
t->SetClassName(udpString);
Expand All @@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);

UDPWrapBase::AddMethods(env, t);
env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "connect", Connect);
Expand All @@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "connect6", Connect6);
env->SetProtoMethod(t, "send6", Send6);
env->SetProtoMethod(t, "disconnect", Disconnect);
env->SetProtoMethod(t, "recvStart", RecvStart);
env->SetProtoMethod(t, "recvStop", RecvStop);
env->SetProtoMethod(t, "getpeername",
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
env->SetProtoMethod(t, "getsockname",
Expand Down Expand Up @@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
flags);
}

if (err == 0)
wrap->listener()->OnAfterBind();

args.GetReturnValue().Set(err);
}

Expand Down Expand Up @@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK(args[3]->IsBoolean());
}

Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2].As<Uint32>()->Value();
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();

size_t msg_size = 0;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);

Expand All @@ -482,7 +520,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
size_t length = Buffer::Length(chunk);

bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
msg_size += length;
}

int err = 0;
Expand All @@ -492,14 +529,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
const unsigned short port = args[3].As<Uint32>()->Value();
node::Utf8Value address(env->isolate(), args[4]);
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
if (err == 0) {
if (err == 0)
addr = reinterpret_cast<sockaddr*>(&addr_storage);
}
}

uv_buf_t* bufs_ptr = *bufs;
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
if (err == 0) {
wrap->current_send_req_wrap_ = args[0].As<Object>();
wrap->current_send_has_callback_ =
sendto ? args[5]->IsTrue() : args[3]->IsTrue();

err = wrap->Send(*bufs, count, addr);

wrap->current_send_req_wrap_.Clear();
wrap->current_send_has_callback_ = false;
}

args.GetReturnValue().Set(err);
}

ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
size_t count,
const sockaddr* addr) {
if (IsHandleClosing()) return UV_EBADF;

size_t msg_size = 0;
for (size_t i = 0; i < count; i++)
msg_size += bufs_ptr[i].len;

int err = 0;
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
if (err == UV_ENOSYS || err == UV_EAGAIN) {
err = 0;
} else if (err >= 0) {
Expand All @@ -517,28 +576,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
CHECK_EQ(static_cast<size_t>(err), msg_size);
// + 1 so that the JS side can distinguish 0-length async sends from
// 0-length sync sends.
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
return;
return msg_size + 1;
}
}
}

if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
req_wrap->msg_size = msg_size;

err = req_wrap->Dispatch(uv_udp_send,
&wrap->handle_,
bufs_ptr,
count,
addr,
OnSend);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
if (req_wrap == nullptr) return UV_ENOSYS;

err = req_wrap->Dispatch(
uv_udp_send,
&handle_,
bufs_ptr,
count,
addr,
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
self->listener()->OnSendDone(
ReqWrap<uv_udp_send_t>::from_req(req), status);
}});
if (err)
delete req_wrap;
}

args.GetReturnValue().Set(err);
return err;
}


ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
SendWrap* req_wrap = new SendWrap(env(),
current_send_req_wrap_,
current_send_has_callback_);
req_wrap->msg_size = msg_size;
return req_wrap;
}


Expand All @@ -552,31 +624,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
}


void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
AsyncWrap* UDPWrap::GetAsyncWrap() {
return this;
}

int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
return uv_udp_getpeername(&handle_, name, namelen);
}

int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
return uv_udp_getsockname(&handle_, name, namelen);
}

void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
}

int UDPWrap::RecvStart() {
if (IsHandleClosing()) return UV_EBADF;
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
// UV_EALREADY means that the socket is already bound but that's okay
if (err == UV_EALREADY)
err = 0;
args.GetReturnValue().Set(err);
return err;
}


void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int r = uv_udp_recv_stop(&wrap->handle_);
args.GetReturnValue().Set(r);
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
}

int UDPWrap::RecvStop() {
if (IsHandleClosing()) return UV_EBADF;
return uv_udp_recv_stop(&handle_);
}


void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
if (req_wrap->have_callback()) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Expand All @@ -593,43 +680,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
void UDPWrap::OnAlloc(uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
*buf = wrap->env()->AllocateManaged(suggested_size).release();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
reinterpret_cast<uv_udp_t*>(handle));
*buf = wrap->listener()->OnAlloc(suggested_size);
}

uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
return env()->AllocateManaged(suggested_size).release();
}

void UDPWrap::OnRecv(uv_udp_t* handle,
ssize_t nread,
const uv_buf_t* buf_,
const struct sockaddr* addr,
const uv_buf_t* buf,
const sockaddr* addr,
unsigned int flags) {
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
Environment* env = wrap->env();
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
wrap->listener()->OnRecv(nread, *buf, addr, flags);
}

AllocatedBuffer buf(env, *buf_);
void UDPWrap::OnRecv(ssize_t nread,
const uv_buf_t& buf_,
const sockaddr* addr,
unsigned int flags) {
Environment* env = this->env();
AllocatedBuffer buf(env, buf_);
if (nread == 0 && addr == nullptr) {
return;
}

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Object> wrap_obj = wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
wrap_obj,
object(),
Undefined(env->isolate()),
Undefined(env->isolate())
};

if (nread < 0) {
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
return;
}

buf.Resize(nread);
argv[2] = buf.ToBuffer().ToLocalChecked();
argv[3] = AddressToJS(env, addr);
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
}

MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,
Expand Down
Loading