Mercurial > hg > udpmsg3
diff hub/hub.php @ 0:dd81c38b513a
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Mon, 28 Feb 2011 00:49:07 +0100 |
parents | |
children | 7e342a0a3b74 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hub/hub.php Mon Feb 28 00:49:07 2011 +0100 @@ -0,0 +1,281 @@ +<?php +/* Copyright 2010 Ivo Smits <Ivo@UCIS.nl>. All rights reserved. + Redistribution and use in source and binary forms, with or without modification, are + permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, this list + of conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED + WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + The views and conclusions contained in the software and documentation are those of the + authors and should not be interpreted as representing official policies, either expressed + or implied, of Ivo Smits.*/ + +if (defined('APP_LOADED')) return; +define('APP_LOADED', TRUE); +if (!defined('CONFIGFILE')) define('CONFIGFILE', './config.php'); + +print("UCIS UDPMSG3 Hub (c) 2010 Ivo Smits <Ivo@UCIS.nl>\n"); +print("More information: http://wiki.ucis.nl/UDPMSG3\n"); +print("\n"); + +class StreamPeer { + public $socket = NULL; + public $buffer = ''; + public function select_read() { + if ($this->udpmsg_read()) return; + socket_delete($this); + socket_close($this->socket); + fprintf(STDERR, "UDPMSG: Error: closing connection.\n"); + peer_delete($this); + $this->closed(); + } + public function select_error() { + socket_delete($this); + socket_close($this->socket); + fprintf(STDERR, "UDPMSG: Error: select error.\n"); + peer_delete($this); + $this->closed(); + } + private function udpmsg_read() { + $msg = socket_read($this->socket, 1024); + if (!strlen($msg)) { + fprintf(STDERR, "UDPMSG: End of file\n"); + return FALSE; + } + $udpmsg_buffer = &$this->buffer; + $udpmsg_buffer .= $msg; + while (strlen($udpmsg_buffer) > 2) { + $len = ord($udpmsg_buffer[0]) * 256 + ord($udpmsg_buffer[1]); + if ($len <= 0 || $len > 1024) { + fprintf(STDERR, "UDPMSG: Error: protocol error\n"); + return FALSE; + } + if (strlen($udpmsg_buffer) < 2 + $len) break; + udpmsg_receive($this, substr($udpmsg_buffer, 2, $len)); + $udpmsg_buffer = substr($udpmsg_buffer, 2 + $len); + } + return TRUE; + } + public function send($msg) { + $len = strlen($msg); + if ($len > 1024) { + fprintf(STDERR, "UDPMSG: Error: message too long!\n"); + return FALSE; + } + $lens = chr(floor($len / 256)).chr($len % 256); + socket_write($this->socket, $lens.$msg); + return TRUE; + } +} +class StreamServer { + public $host, $port, $bindhost, $bindport, $ipv6; + public $socket = NULL; + function __construct($host, $port, $ipv6 = FALSE) { + $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; + if (!$this->select_timeout()) socket_add_timeout($this); + } + public function select_timeout() { + socket_delete($this); + $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); + socket_set_nonblock($this->socket); + if (!socket_bind($this->socket, $this->host, $this->port)) return FALSE; + if (!socket_listen($this->socket)) return FALSE; + socket_delete($this); + socket_add_connected($this); + fprintf(STDERR, "UDPMSG: Listening\n"); + return TRUE; + } + public function select_read() { + $client = socket_accept($this->socket); + if (!$client) return; + if (!socket_getpeername($client, &$addr, &$port)) $addr = 'unknown'; + fprintf(STDERR, 'UDPMSG: Accepted client '.$addr.':'.$port."\n"); + new StreamServerClient($client); + } + public function select_error() { + socket_close($this->socket); + fprintf(STDERR, "UDPMSG: Error: select error on listener. Closing listener.\n"); + socket_delete($this); + socket_add_timeout($this); + } +} +class StreamClient extends StreamPeer { + public $host, $port, $bindhost, $bindport, $ipv6; + function __construct($host, $port, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) { + $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; $this->bindhost = $bindhost; $this->bindport = $bindport; + $this->select_timeout(); + } + public function closed() { + socket_add_timeout($this); + } + public function select_timeout() { + socket_delete($this); + $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); + socket_set_nonblock($this->socket); + if ($this->bindhost || $this->bindport) socket_bind($this->socket, $this->bindhost, $this->bindport); + socket_connect($this->socket, $this->host, $this->port); + socket_add_connecting($this); + } + public function select_write() { + if (!socket_getpeername($this->socket, &$addr, &$port)) $addr = 'unknown'; + fprintf(STDERR, 'UDPMSG: Connected to '.$addr.':'.$port."\n"); + socket_delete($this); + socket_add_connected($this); + peer_add($this); + } +} +class StreamServerClient extends StreamPeer { + public function closed() { } + function __construct($socket) { + socket_set_nonblock($socket); + $this->socket = $socket; + socket_add_connected($this); + peer_add($this); + } +} +class UDPPeer { + public $socket = NULL; + function __construct($host = NULL, $port = NULL, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) { + $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); + socket_set_nonblock($this->socket); + if ($bindhost || $bindport) socket_bind($this->socket, $bindhost, $bindport); + socket_add_connected($this); + if ($host) { + socket_connect($this->socket, $host, $port); + peer_add($this); + } + } + public function select_read() { + if ($this->udpmsg_read()) return; + fprintf(STDERR, "UDPMSG: Error: closing connection.\n"); + } + public function select_error() { + fprintf(STDERR, "UDPMSG: Error: select error.\n"); + } + private function udpmsg_read() { + $msg = socket_read($this->socket, 1024 + 20); + if (!strlen($msg)) { + fprintf(STDERR, "UDPMSG: End of file\n"); + return FALSE; + } + if (strlen($msg) < 20 || substr($msg, 0, 20) != sha(substr($msg, 20), TRUE)) { + fprintf(STDERR, "UDPMSG: Short message or checksum mismatch\n"); + return FALSE; + } + udpmsg_receive($this, $msg); + return TRUE; + } + public function send($msg) { + socket_write($this->socket, sha1($msg, TRUE).$msg); + return TRUE; + } +} + +$msgi = 0; +$msglog = array(); + +$sock_fd_connecting = array(); +$sock_fd_connected = array(); +$sock_timeout = array(); +$sock_obj = array(); + +$peers = array(); + +function peer_add($obj) { + global $peers; + $peers[spl_object_hash($obj)] = $obj; +} +function peer_delete($obj) { + global $peers; + unset($peers[spl_object_hash($obj)]); +} + +function socket_add_timeout($obj) { + global $sock_timeout; + $sock_timeout[spl_object_hash($obj)] = $obj; +} +function socket_add_connecting($obj) { + global $sock_fd_connecting, $sock_obj; + $sock_fd_connecting[(int)$obj->socket] = $obj->socket; + $sock_obj[(int)$obj->socket] = $obj; +} +function socket_add_connected($obj) { + global $sock_fd_connected, $sock_obj; + $sock_fd_connected[(int)$obj->socket] = $obj->socket; + $sock_obj[(int)$obj->socket] = $obj; +} +function socket_delete($obj) { + global $sock_fd_connecting, $sock_fd_connected, $sock_obj, $sock_timeout; + unset($sock_fd_connecting[(int)$obj->socket]); + unset($sock_fd_connected[(int)$obj->socket]); + unset($sock_obj[(int)$obj->socket]); + unset($sock_timeout[spl_object_hash($obj)]); +} + +print("Loading configuration...\n"); +if (!isset($config)) require constant('CONFIGFILE'); + +$logsize = isset($config['logsize'])?$config['logsize']:512; +if (isset($config['tcplisten'])) + foreach ($config['tcplisten'] as $e) + new StreamServer( + isset($e['host']) ? $e['host'] : '0.0.0.0', + isset($e['port']) ? $e['port'] : 15387, + isset($e['ipv6']) && $e['ipv6']); +if (isset($config['tcpconnect'])) + foreach ($config['tcpconnect'] as $e) + new StreamClient( + isset($e['host']) ? $e['host'] : '127.0.0.1', + isset($e['port']) ? $e['port'] : 15387, + isset($e['ipv6']) && $e['ipv6'], + isset($e['bindhost']) ? $e['bindhost'] : NULL, + isset($e['bindport']) ? $e['bindport'] : NULL); +if (isset($config['udppeer'])) + foreach ($config['udppeer'] as $e) + new StreamClient( + isset($e['host']) ? $e['host'] : NULL, + isset($e['port']) ? $e['port'] : 15387, + isset($e['ipv6']) && $e['ipv6'], + isset($e['bindhost']) ? $e['bindhost'] : NULL, + isset($e['bindport']) ? $e['bindport'] : NULL); + +function udpmsg_receive($from, $msg) { + global $peers, $msgi, $msglog, $logsize; + $hash = md5($msg); + if (in_array($hash, $msglog)) return FALSE; + $msglog[$msgi] = $hash; + $msgi++; + if ($msgi >= $logsize) $msgi = 0; + $count = 0; + foreach ($peers as $peer) if ($peer !== $from) if ($peer->send($msg)) $count++; + return $count; +} + +while (TRUE) { + $selread = $sock_fd_connected; + $selwrite = $sock_fd_connecting; + $selerror = array_merge($selread, $selwrite); + if (count($selerror)) { + socket_select(&$selread, &$selwrite, &$selerror, 60); + foreach ($selread as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_read($socket); + foreach ($selwrite as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_write($socket); + foreach ($selerror as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_error($socket); + } else { + sleep(10); + } + foreach ($sock_timeout as $obj) $obj->select_timeout(); +}