diff hub/hub.php @ 0:dd81c38b513a

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Mon, 28 Feb 2011 00:49:07 +0100
parents
children 7e342a0a3b74
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hub/hub.php	Mon Feb 28 00:49:07 2011 +0100
@@ -0,0 +1,281 @@
+<?php
+/* Copyright 2010 Ivo Smits <Ivo@UCIS.nl>. All rights reserved.
+   Redistribution and use in source and binary forms, with or without modification, are
+   permitted provided that the following conditions are met:
+
+   1. Redistributions of source code must retain the above copyright notice, this list of
+      conditions and the following disclaimer.
+
+   2. Redistributions in binary form must reproduce the above copyright notice, this list
+      of conditions and the following disclaimer in the documentation and/or other materials
+      provided with the distribution.
+
+   THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
+   WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+   FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR
+   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+   CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+   SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+   ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+   NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+   ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+   The views and conclusions contained in the software and documentation are those of the
+   authors and should not be interpreted as representing official policies, either expressed
+   or implied, of Ivo Smits.*/
+
+if (defined('APP_LOADED')) return;
+define('APP_LOADED', TRUE);
+if (!defined('CONFIGFILE')) define('CONFIGFILE', './config.php');
+
+print("UCIS UDPMSG3 Hub (c) 2010 Ivo Smits <Ivo@UCIS.nl>\n");
+print("More information: http://wiki.ucis.nl/UDPMSG3\n");
+print("\n");
+
+class StreamPeer {
+	public $socket = NULL;
+	public $buffer = '';
+	public function select_read() {
+		if ($this->udpmsg_read()) return;
+		socket_delete($this);
+		socket_close($this->socket);
+		fprintf(STDERR, "UDPMSG: Error: closing connection.\n");
+		peer_delete($this);
+		$this->closed();
+	}
+	public function select_error() {
+		socket_delete($this);
+		socket_close($this->socket);
+		fprintf(STDERR, "UDPMSG: Error: select error.\n");
+		peer_delete($this);
+		$this->closed();
+	}
+	private function udpmsg_read() {
+		$msg = socket_read($this->socket, 1024);
+		if (!strlen($msg)) {
+			fprintf(STDERR, "UDPMSG: End of file\n");
+			return FALSE;
+		}
+		$udpmsg_buffer = &$this->buffer;
+		$udpmsg_buffer .= $msg;
+		while (strlen($udpmsg_buffer) > 2) {
+			$len = ord($udpmsg_buffer[0]) * 256 + ord($udpmsg_buffer[1]);
+			if ($len <= 0 || $len > 1024) {
+				fprintf(STDERR, "UDPMSG: Error: protocol error\n");
+				return FALSE;
+			}
+			if (strlen($udpmsg_buffer) < 2 + $len) break;
+			udpmsg_receive($this, substr($udpmsg_buffer, 2, $len));
+			$udpmsg_buffer = substr($udpmsg_buffer, 2 + $len);
+		}
+		return TRUE;
+	}
+	public function send($msg) {
+		$len = strlen($msg);
+		if ($len > 1024) {
+			fprintf(STDERR, "UDPMSG: Error: message too long!\n");
+			return FALSE;
+		}
+		$lens = chr(floor($len / 256)).chr($len % 256);
+		socket_write($this->socket, $lens.$msg);
+		return TRUE;
+	}
+}
+class StreamServer {
+	public $host, $port, $bindhost, $bindport, $ipv6;
+	public $socket = NULL;
+	function __construct($host, $port, $ipv6 = FALSE) {
+		$this->host = $host; $this->port = $port; $this->ipv6 = $ipv6;
+		if (!$this->select_timeout()) socket_add_timeout($this);
+	}
+	public function select_timeout() {
+		socket_delete($this);
+		$this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
+		socket_set_nonblock($this->socket);
+		if (!socket_bind($this->socket, $this->host, $this->port)) return FALSE;
+		if (!socket_listen($this->socket)) return FALSE;
+		socket_delete($this);
+		socket_add_connected($this);
+		fprintf(STDERR, "UDPMSG: Listening\n");
+		return TRUE;
+	}
+	public function select_read() {
+		$client = socket_accept($this->socket);
+		if (!$client) return;
+		if (!socket_getpeername($client, &$addr, &$port)) $addr = 'unknown';
+		fprintf(STDERR, 'UDPMSG: Accepted client '.$addr.':'.$port."\n");
+		new StreamServerClient($client);
+	}
+	public function select_error() {
+		socket_close($this->socket);
+		fprintf(STDERR, "UDPMSG: Error: select error on listener. Closing listener.\n");
+		socket_delete($this);
+		socket_add_timeout($this);
+	}
+}
+class StreamClient extends StreamPeer {
+	public $host, $port, $bindhost, $bindport, $ipv6;
+	function __construct($host, $port, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) {
+		$this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; $this->bindhost = $bindhost; $this->bindport = $bindport;
+		$this->select_timeout();
+	}
+	public function closed() {
+		socket_add_timeout($this);
+	}
+	public function select_timeout() {
+		socket_delete($this);
+		$this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
+		socket_set_nonblock($this->socket);
+		if ($this->bindhost || $this->bindport) socket_bind($this->socket, $this->bindhost, $this->bindport);
+		socket_connect($this->socket, $this->host, $this->port);
+		socket_add_connecting($this);
+	}
+	public function select_write() {
+		if (!socket_getpeername($this->socket, &$addr, &$port)) $addr = 'unknown';
+		fprintf(STDERR, 'UDPMSG: Connected to '.$addr.':'.$port."\n");
+		socket_delete($this);
+		socket_add_connected($this);
+		peer_add($this);
+	}
+}
+class StreamServerClient extends StreamPeer {
+	public function closed() { }
+	function __construct($socket) {
+		socket_set_nonblock($socket);
+		$this->socket = $socket;
+		socket_add_connected($this);
+		peer_add($this);
+	}
+}
+class UDPPeer {
+	public $socket = NULL;
+	function __construct($host = NULL, $port = NULL, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) {
+		$this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
+		socket_set_nonblock($this->socket);
+		if ($bindhost || $bindport) socket_bind($this->socket, $bindhost, $bindport);
+		socket_add_connected($this);
+		if ($host) {
+			socket_connect($this->socket, $host, $port);
+			peer_add($this);
+		}
+	}
+	public function select_read() {
+		if ($this->udpmsg_read()) return;
+		fprintf(STDERR, "UDPMSG: Error: closing connection.\n");
+	}
+	public function select_error() {
+		fprintf(STDERR, "UDPMSG: Error: select error.\n");
+	}
+	private function udpmsg_read() {
+		$msg = socket_read($this->socket, 1024 + 20);
+		if (!strlen($msg)) {
+			fprintf(STDERR, "UDPMSG: End of file\n");
+			return FALSE;
+		}
+		if (strlen($msg) < 20 || substr($msg, 0, 20) != sha(substr($msg, 20), TRUE)) {
+			fprintf(STDERR, "UDPMSG: Short message or checksum mismatch\n");
+			return FALSE;
+		}
+		udpmsg_receive($this, $msg);
+		return TRUE;
+	}
+	public function send($msg) {
+		socket_write($this->socket, sha1($msg, TRUE).$msg);
+		return TRUE;
+	}
+}
+
+$msgi = 0;
+$msglog = array();
+
+$sock_fd_connecting = array();
+$sock_fd_connected = array();
+$sock_timeout = array();
+$sock_obj = array();
+
+$peers = array();
+
+function peer_add($obj) {
+	global $peers;
+	$peers[spl_object_hash($obj)] = $obj;
+}
+function peer_delete($obj) {
+	global $peers;
+	unset($peers[spl_object_hash($obj)]);
+}
+
+function socket_add_timeout($obj) {
+	global $sock_timeout;
+	$sock_timeout[spl_object_hash($obj)] = $obj;
+}
+function socket_add_connecting($obj) {
+	global $sock_fd_connecting, $sock_obj;
+	$sock_fd_connecting[(int)$obj->socket] = $obj->socket;
+	$sock_obj[(int)$obj->socket] = $obj;
+}
+function socket_add_connected($obj) {
+	global $sock_fd_connected, $sock_obj;
+	$sock_fd_connected[(int)$obj->socket] = $obj->socket;
+	$sock_obj[(int)$obj->socket] = $obj;
+}
+function socket_delete($obj) {
+	global $sock_fd_connecting, $sock_fd_connected, $sock_obj, $sock_timeout;
+	unset($sock_fd_connecting[(int)$obj->socket]);
+	unset($sock_fd_connected[(int)$obj->socket]);
+	unset($sock_obj[(int)$obj->socket]);
+	unset($sock_timeout[spl_object_hash($obj)]);
+}
+
+print("Loading configuration...\n");
+if (!isset($config)) require constant('CONFIGFILE');
+
+$logsize = isset($config['logsize'])?$config['logsize']:512;
+if (isset($config['tcplisten']))
+	foreach ($config['tcplisten'] as $e)
+		new StreamServer(
+			isset($e['host']) ? $e['host'] : '0.0.0.0',
+			isset($e['port']) ? $e['port'] : 15387,
+			isset($e['ipv6']) && $e['ipv6']);
+if (isset($config['tcpconnect']))
+	foreach ($config['tcpconnect'] as $e)
+		new StreamClient(
+			isset($e['host']) ? $e['host'] : '127.0.0.1',
+			isset($e['port']) ? $e['port'] : 15387,
+			isset($e['ipv6']) && $e['ipv6'],
+			isset($e['bindhost']) ? $e['bindhost'] : NULL,
+			isset($e['bindport']) ? $e['bindport'] : NULL);
+if (isset($config['udppeer']))
+	foreach ($config['udppeer'] as $e)
+		new StreamClient(
+			isset($e['host']) ? $e['host'] : NULL,
+			isset($e['port']) ? $e['port'] : 15387,
+			isset($e['ipv6']) && $e['ipv6'],
+			isset($e['bindhost']) ? $e['bindhost'] : NULL,
+			isset($e['bindport']) ? $e['bindport'] : NULL);
+
+function udpmsg_receive($from, $msg) {
+	global $peers, $msgi, $msglog, $logsize;
+	$hash = md5($msg);
+	if (in_array($hash, $msglog)) return FALSE;
+	$msglog[$msgi] = $hash;
+	$msgi++;
+	if ($msgi >= $logsize) $msgi = 0;
+	$count = 0;
+	foreach ($peers as $peer) if ($peer !== $from) if ($peer->send($msg)) $count++;
+	return $count;
+}
+
+while (TRUE) {
+	$selread = $sock_fd_connected;
+	$selwrite = $sock_fd_connecting;
+	$selerror = array_merge($selread, $selwrite);
+	if (count($selerror)) {
+		socket_select(&$selread, &$selwrite, &$selerror, 60);
+		foreach ($selread as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_read($socket);
+		foreach ($selwrite as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_write($socket);
+		foreach ($selerror as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_error($socket);
+	} else {
+		sleep(10);
+	}
+	foreach ($sock_timeout as $obj) $obj->select_timeout();
+}