Use libevent, second try. Changes this time:
- remove bogus include of base/completion_callback.h - add DEPS rules to allow including third_party/libevent Review URL: http://codereview.chromium.org/2964 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@2371 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
include_rules = [
|
||||
"+third_party/zlib",
|
||||
"+third_party/libevent",
|
||||
]
|
||||
|
@ -159,6 +159,7 @@ if env['PLATFORM'] == 'posix':
|
||||
'base_paths_linux.cc',
|
||||
'file_util_linux.cc',
|
||||
'hmac_nss.cc',
|
||||
'message_pump_libevent.cc',
|
||||
'nss_init.cc',
|
||||
'sys_string_conversions_linux.cc',
|
||||
'worker_pool.cc',
|
||||
@ -190,6 +191,7 @@ env_tests.Prepend(
|
||||
LIBS = [
|
||||
'base',
|
||||
'base_gfx',
|
||||
'event',
|
||||
'gtest',
|
||||
'icuuc',
|
||||
'libpng',
|
||||
|
@ -13,6 +13,10 @@
|
||||
#include "base/string_util.h"
|
||||
#include "base/thread_local.h"
|
||||
|
||||
#if defined(OS_POSIX)
|
||||
#include "base/message_pump_libevent.h"
|
||||
#endif
|
||||
|
||||
// A lazily created thread local storage for quick access to a thread's message
|
||||
// loop, if one exists. This should be safe and free of static constructors.
|
||||
static base::LazyInstance<base::ThreadLocalPointer<MessageLoop> > lazy_tls_ptr(
|
||||
@ -78,6 +82,12 @@ MessageLoop::MessageLoop(Type type)
|
||||
} else {
|
||||
pump_ = new base::MessagePumpWin();
|
||||
}
|
||||
#elif defined(OS_POSIX)
|
||||
if (type_ == TYPE_IO) {
|
||||
pump_ = new base::MessagePumpLibevent();
|
||||
} else {
|
||||
pump_ = new base::MessagePumpDefault();
|
||||
}
|
||||
#else
|
||||
pump_ = new base::MessagePumpDefault();
|
||||
#endif
|
||||
@ -561,4 +571,14 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) {
|
||||
pump_win()->WatchObject(object, watcher);
|
||||
}
|
||||
|
||||
#endif // defined(OS_WIN)
|
||||
#elif defined(OS_POSIX)
|
||||
|
||||
void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
|
||||
struct event* e, Watcher* watcher) {
|
||||
pump_libevent()->WatchSocket(socket, interest_mask, e, watcher);
|
||||
}
|
||||
|
||||
void MessageLoopForIO::UnwatchSocket(struct event* e) {
|
||||
pump_libevent()->UnwatchSocket(e);
|
||||
}
|
||||
#endif
|
||||
|
@ -21,6 +21,8 @@
|
||||
// We need this to declare base::MessagePumpWin::Dispatcher, which we should
|
||||
// really just eliminate.
|
||||
#include "base/message_pump_win.h"
|
||||
#elif defined(OS_POSIX)
|
||||
#include "base/message_pump_libevent.h"
|
||||
#endif
|
||||
|
||||
// A MessageLoop is used to process events for a particular thread. There is
|
||||
@ -274,6 +276,11 @@ class MessageLoop : public base::MessagePump::Delegate {
|
||||
base::MessagePumpWin* pump_win() {
|
||||
return static_cast<base::MessagePumpWin*>(pump_.get());
|
||||
}
|
||||
#elif defined(OS_POSIX)
|
||||
base::MessagePumpLibevent* pump_libevent() {
|
||||
return static_cast<base::MessagePumpLibevent*>(pump_.get());
|
||||
}
|
||||
protected:
|
||||
#endif
|
||||
|
||||
// A function to encapsulate all the exception handling capability in the
|
||||
@ -450,6 +457,14 @@ class MessageLoopForIO : public MessageLoop {
|
||||
|
||||
// Please see MessagePumpWin for definitions of these methods.
|
||||
void WatchObject(HANDLE object, Watcher* watcher);
|
||||
|
||||
#elif defined(OS_POSIX)
|
||||
typedef base::MessagePumpLibevent::Watcher Watcher;
|
||||
|
||||
// Please see MessagePumpLibevent for definitions of these methods.
|
||||
void WatchSocket(int socket, short interest_mask,
|
||||
struct event* e, Watcher* watcher);
|
||||
void UnwatchSocket(struct event* e);
|
||||
#endif // defined(OS_WIN)
|
||||
};
|
||||
|
||||
|
179
base/message_pump_libevent.cc
Normal file
179
base/message_pump_libevent.cc
Normal file
@ -0,0 +1,179 @@
|
||||
// Copyright (c) 2008 The Chromium Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#include "base/message_pump_libevent.h"
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "base/time.h"
|
||||
#include "third_party/libevent/event.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
|
||||
namespace base {
|
||||
|
||||
// Return 0 on success
|
||||
// Too small a function to bother putting in a library?
|
||||
static int SetNonBlocking(int fd)
|
||||
{
|
||||
int flags = fcntl(fd, F_GETFL, 0);
|
||||
if (-1 == flags)
|
||||
flags = 0;
|
||||
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||
}
|
||||
|
||||
// Called if a byte is received on the wakeup pipe.
|
||||
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
|
||||
|
||||
base::MessagePumpLibevent* that =
|
||||
static_cast<base::MessagePumpLibevent*>(context);
|
||||
DCHECK(that->wakeup_pipe_out_ == socket);
|
||||
|
||||
// Remove and discard the wakeup byte.
|
||||
char buf;
|
||||
int nread = read(socket, &buf, 1);
|
||||
DCHECK(nread == 1);
|
||||
// Tell libevent to break out of inner loop.
|
||||
event_base_loopbreak(that->event_base_);
|
||||
}
|
||||
|
||||
MessagePumpLibevent::MessagePumpLibevent()
|
||||
: keep_running_(true),
|
||||
in_run_(false),
|
||||
event_base_(event_base_new()),
|
||||
wakeup_pipe_in_(-1),
|
||||
wakeup_pipe_out_(-1) {
|
||||
if (!Init())
|
||||
NOTREACHED();
|
||||
}
|
||||
|
||||
bool MessagePumpLibevent::Init() {
|
||||
int fds[2];
|
||||
if (pipe(fds))
|
||||
return false;
|
||||
if (SetNonBlocking(fds[0]))
|
||||
return false;
|
||||
if (SetNonBlocking(fds[1]))
|
||||
return false;
|
||||
wakeup_pipe_out_ = fds[0];
|
||||
wakeup_pipe_in_ = fds[1];
|
||||
|
||||
wakeup_event_ = new event;
|
||||
event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
|
||||
OnWakeup, this);
|
||||
event_base_set(event_base_, wakeup_event_);
|
||||
|
||||
if (event_add(wakeup_event_, 0))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
MessagePumpLibevent::~MessagePumpLibevent() {
|
||||
DCHECK(wakeup_event_);
|
||||
DCHECK(event_base_);
|
||||
event_del(wakeup_event_);
|
||||
delete wakeup_event_;
|
||||
event_base_free(event_base_);
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
|
||||
event* e, Watcher* watcher) {
|
||||
|
||||
// Set current interest mask and message pump for this event
|
||||
event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
|
||||
|
||||
// Tell libevent which message pump this socket will belong to when we add it.
|
||||
event_base_set(event_base_, e);
|
||||
|
||||
// Add this socket to the list of monitored sockets.
|
||||
if (event_add(e, NULL))
|
||||
NOTREACHED();
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::UnwatchSocket(event* e) {
|
||||
// Remove this socket from the list of monitored sockets.
|
||||
if (event_del(e))
|
||||
NOTREACHED();
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
|
||||
void* context) {
|
||||
// The given socket is ready for I/O.
|
||||
// Tell the owner what kind of I/O the socket is ready for.
|
||||
Watcher* watcher = static_cast<Watcher*>(context);
|
||||
watcher->OnSocketReady(flags);
|
||||
}
|
||||
|
||||
// Reentrant!
|
||||
void MessagePumpLibevent::Run(Delegate* delegate) {
|
||||
DCHECK(keep_running_) << "Quit must have been called outside of Run!";
|
||||
|
||||
bool old_in_run = in_run_;
|
||||
in_run_ = true;
|
||||
|
||||
for (;;) {
|
||||
bool did_work = delegate->DoWork();
|
||||
if (!keep_running_)
|
||||
break;
|
||||
|
||||
did_work |= delegate->DoDelayedWork(&delayed_work_time_);
|
||||
if (!keep_running_)
|
||||
break;
|
||||
|
||||
if (did_work)
|
||||
continue;
|
||||
|
||||
did_work = delegate->DoIdleWork();
|
||||
if (!keep_running_)
|
||||
break;
|
||||
|
||||
if (did_work)
|
||||
continue;
|
||||
|
||||
// EVLOOP_ONCE tells libevent to only block once,
|
||||
// but to service all pending events when it wakes up.
|
||||
if (delayed_work_time_.is_null()) {
|
||||
event_base_loop(event_base_, EVLOOP_ONCE);
|
||||
} else {
|
||||
TimeDelta delay = delayed_work_time_ - Time::Now();
|
||||
if (delay > TimeDelta()) {
|
||||
struct timeval poll_tv;
|
||||
poll_tv.tv_sec = delay.InSeconds();
|
||||
poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
|
||||
event_base_loopexit(event_base_, &poll_tv);
|
||||
event_base_loop(event_base_, EVLOOP_ONCE);
|
||||
} else {
|
||||
// It looks like delayed_work_time_ indicates a time in the past, so we
|
||||
// need to call DoDelayedWork now.
|
||||
delayed_work_time_ = Time();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
keep_running_ = true;
|
||||
in_run_ = old_in_run;
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::Quit() {
|
||||
DCHECK(in_run_);
|
||||
// Tell both libevent and Run that they should break out of their loops.
|
||||
keep_running_ = false;
|
||||
ScheduleWork();
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::ScheduleWork() {
|
||||
// Tell libevent (in a threadsafe way) that it should break out of its loop.
|
||||
char buf = 0;
|
||||
int nwrite = write(wakeup_pipe_in_, &buf, 1);
|
||||
DCHECK(nwrite == 1);
|
||||
}
|
||||
|
||||
void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
|
||||
// We know that we can't be blocked on Wait right now since this method can
|
||||
// only be called on the same thread as Run, so we only need to update our
|
||||
// record of how long to sleep when we do sleep.
|
||||
delayed_work_time_ = delayed_work_time;
|
||||
}
|
||||
|
||||
} // namespace base
|
||||
|
89
base/message_pump_libevent.h
Normal file
89
base/message_pump_libevent.h
Normal file
@ -0,0 +1,89 @@
|
||||
// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#ifndef BASE_MESSAGE_PUMP_LIBEVENT_H_
|
||||
#define BASE_MESSAGE_PUMP_LIBEVENT_H_
|
||||
|
||||
#include "base/message_pump.h"
|
||||
#include "base/time.h"
|
||||
|
||||
// Declare structs we need from libevent.h rather than including it
|
||||
struct event_base;
|
||||
struct event;
|
||||
|
||||
namespace base {
|
||||
|
||||
// Class to monitor sockets and issue callbacks when sockets are ready for I/O
|
||||
// TODO(dkegel): add support for background file IO somehow
|
||||
class MessagePumpLibevent : public MessagePump {
|
||||
public:
|
||||
// Used with WatchObject to asynchronously monitor the I/O readiness of a
|
||||
// socket.
|
||||
class Watcher {
|
||||
public:
|
||||
virtual ~Watcher() {}
|
||||
// Called from MessageLoop::Run when a ready socket is detected.
|
||||
virtual void OnSocketReady(short eventmask) = 0;
|
||||
};
|
||||
|
||||
MessagePumpLibevent();
|
||||
virtual ~MessagePumpLibevent();
|
||||
|
||||
// Have the current thread's message loop watch for a ready socket.
|
||||
// Caller must provide a struct event for this socket for libevent's use.
|
||||
// The event and interest_mask fields are defined in libevent.
|
||||
// Returns true on success.
|
||||
// TODO(dkegel): hide libevent better; abstraction still too leaky
|
||||
// TODO(dkegel): better error handing
|
||||
// TODO(dkegel): switch to edge-triggered readiness notification
|
||||
void WatchSocket(int socket, short interest_mask, event* e, Watcher*);
|
||||
|
||||
// Stop watching a socket.
|
||||
// Event was previously initialized by WatchSocket.
|
||||
void UnwatchSocket(event* e);
|
||||
|
||||
// MessagePump methods:
|
||||
virtual void Run(Delegate* delegate);
|
||||
virtual void Quit();
|
||||
virtual void ScheduleWork();
|
||||
virtual void ScheduleDelayedWork(const Time& delayed_work_time);
|
||||
|
||||
private:
|
||||
|
||||
// Risky part of constructor. Returns true on success.
|
||||
bool Init();
|
||||
|
||||
// This flag is set to false when Run should return.
|
||||
bool keep_running_;
|
||||
|
||||
// This flag is set when inside Run.
|
||||
bool in_run_;
|
||||
|
||||
// The time at which we should call DoDelayedWork.
|
||||
Time delayed_work_time_;
|
||||
|
||||
// Libevent dispatcher. Watches all sockets registered with it, and sends
|
||||
// readiness callbacks when a socket is ready for I/O.
|
||||
event_base* event_base_;
|
||||
|
||||
// Called by libevent to tell us a registered socket is ready
|
||||
static void OnReadinessNotification(int socket, short flags, void* context);
|
||||
|
||||
// Unix pipe used to implement ScheduleWork()
|
||||
// ... callback; called by libevent inside Run() when pipe is ready to read
|
||||
static void OnWakeup(int socket, short flags, void* context);
|
||||
// ... write end; ScheduleWork() writes a single byte to it
|
||||
int wakeup_pipe_in_;
|
||||
// ... read end; OnWakeup reads it and then breaks Run() out of its sleep
|
||||
int wakeup_pipe_out_;
|
||||
// ... libevent wrapper for read end
|
||||
event* wakeup_event_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent);
|
||||
};
|
||||
|
||||
} // namespace base
|
||||
|
||||
#endif // BASE_MESSAGE_PUMP_LIBEVENT_H_
|
||||
|
@ -63,6 +63,7 @@ env = Environment(
|
||||
BSPATCH_DIR = '$THIRD_PARTY_DIR/bspatch',
|
||||
BZIP2_DIR = '$THIRD_PARTY_DIR/bzip2',
|
||||
ICU38_DIR = '$THIRD_PARTY_DIR/icu38',
|
||||
LIBEVENT_DIR = '$THIRD_PARTY_DIR/libevent',
|
||||
LIBJPEG_DIR = '$THIRD_PARTY_DIR/libjpeg',
|
||||
LIBPNG_DIR = '$THIRD_PARTY_DIR/libpng',
|
||||
LIBXML_DIR = '$THIRD_PARTY_DIR/libxml',
|
||||
@ -500,6 +501,10 @@ if LoadComponent('third_party'):
|
||||
'$LIBXML_DIR/SConscript',
|
||||
'$LIBXSLT_DIR/SConscript',
|
||||
])
|
||||
if env['PLATFORM'] == 'posix':
|
||||
sconscripts.extend([
|
||||
'$LIBEVENT_DIR/SConscript',
|
||||
])
|
||||
# This is temporary until we get this lib to build on other platforms.
|
||||
if env['PLATFORM'] == 'win32':
|
||||
sconscripts.extend([
|
||||
|
@ -186,6 +186,7 @@ libs = [
|
||||
'plugin/plugin.lib',
|
||||
'renderer/renderer.lib',
|
||||
'third_party/hunspell/hunspell.lib',
|
||||
'third_party/libevent/libevent.lib',
|
||||
'third_party/sqlite/sqlite.lib',
|
||||
'views/views.lib',
|
||||
'$V8_DIR/v8.lib',
|
||||
|
1
net/DEPS
1
net/DEPS
@ -1,3 +1,4 @@
|
||||
include_rules = [
|
||||
"+third_party/modp_b64",
|
||||
"+third_party/libevent",
|
||||
]
|
||||
|
@ -133,6 +133,7 @@ if env['PLATFORM'] == 'posix':
|
||||
input_files.extend([
|
||||
# TODO(tc): gnome-vfs? xdgmime? /etc/mime.types?
|
||||
'base/platform_mime_util_linux.cc',
|
||||
'base/tcp_client_socket_libevent.cc',
|
||||
])
|
||||
|
||||
if env['PLATFORM'] in ('darwin', 'posix'):
|
||||
@ -169,6 +170,7 @@ env_tests.Prepend(
|
||||
'net', # net must come before base and modp_b64
|
||||
'bzip2', # bzip2 must come before base
|
||||
'base',
|
||||
'event',
|
||||
'googleurl',
|
||||
'gtest',
|
||||
'icuuc',
|
||||
@ -224,6 +226,7 @@ unittest_files = [
|
||||
'base/net_util_unittest.cc',
|
||||
'base/registry_controlled_domain_unittest.cc',
|
||||
'base/run_all_unittests.cc',
|
||||
'base/tcp_client_socket_unittest.cc',
|
||||
'base/test_completion_callback_unittest.cc',
|
||||
'disk_cache/addr_unittest.cc',
|
||||
'disk_cache/block_files_unittest.cc',
|
||||
@ -242,7 +245,6 @@ if env['PLATFORM'] == 'win32':
|
||||
'base/directory_lister_unittest.cc',
|
||||
'base/ssl_config_service_unittest.cc',
|
||||
'base/ssl_client_socket_unittest.cc',
|
||||
'base/tcp_client_socket_unittest.cc',
|
||||
'base/wininet_util_unittest.cc',
|
||||
'disk_cache/backend_unittest.cc',
|
||||
'http/http_cache_unittest.cc',
|
||||
|
@ -5,11 +5,21 @@
|
||||
#ifndef NET_BASE_TCP_CLIENT_SOCKET_H_
|
||||
#define NET_BASE_TCP_CLIENT_SOCKET_H_
|
||||
|
||||
#include <ws2tcpip.h>
|
||||
#include "build/build_config.h"
|
||||
|
||||
#if defined(OS_WIN)
|
||||
#include <ws2tcpip.h>
|
||||
#include "base/object_watcher.h"
|
||||
#elif defined(OS_POSIX)
|
||||
struct event; // From libevent
|
||||
#define SOCKET int
|
||||
#include "base/message_pump_libevent.h"
|
||||
#endif
|
||||
|
||||
#include "base/scoped_ptr.h"
|
||||
#include "net/base/address_list.h"
|
||||
#include "net/base/client_socket.h"
|
||||
#include "net/base/completion_callback.h"
|
||||
|
||||
namespace net {
|
||||
|
||||
@ -18,7 +28,12 @@ namespace net {
|
||||
// NOTE: The implementation supports half duplex only. Read and Write calls
|
||||
// must not be in progress at the same time.
|
||||
class TCPClientSocket : public ClientSocket,
|
||||
public base::ObjectWatcher::Delegate {
|
||||
#if defined(OS_WIN)
|
||||
public base::ObjectWatcher::Delegate
|
||||
#elif defined(OS_POSIX)
|
||||
public base::MessagePumpLibevent::Watcher
|
||||
#endif
|
||||
{
|
||||
public:
|
||||
// The IP address(es) and port number to connect to. The TCP socket will try
|
||||
// each IP address in the list until it succeeds in establishing a
|
||||
@ -34,31 +49,23 @@ class TCPClientSocket : public ClientSocket,
|
||||
virtual bool IsConnected() const;
|
||||
|
||||
// Socket methods:
|
||||
// Try to transfer buf_len bytes to/from socket.
|
||||
// If a result is available now, return it; else call back later with one.
|
||||
// Do not call again until a result is returned!
|
||||
// If any bytes were transferred, the result is the byte count.
|
||||
// On error, result is a negative error code; see net/base/net_error_list.h
|
||||
// TODO: what would a zero return value indicate?
|
||||
// TODO: support multiple outstanding requests?
|
||||
virtual int Read(char* buf, int buf_len, CompletionCallback* callback);
|
||||
virtual int Write(const char* buf, int buf_len, CompletionCallback* callback);
|
||||
|
||||
private:
|
||||
int CreateSocket(const struct addrinfo* ai);
|
||||
void DoCallback(int rv);
|
||||
void DidCompleteConnect();
|
||||
void DidCompleteIO();
|
||||
|
||||
// base::ObjectWatcher::Delegate methods:
|
||||
virtual void OnObjectSignaled(HANDLE object);
|
||||
|
||||
SOCKET socket_;
|
||||
OVERLAPPED overlapped_;
|
||||
WSABUF buffer_;
|
||||
|
||||
base::ObjectWatcher watcher_;
|
||||
|
||||
CompletionCallback* callback_;
|
||||
|
||||
// The list of addresses we should try in order to establish a connection.
|
||||
AddressList addresses_;
|
||||
|
||||
// The addrinfo that we are attempting to use or NULL if all addrinfos have
|
||||
// been tried.
|
||||
// Where we are in above list, or NULL if all addrinfos have been tried.
|
||||
const struct addrinfo* current_ai_;
|
||||
|
||||
enum WaitState {
|
||||
@ -68,6 +75,34 @@ class TCPClientSocket : public ClientSocket,
|
||||
WAITING_WRITE
|
||||
};
|
||||
WaitState wait_state_;
|
||||
|
||||
#if defined(OS_WIN)
|
||||
// base::ObjectWatcher::Delegate methods:
|
||||
virtual void OnObjectSignaled(HANDLE object);
|
||||
|
||||
OVERLAPPED overlapped_;
|
||||
WSABUF buffer_;
|
||||
|
||||
base::ObjectWatcher watcher_;
|
||||
#elif defined(OS_POSIX)
|
||||
// The socket's libevent wrapper
|
||||
scoped_ptr<event> event_;
|
||||
|
||||
// Called by MessagePumpLibevent when the socket is ready to do I/O
|
||||
void OnSocketReady(short flags);
|
||||
|
||||
// The buffer used by OnSocketReady to retry Read and Write requests
|
||||
char* buf_;
|
||||
int buf_len_;
|
||||
#endif
|
||||
|
||||
// External callback; called when read or write is complete.
|
||||
CompletionCallback* callback_;
|
||||
|
||||
int CreateSocket(const struct addrinfo* ai);
|
||||
void DoCallback(int rv);
|
||||
void DidCompleteConnect();
|
||||
void DidCompleteIO();
|
||||
};
|
||||
|
||||
} // namespace net
|
||||
|
288
net/base/tcp_client_socket_libevent.cc
Normal file
288
net/base/tcp_client_socket_libevent.cc
Normal file
@ -0,0 +1,288 @@
|
||||
// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#include "net/base/tcp_client_socket.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "base/message_loop.h"
|
||||
#include "net/base/net_errors.h"
|
||||
#include "third_party/libevent/event.h"
|
||||
|
||||
|
||||
namespace net {
|
||||
|
||||
const int kInvalidSocket = -1;
|
||||
|
||||
// Return 0 on success
|
||||
// Too small a function to bother putting in a library?
|
||||
static int SetNonBlocking(int fd)
|
||||
{
|
||||
int flags = fcntl(fd, F_GETFL, 0);
|
||||
if (-1 == flags)
|
||||
flags = 0;
|
||||
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||
}
|
||||
|
||||
// Convert values from <errno.h> to values from "net/base/net_errors.h"
|
||||
static int MapPosixError(int err) {
|
||||
// There are numerous posix error codes, but these are the ones we thus far
|
||||
// find interesting.
|
||||
// TODO(port): fill this with a real conversion table
|
||||
switch (err) {
|
||||
case EWOULDBLOCK: return ERR_IO_PENDING;
|
||||
default:
|
||||
return ERR_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
TCPClientSocket::TCPClientSocket(const AddressList& addresses)
|
||||
: socket_(kInvalidSocket),
|
||||
addresses_(addresses),
|
||||
current_ai_(addresses_.head()),
|
||||
wait_state_(NOT_WAITING),
|
||||
event_(new event) {
|
||||
}
|
||||
|
||||
TCPClientSocket::~TCPClientSocket() {
|
||||
Disconnect();
|
||||
}
|
||||
|
||||
int TCPClientSocket::Connect(CompletionCallback* callback) {
|
||||
|
||||
// If already connected, then just return OK.
|
||||
if (socket_ != kInvalidSocket)
|
||||
return OK;
|
||||
|
||||
DCHECK(wait_state_ == NOT_WAITING);
|
||||
|
||||
const addrinfo* ai = current_ai_;
|
||||
DCHECK(ai);
|
||||
|
||||
int rv = CreateSocket(ai);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
|
||||
if (!connect(socket_, ai->ai_addr, static_cast<int>(ai->ai_addrlen))) {
|
||||
// Connected without waiting!
|
||||
return OK;
|
||||
}
|
||||
|
||||
// Synchronous operation not supported
|
||||
DCHECK(callback);
|
||||
|
||||
if (errno != EINPROGRESS && errno != EWOULDBLOCK) {
|
||||
LOG(ERROR) << "connect failed: " << errno;
|
||||
return MapPosixError(errno);
|
||||
}
|
||||
|
||||
// Initialize event_ and link it to our MessagePump.
|
||||
// POLLOUT is set if the connection is established.
|
||||
// POLLIN is set if the connection fails,
|
||||
// so select for both read and write.
|
||||
MessageLoopForIO::current()->WatchSocket(
|
||||
socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this);
|
||||
|
||||
wait_state_ = WAITING_CONNECT;
|
||||
callback_ = callback;
|
||||
return ERR_IO_PENDING;
|
||||
}
|
||||
|
||||
int TCPClientSocket::ReconnectIgnoringLastError(CompletionCallback* callback) {
|
||||
// No ignorable errors!
|
||||
return ERR_FAILED;
|
||||
}
|
||||
|
||||
void TCPClientSocket::Disconnect() {
|
||||
if (socket_ == kInvalidSocket)
|
||||
return;
|
||||
|
||||
MessageLoopForIO::current()->UnwatchSocket(event_.get());
|
||||
close(socket_);
|
||||
socket_ = kInvalidSocket;
|
||||
|
||||
// Reset for next time.
|
||||
current_ai_ = addresses_.head();
|
||||
}
|
||||
|
||||
bool TCPClientSocket::IsConnected() const {
|
||||
if (socket_ == kInvalidSocket || wait_state_ == WAITING_CONNECT)
|
||||
return false;
|
||||
|
||||
// Check if connection is alive.
|
||||
char c;
|
||||
int rv = recv(socket_, &c, 1, MSG_PEEK);
|
||||
if (rv == 0)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int TCPClientSocket::Read(char* buf,
|
||||
int buf_len,
|
||||
CompletionCallback* callback) {
|
||||
DCHECK(socket_ != kInvalidSocket);
|
||||
DCHECK(wait_state_ == NOT_WAITING);
|
||||
DCHECK(!callback_);
|
||||
// Synchronous operation not supported
|
||||
DCHECK(callback);
|
||||
DCHECK(buf_len > 0);
|
||||
|
||||
int nread = read(socket_, buf, buf_len);
|
||||
if (nread > 0) {
|
||||
return nread;
|
||||
}
|
||||
if (nread == -1 && errno != EWOULDBLOCK)
|
||||
return MapPosixError(errno);
|
||||
|
||||
MessageLoopForIO::current()->WatchSocket(
|
||||
socket_, EV_READ|EV_PERSIST, event_.get(), this);
|
||||
|
||||
buf_ = buf;
|
||||
buf_len_ = buf_len;
|
||||
wait_state_ = WAITING_READ;
|
||||
callback_ = callback;
|
||||
return ERR_IO_PENDING;
|
||||
}
|
||||
|
||||
int TCPClientSocket::Write(const char* buf,
|
||||
int buf_len,
|
||||
CompletionCallback* callback) {
|
||||
DCHECK(socket_ != kInvalidSocket);
|
||||
DCHECK(wait_state_ == NOT_WAITING);
|
||||
DCHECK(!callback_);
|
||||
// Synchronous operation not supported
|
||||
DCHECK(callback);
|
||||
DCHECK(buf_len > 0);
|
||||
|
||||
int nwrite = write(socket_, buf, buf_len);
|
||||
if (nwrite > 0) {
|
||||
return nwrite;
|
||||
}
|
||||
if (nwrite == -1 && errno != EWOULDBLOCK)
|
||||
return MapPosixError(errno);
|
||||
|
||||
MessageLoopForIO::current()->WatchSocket(
|
||||
socket_, EV_WRITE|EV_PERSIST, event_.get(), this);
|
||||
|
||||
buf_ = const_cast<char*>(buf);
|
||||
buf_len_ = buf_len;
|
||||
wait_state_ = WAITING_WRITE;
|
||||
callback_ = callback;
|
||||
return ERR_IO_PENDING;
|
||||
}
|
||||
|
||||
int TCPClientSocket::CreateSocket(const addrinfo* ai) {
|
||||
socket_ = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
||||
if (socket_ == kInvalidSocket)
|
||||
return MapPosixError(errno);
|
||||
|
||||
// All our socket I/O is nonblocking
|
||||
if (SetNonBlocking(socket_))
|
||||
return MapPosixError(errno);
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
||||
void TCPClientSocket::DoCallback(int rv) {
|
||||
DCHECK(rv != ERR_IO_PENDING);
|
||||
DCHECK(callback_);
|
||||
|
||||
// since Run may result in Read being called, clear callback_ up front.
|
||||
CompletionCallback* c = callback_;
|
||||
callback_ = NULL;
|
||||
c->Run(rv);
|
||||
}
|
||||
|
||||
void TCPClientSocket::DidCompleteConnect() {
|
||||
int result = ERR_UNEXPECTED;
|
||||
|
||||
wait_state_ = NOT_WAITING;
|
||||
|
||||
// Check to see if connect succeeded
|
||||
int error_code = -1;
|
||||
socklen_t len = sizeof(error_code);
|
||||
if (getsockopt(socket_, SOL_SOCKET, SO_ERROR,
|
||||
reinterpret_cast<char*>(&error_code), &len) < 0) {
|
||||
result = MapPosixError(errno);
|
||||
} else if (error_code == EINPROGRESS) {
|
||||
result = ERR_IO_PENDING;
|
||||
// And await next callback. Haven't seen this case yet myself.
|
||||
} else if (current_ai_->ai_next && (
|
||||
error_code == EADDRNOTAVAIL ||
|
||||
error_code == EAFNOSUPPORT ||
|
||||
error_code == ECONNREFUSED ||
|
||||
error_code == ENETUNREACH ||
|
||||
error_code == EHOSTUNREACH ||
|
||||
error_code == ETIMEDOUT)) {
|
||||
// This address failed, try next one in list.
|
||||
const addrinfo* next = current_ai_->ai_next;
|
||||
Disconnect();
|
||||
current_ai_ = next;
|
||||
result = Connect(callback_);
|
||||
} else if (error_code) {
|
||||
result = MapPosixError(error_code);
|
||||
} else {
|
||||
result = 0;
|
||||
MessageLoopForIO::current()->UnwatchSocket(event_.get());
|
||||
}
|
||||
|
||||
if (result != ERR_IO_PENDING)
|
||||
DoCallback(result);
|
||||
}
|
||||
|
||||
void TCPClientSocket::DidCompleteIO() {
|
||||
int bytes_transferred;
|
||||
switch (wait_state_) {
|
||||
case WAITING_READ:
|
||||
bytes_transferred = read(socket_, buf_, buf_len_);
|
||||
break;
|
||||
case WAITING_WRITE:
|
||||
bytes_transferred = write(socket_, buf_, buf_len_);
|
||||
break;
|
||||
default:
|
||||
NOTREACHED();
|
||||
}
|
||||
|
||||
int result;
|
||||
if (bytes_transferred > 0) {
|
||||
result = bytes_transferred;
|
||||
} else if (bytes_transferred == 0) {
|
||||
// TODO(port): can we tell why it closed, and return a more informative
|
||||
// message? And why does the unit test want to see zero?
|
||||
//result = ERR_CONNECTION_CLOSED;
|
||||
result = 0;
|
||||
} else {
|
||||
result = MapPosixError(errno);
|
||||
}
|
||||
|
||||
if (result != ERR_IO_PENDING) {
|
||||
wait_state_ = NOT_WAITING;
|
||||
MessageLoopForIO::current()->UnwatchSocket(event_.get());
|
||||
DoCallback(result);
|
||||
}
|
||||
}
|
||||
|
||||
void TCPClientSocket::OnSocketReady(short flags) {
|
||||
switch (wait_state_) {
|
||||
case WAITING_CONNECT:
|
||||
DidCompleteConnect();
|
||||
break;
|
||||
case WAITING_READ:
|
||||
case WAITING_WRITE:
|
||||
DidCompleteIO();
|
||||
break;
|
||||
default:
|
||||
NOTREACHED();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace net
|
||||
|
Reference in New Issue
Block a user