From 7a3b0ee961c9bc69d2088b23bda4be580eeaf182 Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 26 Nov 2013 18:59:21 +0100 Subject: [PATCH] Breaking changes, UriHandlers are now managed by the ClientRouter. Better PSR-2 compliance. Some changes to the examples --- TcpStream.php | 141 ------- composer.json | 3 +- client.html => examples/demo.html | 0 demo.php => examples/demo.php | 41 ++- client_ssl.html => examples/demo_ssl.html | 0 demo_ssl.php => examples/demo_ssl.php | 25 +- examples/echo_client.php | 31 ++ tcp.html => examples/tcp_proxy_example.html | 0 examples/tcp_proxy_example.php | 185 ++++++++++ sockets.php | 239 ------------ src/Devristo/Phpws/Client/HixieKey.php | 29 -- src/Devristo/Phpws/Client/WebSocket.php | 223 +++++------ .../Phpws/Client/WebSocketFunctions.php | 142 ------- ...tFrame.php => WebSocketFrameInterface.php} | 0 ...sage.php => WebSocketMessageInterface.php} | 0 .../Phpws/Protocol/WebSocketConnection.php | 16 +- .../Protocol/WebSocketConnectionFactory.php | 5 +- .../Protocol/WebSocketConnectionFlash.php | 9 +- .../Protocol/WebSocketConnectionHixie.php | 14 +- .../Protocol/WebSocketConnectionHybi.php | 16 +- ...n.php => WebSocketConnectionInterface.php} | 6 +- .../Phpws/Protocol/WebSocketObserver.php | 11 +- .../Phpws/Protocol/WebSocketServerClient.php | 104 ++++++ .../Phpws/Protocol/WebSocketStream.php | 188 ---------- src/Devristo/Phpws/Server/ISocketStream.php | 21 -- .../Phpws/Server/IWebSocketServerObserver.php | 22 -- src/Devristo/Phpws/Server/SocketServer.php | 153 -------- .../Phpws/Server/UriHandler/ClientRouter.php | 76 ++++ .../Server/UriHandler/WebSocketUriHandler.php | 14 +- ...r.php => WebSocketUriHandlerInterface.php} | 12 +- src/Devristo/Phpws/Server/WebSocketServer.php | 346 ++++-------------- .../Phpws/Server/WebSocketServerObserver.php | 28 -- tests/echo_client.php | 20 - 33 files changed, 662 insertions(+), 1458 deletions(-) delete mode 100644 TcpStream.php rename client.html => examples/demo.html (100%) rename demo.php => examples/demo.php (62%) rename client_ssl.html => examples/demo_ssl.html (100%) rename demo_ssl.php => examples/demo_ssl.php (75%) create mode 100644 examples/echo_client.php rename tcp.html => examples/tcp_proxy_example.html (100%) create mode 100644 examples/tcp_proxy_example.php delete mode 100644 sockets.php delete mode 100644 src/Devristo/Phpws/Client/HixieKey.php delete mode 100644 src/Devristo/Phpws/Client/WebSocketFunctions.php rename src/Devristo/Phpws/Framing/{IWebSocketFrame.php => WebSocketFrameInterface.php} (100%) rename src/Devristo/Phpws/Messaging/{IWebSocketMessage.php => WebSocketMessageInterface.php} (100%) rename src/Devristo/Phpws/Protocol/{IWebSocketConnection.php => WebSocketConnectionInterface.php} (87%) create mode 100644 src/Devristo/Phpws/Protocol/WebSocketServerClient.php delete mode 100644 src/Devristo/Phpws/Protocol/WebSocketStream.php delete mode 100644 src/Devristo/Phpws/Server/ISocketStream.php delete mode 100644 src/Devristo/Phpws/Server/IWebSocketServerObserver.php delete mode 100644 src/Devristo/Phpws/Server/SocketServer.php create mode 100644 src/Devristo/Phpws/Server/UriHandler/ClientRouter.php rename src/Devristo/Phpws/Server/UriHandler/{IWebSocketUriHandler.php => WebSocketUriHandlerInterface.php} (52%) delete mode 100644 src/Devristo/Phpws/Server/WebSocketServerObserver.php delete mode 100644 tests/echo_client.php diff --git a/TcpStream.php b/TcpStream.php deleted file mode 100644 index 498997b..0000000 --- a/TcpStream.php +++ /dev/null @@ -1,141 +0,0 @@ -id = uniqid("tcp-$address-"); - - $this->address = $address; - $this->socketServer = $server; - $this->logger = $logger; - $this->socket = stream_socket_client("tcp://$address", $error_number, $error, 5, STREAM_CLIENT_CONNECT); - $server->attachStream($this); - - if(!$this->socket) - throw new BadMethodCallException("Cannot connect to $address"); - - $this->_eventManager = new \Zend\EventManager\EventManager(__CLASS__); - } - - public function getId(){ - return $this->id; - } - - public function onData($data) - { - $this->_eventManager->trigger('data', $this, array( - 'data' => $data - )); - } - - public function close($triggerEvent=true) - { - if(!$this->isClosed()){ - $this->logger->debug(sprintf("Closing connection %s to %s", $this->getId(), $this->address)); - @fclose($this->getSocket()); - $this->closed = true; - $this->writeBuffer = ''; - - $this->_eventManager->trigger('close', $this); - } - } - - public function mayWrite() - { - $bytesWritten = fwrite($this->getSocket(), $this->writeBuffer, strlen($this->writeBuffer)); - - if ($bytesWritten === false) - $this->close(); - - $this->writeBuffer = substr($this->writeBuffer, $bytesWritten); - - if (strlen($this->writeBuffer) == 0 && $this->isClosing()) - $this->close(); - } - - public function requestsWrite() - { - return !$this->closed && strlen($this->writeBuffer) > 0; - } - - public function getSocket() - { - return $this->socket; - } - - public function acceptConnection() - { - throw new BadMethodCallException(); - } - - public function isServer() - { - return false; - } - - public function write($data) - { - if(!$this->closed) - $this->writeBuffer .= $data; - } - - public function requestClose() - { - if($this->requestsWrite()) - $this->closing = true; - else $this->close(); - } - - public function isClosing() - { - return $this->closing; - } - - public function isClosed(){ - return $this->closed; - } - - public function getAddress(){ - return $this->address; - } - - /** - * Inject an EventManager instance - * - * @param \Zend\EventManager\EventManagerInterface $eventManager - * @return void - */ - public function setEventManager(\Zend\EventManager\EventManagerInterface $eventManager) - { - $this->_eventManager = $eventManager; - } - - /** - * Retrieve the event manager - * - * Lazy-loads an EventManager instance if none registered. - * - * @return \Zend\EventManager\EventManagerInterface - */ - public function getEventManager() - { - return $this->_eventManager; - } -} \ No newline at end of file diff --git a/composer.json b/composer.json index c99c0ce..33ffbf2 100644 --- a/composer.json +++ b/composer.json @@ -15,6 +15,7 @@ }, "require": { "zendframework/zend-log": "2.*", - "zendframework/zend-eventmanager": "2.2.5" + "react/socket": "v0.3.2", + "react/socket-client": "v0.3.2" } } diff --git a/client.html b/examples/demo.html similarity index 100% rename from client.html rename to examples/demo.html diff --git a/demo.php b/examples/demo.php similarity index 62% rename from demo.php rename to examples/demo.php index 438d631..cd2f61a 100644 --- a/demo.php +++ b/examples/demo.php @@ -1,14 +1,14 @@ #!/php -q php demo.php use Devristo\Phpws\Framing\WebSocketFrame; use Devristo\Phpws\Framing\WebSocketOpcode; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; +use Devristo\Phpws\Protocol\WebSocketConnectionInterface; use Devristo\Phpws\Server\IWebSocketServerObserver; use Devristo\Phpws\Server\UriHandler\WebSocketUriHandler; use Devristo\Phpws\Server\WebSocketServer; @@ -22,13 +22,25 @@ */ class DemoEchoHandler extends WebSocketUriHandler { - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function __construct(\Zend\Log\LoggerInterface $logger, \React\EventLoop\LoopInterface $loop){ + parent::__construct($logger); + + $that = $this; + + $loop->addPeriodicTimer(1, function() use ($that){ + foreach($that->getConnections() as $client){ + $client->sendString("Hello world!"); + } + }); + } + + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { $this->logger->notice("[ECHO] " . strlen($msg->getData()) . " bytes"); // Echo $user->sendMessage($msg); } - public function onAdminMessage(IWebSocketConnection $user, IWebSocketMessage $obj) { + public function onAdminMessage(WebSocketConnectionInterface $user, IWebSocketMessage $obj) { $this->logger->notice("[DEMO] Admin TEST received!"); $frame = WebSocketFrame::create(WebSocketOpcode::PongFrame); @@ -49,31 +61,31 @@ class DemoSocketServer implements IWebSocketServerObserver { protected $debug = true; protected $server; - public function __construct() { + public function __construct(\React\EventLoop\LoopInterface $loop) { $logger = new \Zend\Log\Logger(); $logger->addWriter(new Zend\Log\Writer\Stream("php://output")); $this->logger = $logger; - $this->server = new WebSocketServer("tcp://0.0.0.0:12345", $logger); + $this->server = new WebSocketServer("tcp://0.0.0.0:12345", $loop, $logger); $this->server->addObserver($this); - $this->server->addUriHandler("echo", new DemoEchoHandler($logger)); + $this->server->addUriHandler("echo", new DemoEchoHandler($logger, $loop)); } - public function onConnect(IWebSocketConnection $user) { + public function onConnect(WebSocketConnectionInterface $user) { $this->logger->notice("[DEMO] {$user->getId()} connected"); } - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { //$this->logger->notice("[DEMO] {$user->getId()} says '{$msg->getData()}'"); } - public function onDisconnect(IWebSocketConnection $user) { + public function onDisconnect(WebSocketConnectionInterface $user) { $this->logger->notice("[DEMO] {$user->getId()} disconnected"); } - public function onAdminMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function onAdminMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { $this->logger->notice("[DEMO] Admin Message received!"); $frame = WebSocketFrame::create(WebSocketOpcode::PongFrame); @@ -86,6 +98,9 @@ public function run() { } +$loop = \React\EventLoop\Factory::create(); + // Start server -$server = new DemoSocketServer(); -$server->run(); \ No newline at end of file +$server = new DemoSocketServer($loop); +$server->run(); +$loop->run(); \ No newline at end of file diff --git a/client_ssl.html b/examples/demo_ssl.html similarity index 100% rename from client_ssl.html rename to examples/demo_ssl.html diff --git a/demo_ssl.php b/examples/demo_ssl.php similarity index 75% rename from demo_ssl.php rename to examples/demo_ssl.php index e1039e7..05e24e7 100644 --- a/demo_ssl.php +++ b/examples/demo_ssl.php @@ -8,7 +8,7 @@ use Devristo\Phpws\Framing\WebSocketFrame; use Devristo\Phpws\Framing\WebSocketOpcode; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; +use Devristo\Phpws\Protocol\WebSocketConnectionInterface; use Devristo\Phpws\Server\IWebSocketServerObserver; use Devristo\Phpws\Server\UriHandler\WebSocketUriHandler; use Devristo\Phpws\Server\WebSocketServer; @@ -22,13 +22,13 @@ */ class DemoSslEchoHandler extends WebSocketUriHandler { - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { $this->logger->notice("[ECHO] {$msg->getData()}"); // Echo $user->sendMessage($msg); } - public function onAdminMessage(IWebSocketConnection $user, IWebSocketMessage $obj) { + public function onAdminMessage(WebSocketConnectionInterface $user, IWebSocketMessage $obj) { $this->logger->notice("[DEMO] Admin TEST received!"); $frame = WebSocketFrame::create(WebSocketOpcode::PongFrame); @@ -49,16 +49,16 @@ class DemoSslSocketServer implements IWebSocketServerObserver { protected $debug = true; protected $server; - public function __construct() { + public function __construct($loop) { $logger = new \Zend\Log\Logger(); $logger->addWriter(new Zend\Log\Writer\Stream("php://output")); $this->logger = $logger; - $this->server = new WebSocketServer("ssl://0.0.0.0:12345", $logger); + $this->server = new WebSocketServer("ssl://0.0.0.0:12345", $loop, $logger); $this->server->addObserver($this); - $this->server->addUriHandler("echo", new DemoSslEchoHandler($logger)); + $this->server->addUriHandler("echo", new ProxyHandler($logger)); $this->setupSSL(); } @@ -79,19 +79,19 @@ public function setupSSL() { $this->server->setStreamContext($context); } - public function onConnect(IWebSocketConnection $user) { + public function onConnect(WebSocketConnectionInterface $user) { $this->logger->notice("[DEMO] {$user->getId()} connected"); } - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { $this->logger->notice("[DEMO] {$user->getId()} says '{$msg->getData()}'"); } - public function onDisconnect(IWebSocketConnection $user) { + public function onDisconnect(WebSocketConnectionInterface $user) { $this->logger->notice("[DEMO] {$user->getId()} disconnected"); } - public function onAdminMessage(IWebSocketConnection $user, IWebSocketMessage $msg) { + public function onAdminMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { $this->logger->notice("[DEMO] Admin Message received!"); $frame = WebSocketFrame::create(WebSocketOpcode::PongFrame); @@ -104,6 +104,9 @@ public function run() { } +$loop = \React\EventLoop\Factory::create(); + // Start server -$server = new DemoSslSocketServer(); +$server = new ProxyWebSocketServer($loop); $server->run(); +$loop->run(); \ No newline at end of file diff --git a/examples/echo_client.php b/examples/echo_client.php new file mode 100644 index 0000000..1183dfd --- /dev/null +++ b/examples/echo_client.php @@ -0,0 +1,31 @@ +addWriter($writer); + +$client = new \Devristo\Phpws\Client\WebSocket("ws://echo.websocket.org/?encoding=text", $loop, $logger); +$client->on("connected", function($headers) use ($logger, $client){ + $logger->notice("Connected!"); + $client->send("Hello world!"); +}); + +$client->on("message", function($message) use ($client, $logger){ + $logger->notice("Got message: ".$message->getData()); + $client->close(); +}); + + +$client->open(); +$loop->run(); \ No newline at end of file diff --git a/tcp.html b/examples/tcp_proxy_example.html similarity index 100% rename from tcp.html rename to examples/tcp_proxy_example.html diff --git a/examples/tcp_proxy_example.php b/examples/tcp_proxy_example.php new file mode 100644 index 0000000..8f10fad --- /dev/null +++ b/examples/tcp_proxy_example.php @@ -0,0 +1,185 @@ +#!/php -q + php demo.php +use Devristo\Phpws\Messaging\IWebSocketMessage; +use Devristo\Phpws\Protocol\WebSocketConnectionInterface; +use Devristo\Phpws\Server\UriHandler\WebSocketUriHandler; +use Devristo\Phpws\Server\WebSocketServer; + +/** + * This demo resource handler will respond to all messages sent to /echo/ on the socketserver below + * + * All this handler does is echoing the responds to the user + * @author Chris + * + */ +class ProxyHandler extends WebSocketUriHandler +{ + /** + * @var \React\Stream\Stream[][] + */ + protected $streams = array(); + protected $server; + + public function __construct(\React\EventLoop\LoopInterface $loop, $logger) + { + parent::__construct($logger); + $this->loop = $loop; + } + + public function onDisconnect(WebSocketConnectionInterface $user) + { + $this->logger->notice(sprintf("User %s has been removed from proxy", $user->getId())); + foreach ($this->getStreamsByUser($user) as $stream) { + $stream->close(); + } + unset($this->streams[$user->getId()]); + } + + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) + { + try { + $message = json_decode($msg->getData()); + + if ($message->command == 'connect') + $this->requestConnect($user, $message); + elseif ($message->command == 'write') + $this->requestWrite($user, $message); + elseif ($message->command == 'close') + $this->requestClose($user, $message); + + } catch (Exception $e) { + $this->logger->err($e->getMessage()); + } + } + + protected function requestConnect(WebSocketConnectionInterface $user, $message) + { + $address = $message->address; + $this->logger->notice(sprintf("User %s requests connection to %s", $user->getId(), $address)); + + try { + $dnsResolverFactory = new React\Dns\Resolver\Factory(); + $dns = $dnsResolverFactory->createCached('8.8.8.8', $this->loop); + $stream = new \React\SocketClient\Connector($this->loop, $dns); + + list($host, $port) = explode(":", $address); + + $logger = $this->logger; + $that = $this; + + $stream->create($host, $port)->then(function (\React\Stream\Stream $stream) use($user, $logger, $message, $address, $that){ + $id = uniqid("stream-$address-"); + $that->addStream($user, $id, $stream); + + $user->sendString(json_encode(array( + 'connection' => $id, + 'event' => 'connected', + 'tag' => property_exists($message, 'tag') ? $message->tag : null + ))); + + $stream->on("data", function ($data) use ($stream, $id, $user, $logger){ + $message = array( + 'connection' => $id, + 'event' => 'data', + 'data' => $data + ); + + $user->sendString(json_encode($message)); + }); + + $stream->on("close", function() use($user, $id, $logger, $address){ + $logger->notice(sprintf("Connection %s of user %s to %s has been closed", $id, $user->getId(), $address)); + + $message = + array( + 'connection' => $id, + 'event' => 'close' + ); + + $user->sendString(json_encode($message)); + }); + }); + } catch (Exception $e) { + $user->sendString(json_encode(array( + 'event' => 'error', + 'tag' => property_exists($message, 'tag') ? $message->tag : null, + 'message' => $e->getMessage() + ))); + } + } + + protected function requestWrite(WebSocketConnectionInterface $user, $message) + { + $stream = $this->getStream($user, $message->connection); + $this->logger->notice(sprintf("User %s writes %d bytes to connection %s", $user->getId(), strlen($message->data), $message->connection)); + $stream->write($message->data); + } + + protected function requestClose(WebSocketConnectionInterface $user, $message) + { + $stream = $this->getStream($user, $message->connection); + + if($stream){ + $this->logger->notice(sprintf("User %s closes connection %s", $user->getId(), $message->connection)); + $stream->close(); + $this->removeStream($user, $message->connection); + + $user->sendString(json_encode(array( + 'event' => 'close', + 'connection' => $message->connection, + 'tag' => property_exists($message, 'tag') ? $message->tag : null + ))); + } else { + $user->sendString(json_encode(array( + 'event' => 'error', + 'tag' => property_exists($message, 'tag') ? $message->tag : null, + 'message' => 'Connection was already closed' + ))); + } + } + + /** + * @param WebSocketConnectionInterface $user + * @param $id + * @return \React\Stream\Stream + */ + protected function getStream(WebSocketConnectionInterface $user, $id) + { + $userStreams = $this->getStreamsByUser($user); + + return array_key_exists($id, $userStreams) ? $userStreams[$id] : null; + } + + /** + * @param WebSocketConnectionInterface $user + * @return \React\Stream\Stream[] + */ + protected function getStreamsByUser(WebSocketConnectionInterface $user) + { + return array_key_exists($user->getId(), $this->streams) ? $this->streams[$user->getId()] : array(); + } + + protected function removeStream(WebSocketConnectionInterface $user, $id) + { + unset($this->streams[$user->getId()][$id]); + } + + protected function addStream(WebSocketConnectionInterface $user, $id, \React\Stream\Stream $stream){ + $this->streams[$user->getId()][$id] = $stream; + } +} +$loop = \React\EventLoop\Factory::create(); +$logger = new \Zend\Log\Logger(); +$writer = new Zend\Log\Writer\Stream("php://output"); +$logger->addWriter($writer); + +$server = new WebSocketServer("tcp://0.0.0.0:12345", $loop, $logger); +$router = new \Devristo\Phpws\Server\UriHandler\ClientRouter($server, $logger); +$router->addUriHandler("#^/proxy$#i", new ProxyHandler($loop, $logger)); + +$server->bind(); +$loop->run(); \ No newline at end of file diff --git a/sockets.php b/sockets.php deleted file mode 100644 index eec5080..0000000 --- a/sockets.php +++ /dev/null @@ -1,239 +0,0 @@ -#!/php -q - php demo.php -use Devristo\Phpws\Framing\WebSocketFrame; -use Devristo\Phpws\Framing\WebSocketOpcode; -use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; -use Devristo\Phpws\Server\IWebSocketServerObserver; -use Devristo\Phpws\Server\UriHandler\WebSocketUriHandler; -use Devristo\Phpws\Server\WebSocketServer; -use Devristo\Phpws\Utilities\DefaultDict; - -/** - * This demo resource handler will respond to all messages sent to /echo/ on the socketserver below - * - * All this handler does is echoing the responds to the user - * @author Chris - * - */ -class DemoSslEchoHandler extends WebSocketUriHandler -{ - /** - * @var TcpStream[][] - */ - protected $streams = array(); - protected $server; - - /** - * @param IWebSocketConnection $user - * @param $id - * @return TcpStream|null - */ - protected function getStream(IWebSocketConnection $user, $id) - { - $userStreams = $this->getStreamsByUser($user); - - return array_key_exists($id, $userStreams) ? $userStreams[$id] : null; - } - - /** - * @param IWebSocketConnection $user - * @return TcpStream[] - */ - protected function getStreamsByUser(IWebSocketConnection $user) - { - return array_key_exists($user->getId(), $this->streams) ? $this->streams[$user->getId()] : []; - } - - protected function removeStream(IWebSocketConnection $user, TcpStream $stream) - { - unset($this->streams[$user->getId()][$stream->getId()]); - } - - protected function addStream(IWebSocketConnection $user, TcpStream $stream){ - $this->streams[$user->getId()][$stream->getId()] = $stream; - } - - public function __construct(\Devristo\Phpws\Server\SocketServer $server, $logger) - { - parent::__construct($logger); - $this->socketServer = $server; - } - - public function onDisconnect(IWebSocketConnection $user) - { - $this->logger->notice(sprintf("User %s has been removed from proxy", $user->getId())); - foreach ($this->getStreamsByUser($user) as $stream) { - $stream->close(); - } - unset($this->streams[$user->getId()]); - } - - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) - { - try { - $message = json_decode($msg->getData()); - - if ($message->command == 'connect') - $this->requestConnect($user, $message); - elseif ($message->command == 'write') - $this->requestWrite($user, $message); - elseif ($message->command == 'close') - $this->requestClose($user, $message); - - } catch (Exception $e) { - $this->logger->err($e->getMessage()); - } - } - - protected function requestConnect(IWebSocketConnection $user, $message) - { - $address = $message->address; - $this->logger->notice(sprintf("User %s requests connection to %s", $user->getId(), $address)); - - try { - $stream = new TcpStream($this->socketServer, $address, $this->logger); - $uriHandler = $this; - - $stream->getEventManager()->attach("data", function (\Zend\EventManager\Event $event) use ($user, $uriHandler) { - $stream = $event->getTarget(); - $data = $event->getParam('data'); - - $uriHandler->logger->notice(sprintf("Proxying %d bytes on %s from %s to user %s", strlen($data), $stream->getId(), $stream->getAddress(), $user->getId())); - $message = array( - 'connection' => $stream->getId(), - 'event' => 'data', - 'data' => $event->getParam('data') - ); - - $user->sendString(json_encode($message)); - }); - - $stream->getEventManager()->attach("close", function (\Zend\EventManager\Event $event) use ($user, $uriHandler) { - /** - * @var $stream TcpStream - */ - $stream = $event->getTarget(); - - $uriHandler->logger->notice(sprintf("Connection %s of user %s to %s has been closed", $stream->getId(), $user->getId(), $stream->getAddress())); - - $message = - array( - 'connection' => $stream->getId(), - 'event' => 'close' - ); - - $user->sendString(json_encode($message)); - - // Remove stream from the user's list - $uriHandler->removeStream($user, $stream); - }); - - $this->addStream($user, $stream); - - $user->sendString(json_encode(array( - 'connection' => $stream->getId(), - 'event' => 'connected', - 'tag' => property_exists($message, 'tag') ? $message->tag : null - ))); - } catch (Exception $e) { - $user->sendString(json_encode(array( - 'event' => 'error', - 'tag' => property_exists($message, 'tag') ? $message->tag : null, - 'message' => $e->getMessage() - ))); - } - } - - protected function requestWrite(IWebSocketConnection $user, $message) - { - $stream = $this->getStream($user, $message->connection); - $this->logger->notice(sprintf("User %s writes %d bytes to connection %s to %s", $user->getId(), strlen($message->data), $stream->getId(), $stream->getAddress())); - $stream->write($message->data); - } - - protected function requestClose(IWebSocketConnection $user, $message) - { - $stream = $this->getStream($user, $message->connection); - - if($stream){ - $this->logger->notice(sprintf("User %s closes connection %s to %s", $user->getId(), $stream->getId(), $stream->getAddress())); - $stream->requestClose(); - $this->removeStream($user, $stream); - - $user->sendString(json_encode(array( - 'event' => 'close', - 'connection' => $stream->getId(), - 'tag' => property_exists($message, 'tag') ? $message->tag : null - ))); - } else { - $user->sendString(json_encode(array( - 'event' => 'error', - 'tag' => property_exists($message, 'tag') ? $message->tag : null, - 'message' => 'Connection was already closed' - ))); - } - } -} - -/** - * Demo socket server. Implements the basic eventlisteners and attaches a resource handler for /echo/ urls. - * - * - * @author Chris - * - */ -class DemoSslSocketServer extends \Devristo\Phpws\Server\WebSocketServerObserver -{ - - protected $debug = true; - protected $server; - - public function __construct() - { - $logger = new \Zend\Log\Logger(); - $writer = new Zend\Log\Writer\Stream("php://output"); - $writer->addFilter(new Zend\Log\Filter\Priority(\Zend\Log\Logger::NOTICE)); - $logger->addWriter($writer); - $this->logger = $logger; - - $this->server = new WebSocketServer("tcp://0.0.0.0:12345", $logger); - $this->server->addObserver($this); - - $this->server->addUriHandler("proxy", new DemoSslEchoHandler($this->server->_server, $logger)); - } - - private function getPEMFilename() - { - return './democert.pem'; - } - - public function setupSSL() - { - $context = stream_context_create(); - - // local_cert must be in PEM format - stream_context_set_option($context, 'ssl', 'local_cert', $this->getPEMFilename()); - - stream_context_set_option($context, 'ssl', 'allow_self_signed', true); - stream_context_set_option($context, 'ssl', 'verify_peer', false); - - $this->server->setStreamContext($context); - } - - public function run() - { - $this->server->run(); - } - -} - -// Start server -$server = new DemoSslSocketServer(); -$server->run(); diff --git a/src/Devristo/Phpws/Client/HixieKey.php b/src/Devristo/Phpws/Client/HixieKey.php deleted file mode 100644 index 412f297..0000000 --- a/src/Devristo/Phpws/Client/HixieKey.php +++ /dev/null @@ -1,29 +0,0 @@ -number = $number; - $this->key = $key; - } - -} \ No newline at end of file diff --git a/src/Devristo/Phpws/Client/WebSocket.php b/src/Devristo/Phpws/Client/WebSocket.php index 5a34e95..e8d3552 100644 --- a/src/Devristo/Phpws/Client/WebSocket.php +++ b/src/Devristo/Phpws/Client/WebSocket.php @@ -7,46 +7,49 @@ use Devristo\Phpws\Framing\WebSocketFrame; use Devristo\Phpws\Framing\WebSocketOpcode; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; use Devristo\Phpws\Protocol\WebSocketConnection; use Devristo\Phpws\Protocol\WebSocketConnectionFactory; -use Devristo\Phpws\Protocol\WebSocketConnectionFlash; -use Devristo\Phpws\Protocol\WebSocketConnectionHixie; use Devristo\Phpws\Protocol\WebSocketConnectionHybi; -use Devristo\Phpws\Protocol\WebSocketObserver; -use Devristo\Phpws\Protocol\WebSocketStream; +use Devristo\Phpws\Protocol\WebSocketServerClient; +use Evenement\EventEmitter; +use React\EventLoop\LoopInterface; +use React\Stream\Stream; -class WebSocket implements WebSocketObserver +class WebSocket extends EventEmitter { + const STATE_HANDSHAKE_SENT = 0; + const STATE_CONNECTED = 1; + const STATE_CLOSING = 2; + const STATE_CLOSED = 3; + + protected $state = self::STATE_CLOSED; - protected $socket; - protected $handshakeChallenge; - protected $hixieKey1; - protected $hixieKey2; protected $host; protected $port; - protected $origin; protected $requestUri; protected $url; - protected $hybi; - protected $_frames = array(); - protected $_messages = array(); - protected $_head = ''; protected $_timeOut = 1; + /** + * @var WebSocketServerClient + */ + protected $stream; + + /** * @var WebSocketConnection */ protected $_connection = null; protected $headers; + protected $loop; - public function __construct($url, $useHybie = true, $showHeaders = false) - { - if (defined('WS_DEBUG_HEADER')) - define("WS_DEBUG_HEADER", $showHeaders); + protected $isClosing = false; - $this->hybi = $useHybie; + public function __construct($url, LoopInterface $loop, $logger) + { + $this->logger = $logger; + $this->loop = $loop; $parts = parse_url($url); $this->url = $url; @@ -77,76 +80,83 @@ public function __construct($url, $useHybie = true, $showHeaders = false) } - /** - * @return string - */ - public function getOrigin() - { - return $this->origin; - } - - /** - * @param string $origin - */ - public function setOrigin($origin) + private static function randHybiKey() { - $this->origin = $origin; + return base64_encode( + chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) + . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) + . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) + . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) . chr(rand(0, 255)) + ); } - public function onDisconnect(WebSocketStream $s) - { - - } - public function onConnectionEstablished(WebSocketStream $s) + protected function sendHandshake() { + $challenge = self::randHybiKey(); - } + $this->addHeader("Connection","Upgrade"); + $this->addHeader("Host","{$this->host}"); + $this->addHeader("Sec-WebSocket-Key",$challenge); + $this->addHeader("Origin","{$this->origin}"); + $this->addHeader("Sec-WebSocket-Version",13); + $this->addHeader("Upgrade","websocket"); - public function onMessage(IWebSocketConnection $s, IWebSocketMessage $msg) - { - $this->_messages[] = $msg; - } + $strHandshake = "GET {$this->requestUri} HTTP/1.1\r\n"; - public function onFlashXMLRequest(WebSocketConnectionFlash $connection) - { + foreach ($this->headers as $k => $v) { + $strHandshake .= $k . " " . $v . "\r\n"; + } + $strHandshake .= "\r\n"; + $this->stream->write($strHandshake); + } + + public function onData($data) + { + switch ($this->state) { + case (self::STATE_HANDSHAKE_SENT): + $headers = WebSocketConnectionFactory::parseHeaders($data); + $this->_connection = new WebSocketConnectionHybi($this->stream, $headers); + $myself = $this; + $this->_connection->on("message", function($message) use ($myself){ + $myself->emit("message", array("message" => $message)); + }); + $this->state = self::STATE_CONNECTED; + $this->emit("connected", array("headers" => $headers)); + break; + case (self::STATE_CONNECTED): + $this->_connection->onData($data); + } } - /** - * TODO: Proper header generation! - * TODO: Check server response! - */ public function open() { $errno = $errstr = null; + $that = $this; $protocol = $this->scheme == 'ws' ? "tcp" : "ssl"; $this->socket = stream_socket_client("$protocol://{$this->host}:{$this->port}", $errno, $errstr, $this->getTimeOut()); + $stream = new Stream($this->socket, $this->loop); - // mamta - if ($this->hybi) { - $this->buildHeaderArray(); - } else { - $this->buildHeaderArrayHixie76(); - } - $buffer = $this->serializeHeaders(); + $stream->on('data', array($this, 'onData')); - fwrite($this->socket, $buffer, strlen($buffer)); + $stream->on('message', function($message) use($that){ + $that->emit("message", array("message" => $message)); + }); - // wait for response - $buffer = fread($this->socket, 8192); - $headers = WebSocketConnectionFactory::parseHeaders($buffer); + $this->stream = $stream; - $s = new WebSocketStream($this, $this->socket, $immediateWrite = true); + $this->sendHandshake(); + $this->state = self::STATE_HANDSHAKE_SENT; - $this->_connection = $this->hybi ? new WebSocketConnectionHybi($s, $headers) : new WebSocketConnectionHixie($s, $headers, $buffer); + return $this; + } - $s->setConnection($this->_connection); + public function onMessage($message){ - return true; } public function getTimeOut() @@ -159,36 +169,6 @@ public function setTimeOut($seconds) $this->_timeOut = $seconds; } - protected function buildHeaderArray() - { - $this->handshakeChallenge = WebSocketFunctions::randHybiKey(); - $this->headers = array("GET" => "{$this->requestUri} HTTP/1.1", "Connection:" => "Upgrade", "Host:" => "{$this->host}", "Sec-WebSocket-Key:" => "{$this->handshakeChallenge}", "Origin:" => "{$this->origin}", "Sec-WebSocket-Version:" => 13, "Upgrade:" => "websocket"); - - return $this->headers; - } - - protected function buildHeaderArrayHixie76() - { - $this->hixieKey1 = WebSocketFunctions::randHixieKey(); - $this->hixieKey2 = WebSocketFunctions::randHixieKey(); - $this->headers = array("GET" => "{$this->requestUri} HTTP/1.1", "Connection:" => "Upgrade", "Host:" => "{$this->host}", "Origin:" => "{$this->origin}", "Sec-WebSocket-Key1:" => "{$this->hixieKey1->key}", "Sec-WebSocket-Key2:" => "{$this->hixieKey2->key}", "Upgrade:" => "websocket", "Sec-WebSocket-Protocol: " => "hiwavenet"); - - return $this->headers; - } - - private function serializeHeaders() - { - $str = ''; - - foreach ($this->headers as $k => $v) { - $str .= $k . " " . $v . "\r\n"; - } - - return $str . "\r\n"; - } - - # mamta: hixie 76 - public function addHeader($key, $value) { $this->headers[$key . ":"] = $value; @@ -199,56 +179,35 @@ public function send($string) $this->_connection->sendString($string); } - public function sendMessage($msg) + public function sendMessage(IWebSocketMessage $msg) { $this->_connection->sendMessage($msg); } - /** - * - * @return IWebSocketMessage - */ - public function readMessage() - { - while (count($this->_messages) == 0) - $this->readFrame(); - - - return array_shift($this->_messages); - } - - /** - * @return WebSocketFrame - */ - public function readFrame() + public function sendFrame(IWebSocketFrame $frame) { - $buffer = fread($this->socket, 8192); - - $this->_frames = array_merge($this->_frames, $this->_connection->readFrame($buffer)); - - return array_shift($this->_frames); + $this->_connection->sendFrame($frame); } public function close() { - /** - * @var WebSocketFrame - */ - $frame = null; + if($this->isClosing) + return; + + $this->isClosing = true; $this->sendFrame(WebSocketFrame::create(WebSocketOpcode::CloseFrame)); - $i = 0; - do { - $i++; - $frame = @$this->readFrame(); - } while ($i < 2 && $frame && $frame->getType() == WebSocketOpcode::CloseFrame); + $this->state = self::STATE_CLOSING; + $stream = $this->stream; - @fclose($this->socket); - } + $closeTimer = $this->loop->addTimer(5, function() use ($stream){ + $stream->close(); + }); - public function sendFrame(IWebSocketFrame $frame) - { - $this->_connection->sendFrame($frame); + $loop = $this->loop; + $stream->once("close", function() use ($closeTimer, $loop){ + if($closeTimer) + $loop->cancelTimer($closeTimer); + }); } - } diff --git a/src/Devristo/Phpws/Client/WebSocketFunctions.php b/src/Devristo/Phpws/Client/WebSocketFunctions.php deleted file mode 100644 index 00e506b..0000000 --- a/src/Devristo/Phpws/Client/WebSocketFunctions.php +++ /dev/null @@ -1,142 +0,0 @@ - 0) { - $c = chr(rand(0x21, 0x2f + 1)); #random.choice(_AVAILABLE_KEY_CHARS) - } else { - $c = chr(rand(0x3a, 0x7e + 1)); #random.choice(_AVAILABLE_KEY_CHARS) - } - # $c = chr(65); - $len = strlen($key_n); - # $pos = 2; - $pos = rand(0, $len); - $key_n1 = substr($key_n, 0, $pos); - $key_n2 = substr($key_n, $pos); - $key_n = $key_n1 . $c . $key_n2; - } - for ($i = 0; $i < $spaces_n; $i++) { - $len = strlen($key_n); - # $pos = 2; - $pos = rand(1, $len - 1); - $key_n1 = substr($key_n, 0, $pos); - $key_n2 = substr($key_n, $pos); - $key_n = $key_n1 . " " . $key_n2; - } - - return new HixieKey($number_n, $key_n); - } - -} \ No newline at end of file diff --git a/src/Devristo/Phpws/Framing/IWebSocketFrame.php b/src/Devristo/Phpws/Framing/WebSocketFrameInterface.php similarity index 100% rename from src/Devristo/Phpws/Framing/IWebSocketFrame.php rename to src/Devristo/Phpws/Framing/WebSocketFrameInterface.php diff --git a/src/Devristo/Phpws/Messaging/IWebSocketMessage.php b/src/Devristo/Phpws/Messaging/WebSocketMessageInterface.php similarity index 100% rename from src/Devristo/Phpws/Messaging/IWebSocketMessage.php rename to src/Devristo/Phpws/Messaging/WebSocketMessageInterface.php diff --git a/src/Devristo/Phpws/Protocol/WebSocketConnection.php b/src/Devristo/Phpws/Protocol/WebSocketConnection.php index b285570..0d60ade 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketConnection.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnection.php @@ -10,11 +10,12 @@ use Devristo\Phpws\Framing\IWebSocketFrame; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\WebSocketStream; +use Evenement\EventEmitter; +use React\Stream\WritableStreamInterface; use Zend\Log\LoggerAwareInterface; use Zend\Log\LoggerInterface; -abstract class WebSocketConnection implements IWebSocketConnection, LoggerAwareInterface +abstract class WebSocketConnection extends EventEmitter implements WebSocketConnectionInterface, LoggerAwareInterface { protected $_headers = array(); @@ -26,27 +27,30 @@ abstract class WebSocketConnection implements IWebSocketConnection, LoggerAwareI /** * - * @var WebSocketStream + * @var WebSocketServerClient */ protected $_socket = null; protected $_cookies = array(); public $parameters = null; protected $_role = WebSocketConnectionRole::CLIENT; - public function __construct(WebSocketStream $socket, array $headers) + protected $_eventManger; + + public function __construct(WritableStreamInterface $socket, array $headers) { $this->setHeaders($headers); $this->_socket = $socket; + $this->_id = uniqid("connection-"); } public function getIp() { - return stream_socket_get_name($this->_socket->getSocket(), true); + return $this->_socket->getRemoteAddress(); } public function getId() { - return (int)$this->_socket->getSocket(); + return $this->_id; } public function sendFrame(IWebSocketFrame $frame) diff --git a/src/Devristo/Phpws/Protocol/WebSocketConnectionFactory.php b/src/Devristo/Phpws/Protocol/WebSocketConnectionFactory.php index 61532fa..131c2ac 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketConnectionFactory.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnectionFactory.php @@ -12,13 +12,14 @@ use Devristo\Phpws\Protocol\WebSocketConnectionHixie; use Devristo\Phpws\Protocol\WebSocketConnectionHybi; use Devristo\Phpws\Protocol\WebSocketConnectionRole; -use Devristo\Phpws\Protocol\WebSocketStream; +use Devristo\Phpws\Protocol\WebSocketServerClient; +use React\Socket\ConnectionInterface; use Zend\Log\LoggerInterface; class WebSocketConnectionFactory { - public static function fromSocketData(WebSocketStream $socket, $data, LoggerInterface $logger) + public static function fromSocketData(ConnectionInterface $socket, $data, LoggerInterface $logger) { $headers = self::parseHeaders($data); diff --git a/src/Devristo/Phpws/Protocol/WebSocketConnectionFlash.php b/src/Devristo/Phpws/Protocol/WebSocketConnectionFlash.php index 983a9ce..f549b15 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketConnectionFlash.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnectionFlash.php @@ -16,7 +16,8 @@ class WebSocketConnectionFlash extends WebSocketConnection public function __construct($socket, $data) { $this->_socket = $socket; - $this->_socket->onFlashXMLRequest($this); + + $this->emit("flashXmlRequest"); } public function sendString($msg) @@ -24,9 +25,9 @@ public function sendString($msg) $this->_socket->write($msg); } - public function disconnect() + public function close() { - $this->_socket->disconnect(); + $this->_socket->close(); } public function sendHandshakeResponse() @@ -34,7 +35,7 @@ public function sendHandshakeResponse() throw new Exception("Not supported!"); } - public function readFrame($data) + public function onData($data) { throw new Exception("Not supported!"); } diff --git a/src/Devristo/Phpws/Protocol/WebSocketConnectionHixie.php b/src/Devristo/Phpws/Protocol/WebSocketConnectionHixie.php index 5b4c21c..91bafe4 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketConnectionHixie.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnectionHixie.php @@ -7,14 +7,14 @@ use Devristo\Phpws\Framing\WebSocketFrame76; use Devristo\Phpws\Messaging\WebSocketMessage; use Devristo\Phpws\Messaging\WebSocketMessage76; -use Devristo\Phpws\Protocol\WebSocketStream; +use Devristo\Phpws\Protocol\WebSocketServerClient; class WebSocketConnectionHixie extends WebSocketConnection { private $_clientHandshake; - public function __construct(WebSocketStream $socket, array $headers, $clientHandshake) + public function __construct(WebSocketServerClient $socket, array $headers, $clientHandshake) { $this->_clientHandshake = $clientHandshake; parent::__construct($socket, $headers); @@ -78,12 +78,12 @@ private static function calcHixieResponse($key1, $key2, $l8b) } - public function readFrame($data) + public function onData($data) { $f = WebSocketFrame76::decode($data); - $m = WebSocketMessage76::fromFrame($f); + $message = WebSocketMessage76::fromFrame($f); - $this->_socket->onMessage($m); + $this->emit("message", array('message' => $message)); return array($f); } @@ -95,9 +95,9 @@ public function sendString($msg) return $this->sendMessage($m); } - public function disconnect() + public function close() { - $this->_socket->disconnect(); + $this->_socket->close(); } } diff --git a/src/Devristo/Phpws/Protocol/WebSocketConnectionHybi.php b/src/Devristo/Phpws/Protocol/WebSocketConnectionHybi.php index a35b2d9..3c0fbea 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketConnectionHybi.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnectionHybi.php @@ -40,7 +40,7 @@ public function sendHandshakeResponse() $this->_socket->write($response); - $this->logger->debug("HYBI Response SENT!"); + $this->logger->debug("Got an HYBI style request, sent HYBY handshake response"); } private static function calcHybiResponse($challenge) @@ -48,7 +48,7 @@ private static function calcHybiResponse($challenge) return base64_encode(sha1($challenge . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)); } - public function readFrame($data) + public function onData($data) { $frames = array(); while (!empty($data)) { @@ -103,7 +103,7 @@ protected function processMessageFrame(WebSocketFrame $frame) } if ($this->_openMessage && $this->_openMessage->isFinalised()) { - $this->_socket->onMessage($this->_openMessage); + $this->emit("message", array('message' => $this->_openMessage)); $this->_openMessage = null; } } @@ -119,10 +119,12 @@ protected function processControlFrame(WebSocketFrame $frame) { switch ($frame->getType()) { case WebSocketOpcode::CloseFrame : + $this->logger->notice("Got CLOSE frame"); + $frame = WebSocketFrame::create(WebSocketOpcode::CloseFrame); $this->sendFrame($frame); - $this->_socket->disconnect(); + $this->_socket->close(); break; case WebSocketOpcode::PingFrame : $frame = WebSocketFrame::create(WebSocketOpcode::PongFrame); @@ -138,18 +140,18 @@ public function sendString($msg) return $this->sendMessage($m); } catch (Exception $e) { - $this->disconnect(); + $this->close(); } return false; } - public function disconnect() + public function close() { $f = WebSocketFrame::create(WebSocketOpcode::CloseFrame); $this->sendFrame($f); - $this->_socket->disconnect(); + $this->_socket->close(); } } \ No newline at end of file diff --git a/src/Devristo/Phpws/Protocol/IWebSocketConnection.php b/src/Devristo/Phpws/Protocol/WebSocketConnectionInterface.php similarity index 87% rename from src/Devristo/Phpws/Protocol/IWebSocketConnection.php rename to src/Devristo/Phpws/Protocol/WebSocketConnectionInterface.php index 8ef4395..9f5fc90 100644 --- a/src/Devristo/Phpws/Protocol/IWebSocketConnection.php +++ b/src/Devristo/Phpws/Protocol/WebSocketConnectionInterface.php @@ -11,7 +11,7 @@ use Devristo\Phpws\Framing\IWebSocketFrame; use Devristo\Phpws\Messaging\IWebSocketMessage; -interface IWebSocketConnection +interface WebSocketConnectionInterface { public function getId(); @@ -20,7 +20,7 @@ public function sendHandshakeResponse(); public function setRole($role); - public function readFrame($data); + public function onData($data); public function sendFrame(IWebSocketFrame $frame); @@ -36,5 +36,5 @@ public function getCookies(); public function getIp(); - public function disconnect(); + public function close(); } \ No newline at end of file diff --git a/src/Devristo/Phpws/Protocol/WebSocketObserver.php b/src/Devristo/Phpws/Protocol/WebSocketObserver.php index c613261..e450175 100644 --- a/src/Devristo/Phpws/Protocol/WebSocketObserver.php +++ b/src/Devristo/Phpws/Protocol/WebSocketObserver.php @@ -9,16 +9,13 @@ namespace Devristo\Phpws\Protocol; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\WebSocketStream; +use Devristo\Phpws\Protocol\WebSocketServerClient; interface WebSocketObserver { + public function onDisconnect(WebSocketServerClient $s); - public function onDisconnect(WebSocketStream $s); + public function onConnectionEstablished(WebSocketServerClient $s); - public function onConnectionEstablished(WebSocketStream $s); - - public function onMessage(IWebSocketConnection $s, IWebSocketMessage $msg); - - public function onFlashXMLRequest(WebSocketConnectionFlash $connection); + public function onMessage(WebSocketConnectionInterface $s, IWebSocketMessage $msg); } \ No newline at end of file diff --git a/src/Devristo/Phpws/Protocol/WebSocketServerClient.php b/src/Devristo/Phpws/Protocol/WebSocketServerClient.php new file mode 100644 index 0000000..bfbe0c7 --- /dev/null +++ b/src/Devristo/Phpws/Protocol/WebSocketServerClient.php @@ -0,0 +1,104 @@ +_lastChanged = time(); + $this->on("data", array($this, 'onData')); + $this->logger = $logger; + } + + public function handleData($stream) + { + $data = @fread($stream, $this->bufferSize); + if ('' === $data || false === $data) { + $this->end(); + } else { + $this->emit('data', array($data, $this)); + } + } + + public function onData($data) + { + try { + $this->_lastChanged = time(); + + if ($this->_connection) + $this->_connection->onData($data); + else + $this->establishConnection($data); + } catch (Exception $e) { + $this->close(); + } + } + + public function setConnection(WebSocketConnectionInterface $con) + { + $this->_connection = $con; + } + + public function establishConnection($data) + { + $this->_connection = WebSocketConnectionFactory::fromSocketData($this, $data, $this->logger); + $myself = $this; + $this->_connection->on("message", function($message) use($myself){ + $myself->emit("message", array("message" => $message)); + }); + + $this->_connection->on("flashXmlRequest", function($message) use($myself){ + $myself->emit("flashXmlRequest"); + }); + + if ($this->_connection instanceof WebSocketConnectionFlash) + return; + + $this->emit("connect"); + } + + public function getLastChanged() + { + return $this->_lastChanged; + } + + /** + * + * @return WebSocketConnectionInterface + */ + public function getConnection() + { + return $this->_connection; + } + + public function setLogger(LoggerInterface $logger) + { + $this->logger = $logger; + } +} \ No newline at end of file diff --git a/src/Devristo/Phpws/Protocol/WebSocketStream.php b/src/Devristo/Phpws/Protocol/WebSocketStream.php deleted file mode 100644 index 34efb1c..0000000 --- a/src/Devristo/Phpws/Protocol/WebSocketStream.php +++ /dev/null @@ -1,188 +0,0 @@ -_socket = $socket; - $this->_lastChanged = time(); - $this->_immediateWrite = $immediateWrite; - - $this->addObserver($server); - } - - public function onData($data) - { - try { - $this->_lastChanged = time(); - - if ($this->_connection) - $this->_connection->readFrame($data); - else - $this->establishConnection($data); - } catch (Exception $e) { - $this->disconnect(); - } - } - - public function setConnection(IWebSocketConnection $con) - { - $this->_connection = $con; - } - - public function onMessage(IWebSocketMessage $m) - { - foreach ($this->_observers as $observer) { - $observer->onMessage($this->getConnection(), $m); - } - } - - public function establishConnection($data) - { - $this->_connection = WebSocketConnectionFactory::fromSocketData($this, $data, $this->logger); - - if ($this->_connection instanceof WebSocketConnectionFlash) - return; - - foreach ($this->_observers as $observer) { - $observer->onConnectionEstablished($this); - } - } - - public function write($data) - { - $this->_writeBuffer .= $data; - - if ($this->_immediateWrite == true) { - while ($this->_writeBuffer != '') - $this->mayWrite(); - } - } - - public function requestsWrite() - { - return strlen($this->_writeBuffer); - } - - public function mayWrite() - { - $bytesWritten = fwrite($this->_socket, $this->_writeBuffer, strlen($this->_writeBuffer)); - - if ($bytesWritten === false) - $this->close(); - - $this->_writeBuffer = substr($this->_writeBuffer, $bytesWritten); - - if (strlen($this->_writeBuffer) == 0 && $this->isClosing()) - $this->close(); - } - - public function getLastChanged() - { - return $this->_lastChanged; - } - - public function onFlashXMLRequest(WebSocketConnectionFlash $connection) - { - foreach ($this->_observers as $observer) { - $observer->onFlashXMLRequest($connection); - } - } - - public function disconnect() - { - $this->_disconnecting = true; - - if ($this->_writeBuffer == '') - $this->close(); - } - - public function isClosing() - { - return $this->_disconnecting; - } - - public function close() - { - fclose($this->_socket); - $this->_closed = true; - foreach ($this->_observers as $observer) { - $observer->onDisconnect($this); - } - } - - public function getSocket() - { - return $this->_socket; - } - - /** - * - * @return IWebSocketConnection - */ - public function getConnection() - { - return $this->_connection; - } - - public function addObserver(WebSocketObserver $s) - { - $this->_observers[] = $s; - } - - public function acceptConnection() - { - throw new \BadMethodCallException(); - } - - public function isServer() - { - return false; - } - - public function setLogger(LoggerInterface $logger) - { - $this->logger = $logger; - } - - - public function isClosed() - { - return $this->_closed; - } -} \ No newline at end of file diff --git a/src/Devristo/Phpws/Server/ISocketStream.php b/src/Devristo/Phpws/Server/ISocketStream.php deleted file mode 100644 index fb51f8c..0000000 --- a/src/Devristo/Phpws/Server/ISocketStream.php +++ /dev/null @@ -1,21 +0,0 @@ -streams = new \SplObjectStorage(); - - $this->_logger = $logger; - } - - public function attachStream(ISocketStream $stream){ - $this->streams->attach($stream); - $this->_logger->info("Attaching stream"); - } - - public function detachStream(ISocketStream $stream){ - $this->streams->detach($stream); - $this->_logger->info("Detaching stream"); - } - - public function run(){ - $this->_eventLoop(); - } - - public function getStreams(){ - return $this->streams; - } - - private function detachClosedStreams(){ - $closed = array(); - foreach($this->streams as $stream) - if($stream->isClosed()) - $closed[] = $stream; - - foreach($closed as $stream) - $this->detachStream($stream); - } - - public function getSockets(){ - $sockets = array(); - - foreach($this->streams as $stream) - $sockets[] = $stream->getSocket(); - - return $sockets; - } - - private function getWriteResources(){ - $wantsToWrite = array(); - foreach($this->getStreams() as $stream){ - if($stream->requestsWrite()) - $wantsToWrite[] = $stream->getSocket(); - } - - return $wantsToWrite; - } - - private function getStreamByResource($resource){ - foreach($this->streams as $stream) - if($stream->getSocket() == $resource) - return $stream; - - return null; - } - - protected function _eventLoop(){ - while (true) { - clearstatcache(); - gc_collect_cycles(); - - $changed = $this->getSockets(); - $write = $this->getWriteResources(); - $except = null; - - if (@stream_select($changed, $write, $except, null) === false) { - $this->_logger->err("Stream select has failed. You might need to restart the server if it happens again"); - break; - } - $this->_logger->debug("Streams selected"); - - - foreach ($changed as $resource) { - $stream = $this->getStreamByResource($resource); - - if(!$stream){ - $this->_logger->err("Cannot find ISocketStream using this socket, skipping it"); - @fclose($resource); - continue; - } - - - if ($stream->isServer()) { - $stream->acceptConnection(); - } else { - $buffer = @fread($resource, 8192); - - // If read returns false, close the stream and continue with the next socket - if ($buffer === false) { - $stream->close(); - $this->detachStream($stream); - // Skip to next stream - continue; - } - - $bytes = strlen($buffer); - - if ($bytes === 0) { - $stream->close(); - $this->detachStream($stream); - } else if ($stream != null) { - $stream->onData($buffer); - } - } - } - - if (is_array($write)) { - foreach ($write as $resource) { - $stream = $this->getStreamByResource($resource); - - if(!$stream){ - $this->_logger->err("Cannot find ISocketStream using this socket, skipping it"); - @fclose($resource); - continue; - } - - $stream->mayWrite(); - } - } - } - } -} \ No newline at end of file diff --git a/src/Devristo/Phpws/Server/UriHandler/ClientRouter.php b/src/Devristo/Phpws/Server/UriHandler/ClientRouter.php new file mode 100644 index 0000000..06161c7 --- /dev/null +++ b/src/Devristo/Phpws/Server/UriHandler/ClientRouter.php @@ -0,0 +1,76 @@ +server = $server; + + $this->membership = new \SplObjectStorage(); + + /** + * @var $membership \SplObjectStorage|WebSocketUriHandlerInterface[] + */ + $membership = $this->membership; + + $that = $this; + $server->on("connect", function(WebSocketConnectionInterface $client) use ($that, $logger, $membership){ + $handler = $that->matchConnection($client); + + if($handler){ + $logger->notice("We have added client {$client->getId()} to ".get_class($handler)); + $membership->attach($client, $handler); + $handler->addConnection($client); + }else + $logger->err("Cannot route {$client->getId()} with request uri {$client->getUriRequested()}"); + }); + + $server->on('disconnect', function(WebSocketConnectionInterface $client) use($that, $logger, $membership){ + if($membership->contains($client)){ + $membership[$client]->removeConnection($client); + } else { + $logger->warn("Client {$client->getId()} not attached to any handler, so cannot remove it!"); + } + }); + + $server->on("message", function(WebSocketConnectionInterface $client, IWebSocketMessage $message) use($that, $logger, $membership){ + if($membership->contains($client)){ + $membership[$client]->onMessage($client, $message); + } else { + $logger->warn("Client {$client->getId()} not attached to any handler, so cannot forward the message!"); + } + }); + + } + + /** + * @param WebSocketConnection $client + * @return null|WebSocketUriHandlerInterface + */ + public function matchConnection(WebSocketConnection $client){ + foreach($this->handlers as $key => $value ){ + if(preg_match($key,$client->getUriRequested())) + return $value; + } + + return null; + } + + public function addUriHandler($matchPattern, WebSocketUriHandlerInterface $handler){ + $this->handlers[$matchPattern] = $handler; + } +} \ No newline at end of file diff --git a/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandler.php b/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandler.php index 9705d3a..d6cffb1 100644 --- a/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandler.php +++ b/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandler.php @@ -3,12 +3,12 @@ namespace Devristo\Phpws\Server\UriHandler; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; +use Devristo\Phpws\Protocol\WebSocketConnectionInterface; use Devristo\Phpws\Server\WebSocketServer; use SplObjectStorage; use Zend\Log\LoggerInterface; -abstract class WebSocketUriHandler implements IWebSocketUriHandler +abstract class WebSocketUriHandler implements WebSocketUriHandlerInterface { /** @@ -36,12 +36,12 @@ public function __construct($logger) $this->logger = $logger; } - public function addConnection(IWebSocketConnection $user) + public function addConnection(WebSocketConnectionInterface $user) { $this->users->attach($user); } - public function removeConnection(IWebSocketConnection $user) + public function removeConnection(WebSocketConnectionInterface $user) { $this->users->detach($user); $this->onDisconnect($user); @@ -52,17 +52,17 @@ public function setServer(WebSocketServer $server) $this->server = $server; } - public function send(IWebSocketConnection $client, $str) + public function send(WebSocketConnectionInterface $client, $str) { return $client->sendString($str); } - public function onDisconnect(IWebSocketConnection $user) + public function onDisconnect(WebSocketConnectionInterface $user) { } - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg) + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg) { } diff --git a/src/Devristo/Phpws/Server/UriHandler/IWebSocketUriHandler.php b/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandlerInterface.php similarity index 52% rename from src/Devristo/Phpws/Server/UriHandler/IWebSocketUriHandler.php rename to src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandlerInterface.php index b088bdd..df22cbe 100644 --- a/src/Devristo/Phpws/Server/UriHandler/IWebSocketUriHandler.php +++ b/src/Devristo/Phpws/Server/UriHandler/WebSocketUriHandlerInterface.php @@ -9,22 +9,22 @@ namespace Devristo\Phpws\Server\UriHandler; use Devristo\Phpws\Messaging\IWebSocketMessage; -use Devristo\Phpws\Protocol\IWebSocketConnection; +use Devristo\Phpws\Protocol\WebSocketConnectionInterface; use Devristo\Phpws\Server\WebSocketServer; -interface IWebSocketUriHandler +interface WebSocketUriHandlerInterface { - public function addConnection(IWebSocketConnection $user); + public function addConnection(WebSocketConnectionInterface $user); - public function removeConnection(IWebSocketConnection $user); + public function removeConnection(WebSocketConnectionInterface $user); - public function onMessage(IWebSocketConnection $user, IWebSocketMessage $msg); + public function onMessage(WebSocketConnectionInterface $user, IWebSocketMessage $msg); public function setServer(WebSocketServer $server); /** - * @return IWebSocketConnection[] + * @return WebSocketConnectionInterface[] */ public function getConnections(); } \ No newline at end of file diff --git a/src/Devristo/Phpws/Server/WebSocketServer.php b/src/Devristo/Phpws/Server/WebSocketServer.php index 3a4707a..07d783a 100644 --- a/src/Devristo/Phpws/Server/WebSocketServer.php +++ b/src/Devristo/Phpws/Server/WebSocketServer.php @@ -1,13 +1,13 @@ _url = $url; + $this->loop = $loop; + $this->_streams = new SplObjectStorage(); $this->_connections = new SplObjectStorage(); $this->_context = stream_context_create(); $this->_logger = $logger; - $this->_server = new SocketServer($logger); } public function getStreamContext() @@ -86,281 +82,93 @@ public function setStreamContext($context) $this->_context = $context; } - /** - * Unassociate a request uri to a IWebSocketResourceHandler. - * - * @param string $script For example 'handler1' to capture request with URI '/handler1/' - * @param bool $disconnectUsers if true, disconnect users assosiated to handler. - * @return bool|\Devristo\Phpws\Server\UriHandler\IWebSocketUriHandler - */ - public function removeUriHandler($script, $disconnectUsers = true) - { - - if (empty($this->uriHandlers[$script])) - return false; - $handler = $this->uriHandlers[$script]; - unset($this->uriHandlers[$script]); - - if ($disconnectUsers) - foreach ($handler->getConnections() as $user) { - - $handler->removeConnection($user); - $user->disconnect(); - } - - return $handler; - } - /** * Start the server */ - public function run() + public function bind() { - error_reporting(E_ALL); - set_time_limit(0); - - ob_implicit_flush(); - - $err = $errno = 0; $port = parse_url($this->_url, PHP_URL_PORT); - - $this->FLASH_POLICY_FILE = str_replace('to-ports="*', 'to-ports="' . $port, $this->FLASH_POLICY_FILE); - $this->master = stream_socket_server($this->_url, $errno, $err, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $this->_context); + $serverSocket = stream_socket_server($this->_url, $errno, $err, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $this->_context); $this->_logger->notice(sprintf("phpws listening on %s", $this->_url)); - if ($this->master == false) { + if ($serverSocket == false) { $this->_logger->err("Error: $err"); return; } - $this->_server->attachStream($this); - $this->_server->run(); - } + $timeOut = & $this->purgeUserTimeOut; + $sockets = $this->_streams; + $that = $this; + $logger = $this->_logger; - public function acceptConnection() - { - try { - $client = stream_socket_accept($this->master); - stream_set_blocking($client, 0); + $this->loop->addReadStream($serverSocket, function ($serverSocket) use ($that, $logger, $sockets) { + $newSocket = stream_socket_accept($serverSocket); - if ($client === false) { - $this->_logger->warn("Failed to accept client connection"); + if (false === $newSocket) { + return; } - $stream = new WebSocketStream($this, $client); - $stream->setLogger($this->_logger); - $this->_server->attachStream($stream); - - $this->_logger->info("WebSocket client accepted"); - } catch (Exception $e) { - $this->_logger->crit("Failed to accept client connection"); - } - } - - public function addObserver(IWebSocketServerObserver $o) - { - $this->_observers[] = $o; - } - - /** - * Associate a request uri to a IWebSocketResourceHandler. - * - * @param string $script For example 'handler1' to capture request with URI '/handler1/' - * @param \Devristo\Phpws\Server\UriHandler\IWebSocketUriHandler $handler Instance of a IWebSocketResourceHandler. This instance will receive the messages. - */ - public function addUriHandler($script, IWebSocketUriHandler $handler) - { - $this->uriHandlers[$script] = $handler; - $handler->setServer($this); - } - - /** - * Dispatch incoming message to the associated resource and to the general onMessage event handler - * @param \Devristo\Phpws\Protocol\IWebSocketConnection $user - * @param IWebSocketMessage $msg - */ - protected function dispatchMessage(IWebSocketConnection $user, IWebSocketMessage $msg) - { - $this->_logger->debug("Dispatching message to URI handlers and Observers"); - - if (array_key_exists($this->_connections[$user], $this->uriHandlers)) { - $this->uriHandlers[$this->_connections[$user]]->onMessage($user, $msg); - } - - foreach ($this->_observers as $o) { - $o->onMessage($user, $msg); - } - } - - /** - * Adds a user to a IWebSocketResourceHandler by using the request uri in the GET request of - * the client's opening handshake - * - * @param \Devristo\Phpws\Protocol\IWebSocketConnection $user - * @param $uri - * @return IWebSocketUriHandler Instance of the resource handler the user has been added to. - */ - protected function addConnectionToUriHandler(IWebSocketConnection $user, $uri) - { - $url = parse_url($uri); - - if (isset($url['query'])) - parse_str($url['query'], $query); - else - $query = array(); - - if (isset($url['path']) == false) - $url['path'] = '/'; - - $pathSplit = preg_split("/\\//", $url['path'], 0, PREG_SPLIT_NO_EMPTY); - $resource = array_pop($pathSplit); - - $user->parameters = $query; - - - if (array_key_exists($resource, $this->uriHandlers)) { - $this->uriHandlers[$resource]->addConnection($user); - $this->_connections[$user] = $resource; - - $this->_logger->notice("User {$user->getId()} has been added to $resource"); - } - } - - public function onConnectionEstablished(WebSocketStream $s) - { - $con = $s->getConnection(); - $this->_connections->attach($con); - $uri = $con->getUriRequested(); - - $this->addConnectionToUriHandler($con, $uri); - - foreach ($this->_observers as $o) { - /** - * @var @o IWebSocketServerObserver - */ - $o->onConnect($con); - } - } - - public function onMessage(IWebSocketConnection $connection, IWebSocketMessage $msg) - { - try { - $this->dispatchMessage($connection, $msg); - } catch (Exception $e) { - $this->_logger->error("Exception occurred while handling message:\r\n" . $e->getTraceAsString()); - } - } - - public function onDisconnect(WebSocketStream $socket) - { - $con = $socket->getConnection(); - try { - if ($con) { - $handler = $this->_connections[$con]; - - if ($handler) - $this->uriHandlers[$handler]->removeConnection($con); - - $this->_connections->detach($socket->getConnection()); - } - } catch (Exception $e) { - $this->_logger->err("Exception occurred while handling message:\r\n" . $e->getTraceAsString()); - } - - - if ($con instanceof IWebSocketConnection) { - foreach ($this->_observers as $o) { - /** - * @var @o IWebSocketServerObserver - */ - $o->onDisconnect($con); + stream_set_blocking($newSocket, 0); + $client = new WebSocketServerClient($newSocket, $that->loop, $logger); + $sockets->attach($client); + + $client->on("connect", function () use ($that, $client, $logger) { + try { + $con = $client->getConnection(); + $that->getConnections()->attach($con); + $that->emit("connect", array("client" => $con)); + } catch (Exception $e) { + $logger->err("[on_connect] Error occurred while running a callback"); + } + }); + + $client->on("message", function ($message) use ($that, $client, $logger) { + try { + $connection = $client->getConnection(); + $that->emit("message", array("client" => $connection, "message" => $message)); + } catch (Exception $e) { + $logger->err("[on_message] Error occurred while running a callback"); + } + }); + + $client->on("close", function () use ($that, $client, $logger) { + try{ + $that->getConnections()->detach($client->getConnection()); + $that->emit("disconnect", array("client" => $client->getConnection())); + }catch (Exception $e) { + $logger->err("[on_message] Error occurred while running a callback"); + } + }); + + $client->on("flashXmlRequest", function () use ($that, $client) { + $client->getConnection()->sendString($that->FLASH_POLICY_FILE); + $client->close(); + }); + }); + + $this->loop->addPeriodicTimer(5, function () use ($timeOut, $sockets, $that) { + $currentTime = time(); + + if ($timeOut == null) + return; + + foreach ($sockets as $s) { + if ($currentTime - $s->getLastChanged() > $timeOut) { + $s->close(); + } } - } - - $this->_server->detachStream($socket); - } - - protected function purgeUsers() - { - $currentTime = time(); - - if ($this->purgeUserTimeOut == null) - return; - - foreach ($this->sockets as $s) { - if ($currentTime - $s->getLastChanged() > $this->purgeUserTimeOut) { - $s->disconnect(); - $this->onDisconnect($s); - } - } + }); } public function getConnections() { return $this->_connections; } - - public function debug($msg) - { - if ($this->debug) - echo date("Y-m-d H:i:s") . " | " . $msg . "\n"; - } - - public function onFlashXMLRequest(WebSocketConnectionFlash $connection) - { - $connection->sendString($this->FLASH_POLICY_FILE); - $connection->disconnect(); - } - - /** - * - * @param \Devristo\Phpws\Server\UriHandler\IWebSocketUriHandler $uri - * - * @return \Devristo\Phpws\Server\UriHandler\IWebSocketUriHandler - */ - public function getUriHandler($uri) - { - return $this->uriHandlers[$uri]; - } - - public function onData($data) - { - throw new \BadMethodCallException(); - } - - public function close() - { - fclose($this->getSocket()); - } - - public function mayWrite() - { - return; - } - - public function requestsWrite() - { - return false; - } - - public function getSocket() - { - return $this->master; - } - - public function isServer() - { - return true; - } - - public function isClosed(){ - return false; - } } diff --git a/src/Devristo/Phpws/Server/WebSocketServerObserver.php b/src/Devristo/Phpws/Server/WebSocketServerObserver.php deleted file mode 100644 index af690cd..0000000 --- a/src/Devristo/Phpws/Server/WebSocketServerObserver.php +++ /dev/null @@ -1,28 +0,0 @@ -open(); - -$client->send("Hello world"); -$msg = $client->readMessage(); - - -echo $msg->getData(); - -$client->close(); \ No newline at end of file