Skip to content

Commit

Permalink
* Flash client fixed
Browse files Browse the repository at this point in the history
* Support for multiple frames in one read, still cant handle
incomplete frames.
* Now using stream_socket_ functions instead of socket_ functions, will
aid the implementation of WSS (WebSockets over SSL). If you use
socket_ functions in your phpws observers / urihandlers you will need to
change those to the corresponding stream functions.
  • Loading branch information
Devristo committed Dec 14, 2011
1 parent b3aea03 commit 85ea812
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 33 deletions.
19 changes: 19 additions & 0 deletions phpws/tests/server.test.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ function test_echoResourceHandlerResponse(){
$this->assertEqual($input, $msg->getData());
}

function test_DoubleEchoResourceHandlerResponse(){
$input = "Hello World!";
$msg = WebSocketMessage::create($input);

$client = new WebSocket("ws://127.0.0.1:12345/echo/");
$client->setTimeOut(1000);
$client->open();
$client->sendMessage($msg);
$client->sendMessage($msg);

$msg = $client->readMessage();
$msg2= $client->readMessage();

$client->close();
$this->assertEqual($input, $msg->getData());

$this->assertEqual($input, $msg2->getData());
}

function test_AdminPing(){
$msg = WebSocketAdminMessage::create("shutdown");

Expand Down
30 changes: 18 additions & 12 deletions phpws/websocket.client.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ public function __construct($url){

$this->url = $url;

if(in_array($parts['scheme'], array('ws')) === false)
if(in_array($parts['scheme'], array('ws','wss')) === false)
throw new WebSocketInvalidUrlScheme();

$this->scheme = $parts['scheme'];

$this->host = $parts['host'];
$this->port = $parts['port'];

Expand All @@ -34,30 +36,34 @@ public function __construct($url){
if(isset($parts['query']))
$this->requestUri .= "?".$parts['query'];

$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$this->setTimeOut(1);

$this->buildHeaderArray();
}

public function setTimeOut($seconds){
socket_set_option($this->socket,SOL_SOCKET, SO_RCVTIMEO, array("sec" => $seconds, "usec" => 0));
socket_set_option($this->socket,SOL_SOCKET, SO_SNDTIMEO, array("sec" => $seconds, "usec" => 0));
$this->_timeOut = $seconds;
}

public function getTimeOut(){
return $this->_timeOut;
}

/**
* TODO: Proper header generation!
* TODO: Check server response!
*/
public function open(){
socket_connect($this->socket, $this->host, $this->port);
$errno = $errstr = null;

$protocol = $this->scheme == 'ws' ? "tcp" : "ssl";

$this->socket = stream_socket_client("$protocol://{$this->host}:{$this->port}", $errno, $errstr, $this->getTimeOut());// socket_connect($this->socket, $this->host, $this->port);

$buffer = $this->serializeHeaders();

socket_write($this->socket, $buffer, strlen($buffer));
fwrite($this->socket, $buffer, strlen($buffer));

// wait for response
$buffer = socket_read($this->socket, 2048,PHP_BINARY_READ);
$buffer = fread($this->socket, 2048);
$headers = WebSocketFunctions::parseHeaders($buffer);

if($headers['Sec-Websocket-Accept'] != WebSocketFunctions::calcHybiResponse($this->handshakeChallenge)){
Expand Down Expand Up @@ -112,14 +118,14 @@ public function sendMessage(IWebSocketMessage $msg){

public function sendFrame(IWebSocketFrame $frame){
$msg = $frame->encode();
socket_write($this->socket, $msg,strlen($msg));
fwrite($this->socket, $msg,strlen($msg));
}

/**
* @return WebSocketFrame
*/
public function readFrame(){
$data = socket_read($this->socket,2048,PHP_BINARY_READ);
$data = fread($this->socket,2048);

if($data === false)
return null;
Expand Down Expand Up @@ -159,7 +165,7 @@ public function close(){
}while($i < 2 && $frame && $frame->getType == WebSocketOpcode::CloseFrame);


socket_close($this->socket);
fclose($this->socket);
}

}
25 changes: 20 additions & 5 deletions phpws/websocket.framing.php
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public function getType(){
return $this->opcode;
}

public static function decode($raw){
return self::consume($raw, $unconsumed = NULL);
}

public function encode(){
$this->payloadLength = strlen($this->payloadData);

Expand Down Expand Up @@ -186,9 +190,12 @@ public function encode(){
return $encoded;
}

public static function decode($raw){
public static function consume($raw, &$unconsumed){
$frame = new self();

if(strlen($raw) < 2)
return false;

// Read the first two bytes, then chop them off
list($firstByte, $secondByte) = substr($raw,0,2);
$raw = substr($raw,2);
Expand Down Expand Up @@ -223,12 +230,20 @@ public static function decode($raw){
$raw = substr($raw,4);
}

if(strlen($raw) != $frame->payloadLength)
throw new WebSocketFrameSizeMismatch($frame);
if(strlen($raw) < $frame->payloadLength)
return FALSE;

$payload = substr($raw, 0, $frame->payloadLength);

// Return unconsumed part
if($unconsumed !== null){
$unconsumed = substr($raw, $frame->payloadLength);
}


if($frame->mask)
$frame->payloadData = self::rotMask($raw, $frame->maskingKey);
else $frame->payloadData = $raw;
$frame->payloadData = self::rotMask($payload, $frame->maskingKey);
else $frame->payloadData = $payload;

return $frame;
}
Expand Down
29 changes: 22 additions & 7 deletions phpws/websocket.protocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public static function fromSocketData(WebSocket $socket, $data){
if(isset($headers['Sec-Websocket-Key1'])) {
return new WebSocketConnectionHixie($socket, $headers, $data);
} else if(strpos($data,'<policy-file-request/>') === 0) {
return new WebSocketConnectionFlash($data);
return new WebSocketConnectionFlash($socket, $data);
} else{
return new WebSocketConnectionHybi($socket, $headers);
}
Expand Down Expand Up @@ -114,10 +114,15 @@ protected function getQueryParts(){
public function getAdminKey(){
return isset($this->_headers['Admin-Key']) ? $this->_headers['Admin-Key'] : null;
}

public function getSocket(){
return $this->_socket;
}
}

class WebSocketConnectionFlash{
public function __construct($data){
public function __construct($socket, $data){
$this->_socket = $socket;
$this->_socket->onFlashXMLRequest($this);
}

Expand All @@ -126,13 +131,16 @@ public function sendString($msg){
}

public function disconnect(){
$this->_socket->close();
$this->_socket->disconnect();
}
}

class WebSocketConnectionHybi extends WebSocketConnection{
private $_openMessage = null;

private $_buffer = "";


public function sendHandshakeResponse(){
// Check for newer handshake
$challenge = isset($this->_headers['Sec-Websocket-Key']) ? $this->_headers['Sec-Websocket-Key'] : null;
Expand All @@ -153,11 +161,18 @@ public function sendHandshakeResponse(){
}

public function readFrame($data){
$frame = WebSocketFrame::decode($data);
$unconsumed = "";



if(WebSocketOpcode::isControlFrame($frame->getType()))
$this->processControlFrame($frame);
else $this->processMessageFrame($frame);
while($frame = WebSocketFrame::consume($data, &$unconsumed)) {
$data = $unconsumed;
$this->_buffer = $unconsumed;

if(WebSocketOpcode::isControlFrame($frame->getType()))
$this->processControlFrame($frame);
else $this->processMessageFrame($frame);
}
}

/**
Expand Down
22 changes: 15 additions & 7 deletions phpws/websocket.server.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ public function __construct($address,$port, $adminKey){

$this->FLASH_POLICY_FILE = str_replace('to-ports="*','to-ports="'.$port,$this->FLASH_POLICY_FILE);

$this->master=socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die("socket_create() failed");
socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("socket_option() failed");
//$this->master=socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die("socket_create() failed");

$this->master = stream_socket_server("tcp://0.0.0.0:{$port}");


/*socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("socket_option() failed");
socket_bind($this->master, $address, $port) or die("socket_bind() failed");
socket_listen($this->master,20) or die("socket_listen() failed");
socket_listen($this->master,20) or die("socket_listen() failed");*/

$this->say("PHP WebSocket Server");
$this->say("========================================");
Expand All @@ -109,8 +113,10 @@ public function run(){

// Retreive sockets which are 'Changed'
$changed = $this->getResources();
$write = null;
$except = null;

socket_select($changed,$write=NULL,$except=NULL,NULL);
stream_select($changed,$write,$except,NULL);

$this->debug("Socket selected");

Expand All @@ -123,7 +129,9 @@ public function run(){


// TODO: only reads up to 2048 bytes ?
$bytes = @socket_recv($resource, $buffer, 2048, 0);
$buffer = @fread($resource, 2048);
$bytes = strlen($buffer);

$socket = $this->getSocketByResource($resource);
$this->debug($bytes);

Expand All @@ -144,8 +152,8 @@ public function run(){

private function acceptSocket(){
try{
$client=socket_accept($this->master);
if($client<0){
$client=stream_socket_accept($this->master);
if($client === false){
self::log('socket_accept() failed'); continue;
}

Expand Down
7 changes: 5 additions & 2 deletions phpws/websocket.socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ public function onMessage(IWebSocketMessage $m){
public function establishConnection($data){
$this->_connection = WebSocketConnectionFactory::fromSocketData($this, $data);

if($this->_connection instanceof WebSocketConnectionFlash)
return;

foreach($this->_observers as $observer){
$observer->onConnectionEstablished($this);
}
}

public function write($data){
if(@socket_write($this->_socket, $data,strlen($data)) === false)
if(@fwrite($this->_socket, $data,strlen($data)) === false)
$this->disconnect();
}

Expand All @@ -73,7 +76,7 @@ public function onFlashXMLRequest(WebSocketConnectionFlash $connection){
}

public function disconnect(){
socket_close($this->_socket);
@fclose($this->_socket);

foreach($this->_observers as $observer){
$observer->onDisconnect($this);
Expand Down

0 comments on commit 85ea812

Please sign in to comment.