Mercurial > hg > udpmsg3
comparison hub/hub.php @ 0:dd81c38b513a
Initial commit
author | Ivo Smits <Ivo@UCIS.nl> |
---|---|
date | Mon, 28 Feb 2011 00:49:07 +0100 |
parents | |
children | 7e342a0a3b74 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:dd81c38b513a |
---|---|
1 <?php | |
2 /* Copyright 2010 Ivo Smits <Ivo@UCIS.nl>. All rights reserved. | |
3 Redistribution and use in source and binary forms, with or without modification, are | |
4 permitted provided that the following conditions are met: | |
5 | |
6 1. Redistributions of source code must retain the above copyright notice, this list of | |
7 conditions and the following disclaimer. | |
8 | |
9 2. Redistributions in binary form must reproduce the above copyright notice, this list | |
10 of conditions and the following disclaimer in the documentation and/or other materials | |
11 provided with the distribution. | |
12 | |
13 THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED | |
14 WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND | |
15 FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR | |
16 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
17 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |
18 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON | |
19 ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | |
20 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF | |
21 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
22 | |
23 The views and conclusions contained in the software and documentation are those of the | |
24 authors and should not be interpreted as representing official policies, either expressed | |
25 or implied, of Ivo Smits.*/ | |
26 | |
27 if (defined('APP_LOADED')) return; | |
28 define('APP_LOADED', TRUE); | |
29 if (!defined('CONFIGFILE')) define('CONFIGFILE', './config.php'); | |
30 | |
31 print("UCIS UDPMSG3 Hub (c) 2010 Ivo Smits <Ivo@UCIS.nl>\n"); | |
32 print("More information: http://wiki.ucis.nl/UDPMSG3\n"); | |
33 print("\n"); | |
34 | |
35 class StreamPeer { | |
36 public $socket = NULL; | |
37 public $buffer = ''; | |
38 public function select_read() { | |
39 if ($this->udpmsg_read()) return; | |
40 socket_delete($this); | |
41 socket_close($this->socket); | |
42 fprintf(STDERR, "UDPMSG: Error: closing connection.\n"); | |
43 peer_delete($this); | |
44 $this->closed(); | |
45 } | |
46 public function select_error() { | |
47 socket_delete($this); | |
48 socket_close($this->socket); | |
49 fprintf(STDERR, "UDPMSG: Error: select error.\n"); | |
50 peer_delete($this); | |
51 $this->closed(); | |
52 } | |
53 private function udpmsg_read() { | |
54 $msg = socket_read($this->socket, 1024); | |
55 if (!strlen($msg)) { | |
56 fprintf(STDERR, "UDPMSG: End of file\n"); | |
57 return FALSE; | |
58 } | |
59 $udpmsg_buffer = &$this->buffer; | |
60 $udpmsg_buffer .= $msg; | |
61 while (strlen($udpmsg_buffer) > 2) { | |
62 $len = ord($udpmsg_buffer[0]) * 256 + ord($udpmsg_buffer[1]); | |
63 if ($len <= 0 || $len > 1024) { | |
64 fprintf(STDERR, "UDPMSG: Error: protocol error\n"); | |
65 return FALSE; | |
66 } | |
67 if (strlen($udpmsg_buffer) < 2 + $len) break; | |
68 udpmsg_receive($this, substr($udpmsg_buffer, 2, $len)); | |
69 $udpmsg_buffer = substr($udpmsg_buffer, 2 + $len); | |
70 } | |
71 return TRUE; | |
72 } | |
73 public function send($msg) { | |
74 $len = strlen($msg); | |
75 if ($len > 1024) { | |
76 fprintf(STDERR, "UDPMSG: Error: message too long!\n"); | |
77 return FALSE; | |
78 } | |
79 $lens = chr(floor($len / 256)).chr($len % 256); | |
80 socket_write($this->socket, $lens.$msg); | |
81 return TRUE; | |
82 } | |
83 } | |
84 class StreamServer { | |
85 public $host, $port, $bindhost, $bindport, $ipv6; | |
86 public $socket = NULL; | |
87 function __construct($host, $port, $ipv6 = FALSE) { | |
88 $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; | |
89 if (!$this->select_timeout()) socket_add_timeout($this); | |
90 } | |
91 public function select_timeout() { | |
92 socket_delete($this); | |
93 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); | |
94 socket_set_nonblock($this->socket); | |
95 if (!socket_bind($this->socket, $this->host, $this->port)) return FALSE; | |
96 if (!socket_listen($this->socket)) return FALSE; | |
97 socket_delete($this); | |
98 socket_add_connected($this); | |
99 fprintf(STDERR, "UDPMSG: Listening\n"); | |
100 return TRUE; | |
101 } | |
102 public function select_read() { | |
103 $client = socket_accept($this->socket); | |
104 if (!$client) return; | |
105 if (!socket_getpeername($client, &$addr, &$port)) $addr = 'unknown'; | |
106 fprintf(STDERR, 'UDPMSG: Accepted client '.$addr.':'.$port."\n"); | |
107 new StreamServerClient($client); | |
108 } | |
109 public function select_error() { | |
110 socket_close($this->socket); | |
111 fprintf(STDERR, "UDPMSG: Error: select error on listener. Closing listener.\n"); | |
112 socket_delete($this); | |
113 socket_add_timeout($this); | |
114 } | |
115 } | |
116 class StreamClient extends StreamPeer { | |
117 public $host, $port, $bindhost, $bindport, $ipv6; | |
118 function __construct($host, $port, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) { | |
119 $this->host = $host; $this->port = $port; $this->ipv6 = $ipv6; $this->bindhost = $bindhost; $this->bindport = $bindport; | |
120 $this->select_timeout(); | |
121 } | |
122 public function closed() { | |
123 socket_add_timeout($this); | |
124 } | |
125 public function select_timeout() { | |
126 socket_delete($this); | |
127 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); | |
128 socket_set_nonblock($this->socket); | |
129 if ($this->bindhost || $this->bindport) socket_bind($this->socket, $this->bindhost, $this->bindport); | |
130 socket_connect($this->socket, $this->host, $this->port); | |
131 socket_add_connecting($this); | |
132 } | |
133 public function select_write() { | |
134 if (!socket_getpeername($this->socket, &$addr, &$port)) $addr = 'unknown'; | |
135 fprintf(STDERR, 'UDPMSG: Connected to '.$addr.':'.$port."\n"); | |
136 socket_delete($this); | |
137 socket_add_connected($this); | |
138 peer_add($this); | |
139 } | |
140 } | |
141 class StreamServerClient extends StreamPeer { | |
142 public function closed() { } | |
143 function __construct($socket) { | |
144 socket_set_nonblock($socket); | |
145 $this->socket = $socket; | |
146 socket_add_connected($this); | |
147 peer_add($this); | |
148 } | |
149 } | |
150 class UDPPeer { | |
151 public $socket = NULL; | |
152 function __construct($host = NULL, $port = NULL, $ipv6 = FALSE, $bindhost = NULL, $bindport = NULL) { | |
153 $this->socket = socket_create($this->ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM, SOL_TCP); | |
154 socket_set_nonblock($this->socket); | |
155 if ($bindhost || $bindport) socket_bind($this->socket, $bindhost, $bindport); | |
156 socket_add_connected($this); | |
157 if ($host) { | |
158 socket_connect($this->socket, $host, $port); | |
159 peer_add($this); | |
160 } | |
161 } | |
162 public function select_read() { | |
163 if ($this->udpmsg_read()) return; | |
164 fprintf(STDERR, "UDPMSG: Error: closing connection.\n"); | |
165 } | |
166 public function select_error() { | |
167 fprintf(STDERR, "UDPMSG: Error: select error.\n"); | |
168 } | |
169 private function udpmsg_read() { | |
170 $msg = socket_read($this->socket, 1024 + 20); | |
171 if (!strlen($msg)) { | |
172 fprintf(STDERR, "UDPMSG: End of file\n"); | |
173 return FALSE; | |
174 } | |
175 if (strlen($msg) < 20 || substr($msg, 0, 20) != sha(substr($msg, 20), TRUE)) { | |
176 fprintf(STDERR, "UDPMSG: Short message or checksum mismatch\n"); | |
177 return FALSE; | |
178 } | |
179 udpmsg_receive($this, $msg); | |
180 return TRUE; | |
181 } | |
182 public function send($msg) { | |
183 socket_write($this->socket, sha1($msg, TRUE).$msg); | |
184 return TRUE; | |
185 } | |
186 } | |
187 | |
188 $msgi = 0; | |
189 $msglog = array(); | |
190 | |
191 $sock_fd_connecting = array(); | |
192 $sock_fd_connected = array(); | |
193 $sock_timeout = array(); | |
194 $sock_obj = array(); | |
195 | |
196 $peers = array(); | |
197 | |
198 function peer_add($obj) { | |
199 global $peers; | |
200 $peers[spl_object_hash($obj)] = $obj; | |
201 } | |
202 function peer_delete($obj) { | |
203 global $peers; | |
204 unset($peers[spl_object_hash($obj)]); | |
205 } | |
206 | |
207 function socket_add_timeout($obj) { | |
208 global $sock_timeout; | |
209 $sock_timeout[spl_object_hash($obj)] = $obj; | |
210 } | |
211 function socket_add_connecting($obj) { | |
212 global $sock_fd_connecting, $sock_obj; | |
213 $sock_fd_connecting[(int)$obj->socket] = $obj->socket; | |
214 $sock_obj[(int)$obj->socket] = $obj; | |
215 } | |
216 function socket_add_connected($obj) { | |
217 global $sock_fd_connected, $sock_obj; | |
218 $sock_fd_connected[(int)$obj->socket] = $obj->socket; | |
219 $sock_obj[(int)$obj->socket] = $obj; | |
220 } | |
221 function socket_delete($obj) { | |
222 global $sock_fd_connecting, $sock_fd_connected, $sock_obj, $sock_timeout; | |
223 unset($sock_fd_connecting[(int)$obj->socket]); | |
224 unset($sock_fd_connected[(int)$obj->socket]); | |
225 unset($sock_obj[(int)$obj->socket]); | |
226 unset($sock_timeout[spl_object_hash($obj)]); | |
227 } | |
228 | |
229 print("Loading configuration...\n"); | |
230 if (!isset($config)) require constant('CONFIGFILE'); | |
231 | |
232 $logsize = isset($config['logsize'])?$config['logsize']:512; | |
233 if (isset($config['tcplisten'])) | |
234 foreach ($config['tcplisten'] as $e) | |
235 new StreamServer( | |
236 isset($e['host']) ? $e['host'] : '0.0.0.0', | |
237 isset($e['port']) ? $e['port'] : 15387, | |
238 isset($e['ipv6']) && $e['ipv6']); | |
239 if (isset($config['tcpconnect'])) | |
240 foreach ($config['tcpconnect'] as $e) | |
241 new StreamClient( | |
242 isset($e['host']) ? $e['host'] : '127.0.0.1', | |
243 isset($e['port']) ? $e['port'] : 15387, | |
244 isset($e['ipv6']) && $e['ipv6'], | |
245 isset($e['bindhost']) ? $e['bindhost'] : NULL, | |
246 isset($e['bindport']) ? $e['bindport'] : NULL); | |
247 if (isset($config['udppeer'])) | |
248 foreach ($config['udppeer'] as $e) | |
249 new StreamClient( | |
250 isset($e['host']) ? $e['host'] : NULL, | |
251 isset($e['port']) ? $e['port'] : 15387, | |
252 isset($e['ipv6']) && $e['ipv6'], | |
253 isset($e['bindhost']) ? $e['bindhost'] : NULL, | |
254 isset($e['bindport']) ? $e['bindport'] : NULL); | |
255 | |
256 function udpmsg_receive($from, $msg) { | |
257 global $peers, $msgi, $msglog, $logsize; | |
258 $hash = md5($msg); | |
259 if (in_array($hash, $msglog)) return FALSE; | |
260 $msglog[$msgi] = $hash; | |
261 $msgi++; | |
262 if ($msgi >= $logsize) $msgi = 0; | |
263 $count = 0; | |
264 foreach ($peers as $peer) if ($peer !== $from) if ($peer->send($msg)) $count++; | |
265 return $count; | |
266 } | |
267 | |
268 while (TRUE) { | |
269 $selread = $sock_fd_connected; | |
270 $selwrite = $sock_fd_connecting; | |
271 $selerror = array_merge($selread, $selwrite); | |
272 if (count($selerror)) { | |
273 socket_select(&$selread, &$selwrite, &$selerror, 60); | |
274 foreach ($selread as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_read($socket); | |
275 foreach ($selwrite as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_write($socket); | |
276 foreach ($selerror as $socket) if (isset($sock_obj[(int)$socket])) $sock_obj[(int)$socket]->select_error($socket); | |
277 } else { | |
278 sleep(10); | |
279 } | |
280 foreach ($sock_timeout as $obj) $obj->select_timeout(); | |
281 } |