changeset 12:7917bd536187 draft

Added hook for new articles, detect send/write failures, fixed handling of multiline headers, add Date header if it doesn't exist, add option to disable peers, fixes for synchronization with INN, added streaming mode support, small fixes
author Ivo Smits <Ivo@UCIS.nl>
date Wed, 12 Jun 2013 22:22:07 +0200
parents e0807e0b1a67
children cccd73f72bf6
files common.php database.mysql fetchnews.php server.php
diffstat 4 files changed, 116 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/common.php	Sat Jun 18 15:59:11 2011 +0200
+++ b/common.php	Wed Jun 12 22:22:07 2013 +0200
@@ -24,6 +24,8 @@
    authors and should not be interpreted as representing official policies, either expressed
    or implied, of Ivo Smits.*/
 
+$pnewss_hooks = array('article_stored' => array());
+
 require_once './pdo.php';
 require_once './config.php';
 
@@ -40,7 +42,12 @@
 }
 function nntp_writeline($socket, $line) {
 	writelog('W: '.$line);
-	fwrite($socket, $line."\r\n");
+	$line .= "\r\n";
+	while (strlen($line)) {
+		$written = fwrite($socket, $line);
+		if ($written === FALSE || $written <= 0) throw new Exception('Write operation failed');
+		$line = substr($line, $written);
+	}
 }
 function nntp_readlines($socket) {
 	$line = nntp_readline($socket);
@@ -54,8 +61,12 @@
 	return $lines;
 }
 
+function pnewss_call_hooks($hooks, $args) {
+	foreach ($hooks as $hook) call_user_func_array($hook, $args);
+}
+
 function nntp_article_store($lines, $header = array()) {
-	global $db;
+	global $db, $pnewss_hooks;
 	$headers = array();
 	if (!count($header)) {
 		while (count($lines)) {
@@ -64,9 +75,15 @@
 			$header[] = $line;
 		}
 	}
+	$headername = NULL;
 	foreach ($header as $headerid => $line) {
 		if (!strlen($line) || $line == '.') throw new Exception('Empty or terminating header line');
 		if (strpos($line, "\r") !== FALSE || strpos($line, "\n") !== FALSE || strpos($line, "\0")) throw new Exception('Invalid newline or NUL character in header line');
+		if (strlen($line) && strlen($headername) && ($line[0] == ' ' || $line[0] == "\t") && isset($headers[$headername])) {
+			$headers[$headername] .= ' '.trim($line, " \t");
+			unset($header[$headerid]);
+			continue;
+		}
 		$parts = explode(': ', $line, 2);
 		$headername = strtoupper($parts[0]);
 		switch ($headername) {
@@ -86,6 +103,7 @@
 				$line = NULL;
 				break;
 			default:
+				if ($headername[0] == 'X' && $headername[1] == '-') break;
 				writelog("Received unknown header $headername");
 		}
 	}
@@ -102,6 +120,7 @@
 	}
 	if (!count($newsgroups)) throw new Exception('No known newsgroups listed');
 	if (!isset($headers['MESSAGE-ID'])) $headers['MESSAGE-ID'] = '<'.md5(time().rand()).'@pNewss.Core.UCIS.nl>';
+	if (!isset($headers['DATE'])) $headers['DATE'] = gmdate('r');
 	$messageid = $headers['MESSAGE-ID'];
 	if (strlen($messageid) < 3 || strlen($messageid) > 250 || $messageid[0] != '<' || strpos($messageid, '>') !== strlen($messageid) - 1) throw new Exception('Bad Message-ID');
 	$messageid = substr($messageid, 1, -1);
@@ -113,5 +132,6 @@
 	}
 	$id = $db->insert('INSERT INTO `messages` (`messageid`, `header`, `body`) VALUES (?, ?, ?)', array($messageid, implode("\r\n", $header), implode("\r\n", $lines)));
 	foreach ($newsgroups as $groupid) $db->insert('INSERT INTO `groupmessages` (`group`, `message`) VALUES (?, ?)', array($groupid, $id));
+	pnewss_call_hooks($pnewss_hooks['article_stored'], array(array('messageid' => $messageid, 'headers' => $headers, 'body' => $lines, 'dbid' => $id)));
 	return $messageid;
 }
--- a/database.mysql	Sat Jun 18 15:59:11 2011 +0200
+++ b/database.mysql	Wed Jun 12 22:22:07 2013 +0200
@@ -78,6 +78,7 @@
 CREATE TABLE IF NOT EXISTS `peers` (
   `id` int(10) unsigned NOT NULL auto_increment,
   `address` varchar(255) collate utf8_unicode_ci NOT NULL,
+  `enabled` tinyiny(1) unsigned NOT NULL DEFAULT '1',
   `post` tinyint(1) unsigned NOT NULL,
   `lastposted` int(10) unsigned default NULL,
   PRIMARY KEY  (`id`)
--- a/fetchnews.php	Sat Jun 18 15:59:11 2011 +0200
+++ b/fetchnews.php	Wed Jun 12 22:22:07 2013 +0200
@@ -29,6 +29,7 @@
 require_once './common.php';
 
 foreach ($db->evalAllAssoc('SELECT * FROM `peers`') as $peer) {
+	if (!$peer['enabled']) continue;
 	$socket = stream_socket_client($peer['address']);
 	if ($socket === FALSE) {
 		print("Could not connect to peer $peer[address]\n");
@@ -40,6 +41,38 @@
 	} else if ($code == 201) {
 		$peer['post'] = 0;
 	} else die("Error code $code from $peer[address]\n");
+	while ($peer['post'] == 2) {
+		if ($peer['lastposted'] === NULL) {
+			$articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 10');
+		} else {
+			$articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 10', $peer['lastposted']);
+		}
+		if (!count($articles)) break;
+		foreach ($articles as $article) {
+			nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>');
+			$line = nntp_readline($socket);
+			$code = strtok($line, " \t");
+			if ($code == 435) { //Duplicate
+			} else if ($code == 335) { //Please send
+				foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line);
+				nntp_writeline($socket, '');
+				foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line);
+				nntp_writeline($socket, '.');
+				$line = nntp_readline($socket);
+				$code = strtok($line, " \t");
+				if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n");
+				if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
+			} else {
+				print("IHAVE rejected by remote server, falling back to POST\n");
+				$peer['post'] = 1;
+				break;
+			}
+			if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
+		}
+		$db->update('UPDATE `peers` SET `lastposted` = ? WHERE `id` = ?', array($peer['lastposted'], $peer['id']));
+	}
+	nntp_writeline($socket, 'MODE READER');
+	$line = nntp_readline($socket);
 	foreach ($db->evalAllAssoc('SELECT * FROM `peergroups` WHERE `peer` = ?', $peer['id']) as $peergroup) {
 		$group = $db->evalRowAssoc('SELECT * FROM `groups` WHERE `id` = ?', $peergroup['group']);
 		nntp_writeline($socket, 'GROUP '.$group['name']);
@@ -88,36 +121,25 @@
 			$db->update('UPDATE `peergroups` SET `low` = ?, `high` = ? WHERE `peer` = ? AND `group` = ?', array($low, $high, $peergroup['peer'], $peergroup['group']));
 		}
 	}
-	while ($peer['post']) {
+	while ($peer['post'] == 1) {
 		if ($peer['lastposted'] === NULL) {
-			$articles = $db->evalAllAssoc('SELECT * FROM `messages` LIMIT 10');
+			$articles = $db->evalAllAssoc('SELECT * FROM `messages` ORDER BY `id` ASC LIMIT 5');
 		} else {
-			$articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? LIMIT 10', $peer['lastposted']);
+			$articles = $db->evalAllAssoc('SELECT * FROM `messages` WHERE `id` > ? ORDER BY `id` ASC LIMIT 5', $peer['lastposted']);
 		}
 		if (!count($articles)) break;
 		foreach ($articles as $article) {
-			$dopost = FALSE;
-			if ($peer['post'] == 2) {
-				nntp_writeline($socket, 'IHAVE <'.$article['messageid'].'>');
+			nntp_writeline($socket, 'STAT <'.$article['messageid'].'>');
+			$line = nntp_readline($socket);
+			$code = strtok($line, " \t");
+			if ($code == 501) { //Argument error
+				print("STAT rejected by remote server, skipping message\n");
+			} elseif ($code == 223) { //Exists
+			} elseif ($code == 430) { //Not found
+				nntp_writeline($socket, 'POST');
 				$line = nntp_readline($socket);
 				$code = strtok($line, " \t");
-				if ($code == 335) $dopost = TRUE;
-				elseif ($code != 435) $peer['post'] = 1;
-			}
-			if ($peer['post'] != 2) {
-				nntp_writeline($socket, 'STAT <'.$article['messageid'].'>');
-				$line = nntp_readline($socket);
-				$code = strtok($line, " \t");
-				if ($code == 430) $dopost = TRUE;
-				elseif ($code != 223) die("Error code $code from $peer[address]\n");
-				if ($dopost) {
-					nntp_writeline($socket, 'POST');
-					$line = nntp_readline($socket);
-					$code = strtok($line, " \t");
-					if ($code != 340) die("Error code $code from $peer[address]\n");
-				}
-			}
-			if ($dopost) {
+				if ($code != 340) die("Error code $code from $peer[address]\n");
 				foreach (explode("\r\n", $article['header']) as $line) nntp_writeline_data($socket, $line);
 				nntp_writeline($socket, '');
 				foreach (explode("\r\n", $article['body']) as $line) nntp_writeline_data($socket, $line);
@@ -125,6 +147,8 @@
 				$line = nntp_readline($socket);
 				$code = strtok($line, " \t");
 				if ($code != 240 && $code != 235) print("Article $article[messageid] was not accepted ($code)\n");
+			} else {
+				die("Error code $code from $peer[address]\n");
 			}
 			if ($article['id'] > $peer['lastposted']) $peer['lastposted'] = $article['id'];
 		}
--- a/server.php	Sat Jun 18 15:59:11 2011 +0200
+++ b/server.php	Wed Jun 12 22:22:07 2013 +0200
@@ -51,6 +51,16 @@
 	if ($line === FALSE || $line === NULL) break;
 	$cmd = strtoupper(strtok($line, " \t"));
 	switch ($cmd) {
+		case '':
+			break;
+		case 'MODE':
+			$mode = strtok(" \t");
+			switch (strtoupper($mode)) {
+				case 'READER': nntp_writeline(STDOUT, '200 Hello, you can post'); break;
+				case 'STREAM': nntp_writeline(STDOUT, '203 Streaming permitted'); break;
+				default: nntp_writeline(STDOUT, '501 Unknown MODE variant'); break;
+			}
+			break;
 		case 'CAPABILITIES':
 			nntp_writeline(STDOUT, '101 Capability list:');
 			nntp_writeline_data(STDOUT, 'VERSION 2');
@@ -59,6 +69,8 @@
 			nntp_writeline_data(STDOUT, 'READER');
 			nntp_writeline_data(STDOUT, 'IHAVE');
 			nntp_writeline_data(STDOUT, 'LIST ACTIVE');
+			nntp_writeline_data(STDOUT, 'MODE-READER');
+			nntp_writeline_data(STDOUT, 'STREAMING');
 			nntp_writeline(STDOUT, '.');
 			break;
 		case 'DATE':
@@ -73,7 +85,7 @@
 			nntp_writeline(STDOUT, '215 list of groups follows');
 			foreach ($db->evalAllAssoc('SELECT * FROM `groups`') as $group) {
 				$groupmessages = $db->evalRow('SELECT MIN(`number`), MAX(`number`) FROM `groupmessages` WHERE `group` = ?', $group['id']);
-				nntp_writeline_data(STDOUT, $group['name'].' '.intval($groupmessages[1]).' '.intval($groupmessages[0]).' n');
+				nntp_writeline_data(STDOUT, $group['name'].' '.intval($groupmessages[1]).' '.intval($groupmessages[0]).' y');
 			}
 			nntp_writeline(STDOUT, '.');
 			break;
@@ -179,7 +191,7 @@
 			break;
 		case 'IHAVE':
 			$messageid = strtok(" \t");
-			if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] == '>') {
+			if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') {
 				nntp_writeline(STDOUT, '435 Argument error');
 				break;
 			}
@@ -197,6 +209,38 @@
 				nntp_writeline(STDOUT, '437 '.$ex->getMessage());
 			}
 			break;
+		case 'CHECK':
+			$messageid = strtok(" \t");
+			if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') {
+				nntp_writeline(STDOUT, '435 Argument error');
+				break;
+			}
+			$messageid = substr($messageid, 1, -1);
+			if ($db->evalRow('SELECT `id` FROM `messages` WHERE `messageid` = ?', $messageid) !== FALSE) {
+				nntp_writeline(STDOUT, '438 <'.$messageid.'> Duplicate');
+				break;
+			}
+			nntp_writeline(STDOUT, '238 <'.$messageid.'> Send article to be transferred');
+			break;
+		case 'TAKETHIS':
+			$messageid = strtok(" \t");
+			$lines = nntp_readlines(STDIN);
+			if ($messageid === FALSE || strlen($messageid) <= 2 || $messageid[0] != '<' || $messageid[strlen($messageid)-1] != '>') {
+				nntp_writeline(STDOUT, '435 Argument error');
+				break;
+			}
+			$messageid = substr($messageid, 1, -1);
+			if ($db->evalRow('SELECT `id` FROM `messages` WHERE `messageid` = ?', $messageid) !== FALSE) {
+				nntp_writeline(STDOUT, '439 <'.$messageid.'> Duplicate');
+				break;
+			}
+			try {
+				$messageid = nntp_article_store($lines);
+				nntp_writeline(STDOUT, '239 <'.$messageid.'> Article stored');
+			} catch (Exception $ex) {
+				nntp_writeline(STDOUT, '437 '.$ex->getMessage());
+			}
+			break;
 		case 'QUIT':
 			nntp_writeline(STDOUT, '205 Bye.');
 			return;