comparison fetchnews.php @ 12:7917bd536187 draft

Added hook for new articles, detect send/write failures, fixed handling of multiline headers, add Date header if it doesn't exist, add option to disable peers, fixes for synchronization with INN, added streaming mode support, small fixes
author Ivo Smits <Ivo@UCIS.nl>
date Wed, 12 Jun 2013 22:22:07 +0200
parents e0807e0b1a67
children cccd73f72bf6
comparison
equal deleted inserted replaced
11:e0807e0b1a67 12:7917bd536187
27 27
28 chdir(__DIR__); 28 chdir(__DIR__);
29 require_once './common.php'; 29 require_once './common.php';
30 30
31 foreach ($db->evalAllAssoc('SELECT * FROM `peers`') as $peer) { 31 foreach ($db->evalAllAssoc('SELECT * FROM `peers`') as $peer) {
32 if (!$peer['enabled']) continue;
32 $socket = stream_socket_client($peer['address']); 33 $socket = stream_socket_client($peer['address']);
33 if ($socket === FALSE) { 34 if ($socket === FALSE) {
34 print("Could not connect to peer $peer[address]\n"); 35 print("Could not connect to peer $peer[address]\n");
35 continue; 36 continue;
36 } 37 }
38 $code = strtok($line, " \t"); 39 $code = strtok($line, " \t");
39 if ($code == 200) { 40 if ($code == 200) {
40 } else if ($code == 201) { 41 } else if ($code == 201) {
41 $peer['post'] = 0; 42 $peer['post'] = 0;
42 } else die("Error code $code from $peer[address]\n"); 43 } else die("Error code $code from $peer[address]\n");
44 while ($peer['post'] == 2) {
45 if ($peer['lastposted'] === NULL) {
46 $articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 10');
47 } else {
48 $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 10', $peer['lastposted']);
49 }
50 if (!count($articles)) break;
51 foreach ($articles as $article) {
52 nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>');
53 $line = nntp_readline($socket);
54 $code = strtok($line, " \t");
55 if ($code == 435) { //Duplicate
56 } else if ($code == 335) { //Please send
57 foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line);
58 nntp_writeline($socket, '');
59 foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line);
60 nntp_writeline($socket, '.');
61 $line = nntp_readline($socket);
62 $code = strtok($line, " \t");
63 if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n");
64 if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
65 } else {
66 print("IHAVE rejected by remote server, falling back to POST\n");
67 $peer['post'] = 1;
68 break;
69 }
70 if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
71 }
72 $db->update('UPDATE `peers` SET `lastposted` = ? WHERE `id` = ?', array($peer['lastposted'], $peer['id']));
73 }
74 nntp_writeline($socket, 'MODE READER');
75 $line = nntp_readline($socket);
43 foreach ($db->evalAllAssoc('SELECT * FROM `peergroups` WHERE `peer` = ?', $peer['id']) as $peergroup) { 76 foreach ($db->evalAllAssoc('SELECT * FROM `peergroups` WHERE `peer` = ?', $peer['id']) as $peergroup) {
44 $group = $db->evalRowAssoc('SELECT * FROM `groups` WHERE `id` = ?', $peergroup['group']); 77 $group = $db->evalRowAssoc('SELECT * FROM `groups` WHERE `id` = ?', $peergroup['group']);
45 nntp_writeline($socket, 'GROUP '.$group['name']); 78 nntp_writeline($socket, 'GROUP '.$group['name']);
46 $line = nntp_readline($socket); 79 $line = nntp_readline($socket);
47 $code = strtok($line, " \t"); 80 $code = strtok($line, " \t");
86 } 119 }
87 } 120 }
88 $db->update('UPDATE `peergroups` SET `low` = ?, `high` = ? WHERE `peer` = ? AND `group` = ?', array($low, $high, $peergroup['peer'], $peergroup['group'])); 121 $db->update('UPDATE `peergroups` SET `low` = ?, `high` = ? WHERE `peer` = ? AND `group` = ?', array($low, $high, $peergroup['peer'], $peergroup['group']));
89 } 122 }
90 } 123 }
91 while ($peer['post']) { 124 while ($peer['post'] == 1) {
92 if ($peer['lastposted'] === NULL) { 125 if ($peer['lastposted'] === NULL) {
93 $articles = $db->evalAllAssoc('SELECT * FROM `messages` LIMIT 10'); 126 $articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 5');
94 } else { 127 } else {
95 $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? LIMIT 10', $peer['lastposted']); 128 $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 5', $peer['lastposted']);
96 } 129 }
97 if (!count($articles)) break; 130 if (!count($articles)) break;
98 foreach ($articles as $article) { 131 foreach ($articles as $article) {
99 $dopost = FALSE; 132 nntp_writeline($socket, 'STAT <'.$article['messageid'].'>');
100 if ($peer['post'] == 2) { 133 $line = nntp_readline($socket);
101 nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>'); 134 $code = strtok($line, " \t");
135 if ($code == 501) { //Argument error
136 print("STAT rejected by remote server, skipping message\n");
137 } elseif ($code == 223) { //Exists
138 } elseif ($code == 430) { //Not found
139 nntp_writeline($socket, 'POST');
102 $line = nntp_readline($socket); 140 $line = nntp_readline($socket);
103 $code = strtok($line, " \t"); 141 $code = strtok($line, " \t");
104 if ($code == 335) $dopost = TRUE; 142 if ($code != 340) die("Error code $code from $peer[address]\n");
105 elseif ($code != 435) $peer['post'] = 1;
106 }
107 if ($peer['post'] != 2) {
108 nntp_writeline($socket, 'STAT <'.$article['messageid'].'>');
109 $line = nntp_readline($socket);
110 $code = strtok($line, " \t");
111 if ($code == 430) $dopost = TRUE;
112 elseif ($code != 223) die("Error code $code from $peer[address]\n");
113 if ($dopost) {
114 nntp_writeline($socket, 'POST');
115 $line = nntp_readline($socket);
116 $code = strtok($line, " \t");
117 if ($code != 340) die("Error code $code from $peer[address]\n");
118 }
119 }
120 if ($dopost) {
121 foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line); 143 foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line);
122 nntp_writeline($socket, ''); 144 nntp_writeline($socket, '');
123 foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line); 145 foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line);
124 nntp_writeline($socket, '.'); 146 nntp_writeline($socket, '.');
125 $line = nntp_readline($socket); 147 $line = nntp_readline($socket);
126 $code = strtok($line, " \t"); 148 $code = strtok($line, " \t");
127 if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n"); 149 if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n");
150 } else {
151 die("Error code $code from $peer[address]\n");
128 } 152 }
129 if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id']; 153 if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
130 } 154 }
131 $db->update('UPDATE `peers` SET `lastposted` = ? WHERE `id` = ?', array($peer['lastposted'], $peer['id'])); 155 $db->update('UPDATE `peers` SET `lastposted` = ? WHERE `id` = ?', array($peer['lastposted'], $peer['id']));
132 } 156 }