Skip to content

Commit

Permalink
Streamline socket server
Browse files Browse the repository at this point in the history
  • Loading branch information
bcosca committed Aug 16, 2018
1 parent b7cf8c8 commit 62e39c6
Showing 1 changed file with 67 additions and 93 deletions.
160 changes: 67 additions & 93 deletions cli/ws.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class WS {
$ctx,
$wait,
$sockets,
$protocol,
$agents=[],
$events=[];

Expand All @@ -61,16 +62,14 @@ class WS {
* @param $socket resource
**/
function alloc($socket) {
if (is_bool($str=$this->read($socket))) {
$this->close($socket);
if (is_bool($buf=$this->read($socket)))
return;
}
// Get WebSocket headers
$hdrs=[];
$CRLF="\r\n";
$EOL="\r\n";
$verb=NULL;
$uri=NULL;
foreach (explode($CRLF,trim($str)) as $line)
foreach (explode($EOL,trim($buf)) as $line)
if (preg_match('/^(\w+)\s(.+)\sHTTP\/1\.\d$/',
trim($line),$match)) {
$verb=$match[1];
Expand Down Expand Up @@ -98,36 +97,29 @@ function alloc($socket) {
if ($verb && $uri)
$this->write(
$socket,
$str='HTTP/1.1 400 Bad Request'.$CRLF.
'Connection: close'.$CRLF.$CRLF
'HTTP/1.1 400 Bad Request'.$EOL.
'Connection: close'.$EOL.$EOL
);
$this->close($socket);
return;
}
// Handshake
$str='HTTP/1.1 101 Switching Protocols'.$CRLF.
'Upgrade: websocket'.$CRLF.
'Connection: Upgrade'.$CRLF;
$buf='HTTP/1.1 101 Switching Protocols'.$EOL.
'Upgrade: websocket'.$EOL.
'Connection: Upgrade'.$EOL;
if (isset($hdrs['Sec-Websocket-Protocol']))
$str.='Sec-WebSocket-Protocol: '.
$hdrs['Sec-Websocket-Protocol'].$CRLF;
$str.='Sec-WebSocket-Accept: '.
$buf.='Sec-WebSocket-Protocol: '.
$hdrs['Sec-Websocket-Protocol'].$EOL;
$buf.='Sec-WebSocket-Accept: '.
base64_encode(
sha1(
$hdrs['Sec-Websocket-Key'].
self::Magic,
TRUE
)
).$CRLF.$CRLF;
$bytes=$this->write($socket,$str);
if ($bytes) {
sha1($hdrs['Sec-Websocket-Key'].WS::Magic,TRUE)
).$EOL.$EOL;
if ($this->write($socket,$buf)) {
// Connect agent to server
$this->sockets[]=$socket;
$this->sockets[(int)$socket]=$socket;
$this->agents[(int)$socket]=
new Agent($this,$socket,$verb,$uri,$hdrs);
}
else
$this->close($socket);
}

/**
Expand All @@ -136,51 +128,44 @@ function alloc($socket) {
* @param $socket resource
**/
function close($socket) {
if (isset($this->agents[(int)$socket]))
unset($this->sockets[(int)$socket],$this->agents[(int)$socket]);
stream_socket_shutdown($socket,STREAM_SHUT_WR);
@fclose($socket);
}

/**
* Free stream socket
* @return bool
* @param $socket resource
**/
function free($socket) {
unset($this->sockets[array_search($socket,$this->sockets)]);
unset($this->agents[(int)$socket]);
$this->close($socket);
}

/**
* Read from stream socket
* @return string|FALSE
* @param $socket resource
**/
function read($socket) {
if (is_string($str=@fread($socket,self::Packet)) &&
strlen($str) &&
strlen($str)<self::Packet)
return $str;
if (is_string($buf=@fread($socket,WS::Packet)) &&
strlen($buf) &&
strlen($buf)<WS::Packet)
return $buf;
if (isset($this->events['error']) &&
is_callable($func=$this->events['error']))
$func($this);
$this->close($socket);
return FALSE;
}

/**
* Write to stream socket
* @return int|FALSE
* @param $socket resource
* @param $str string
* @param $buf string
**/
function write($socket,$str) {
for ($i=0,$bytes=0;$i<strlen($str);$i+=$bytes) {
if (($bytes=@fwrite($socket,substr($str,$i))) &&
function write($socket,$buf) {
for ($i=0,$bytes=0;$i<strlen($buf);$i+=$bytes) {
if (($bytes=@fwrite($socket,substr($buf,$i))) &&
@fflush($socket))
continue;
if (isset($this->events['error']) &&
is_callable($func=$this->events['error']))
$func($this);
$this->close($socket);
return FALSE;
}
return $bytes;
Expand Down Expand Up @@ -249,7 +234,7 @@ function run() {
register_shutdown_function(function() use($listen) {
foreach ($this->sockets as $socket)
if ($socket!=$listen)
$this->free($socket);
$this->close($socket);
$this->close($listen);
if (isset($this->events['stop']) &&
is_callable($func=$this->events['stop']))
Expand All @@ -260,7 +245,7 @@ function run() {
if (isset($this->events['start']) &&
is_callable($func=$this->events['start']))
$func($this);
$this->sockets=[$listen];
$this->sockets=[(int)$listen=>$listen];
$empty=[];
$wait=$this->wait;
while (TRUE) {
Expand Down Expand Up @@ -290,26 +275,8 @@ function run() {
}
else {
$id=(int)$socket;
if (isset($this->agents[$id]) &&
$raw=$this->agents[$id]->fetch()) {
list($op,$data)=$raw;
// Dispatch
switch ($op & self::OpCode) {
case self::Ping:
$this->agents[$id]->send(self::Pong);
break;
case self::Close:
$this->free($socket);
break;
case self::Text:
$data=trim($data);
case self::Binary:
if (isset($this->events['receive']) &&
is_callable($func=$this->events['receive']))
$func($this->agents[$id],$op,$data);
break;
}
}
if (isset($this->agents[$id]))
$this->agents[$id]->fetch();
}
}
$wait-=microtime(TRUE)-$mark;
Expand All @@ -320,10 +287,9 @@ function run() {
}
if (!$count) {
$mark=microtime(TRUE);
foreach ($this->sockets as $socket) {
foreach ($this->sockets as $id=>$socket) {
if (!is_resource($socket))
continue;
$id=(int)$socket;
if ($socket!=$listen &&
isset($this->agents[$id]) &&
isset($this->events['idle']) &&
Expand Down Expand Up @@ -363,8 +329,7 @@ class Agent {
$verb,
$uri,
$headers,
$events,
$buffer;
$events;

/**
* Return server instance
Expand All @@ -382,6 +347,14 @@ function id() {
return $this->id;
}

/**
* Return socket
* @return object
**/
function socket() {
return $this->socket;
}

/**
* Return request method
* @return string
Expand Down Expand Up @@ -414,22 +387,20 @@ function headers() {
* @param $payload string
**/
function send($op,$data='') {
$server=$this->server;
$mask=WS::Finale | $op & WS::OpCode;
$len=strlen($data);
$str='';
$buf='';
if ($len>0xffff)
$str=pack('CCNN',$mask,0x7f,$len);
$buf=pack('CCNN',$mask,0x7f,$len);
else
if ($len>0x7d)
$str=pack('CCn',$mask,0x7e,$len);
$buf=pack('CCn',$mask,0x7e,$len);
else
$str=pack('CC',$mask,$len);
$str.=$data;
$server=$this->server();
if (is_bool($server->write($this->socket,$str))) {
$this->free();
$buf=pack('CC',$mask,$len);
$buf.=$data;
if (is_bool($server->write($this->socket,$buf)))
return FALSE;
}
if (!in_array($op,[WS::Pong,WS::Close]) &&
isset($this->events['send']) &&
is_callable($func=$this->events['send']))
Expand All @@ -443,12 +414,9 @@ function send($op,$data='') {
**/
function fetch() {
// Unmask payload
$server=$this->server();
if (is_bool($buf=$server->read($this->socket))) {
$this->free();
$server=$this->server;
if (is_bool($buf=$server->read($this->socket)))
return FALSE;
}
$buf=($this->buffer.=$buf);
$op=ord($buf[0]) & WS::OpCode;
$len=ord($buf[1]) & WS::Length;
$pos=2;
Expand All @@ -469,18 +437,25 @@ function fetch() {
return FALSE;
for ($i=0,$data='';$i<$len;$i++)
$data.=chr(ord($buf[$pos+$i])^$mask[$i%4]);
$this->buffer='';
// Dispatch
switch ($op & WS::OpCode) {
case WS::Ping:
$this->send(WS::Pong);
break;
case WS::Close:
$server->close($this->socket);
break;
case WS::Text:
$data=trim($data);
case WS::Binary:
if (isset($this->events['receive']) &&
is_callable($func=$this->events['receive']))
$func($this,$op,$data);
break;
}
return [$op,$data];
}

/**
* Free stream socket
* @return NULL
**/
function free() {
$this->server->free($this->socket);
}

/**
* Destroy object
* @return NULL
Expand Down Expand Up @@ -508,7 +483,6 @@ function __construct($server,$socket,$verb,$uri,array $hdrs) {
$this->uri=$uri;
$this->headers=$hdrs;
$this->events=$server->events();
$this->buffer='';
if (isset($this->events['connect']) &&
is_callable($func=$this->events['connect']))
$func($this);
Expand Down

0 comments on commit 62e39c6

Please sign in to comment.