# HG changeset patch # User Ivo Smits # Date 1371068527 -7200 # Node ID 7917bd536187e9c315ccae13ec7f08893b42a989 # Parent e0807e0b1a678d34f58ac7625e3ffc02c87ec2ae Added hook for new articles, detect send/write failures, fixed handling of multiline headers, add Date header if it doesn't exist, add option to disable peers, fixes for synchronization with INN, added streaming mode support, small fixes diff -r e0807e0b1a67 -r 7917bd536187 common.php --- a/common.php Sat Jun 18 15:59:11 2011 +0200 +++ b/common.php Wed Jun 12 22:22:07 2013 +0200 @@ -24,6 +24,8 @@ authors and should not be interpreted as representing official policies, either expressed or implied, of Ivo Smits.*/ +$pnewss_hooks = array('article_stored' => array()); + require_once './pdo.php'; require_once './config.php'; @@ -40,7 +42,12 @@ } function nntp_writeline($socket, $line) { writelog('W: '.$line); - fwrite($socket, $line."\r\n"); + $line .= "\r\n"; + while (strlen($line)) { + $written = fwrite($socket, $line); + if ($written === FALSE || $written <= 0) throw new Exception('Write operation failed'); + $line = substr($line, $written); + } } function nntp_readlines($socket) { $line = nntp_readline($socket); @@ -54,8 +61,12 @@ return $lines; } +function pnewss_call_hooks($hooks, $args) { + foreach ($hooks as $hook) call_user_func_array($hook, $args); +} + function nntp_article_store($lines, $header = array()) { - global $db; + global $db, $pnewss_hooks; $headers = array(); if (!count($header)) { while (count($lines)) { @@ -64,9 +75,15 @@ $header[] = $line; } } + $headername = NULL; foreach ($header as $headerid => $line) { if (!strlen($line) || $line == '.') throw new Exception('Empty or terminating header line'); if (strpos($line, "\r") !== FALSE || strpos($line, "\n") !== FALSE || strpos($line, "\0")) throw new Exception('Invalid newline or NUL character in header line'); + if (strlen($line) && strlen($headername) && ($line[0] == ' ' || $line[0] == "\t") && isset($headers[$headername])) { + $headers[$headername] .= ' '.trim($line, " \t"); + unset($header[$headerid]); + continue; + } $parts = explode(': ', $line, 2); $headername = strtoupper($parts[0]); switch ($headername) { @@ -86,6 +103,7 @@ $line = NULL; break; default: + if ($headername[0] == 'X' && $headername[1] == '-') break; writelog("Received unknown header $headername"); } } @@ -102,6 +120,7 @@ } if (!count($newsgroups)) throw new Exception('No known newsgroups listed'); if (!isset($headers['MESSAGE-ID'])) $headers['MESSAGE-ID'] = '<'.md5(time().rand()).'@pNewss.Core.UCIS.nl>'; + if (!isset($headers['DATE'])) $headers['DATE'] = gmdate('r'); $messageid = $headers['MESSAGE-ID']; if (strlen($messageid) < 3 || strlen($messageid) > 250 || $messageid[0] != '<' || strpos($messageid, '>') !== strlen($messageid) - 1) throw new Exception('Bad Message-ID'); $messageid = substr($messageid, 1, -1); @@ -113,5 +132,6 @@ } $id = $db->insert('INSERT INTO `messages` (`messageid`, `header`, `body`) VALUES (?, ?, ?)', array($messageid, implode("\r\n", $header), implode("\r\n", $lines))); foreach ($newsgroups as $groupid) $db->insert('INSERT INTO `groupmessages` (`group`, `message`) VALUES (?, ?)', array($groupid, $id)); + pnewss_call_hooks($pnewss_hooks['article_stored'], array(array('messageid' => $messageid, 'headers' => $headers, 'body' => $lines, 'dbid' => $id))); return $messageid; } diff -r e0807e0b1a67 -r 7917bd536187 database.mysql --- a/database.mysql Sat Jun 18 15:59:11 2011 +0200 +++ b/database.mysql Wed Jun 12 22:22:07 2013 +0200 @@ -78,6 +78,7 @@ CREATE TABLE IF NOT EXISTS `peers` ( `id` int(10) unsigned NOT NULL auto_increment, `address` varchar(255) collate utf8_unicode_ci NOT NULL, + `enabled` tinyiny(1) unsigned NOT NULL DEFAULT '1', `post` tinyint(1) unsigned NOT NULL, `lastposted` int(10) unsigned default NULL, PRIMARY KEY (`id`) diff -r e0807e0b1a67 -r 7917bd536187 fetchnews.php --- a/fetchnews.php Sat Jun 18 15:59:11 2011 +0200 +++ b/fetchnews.php Wed Jun 12 22:22:07 2013 +0200 @@ -29,6 +29,7 @@ require_once './common.php'; foreach ($db->evalAllAssoc('SELECT * FROM `peers`') as $peer) { + if (!$peer['enabled']) continue; $socket = stream_socket_client($peer['address']); if ($socket === FALSE) { print("Could not connect to peer $peer[address]\n"); @@ -40,6 +41,38 @@ } else if ($code == 201) { $peer['post'] = 0; } else die("Error code $code from $peer[address]\n"); + while ($peer['post'] == 2) { + if ($peer['lastposted'] === NULL) { + $articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 10'); + } else { + $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 10', $peer['lastposted']); + } + if (!count($articles)) break; + foreach ($articles as $article) { + nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>'); + $line = nntp_readline($socket); + $code = strtok($line, " \t"); + if ($code == 435) { //Duplicate + } else if ($code == 335) { //Please send + foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line); + nntp_writeline($socket, ''); + foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line); + nntp_writeline($socket, '.'); + $line = nntp_readline($socket); + $code = strtok($line, " \t"); + if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n"); + if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id']; + } else { + print("IHAVE rejected by remote server, falling back to POST\n"); + $peer['post'] = 1; + break; + } + if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id']; + } + $db->update('UPDATE `peers` SET `lastposted` = ? WHERE `id` = ?', array($peer['lastposted'], $peer['id'])); + } + nntp_writeline($socket, 'MODE READER'); + $line = nntp_readline($socket); foreach ($db->evalAllAssoc('SELECT * FROM `peergroups` WHERE `peer` = ?', $peer['id']) as $peergroup) { $group = $db->evalRowAssoc('SELECT * FROM `groups` WHERE `id` = ?', $peergroup['group']); nntp_writeline($socket, 'GROUP '.$group['name']); @@ -88,36 +121,25 @@ $db->update('UPDATE `peergroups` SET `low` = ?, `high` = ? WHERE `peer` = ? AND `group` = ?', array($low, $high, $peergroup['peer'], $peergroup['group'])); } } - while ($peer['post']) { + while ($peer['post'] == 1) { if ($peer['lastposted'] === NULL) { - $articles = $db->evalAllAssoc('SELECT * FROM `messages` LIMIT 10'); + $articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 5'); } else { - $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? LIMIT 10', $peer['lastposted']); + $articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 5', $peer['lastposted']); } if (!count($articles)) break; foreach ($articles as $article) { - $dopost = FALSE; - if ($peer['post'] == 2) { - nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>'); + nntp_writeline($socket, 'STAT <'.$article['messageid'].'>'); + $line = nntp_readline($socket); + $code = strtok($line, " \t"); + if ($code == 501) { //Argument error + print("STAT rejected by remote server, skipping message\n"); + } elseif ($code == 223) { //Exists + } elseif ($code == 430) { //Not found + nntp_writeline($socket, 'POST'); $line = nntp_readline($socket); $code = strtok($line, " \t"); - if ($code == 335) $dopost = TRUE; - elseif ($code != 435) $peer['post'] = 1; - } - if ($peer['post'] != 2) { - nntp_writeline($socket, 'STAT <'.$article['messageid'].'>'); - $line = nntp_readline($socket); - $code = strtok($line, " \t"); - if ($code == 430) $dopost = TRUE; - elseif ($code != 223) die("Error code $code from $peer[address]\n"); - if ($dopost) { - nntp_writeline($socket, 'POST'); - $line = nntp_readline($socket); - $code = strtok($line, " \t"); - if ($code != 340) die("Error code $code from $peer[address]\n"); - } - } - if ($dopost) { + if ($code != 340) die("Error code $code from $peer[address]\n"); foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line); nntp_writeline($socket, ''); foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line); @@ -125,6 +147,8 @@ $line = nntp_readline($socket); $code = strtok($line, " \t"); if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n"); + } else { + die("Error code $code from $peer[address]\n"); } if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id']; } diff -r e0807e0b1a67 -r 7917bd536187 server.php --- a/server.php Sat Jun 18 15:59:11 2011 +0200 +++ b/server.php Wed Jun 12 22:22:07 2013 +0200 @@ -51,6 +51,16 @@ if ($line === FALSE || $line === NULL) break; $cmd = strtoupper(strtok($line, " \t")); switch ($cmd) { + case '': + break; + case 'MODE': + $mode = strtok(" \t"); + switch (strtoupper($mode)) { + case 'READER': nntp_writeline(STDOUT, '200 Hello, you can post'); break; + case 'STREAM': nntp_writeline(STDOUT, '203 Streaming permitted'); break; + default: nntp_writeline(STDOUT, '501 Unknown MODE variant'); break; + } + break; case 'CAPABILITIES': nntp_writeline(STDOUT, '101 Capability list:'); nntp_writeline_data(STDOUT, 'VERSION 2'); @@ -59,6 +69,8 @@ nntp_writeline_data(STDOUT, 'READER'); nntp_writeline_data(STDOUT, 'IHAVE'); nntp_writeline_data(STDOUT, 'LIST ACTIVE'); + nntp_writeline_data(STDOUT, 'MODE-READER'); + nntp_writeline_data(STDOUT, 'STREAMING'); nntp_writeline(STDOUT, '.'); break; case 'DATE': @@ -73,7 +85,7 @@ nntp_writeline(STDOUT, '215 list of groups follows'); foreach ($db->evalAllAssoc('SELECT * FROM `groups`') as $group) { $groupmessages = $db->evalRow('SELECT MIN(`number`), MAX(`number`) FROM `groupmessages` WHERE `group` = ?', $group['id']); - nntp_writeline_data(STDOUT, $group['name'].' '.intval($groupmessages[1]).' '.intval($groupmessages[0]).' n'); + nntp_writeline_data(STDOUT, $group['name'].' '.intval($groupmessages[1]).' '.intval($groupmessages[0]).' y'); } nntp_writeline(STDOUT, '.'); break; @@ -179,7 +191,7 @@ break; case 'IHAVE': $messageid = strtok(" \t"); - if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] == '>') { + if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') { nntp_writeline(STDOUT, '435 Argument error'); break; } @@ -197,6 +209,38 @@ nntp_writeline(STDOUT, '437 '.$ex->getMessage()); } break; + case 'CHECK': + $messageid = strtok(" \t"); + if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') { + nntp_writeline(STDOUT, '435 Argument error'); + break; + } + $messageid = substr($messageid, 1, -1); + if ($db->evalRow('SELECT `id` FROM `messages` WHERE `messageid` = ?', $messageid) !== FALSE) { + nntp_writeline(STDOUT, '438 <'.$messageid.'> Duplicate'); + break; + } + nntp_writeline(STDOUT, '238 <'.$messageid.'> Send article to be transferred'); + break; + case 'TAKETHIS': + $messageid = strtok(" \t"); + $lines = nntp_readlines(STDIN); + if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') { + nntp_writeline(STDOUT, '435 Argument error'); + break; + } + $messageid = substr($messageid, 1, -1); + if ($db->evalRow('SELECT `id` FROM `messages` WHERE `messageid` = ?', $messageid) !== FALSE) { + nntp_writeline(STDOUT, '439 <'.$messageid.'> Duplicate'); + break; + } + try { + $messageid = nntp_article_store($lines); + nntp_writeline(STDOUT, '239 <'.$messageid.'> Article stored'); + } catch (Exception $ex) { + nntp_writeline(STDOUT, '437 '.$ex->getMessage()); + } + break; case 'QUIT': nntp_writeline(STDOUT, '205 Bye.'); return;