aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2013-01-30 10:36:47 +0100
committerMax Kellermann <max@duempel.org>2013-01-30 11:03:44 +0100
commita291415326f87afe5b42a89e9a90029b876895f2 (patch)
tree0ecd6f1e42a443ed5c96457682af3cafc39bc16c
parent4ad7456428913f5232768367f2c0694bcb4540bb (diff)
event/BufferedSocket: move output buffer to FullyBufferedSocket
BufferedSocket has just an input buffer, and FullyBufferedSocket adds the output buffer.
-rw-r--r--Makefile.am1
-rw-r--r--src/ClientExpire.cxx2
-rw-r--r--src/ClientInternal.hxx10
-rw-r--r--src/ClientNew.cxx2
-rw-r--r--src/event/BufferedSocket.cxx90
-rw-r--r--src/event/BufferedSocket.hxx29
-rw-r--r--src/event/FullyBufferedSocket.cxx132
-rw-r--r--src/event/FullyBufferedSocket.hxx63
8 files changed, 210 insertions, 119 deletions
diff --git a/Makefile.am b/Makefile.am
index e2977a43..97b8f7ef 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -326,6 +326,7 @@ libevent_a_SOURCES = \
src/event/TimeoutMonitor.hxx src/event/TimeoutMonitor.cxx \
src/event/SocketMonitor.cxx src/event/SocketMonitor.hxx \
src/event/BufferedSocket.cxx src/event/BufferedSocket.hxx \
+ src/event/FullyBufferedSocket.cxx src/event/FullyBufferedSocket.hxx \
src/event/MultiSocketMonitor.cxx src/event/MultiSocketMonitor.hxx \
src/event/Loop.hxx
diff --git a/src/ClientExpire.cxx b/src/ClientExpire.cxx
index 8f57f5b3..6bb0a43a 100644
--- a/src/ClientExpire.cxx
+++ b/src/ClientExpire.cxx
@@ -26,7 +26,7 @@ Client::SetExpired()
if (IsExpired())
return;
- BufferedSocket::Close();
+ FullyBufferedSocket::Close();
TimeoutMonitor::Schedule(0);
}
diff --git a/src/ClientInternal.hxx b/src/ClientInternal.hxx
index d538cfb3..06a60be7 100644
--- a/src/ClientInternal.hxx
+++ b/src/ClientInternal.hxx
@@ -24,7 +24,7 @@
#include "Client.hxx"
#include "ClientMessage.hxx"
#include "CommandListBuilder.hxx"
-#include "event/BufferedSocket.hxx"
+#include "event/FullyBufferedSocket.hxx"
#include "event/TimeoutMonitor.hxx"
#include "command.h"
@@ -44,7 +44,7 @@ enum {
struct Partition;
-class Client final : private BufferedSocket, TimeoutMonitor {
+class Client final : private FullyBufferedSocket, TimeoutMonitor {
public:
Partition &partition;
struct playlist &playlist;
@@ -89,7 +89,7 @@ public:
int fd, int uid, int num);
bool IsConnected() const {
- return BufferedSocket::IsDefined();
+ return FullyBufferedSocket::IsDefined();
}
gcc_pure
@@ -99,13 +99,13 @@ public:
gcc_pure
bool IsExpired() const {
- return !BufferedSocket::IsDefined();
+ return !FullyBufferedSocket::IsDefined();
}
void Close();
void SetExpired();
- using BufferedSocket::Write;
+ using FullyBufferedSocket::Write;
/**
* Send "idle" response to this client.
diff --git a/src/ClientNew.cxx b/src/ClientNew.cxx
index 74cb0cc8..9ac66079 100644
--- a/src/ClientNew.cxx
+++ b/src/ClientNew.cxx
@@ -49,7 +49,7 @@ static const char GREETING[] = "OK MPD " PROTOCOL_VERSION "\n";
Client::Client(EventLoop &_loop, Partition &_partition,
int _fd, int _uid, int _num)
- :BufferedSocket(_fd, _loop, 16384, client_max_output_buffer_size),
+ :FullyBufferedSocket(_fd, _loop, 16384, client_max_output_buffer_size),
TimeoutMonitor(_loop),
partition(_partition),
playlist(partition.playlist), player_control(&partition.pc),
diff --git a/src/event/BufferedSocket.cxx b/src/event/BufferedSocket.cxx
index dec60b5f..05e70344 100644
--- a/src/event/BufferedSocket.cxx
+++ b/src/event/BufferedSocket.cxx
@@ -33,26 +33,6 @@ BufferedSocket::~BufferedSocket()
}
BufferedSocket::ssize_t
-BufferedSocket::DirectWrite(const void *data, size_t length)
-{
- const auto nbytes = SocketMonitor::Write((const char *)data, length);
- if (gcc_unlikely(nbytes < 0)) {
- const auto code = GetSocketError();
- if (IsSocketErrorAgain(code))
- return 0;
-
- Cancel();
-
- if (IsSocketErrorClosed(code))
- OnSocketClosed();
- else
- OnSocketError(NewSocketError(code));
- }
-
- return nbytes;
-}
-
-BufferedSocket::ssize_t
BufferedSocket::DirectRead(void *data, size_t length)
{
const auto nbytes = SocketMonitor::Read((char *)data, length);
@@ -76,30 +56,6 @@ BufferedSocket::DirectRead(void *data, size_t length)
}
bool
-BufferedSocket::WriteFromBuffer()
-{
- assert(IsDefined());
-
- size_t length;
- const void *data = output.Read(&length);
- if (data == nullptr) {
- CancelWrite();
- return true;
- }
-
- auto nbytes = DirectWrite(data, length);
- if (gcc_unlikely(nbytes <= 0))
- return nbytes == 0;
-
- output.Consume(nbytes);
-
- if (output.IsEmpty())
- CancelWrite();
-
- return true;
-}
-
-bool
BufferedSocket::ReadToBuffer()
{
assert(IsDefined());
@@ -119,38 +75,6 @@ BufferedSocket::ReadToBuffer()
}
bool
-BufferedSocket::Write(const void *data, size_t length)
-{
- assert(IsDefined());
-
-#if 0
- /* TODO: disabled because this would add overhead on some callers (the ones that often), but it may be useful */
-
- if (output.IsEmpty()) {
- /* try to write it directly first */
- const auto nbytes = DirectWrite(data, length);
- if (gcc_likely(nbytes > 0)) {
- data = (const uint8_t *)data + nbytes;
- length -= nbytes;
- if (length == 0)
- return true;
- } else if (nbytes < 0)
- return false;
- }
-#endif
-
- if (!output.Append(data, length)) {
- // TODO
- OnSocketError(g_error_new_literal(g_quark_from_static_string("buffered_socket"),
- 0, "Output buffer is full"));
- return false;
- }
-
- ScheduleWrite();
- return true;
-}
-
-bool
BufferedSocket::ResumeInput()
{
assert(IsDefined());
@@ -220,20 +144,6 @@ BufferedSocket::OnSocketReady(unsigned flags)
if (input == nullptr || !fifo_buffer_is_full(input))
ScheduleRead();
-
- /* just in case the OnSocketInput() method has added
- data to the output buffer: try to send it now
- instead of waiting for the next event loop
- iteration */
- if (!output.IsEmpty())
- flags |= WRITE;
- }
-
- if (flags & WRITE) {
- assert(!output.IsEmpty());
-
- if (!WriteFromBuffer())
- return false;
}
return true;
diff --git a/src/event/BufferedSocket.hxx b/src/event/BufferedSocket.hxx
index 23fd97d3..86deb8d9 100644
--- a/src/event/BufferedSocket.hxx
+++ b/src/event/BufferedSocket.hxx
@@ -22,21 +22,19 @@
#include "check.h"
#include "SocketMonitor.hxx"
-#include "util/PeakBuffer.hxx"
#include "gcc.h"
struct fifo_buffer;
-class EventLoop;
-class BufferedSocket : private SocketMonitor {
+/**
+ * A #SocketMonitor specialization that adds an input buffer.
+ */
+class BufferedSocket : protected SocketMonitor {
fifo_buffer *input;
- PeakBuffer output;
public:
- BufferedSocket(int _fd, EventLoop &_loop,
- size_t normal_size, size_t peak_size=0)
- :SocketMonitor(_fd, _loop), input(nullptr),
- output(normal_size, peak_size) {
+ BufferedSocket(int _fd, EventLoop &_loop)
+ :SocketMonitor(_fd, _loop), input(nullptr) {
ScheduleRead();
}
@@ -44,19 +42,12 @@ public:
using SocketMonitor::IsDefined;
using SocketMonitor::Close;
+ using SocketMonitor::Write;
private:
- ssize_t DirectWrite(const void *data, size_t length);
ssize_t DirectRead(void *data, size_t length);
/**
- * Send data from the output buffer to the socket.
- *
- * @return false if the socket has been closed
- */
- bool WriteFromBuffer();
-
- /**
* Receive data from the socket to the input buffer.
*
* @return false if the socket has been closed
@@ -67,11 +58,6 @@ protected:
/**
* @return false if the socket has been closed
*/
- bool Write(const void *data, size_t length);
-
- /**
- * @return false if the socket has been closed
- */
bool ResumeInput();
/**
@@ -112,7 +98,6 @@ protected:
virtual void OnSocketError(GError *error) = 0;
virtual void OnSocketClosed() = 0;
-private:
virtual bool OnSocketReady(unsigned flags) override;
};
diff --git a/src/event/FullyBufferedSocket.cxx b/src/event/FullyBufferedSocket.cxx
new file mode 100644
index 00000000..a92cb68a
--- /dev/null
+++ b/src/event/FullyBufferedSocket.cxx
@@ -0,0 +1,132 @@
+/*
+ * Copyright (C) 2003-2013 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "config.h"
+#include "FullyBufferedSocket.hxx"
+#include "SocketError.hxx"
+#include "util/fifo_buffer.h"
+
+#include <assert.h>
+#include <stdint.h>
+#include <string.h>
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/socket.h>
+#endif
+
+FullyBufferedSocket::ssize_t
+FullyBufferedSocket::DirectWrite(const void *data, size_t length)
+{
+ const auto nbytes = SocketMonitor::Write((const char *)data, length);
+ if (gcc_unlikely(nbytes < 0)) {
+ const auto code = GetSocketError();
+ if (IsSocketErrorAgain(code))
+ return 0;
+
+ Cancel();
+
+ if (IsSocketErrorClosed(code))
+ OnSocketClosed();
+ else
+ OnSocketError(NewSocketError(code));
+ }
+
+ return nbytes;
+}
+
+bool
+FullyBufferedSocket::WriteFromBuffer()
+{
+ assert(IsDefined());
+
+ size_t length;
+ const void *data = output.Read(&length);
+ if (data == nullptr) {
+ CancelWrite();
+ return true;
+ }
+
+ auto nbytes = DirectWrite(data, length);
+ if (gcc_unlikely(nbytes <= 0))
+ return nbytes == 0;
+
+ output.Consume(nbytes);
+
+ if (output.IsEmpty())
+ CancelWrite();
+
+ return true;
+}
+
+bool
+FullyBufferedSocket::Write(const void *data, size_t length)
+{
+ assert(IsDefined());
+
+#if 0
+ /* TODO: disabled because this would add overhead on some callers (the ones that often), but it may be useful */
+
+ if (output.IsEmpty()) {
+ /* try to write it directly first */
+ const auto nbytes = DirectWrite(data, length);
+ if (gcc_likely(nbytes > 0)) {
+ data = (const uint8_t *)data + nbytes;
+ length -= nbytes;
+ if (length == 0)
+ return true;
+ } else if (nbytes < 0)
+ return false;
+ }
+#endif
+
+ if (!output.Append(data, length)) {
+ // TODO
+ OnSocketError(g_error_new_literal(g_quark_from_static_string("buffered_socket"),
+ 0, "Output buffer is full"));
+ return false;
+ }
+
+ ScheduleWrite();
+ return true;
+}
+
+bool
+FullyBufferedSocket::OnSocketReady(unsigned flags)
+{
+ const bool was_empty = output.IsEmpty();
+ if (!BufferedSocket::OnSocketReady(flags))
+ return false;
+
+ if (was_empty && !output.IsEmpty())
+ /* just in case the OnSocketInput() method has added
+ data to the output buffer: try to send it now
+ instead of waiting for the next event loop
+ iteration */
+ flags |= WRITE;
+
+ if (flags & WRITE) {
+ assert(!output.IsEmpty());
+
+ if (!WriteFromBuffer())
+ return false;
+ }
+
+ return true;
+}
diff --git a/src/event/FullyBufferedSocket.hxx b/src/event/FullyBufferedSocket.hxx
new file mode 100644
index 00000000..c67c2c78
--- /dev/null
+++ b/src/event/FullyBufferedSocket.hxx
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2003-2013 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_FULLY_BUFFERED_SOCKET_HXX
+#define MPD_FULLY_BUFFERED_SOCKET_HXX
+
+#include "check.h"
+#include "BufferedSocket.hxx"
+#include "util/PeakBuffer.hxx"
+#include "gcc.h"
+
+/**
+ * A #BufferedSocket specialization that adds an output buffer.
+ */
+class FullyBufferedSocket : protected BufferedSocket {
+ PeakBuffer output;
+
+public:
+ FullyBufferedSocket(int _fd, EventLoop &_loop,
+ size_t normal_size, size_t peak_size=0)
+ :BufferedSocket(_fd, _loop),
+ output(normal_size, peak_size) {
+ }
+
+ using BufferedSocket::IsDefined;
+ using BufferedSocket::Close;
+
+private:
+ ssize_t DirectWrite(const void *data, size_t length);
+
+ /**
+ * Send data from the output buffer to the socket.
+ *
+ * @return false if the socket has been closed
+ */
+ bool WriteFromBuffer();
+
+protected:
+ /**
+ * @return false if the socket has been closed
+ */
+ bool Write(const void *data, size_t length);
+
+ virtual bool OnSocketReady(unsigned flags) override;
+};
+
+#endif