#!/usr/bin/env python3

"""

q2n - QEC to NNTP sync

This script syncs QEC logs to NNTP.





again


- read from argv or a config file

- set up a cron job

- put it on tildegit (once my application issue get sorted out)

"""

from dataclasses import dataclass

import datetime as dt

import io

import logging

import os

import pickle

import pwd

import random

import subprocess as sp

import time

import typing as t

_LOGGER = logging.getLogger(__name__)

Path = str

User = str

NntpArticleBody = str

LogEntryHash = str

@dataclass

class Config:

listing_dir: str

listing_filename: str

nntp_group: str

nntp_server: str

max_submission: int

submission_store_dir: Path

@classmethod

def create(cls):

return Config(

listing_dir="/var/gopher/",

listing_filename="listing.gophermap",

nntp_server="localhost",

TODO: find more appropriate one

nntp_group="cosmic.worldbuilding",

max_submission=5,

submission_store_dir="/var/tmp/q2n",

)

@dataclass

class Ship:

name: str

owner: User

@dataclass

class LogEntry:

ship: Ship

author: User

title: str

file_name: str

class LogIterator(t.Protocol):

def __call__(self) -> t.List[LogEntry]: ...

class SubmitCondition(t.Protocol):

def __call__(self, log_entry: LogEntry) -> bool: ...

class LogSubmitter(t.Protocol):

def __call__(self, log: LogEntry) -> None: ...

@dataclass

class Utils:

config: Config

def ship_owner(self, ship_name: str) -> User:

return self._get_path_user(

f"{self.config.listing_dir}/{ship_name}"

)

def read_log_content(self, log: LogEntry) -> str:

return self._read_log_entry(

f"{self.config.listing_dir}/{log.ship.name}/{log.file_name}"

)

@staticmethod

def _read_log_entry(path: str) -> str:

with open(path, "r", encoding="utf-8") as f:

return f.read()

@staticmethod

def _get_path_user(fp: str) -> User:

st = os.stat(fp)

return pwd.getpwuid(st.st_uid).pw_name

@dataclass

class SubmittedLogsStore:

store_dir: str

def __post_init__(self):

import subprocess as sp

sp.check_call(

f"mkdir -p {self.store_dir}",

shell=True

)

def record_submission(self, log: LogEntry):

with open(f"{self.store_dir}/{self.checksum(log)}", "wb") as f:

pickle.dump(log, f)

def load_submitted_logs(self) -> t.List[LogEntryHash]:

return os.listdir(self.store_dir)

@staticmethod

def checksum(log: LogEntry) -> LogEntryHash:

import hashlib

checked_str = f"{log.ship.name}{log.file_name}"

return hashlib.md5(checked_str.encode("utf-8")).hexdigest()

Throttles log entries to submit. Just in case there's a bug.

Usually we'd limit logs to submit to a small number, and maybe also

send out some alert.

SubmissionThrottle = t.Callable[[t.List[LogEntry]], t.List[LogEntry]]

@dataclass

class ListingFileLogIterator(LogIterator):

listing_dir: str

listing_filename: str

utils: Utils

def __call__(self) -> t.List[LogEntry]:

with open(

f"{self.listing_dir}/{self.listing_filename}",

"r",

encoding="utf-8"

) as f:

entries = f.readlines()

return [self._parse(ent) for ent in entries]

def _parse(self, entry: str) -> LogEntry:

"""Parse a listing file entry into a `LogEntry`

An entry looks like this:

0betsy - About QEC /betsy/qec.txt

I.e.

0 - <TAB><file_path></p><p>Note:</p><br><ul><li><file_path> is rooted at /var/gohper, i.e., where the listing</li></ul><p>file resides.</p><p>"""</p><p>import re</p><p>res = re.match(r"^0(.+?) - (.+)\t(.+)$", entry)</p><p>if not res: raise ValueError(f"Cannot parse: {entry}")</p><h1>It's more robust to use the file path (/ship/fn.txt) to obtain ship's</h1><h1>name, rather than res.group(1). This is b/c there're duplicated</h1><h1>entries in the listing:</h1><h1>0Polonia - 24131 /Polonia-II/24131.txt</h1><h1>0Polonia-II - 24131 /Polonia-II/24131.txt</h1><p>title = res.group(2)</p><p>log_path = res.group(3)</p><p>ship, log_fn = self._parse_log_file_name(log_path)</p><p>ship_owner = self.utils.ship_owner(ship)</p><p>return LogEntry(</p><p>ship=Ship(name=ship, owner=ship_owner),</p><p>author=ship_owner,</p><p>title=title,</p><p>file_name=log_fn,</p><p>)</p><p>@staticmethod</p><p>def _parse_log_file_name(ship_and_file: str) -> t.Tuple[str, str]:</p><p>"/<ship>/file.txt -> (<ship>, file.txt)"</p><p>return t.cast(</p><p>t.Tuple[str, str],</p><p>tuple(x for x in ship_and_file.split("/") if x),</p><p>)</p><p>@dataclass</p><p>class SubmitConditionImpl(SubmitCondition):</p><p>submission_store: SubmittedLogsStore</p><p>def __call__(self, log_entry: LogEntry) -> bool:</p><p>return (</p><p>self.submission_store.checksum(log_entry)</p><p>not in self.submission_store.load_submitted_logs()</p><p>)</p><p>@dataclass</p><p>class NntpLogSubmitter(LogSubmitter):</p><p>@dataclass</p><p>class NntpLogFormat:</p><p>subject: str</p><p>body: str</p><p>from_: str</p><p>submission_store: SubmittedLogsStore</p><p>read_log_entry: t.Callable[[LogEntry], NntpArticleBody]</p><p>nntp_group: str</p><p>nntp_server: str</p><p>dry_run: bool = False</p><p>def __call__(self, log: LogEntry) -> None:</p><p>self.nntp_submit(log)</p><p>self.submission_store.record_submission(log)</p><p>def add_envelope(self, article: str, log: LogEntry) -> str:</p><p>return f"""\</p><p>TIMESTAMP: {int(time.time())} SGT</p><p>AUTHOR: {log.author}</p><p>ORIGINATING SHIP: {log.ship.name}</p><p>QEC GATEWAY: QG-{random.randint(0, 31)}</p><p>{article}</p><p>"""</p><p>def nntp_submit(self, log: LogEntry) -> None:</p><p>import nntplib as nn</p><p>s = nn.NNTP(self.nntp_server, readermode=True)</p><p>article_body = self.read_log_entry(log)</p><p>article_body = self.add_envelope(article_body, log)</p><p>msg = f"""\</p><p>Newsgroups: {self.nntp_group}</p><p>Subject: [QEC] {log.title}</p><p>From: {log.author} "{log.author}@cosmic.voyage"</p><p>{article_body}</p><p>"""</p><p>f = io.BytesIO(msg.encode("utf-8"))</p><p>f.seek(0)</p><p>_LOGGER.info(f"About to submit log:\n{msg}")</p><p>if not self.dry_run:</p><p>s.post(f)</p><p>@dataclass</p><p>class SubmissionThrottler:</p><p>max_submission: int</p><p>def __call__(self, logs: t.List[LogEntry]) -> t.List[LogEntry]:</p><p>return logs[0:self.max_submission]</p><p>def main():</p><p>logging.basicConfig()</p><p>logging.root.setLevel(logging.INFO)</p><p>config = Config.create()</p><p>_LOGGER.info(f"Running with config: {config}")</p><p>utils = Utils(config=config)</p><p>iterate_logs = ListingFileLogIterator(</p><p>listing_dir=config.listing_dir,</p><p>listing_filename=config.listing_filename,</p><p>utils=utils,</p><p>)</p><p>throttler = SubmissionThrottler(config.max_submission)</p><p>submission_store = SubmittedLogsStore(store_dir=config.submission_store_dir)</p><p>should_submit = SubmitConditionImpl(submission_store=submission_store)</p><p>submit_log = NntpLogSubmitter(</p><p>submission_store=submission_store,</p><p>read_log_entry=utils.read_log_content,</p><p>nntp_group=config.nntp_group,</p><p>nntp_server=config.nntp_server,</p><p>dry_run=True, # TODO remove</p><p>)</p><p>logs_to_submit = [log for log in iterate_logs() if should_submit(log)]</p><h3># FOR TEST: remove - randomly choose one log</h3><h3>logs_to_submit = logs_to_submit[random.randint(0, len(logs_to_submit)-2):][0:]</h3><p>logs_to_submit = throttler(logs_to_submit)</p><p>_LOGGER.info(f"Submitting {len(logs_to_submit)} logs...")</p><p>for log in logs_to_submit: submit_log(log)</p><p>if __name__ == "__main__":</p><p>main()</p> <hr> <a href="https://codeberg.org/soap/gemini-bridge">Source</a> </main> </body> </html>