comparison hub/hub.php @ 0:dd81c38b513a

Initial commit
author Ivo Smits <Ivo@UCIS.nl>
date Mon, 28 Feb 2011 00:49:07 +0100
parents
children 7e342a0a3b74
comparison
equal deleted inserted replaced
-1:000000000000 0:dd81c38b513a
1 <?php
2 /* Copyright 2010 Ivo Smits <Ivo@UCIS.nl>. All rights reserved.
3 Redistribution and use in source and binary forms, with or without modification, are
4 permitted provided that the following conditions are met:
5
6 1. Redistributions of source code must retain the above copyright notice, this list of
7 conditions and the following disclaimer.
8
9 2. Redistributions in binary form must reproduce the above copyright notice, this list
10 of conditions and the following disclaimer in the documentation and/or other materials
11 provided with the distribution.
12
13 THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
14 WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
15 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR
16 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
17 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
18 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
19 ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
20 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
23 The views and conclusions contained in the software and documentation are those of the
24 authors and should not be interpreted as representing official policies, either expressed
25 or implied, of Ivo Smits.*/
26
27 if (defined('APP_LOADED')) return;
28 define('APP_LOADED', TRUE);
29 if (!defined('CONFIGFILE')) define('CONFIGFILE', './config.php');
30
31 print("UCIS UDPMSG3 Hub (c) 2010 Ivo Smits <Ivo@UCIS.nl>\n");
32 print("More information: http://wiki.ucis.nl/UDPMSG3\n");
33 print("\n");
34
35 class StreamPeer {
36 public $socket = NULL;
37 public $buffer = '';
38 public function select_read() {
39 if ($this->udpmsg_read()) return;
40 socket_delete($this);
41 socket_close($this->socket);
42 fprintf(STDERR, "UDPMSG: Error: closing connection.\n");
43 peer_delete($this);
44 $this->closed();
45 }
46 public function select_error() {
47 socket_delete($this);
48 socket_close($this->socket);
49 fprintf(STDERR, "UDPMSG: Error: select error.\n");
50 peer_delete($this);
51 $this->closed();
52 }
53 private function udpmsg_read() {
54 $msg = socket_read($this->socket, 1024);
55 if (!strlen($msg)) {
56 fprintf(STDERR, "UDPMSG: End of file\n");
57 return FALSE;
58 }
59 $udpmsg_buffer = &$this->buffer;
60 $udpmsg_buffer .= $msg;
61 while (strlen($udpmsg_buffer) > 2) {
62 $len = ord($udpmsg_buffer[0]) * 256 + ord($udpmsg_buffer[1]);
63 if ($len <= 0 || $len > 1024) {
64 fprintf(STDERR, "UDPMSG: Error: protocol error\n");
65 return FALSE;
66 }
67 if (strlen($udpmsg_buffer) < 2 + $len) break;
68 udpmsg_receive($this, substr($udpmsg_buffer, 2, $len));
69 $udpmsg_buffer = substr($udpmsg_buffer, 2 + $len);
70 }
71 return TRUE;
72 }
73 public function send($msg) {
74 $len = strlen($msg);
75 if ($len > 1024) {
76 fprintf(STDERR, "UDPMSG: Error: message too long!\n");
77 return FALSE;
78 }
79 $lens = chr(floor($len / 256)).chr($len % 256);
80 socket_write($this->socket, $lens.$msg);
81 return TRUE;
82 }
83 }
84 class StreamServer {
85 public $host, $port, $bindhost, $bindport, $ipv6;
86 public $socket = NULL;
87 function __construct($host, $port, $ipv6 = FALSE) {
88 $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6;
89 if (!$this->select_timeout()) socket_add_timeout($this);
90 }
91 public function select_timeout() {
92 socket_delete($this);
93 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
94 socket_set_nonblock($this->socket);
95 if (!socket_bind($this->socket, $this->host, $this->port)) return FALSE;
96 if (!socket_listen($this->socket)) return FALSE;
97 socket_delete($this);
98 socket_add_connected($this);
99 fprintf(STDERR, "UDPMSG: Listening\n");
100 return TRUE;
101 }
102 public function select_read() {
103 $client = socket_accept($this->socket);
104 if (!$client) return;
105 if (!socket_getpeername($client, &$addr, &$port)) $addr = 'unknown';
106 fprintf(STDERR, 'UDPMSG: Accepted client '.$addr.':'.$port."\n");
107 new StreamServerClient($client);
108 }
109 public function select_error() {
110 socket_close($this->socket);
111 fprintf(STDERR, "UDPMSG: Error: select error on listener. Closing listener.\n");
112 socket_delete($this);
113 socket_add_timeout($this);
114 }
115 }
116 class StreamClient extends StreamPeer {
117 public $host, $port, $bindhost, $bindport, $ipv6;
118 function __construct($host, $port, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) {
119 $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; $this->bindhost = $bindhost; $this->bindport = $bindport;
120 $this->select_timeout();
121 }
122 public function closed() {
123 socket_add_timeout($this);
124 }
125 public function select_timeout() {
126 socket_delete($this);
127 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
128 socket_set_nonblock($this->socket);
129 if ($this->bindhost || $this->bindport) socket_bind($this->socket, $this->bindhost, $this->bindport);
130 socket_connect($this->socket, $this->host, $this->port);
131 socket_add_connecting($this);
132 }
133 public function select_write() {
134 if (!socket_getpeername($this->socket, &$addr, &$port)) $addr = 'unknown';
135 fprintf(STDERR, 'UDPMSG: Connected to '.$addr.':'.$port."\n");
136 socket_delete($this);
137 socket_add_connected($this);
138 peer_add($this);
139 }
140 }
141 class StreamServerClient extends StreamPeer {
142 public function closed() { }
143 function __construct($socket) {
144 socket_set_nonblock($socket);
145 $this->socket = $socket;
146 socket_add_connected($this);
147 peer_add($this);
148 }
149 }
150 class UDPPeer {
151 public $socket = NULL;
152 function __construct($host = NULL, $port = NULL, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) {
153 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP);
154 socket_set_nonblock($this->socket);
155 if ($bindhost || $bindport) socket_bind($this->socket, $bindhost, $bindport);
156 socket_add_connected($this);
157 if ($host) {
158 socket_connect($this->socket, $host, $port);
159 peer_add($this);
160 }
161 }
162 public function select_read() {
163 if ($this->udpmsg_read()) return;
164 fprintf(STDERR, "UDPMSG: Error: closing connection.\n");
165 }
166 public function select_error() {
167 fprintf(STDERR, "UDPMSG: Error: select error.\n");
168 }
169 private function udpmsg_read() {
170 $msg = socket_read($this->socket, 1024 + 20);
171 if (!strlen($msg)) {
172 fprintf(STDERR, "UDPMSG: End of file\n");
173 return FALSE;
174 }
175 if (strlen($msg) < 20 || substr($msg, 0, 20) != sha(substr($msg, 20), TRUE)) {
176 fprintf(STDERR, "UDPMSG: Short message or checksum mismatch\n");
177 return FALSE;
178 }
179 udpmsg_receive($this, $msg);
180 return TRUE;
181 }
182 public function send($msg) {
183 socket_write($this->socket, sha1($msg, TRUE).$msg);
184 return TRUE;
185 }
186 }
187
188 $msgi = 0;
189 $msglog = array();
190
191 $sock_fd_connecting = array();
192 $sock_fd_connected = array();
193 $sock_timeout = array();
194 $sock_obj = array();
195
196 $peers = array();
197
198 function peer_add($obj) {
199 global $peers;
200 $peers[spl_object_hash($obj)] = $obj;
201 }
202 function peer_delete($obj) {
203 global $peers;
204 unset($peers[spl_object_hash($obj)]);
205 }
206
207 function socket_add_timeout($obj) {
208 global $sock_timeout;
209 $sock_timeout[spl_object_hash($obj)] = $obj;
210 }
211 function socket_add_connecting($obj) {
212 global $sock_fd_connecting, $sock_obj;
213 $sock_fd_connecting[(int)$obj->socket] = $obj->socket;
214 $sock_obj[(int)$obj->socket] = $obj;
215 }
216 function socket_add_connected($obj) {
217 global $sock_fd_connected, $sock_obj;
218 $sock_fd_connected[(int)$obj->socket] = $obj->socket;
219 $sock_obj[(int)$obj->socket] = $obj;
220 }
221 function socket_delete($obj) {
222 global $sock_fd_connecting, $sock_fd_connected, $sock_obj, $sock_timeout;
223 unset($sock_fd_connecting[(int)$obj->socket]);
224 unset($sock_fd_connected[(int)$obj->socket]);
225 unset($sock_obj[(int)$obj->socket]);
226 unset($sock_timeout[spl_object_hash($obj)]);
227 }
228
229 print("Loading configuration...\n");
230 if (!isset($config)) require constant('CONFIGFILE');
231
232 $logsize = isset($config['logsize'])?$config['logsize']:512;
233 if (isset($config['tcplisten']))
234 foreach ($config['tcplisten'] as $e)
235 new StreamServer(
236 isset($e['host']) ? $e['host'] : '0.0.0.0',
237 isset($e['port']) ? $e['port'] : 15387,
238 isset($e['ipv6']) && $e['ipv6']);
239 if (isset($config['tcpconnect']))
240 foreach ($config['tcpconnect'] as $e)
241 new StreamClient(
242 isset($e['host']) ? $e['host'] : '127.0.0.1',
243 isset($e['port']) ? $e['port'] : 15387,
244 isset($e['ipv6']) && $e['ipv6'],
245 isset($e['bindhost']) ? $e['bindhost'] : NULL,
246 isset($e['bindport']) ? $e['bindport'] : NULL);
247 if (isset($config['udppeer']))
248 foreach ($config['udppeer'] as $e)
249 new StreamClient(
250 isset($e['host']) ? $e['host'] : NULL,
251 isset($e['port']) ? $e['port'] : 15387,
252 isset($e['ipv6']) && $e['ipv6'],
253 isset($e['bindhost']) ? $e['bindhost'] : NULL,
254 isset($e['bindport']) ? $e['bindport'] : NULL);
255
256 function udpmsg_receive($from, $msg) {
257 global $peers, $msgi, $msglog, $logsize;
258 $hash = md5($msg);
259 if (in_array($hash, $msglog)) return FALSE;
260 $msglog[$msgi] = $hash;
261 $msgi++;
262 if ($msgi >= $logsize) $msgi = 0;
263 $count = 0;
264 foreach ($peers as $peer) if ($peer !== $from) if ($peer->send($msg)) $count++;
265 return $count;
266 }
267
268 while (TRUE) {
269 $selread = $sock_fd_connected;
270 $selwrite = $sock_fd_connecting;
271 $selerror = array_merge($selread, $selwrite);
272 if (count($selerror)) {
273 socket_select(&$selread, &$selwrite, &$selerror, 60);
274 foreach ($selread as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_read($socket);
275 foreach ($selwrite as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_write($socket);
276 foreach ($selerror as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_error($socket);
277 } else {
278 sleep(10);
279 }
280 foreach ($sock_timeout as $obj) $obj->select_timeout();
281 }