@ -2,19 +2,21 @@ use std::collections::HashSet;
use std ::sync ::Arc ;
use std ::time ::{ Duration , Instant } ;
use elefren ::{ FediClient , StatusBuilder } ;
use elefren ::debug ::EventDisplay ;
use elefren ::debug ::NotificationDisplay ;
use elefren ::debug ::StatusDisplay ;
use elefren ::entities ::event ::Event ;
use elefren ::entities ::notification ::{ Notification , NotificationType } ;
use elefren ::entities ::status ::Status ;
use elefren ::status_builder ::Visibility ;
use elefren ::{ FediClient , StatusBuilder } ;
use futures ::StreamExt ;
use crate ::command ::StatusCommand ;
use crate ::error ::GroupError ;
use crate ::store ::data ::GroupConfig ;
use crate ::store ::ConfigStore ;
use crate ::utils ::{ normalize_acct , LogError } ;
use crate ::store ::data ::GroupConfig ;
use crate ::utils ::{ LogError , normalize_acct } ;
/// This is one group's config store capable of persistence
#[ derive(Debug) ]
@ -27,6 +29,9 @@ pub struct GroupHandle {
const DELAY_BEFORE_ACTION : Duration = Duration ::from_millis ( 250 ) ;
const DELAY_REOPEN_STREAM : Duration = Duration ::from_millis ( 1000 ) ;
const MAX_CATCHUP_NOTIFS : usize = 25 ;
// also statuses
const MAX_CATCHUP_STATUSES : usize = 100 ;
// higher because we can expect a lot of non-hashtag statuses here
const PERIODIC_SAVE : Duration = Duration ::from_secs ( 60 ) ;
const PING_INTERVAL : Duration = Duration ::from_secs ( 15 ) ; // must be < periodic save!
@ -45,6 +50,7 @@ impl GroupHandle {
Ok ( ( ) )
}
/*
pub async fn reload ( & mut self ) -> Result < ( ) , GroupError > {
if let Some ( g ) = self . store . get_group_config ( self . config . get_acct ( ) ) . await {
self . config = g ;
@ -53,6 +59,7 @@ impl GroupHandle {
Err ( GroupError ::GroupNotExist )
}
}
* /
}
trait NotifTimestamp {
@ -65,6 +72,14 @@ impl NotifTimestamp for Notification {
}
}
impl NotifTimestamp for Status {
fn timestamp_millis ( & self ) -> u64 {
// this may not work well for unseen status tracking,
// if ancient statuses were to appear in the timeline :(
self . created_at . timestamp_millis ( ) . max ( 0 ) as u64
}
}
impl GroupHandle {
pub async fn run ( & mut self ) -> Result < ( ) , GroupError > {
assert! ( PERIODIC_SAVE > = PING_INTERVAL ) ;
@ -89,6 +104,20 @@ impl GroupHandle {
}
}
match self . catch_up_with_missed_statuses ( ) . await {
Ok ( true ) = > {
debug ! ( "Some missed statuses handled" ) ;
// Save asap!
next_save = Instant ::now ( ) - PERIODIC_SAVE
}
Ok ( false ) = > {
debug ! ( "No statuses missed" ) ;
}
Err ( e ) = > {
error ! ( "Failed to handle missed statuses: {}" , e ) ;
}
}
loop {
if next_save < Instant ::now ( ) {
self . save_if_needed ( ) . await . log_error ( "Failed to save group" ) ;
@ -104,7 +133,9 @@ impl GroupHandle {
Ok ( Some ( event ) ) = > {
debug ! ( "(@{}) Event: {}" , self . config . get_acct ( ) , EventDisplay ( & event ) ) ;
match event {
Event ::Update ( _status ) = > { }
Event ::Update ( status ) = > {
self . handle_status ( status ) . await . log_error ( "Error handling a status" ) ;
}
Event ::Notification ( n ) = > {
self . handle_notification ( n ) . await . log_error ( "Error handling a notification" ) ;
}
@ -156,7 +187,7 @@ impl GroupHandle {
return Ok ( ( ) ) ;
}
let commands = crate ::command ::parse_statu s ( & status . content ) ;
let commands = crate ::command ::parse_slash_command s ( & status . content ) ;
let mut replies = vec! [ ] ;
let mut announcements = vec! [ ] ;
@ -205,6 +236,9 @@ impl GroupHandle {
any_admin_cmd = true ;
replies . push ( format! ( "User {} banned from group!" , u ) ) ;
self . unfollow_user ( & u ) . await
. log_error ( "Failed to unfollow" ) ;
// no announcement here
}
Err ( e ) = > {
@ -280,11 +314,7 @@ impl GroupHandle {
Ok ( _ ) = > {
any_admin_cmd = true ;
replies . push ( format! ( "User {} added to the group!" , u ) ) ;
if self . config . is_member_only ( ) {
announcements
. push ( format! ( "Welcome new member @{} to the group!" , u ) ) ;
}
self . follow_user ( & u ) . await . log_error ( "Failed to follow" ) ;
}
Err ( e ) = > {
replies . push ( format! ( "Failed to add user {} to group: {}" , u , e ) ) ;
@ -303,6 +333,9 @@ impl GroupHandle {
Ok ( _ ) = > {
any_admin_cmd = true ;
replies . push ( format! ( "User {} removed from the group." , u ) ) ;
self . unfollow_user ( & u ) . await
. log_error ( "Failed to unfollow" ) ;
}
Err ( _ ) = > {
unreachable! ( )
@ -313,12 +346,33 @@ impl GroupHandle {
replies . push ( "Only admins can manage members" . to_string ( ) ) ;
}
}
StatusCommand ::AddTag ( tag ) = > {
if is_admin {
any_admin_cmd = true ;
self . config . add_tag ( & tag ) ;
replies . push ( format! ( "Tag #{} added to the group!" , tag ) ) ;
} else {
replies . push ( "Only admins can manage group tags" . to_string ( ) ) ;
}
}
StatusCommand ::RemoveTag ( tag ) = > {
if is_admin {
any_admin_cmd = true ;
self . config . remove_tag ( & tag ) ;
replies . push ( format! ( "Tag #{} removed from the group!" , tag ) ) ;
} else {
replies . push ( "Only admins can manage group tags" . to_string ( ) ) ;
}
}
StatusCommand ::GrantAdmin ( u ) = > {
let u = normalize_acct ( & u , & group_acct ) ? ;
if is_admin {
if ! self . config . is_admin ( & u ) {
match self . config . set_admin ( & u , true ) {
Ok ( _ ) = > {
// try to make the config a little more sane, admins should be members
let _ = self . config . set_member ( & u , true ) ;
any_admin_cmd = true ;
replies . push ( format! ( "User {} is now a group admin!" , u ) ) ;
announcements
@ -409,12 +463,13 @@ impl GroupHandle {
* * Supported commands :* * \ n \
` / boost , / b ` - boost the replied - to post into the group \ n \
` / ignore , / i ` - make the group completely ignore the post \ n \
` / ping ` - check that the service is alive " . to_string ( ) ,
` / ping ` - check that the service is alive \ n \
` / join ` - join the group \ n \
` / leave ` - leave the group " . to_string ( ) ,
) ;
if self . config . is_member_only ( ) {
replies . push ( "`/members, /who` - show group members / admins" . to_string ( ) ) ;
replies . push ( "`/leave` - leave the group" . to_string ( ) ) ;
} else {
replies . push ( "`/members, /who` - show group admins" . to_string ( ) ) ;
}
@ -441,18 +496,7 @@ impl GroupHandle {
if is_admin {
if self . config . is_member_only ( ) {
replies . push ( "Group members:" . to_string ( ) ) ;
let admins = self . config . get_admins ( ) . collect ::< HashSet < _ > > ( ) ;
let mut members = self . config . get_members ( ) . collect ::< Vec < _ > > ( ) ;
members . extend ( admins . iter ( ) ) ;
members . sort ( ) ;
members . dedup ( ) ;
for m in members {
if admins . contains ( & m ) {
replies . push ( format! ( "{} [admin]" , m ) ) ;
} else {
replies . push ( m . to_string ( ) ) ;
}
}
self . list_members ( & mut replies ) ;
} else {
show_admins = true ;
}
@ -462,18 +506,55 @@ impl GroupHandle {
if show_admins {
replies . push ( "Group admins:" . to_string ( ) ) ;
let mut admins = self . config . get_admins ( ) . collect ::< Vec < _ > > ( ) ;
admins . sort ( ) ;
for a in admins {
replies . push ( a . to_string ( ) ) ;
}
self . list_admins ( & mut replies ) ;
}
}
StatusCommand ::ListTags = > {
replies . push ( "Group tags:" . to_string ( ) ) ;
let mut tags = self . config . get_tags ( ) . collect ::< Vec < _ > > ( ) ;
tags . sort ( ) ;
for t in tags {
replies . push ( format! ( "#{}" , t ) ) ;
}
}
StatusCommand ::Leave = > {
if self . config . is_member ( & notif_acct ) {
if self . config . is_member_or_admin ( & notif_acct ) {
// admin can leave but that's a bad idea
any_admin_cmd = true ;
let _ = self . config . set_member ( & notif_acct , false ) ;
replies . push ( "You left the group." . to_string ( ) ) ;
replies . push ( "You're no longer a group member. Unfollow the group user to stop receiving group messages." . to_string ( ) ) ;
self . unfollow_user ( & notif_acct ) . await
. log_error ( "Failed to unfollow" ) ;
}
}
StatusCommand ::Join = > {
if self . config . is_member_or_admin ( & notif_acct ) {
// Already a member, so let's try to follow the user
// again, maybe first time it failed
self . follow_user ( & notif_acct ) . await
. log_error ( "Failed to follow" ) ;
} else {
// Not a member yet
if self . config . is_member_only ( ) {
// No you can't
replies . push ( format! (
" Hi , this group is closed to new sign - ups . \ n \
Please ask one of the group admins to add you :" ) ) ;
self . list_admins ( & mut replies ) ;
} else {
// Open access
self . follow_user ( & notif_acct ) . await
. log_error ( "Failed to follow" ) ;
// This only fails if the user is banned, but that is filtered above
let _ = self . config . set_member ( & notif_acct , true ) ;
replies . push ( format! ( " \
Thanks for joining , you are now a member and the group user will \
follow you so you can use group hashtags . Make sure you follow the \
group user to receive group messages . " ) ) ;
}
}
}
StatusCommand ::Ping = > {
@ -524,43 +605,106 @@ impl GroupHandle {
}
NotificationType ::Follow = > {
info ! ( "New follower!" ) ;
tokio ::time ::sleep ( Duration ::from_millis ( 500 ) ) . await ;
let text = if self . config . is_member_only ( ) {
// Admins are listed without @, so they won't become handles here.
// Tagging all admins would be annoying.
let mut admins = self . config . get_admins ( ) . cloned ( ) . collect ::< Vec < _ > > ( ) ;
admins . sort ( ) ;
format! (
" @ { user } welcome to the group ! This is a member - only group , you won ' t be \
able to post . Ask the group admins if you wish to join ! \ n \ n \
Admins : { admins } " ,
user = notif_acct ,
admins = admins . join ( ", " )
)
if self . config . is_member_or_admin ( & notif_acct ) {
// Already joined, just doing something silly, ignore this
debug ! ( "User already a member, ignoring" ) ;
} else {
format! (
" @ { user } welcome to the group ! \
To share a post , tag the group user . Use / help for more info . " ,
user = notif_acct
)
} ;
let post = StatusBuilder ::new ( )
. status ( text )
. content_type ( "text/markdown" )
. visibility ( Visibility ::Direct )
. build ( )
. expect ( "error build status" ) ;
let _ = self . client . new_status ( post ) . await . log_error ( "Failed to post" ) ;
let text = if self . config . is_member_only ( ) {
// Admins are listed without @, so they won't become handles here.
// Tagging all admins would be annoying.
let mut admins = self . config . get_admins ( ) . cloned ( ) . collect ::< Vec < _ > > ( ) ;
admins . sort ( ) ;
format! (
" @ { user } Hi , this is a member - only group , you won ' t be \
able to post . You can still receive group posts though . If you ' d like to join , \
please ask one of the group admins to add you :\ n \ n \
{ admins } " ,
user = notif_acct ,
admins = admins . join ( ", " )
)
} else {
self . follow_user ( & notif_acct ) . await
. log_error ( "Failed to follow" ) ;
make_welcome_text ( & notif_acct )
} ;
let post = StatusBuilder ::new ( )
. status ( text )
. content_type ( "text/markdown" )
. visibility ( Visibility ::Direct )
. build ( )
. expect ( "error build status" ) ;
tokio ::time ::sleep ( Duration ::from_millis ( 500 ) ) . await ;
let _ = self . client . new_status ( post ) . await . log_error ( "Failed to post" ) ;
}
}
_ = > { }
NotificationType ::Favourite = > { }
NotificationType ::Reblog = > { }
}
Ok ( ( ) )
}
/// Handle a non-mention status
async fn handle_status ( & mut self , s : Status ) -> Result < ( ) , GroupError > {
debug ! ( "Handling status #{}" , s . id ) ;
let ts = s . timestamp_millis ( ) ;
self . config . set_last_status ( ts ) ;
if ! s . content . contains ( '#' ) {
debug ! ( "No tags in status" ) ;
return Ok ( ( ) ) ;
}
if s . visibility . is_private ( ) {
debug ! ( "Status is direct/private, not boosting" ) ;
return Ok ( ( ) ) ;
}
if s . content . contains ( "/add " ) | | s . content . contains ( "/remove " ) {
debug ! ( "Discard, looks like a hashtag manipulation command" ) ;
return Ok ( ( ) ) ;
}
let gu = self . config . get_acct ( ) ;
let su = normalize_acct ( & s . account . acct , gu ) ? ;
if self . config . is_banned ( & su ) {
debug ! ( "Status author @{} is banned." , su ) ;
return Ok ( ( ) ) ;
}
if ! self . config . is_member_or_admin ( & su ) {
debug ! ( "Status author @{} is not a member." , su ) ;
return Ok ( ( ) ) ;
}
let tags = crate ::command ::parse_status_tags ( & s . content ) ;
debug ! ( "Tags in status: {:?}" , tags ) ;
for t in tags {
if self . config . is_tag_followed ( & t ) {
self . client . reblog ( & s . id ) . await
. log_error ( "Failed to reblog" ) ;
break ;
}
}
Ok ( ( ) )
}
async fn follow_user ( & mut self , acct : & str ) -> Result < ( ) , GroupError > {
self . client . follow ( acct ) . await ? ;
Ok ( ( ) )
}
async fn unfollow_user ( & mut self , acct : & str ) -> Result < ( ) , GroupError > {
self . client . unfollow ( acct ) . await ? ;
Ok ( ( ) )
}
/// Catch up with missed notifications, returns true if any were handled
async fn catch_up_with_missed_notifications ( & mut self ) -> Result < bool , GroupError > {
let last_notif = self . config . get_last_notif ( ) ;
@ -601,4 +745,92 @@ impl GroupHandle {
Ok ( true )
}
/// Catch up with missed statuses, returns true if any were handled
async fn catch_up_with_missed_statuses ( & mut self ) -> Result < bool , GroupError > {
let last_status = self . config . get_last_status ( ) ;
let notifications = self . client . get_home_timeline ( ) . await ? ;
let mut iter = notifications . items_iter ( ) ;
let mut statuses_to_handle = vec! [ ] ;
// They are retrieved newest first, but we want oldest first for chronological handling
let mut num = 0 ;
while let Some ( s ) = iter . next_item ( ) . await {
let ts = s . timestamp_millis ( ) ;
if ts < = last_status {
break ; // reached our last seen status (hopefully there arent any retro-bumped)
}
if s . content . contains ( '#' ) & & ! s . visibility . is_private ( ) {
statuses_to_handle . push ( s ) ;
}
num + = 1 ;
if num > MAX_CATCHUP_STATUSES {
warn ! ( "Too many statuses missed to catch up!" ) ;
break ;
}
}
if statuses_to_handle . is_empty ( ) {
return Ok ( false ) ;
}
statuses_to_handle . reverse ( ) ;
debug ! ( "{} statuses to catch up!" , statuses_to_handle . len ( ) ) ;
for s in statuses_to_handle {
debug ! ( "Handling missed status: {}" , StatusDisplay ( & s ) ) ;
self . handle_status ( s ) . await
. log_error ( "Error handling a status" ) ;
}
Ok ( true )
}
fn list_admins ( & self , replies : & mut Vec < String > ) {
let mut admins = self . config . get_admins ( ) . collect ::< Vec < _ > > ( ) ;
admins . sort ( ) ;
for a in admins {
replies . push ( a . to_string ( ) ) ;
}
}
fn list_members ( & self , replies : & mut Vec < String > ) {
let admins = self . config . get_admins ( ) . collect ::< HashSet < _ > > ( ) ;
let mut members = self . config . get_members ( ) . collect ::< Vec < _ > > ( ) ;
members . extend ( admins . iter ( ) ) ;
members . sort ( ) ;
members . dedup ( ) ;
for m in members {
if admins . contains ( & m ) {
replies . push ( format! ( "{} [admin]" , m ) ) ;
} else {
replies . push ( m . to_string ( ) ) ;
}
}
}
}
fn make_welcome_text ( user : & str ) -> String {
format! (
" @ { user } Welcome to the group ! To share a post , tag the group user \
or use one of the group hashtags . Use / help for more info . " ,
user = user
)
}
trait VisExt : Copy {
/// Check if is private or direct
fn is_private ( self ) -> bool ;
}
impl VisExt for Visibility {
fn is_private ( self ) -> bool {
self = = Visibility ::Direct | | self = = Visibility ::Private
}
}