diff --git a/telegram/client.go b/telegram/client.go index 6f6d719..6033c7c 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -64,6 +64,8 @@ type Client struct { lastMsgHashes map[int64]uint64 msgHashSeed maphash.Seed + mucResources map[int64]map[string]bool + locks clientLocks SendMessageLock sync.Mutex } @@ -73,6 +75,7 @@ type clientLocks struct { chatMessageLocks map[int64]*sync.Mutex resourcesLock sync.Mutex outboxLock sync.Mutex + mucResourcesLock sync.Mutex lastMsgHashesLock sync.Mutex authorizerReadLock sync.Mutex @@ -133,6 +136,7 @@ func NewClient(conf config.TelegramConfig, jid string, component *xmpp.Component Session: session, resources: make(map[string]bool), outbox: make(map[string]string), + mucResources: make(map[int64]map[string]bool), content: &conf.Content, cache: cache.NewCache(), options: options, diff --git a/telegram/commands.go b/telegram/commands.go index 87fff72..d365d4e 100644 --- a/telegram/commands.go +++ b/telegram/commands.go @@ -193,23 +193,6 @@ func (c *Client) unsubscribe(chatID int64) error { ) } -func (c *Client) sendMessagesReverse(chatID int64, messages []*client.Message) { - for i := len(messages) - 1; i >= 0; i-- { - message := messages[i] - reply, _ := c.getMessageReply(message) - - gateway.SendMessage( - c.jid, - strconv.FormatInt(chatID, 10), - c.formatMessage(0, 0, false, message), - strconv.FormatInt(message.Id, 10), - c.xmpp, - reply, - false, - ) - } -} - func (c *Client) usernameOrIDToID(username string) (int64, error) { userID, err := strconv.ParseInt(username, 10, 64) // couldn't parse the id, try to lookup as a username @@ -1005,7 +988,7 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool) return err.Error(), true } - c.sendMessagesReverse(chatID, messages.Messages) + c.sendMessagesReverse(chatID, messages.Messages, true, "") // get latest entries from history case "history": var limit int32 = 10 @@ -1016,32 +999,11 @@ func (c *Client) ProcessChatCommand(chatID int64, cmdline string) (string, bool) } } - var newMessages *client.Messages - var messages []*client.Message - var err error - var fromId int64 - for _ = range make([]struct{}, limit) { // safety limit - if len(messages) > 0 { - fromId = messages[len(messages)-1].Id - } - - newMessages, err = c.client.GetChatHistory(&client.GetChatHistoryRequest{ - ChatId: chatID, - FromMessageId: fromId, - Limit: limit, - }) - if err != nil { - return err.Error(), true - } - - messages = append(messages, newMessages.Messages...) - - if len(newMessages.Messages) == 0 || len(messages) >= int(limit) { - break - } + messages, err := c.getNLastMessages(chatID, limit) + if err != nil { + return err.Error(), true } - - c.sendMessagesReverse(chatID, messages) + c.sendMessagesReverse(chatID, messages, true, "") // chat members case "members": var query string diff --git a/telegram/handlers.go b/telegram/handlers.go index 71b55bc..b7277e8 100644 --- a/telegram/handlers.go +++ b/telegram/handlers.go @@ -265,7 +265,7 @@ func (c *Client) updateMessageContent(update *client.UpdateMessageContent) { markupFunction, )) for _, jid := range jids { - gateway.SendMessage(jid, strconv.FormatInt(update.ChatId, 10), text, "e"+strconv.FormatInt(update.MessageId, 10), c.xmpp, nil, false) + gateway.SendMessage(jid, strconv.FormatInt(update.ChatId, 10), text, "e"+strconv.FormatInt(update.MessageId, 10), c.xmpp, nil, 0, false, false) } } } diff --git a/telegram/utils.go b/telegram/utils.go index 9ecf7af..036956b 100644 --- a/telegram/utils.go +++ b/telegram/utils.go @@ -44,6 +44,12 @@ var replyRegex = regexp.MustCompile("\\A>>? ?([0-9]+)\\n") const newlineChar string = "\n" const messageHeaderSeparator string = " | " +const ( + ChatTypeOther byte = iota + ChatTypePM + ChatTypeGroup +) + // GetContactByUsername resolves username to user id retrieves user and chat information func (c *Client) GetContactByUsername(username string) (*client.Chat, *client.User, error) { if !c.Online() { @@ -121,10 +127,10 @@ func (c *Client) GetContactByID(id int64, chat *client.Chat) (*client.Chat, *cli return chat, user, nil } -// IsPM checks if a chat is PM -func (c *Client) IsPM(id int64) (bool, error) { +// GetChatType checks if a chat is PM or group +func (c *Client) GetChatType(id int64) (byte, error) { if !c.Online() || id == 0 { - return false, errOffline + return ChatTypeOther, errOffline } var err error @@ -135,7 +141,7 @@ func (c *Client) IsPM(id int64) (bool, error) { ChatId: id, }) if err != nil { - return false, err + return ChatTypeOther, err } c.cache.SetChat(id, chat) @@ -143,9 +149,12 @@ func (c *Client) IsPM(id int64) (bool, error) { chatType := chat.Type.ChatTypeType() if chatType == client.TypeChatTypePrivate || chatType == client.TypeChatTypeSecret { - return true, nil + return ChatTypePM, nil } - return false, nil + if c.IsGroup(chat) { + return ChatTypeGroup, nil + } + return ChatTypeOther, nil } func (c *Client) userStatusToText(status client.UserStatus, chatID int64) (string, string, string) { @@ -294,25 +303,52 @@ func (c *Client) ProcessStatusUpdate(chatID int64, status string, show string, o ) } -func (c *Client) SendMUCStatuses(chatID int64) { +// JoinMUC saves MUC join fact and sends initialization data +func (c *Client) JoinMUC(chatId int64, resource string) { + // save the nickname in this MUC, also as a marker of join + c.locks.mucResourcesLock.Lock() + oldMap, ok := c.mucResources[chatId] + if ok { + _, ok := oldMap[resource] + if ok { + // already joined, initializing anyway + } else { + oldMap[resource] = true + } + } else { + newMap := make(map[string]bool) + newMap[resource] = true + c.mucResources[chatId] = newMap + } + c.locks.mucResourcesLock.Unlock() + + c.sendMUCStatuses(chatId) + + messages, err := c.getNLastMessages(chatId, 20) + if err == nil { + c.sendMessagesReverse(chatId, messages, false, c.jid+"/"+resource) + } +} + +func (c *Client) sendMUCStatuses(chatID int64) { + sChatId := strconv.FormatInt(chatID, 10) + myNickname := "me" + if c.me != nil { + myNickname := c.me.FirstName + if c.me.LastName != "" { + myNickname = myNickname + " " + c.me.LastName + } + } + myAffiliation := "member" + members, err := c.client.SearchChatMembers(&client.SearchChatMembersRequest{ ChatId: chatID, Limit: 200, Filter: &client.ChatMembersFilterMembers{}, }) if err == nil { - chatIDString := strconv.FormatInt(chatID, 10) gatewayJidSuffix := "@" + gateway.Jid.Full() - myNickname := "me" - if c.me != nil { - myNickname := c.me.FirstName - if c.me.LastName != "" { - myNickname = myNickname + " " + c.me.LastName - } - } - myAffiliation := "member" - for _, member := range members.Members { var senderId int64 switch member.MemberId.MessageSenderType() { @@ -324,7 +360,7 @@ func (c *Client) SendMUCStatuses(chatID int64) { senderId = memberChat.ChatId } - nickname := c.formatContact(senderId) + nickname := c.getMUCNickname(senderId) affiliation := c.memberStatusToAffiliation(member.Status) if c.me != nil && senderId == c.me.Id { myNickname = nickname @@ -335,25 +371,29 @@ func (c *Client) SendMUCStatuses(chatID int64) { gateway.SendPresence( c.xmpp, c.jid, - gateway.SPFrom(chatIDString), + gateway.SPFrom(sChatId), gateway.SPResource(nickname), gateway.SPImmed(true), gateway.SPMUCAffiliation(affiliation), gateway.SPMUCJid(strconv.FormatInt(senderId, 10) + gatewayJidSuffix), ) } - - // according to the spec, own member entry should be sent the last - gateway.SendPresence( - c.xmpp, - c.jid, - gateway.SPFrom(chatIDString), - gateway.SPResource(myNickname), - gateway.SPImmed(true), - gateway.SPMUCAffiliation(myAffiliation), - gateway.SPMUCStatusCodes([]uint16{100, 110, 210}), - ) } + + // according to the spec, own member entry should be sent the last + gateway.SendPresence( + c.xmpp, + c.jid, + gateway.SPFrom(sChatId), + gateway.SPResource(myNickname), + gateway.SPImmed(true), + gateway.SPMUCAffiliation(myAffiliation), + gateway.SPMUCStatusCodes([]uint16{100, 110, 210}), + ) +} + +func (c *Client) getMUCNickname(chatID int64) string { + return c.formatContact(chatID) } func (c *Client) formatContact(chatID int64) string { @@ -917,13 +957,13 @@ func (c *Client) countCharsInLines(lines *[]string) (count int) { } func (c *Client) messageToPrefix(message *client.Message, previewString string, fileString string, replyMsg *client.Message) (string, int, int) { - isPM, err := c.IsPM(message.ChatId) + chatType, err := c.GetChatType(message.ChatId) if err != nil { - log.Errorf("Could not determine if chat is PM: %v", err) + log.Errorf("Could not determine chat type: %v", err) } isCarbonsEnabled := gateway.MessageOutgoingPermissionVersion > 0 && c.Session.Carbons // with carbons, hide for all messages in PM and only for outgoing in group chats - hideSender := isCarbonsEnabled && (message.IsOutgoing || isPM) + hideSender := (isCarbonsEnabled && (message.IsOutgoing || chatType == ChatTypePM)) || (c.Session.MUC && chatType == ChatTypeGroup) var replyStart, replyEnd int prefix := []string{} @@ -944,7 +984,7 @@ func (c *Client) messageToPrefix(message *client.Message, previewString string, } } } - if !isPM || !c.Session.HideIds { + if chatType != ChatTypePM || !c.Session.HideIds { prefix = append(prefix, directionChar+strconv.FormatInt(message.Id, 10)) } // show sender in group chats @@ -999,8 +1039,20 @@ func (c *Client) ensureDownloadFile(file *client.File) *client.File { // ProcessIncomingMessage transfers a message to XMPP side and marks it as read on Telegram side func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) { - isCarbon := gateway.MessageOutgoingPermissionVersion > 0 && c.Session.Carbons && message.IsOutgoing - jids := c.getCarbonFullJids(isCarbon, "") + c.sendMessageToGateway(chatId, message, false, "", []string{}) +} + +func (c *Client) sendMessageToGateway(chatId int64, message *client.Message, delay bool, groupChatFrom string, groupChatTos []string) { + var isCarbon bool + var jids []string + var isGroupchat bool + if len(groupChatTos) == 0 { + isCarbon = gateway.MessageOutgoingPermissionVersion > 0 && c.Session.Carbons && message.IsOutgoing + jids = c.getCarbonFullJids(isCarbon, "") + } else { + isGroupchat = true + jids = groupChatTos + } var text, oob, auxText string @@ -1073,12 +1125,22 @@ func (c *Client) ProcessIncomingMessage(chatId int64, message *client.Message) { // forward message to XMPP sId := strconv.FormatInt(message.Id, 10) - sChatId := strconv.FormatInt(chatId, 10) + var from string + if groupChatFrom == "" { + from = strconv.FormatInt(chatId, 10) + } else { + from = groupChatFrom + } + + var timestamp int64 + if delay { + timestamp = int64(message.Date) + } for _, jid := range jids { - gateway.SendMessageWithOOB(jid, sChatId, text, sId, c.xmpp, reply, oob, isCarbon) + gateway.SendMessageWithOOB(jid, from, text, sId, c.xmpp, reply, timestamp, oob, isCarbon, isGroupchat) if auxText != "" { - gateway.SendMessage(jid, sChatId, auxText, sId, c.xmpp, reply, isCarbon) + gateway.SendMessage(jid, from, auxText, sId, c.xmpp, reply, timestamp, isCarbon, isGroupchat) } } } @@ -1294,6 +1356,36 @@ func (c *Client) getLastMessages(id int64, query string, from int64, count int32 }) } +func (c *Client) getNLastMessages(chatID int64, limit int32) ([]*client.Message, error) { + var newMessages *client.Messages + var messages []*client.Message + var err error + var fromId int64 + + for _ = range make([]struct{}, limit) { // safety limit + if len(messages) > 0 { + fromId = messages[len(messages)-1].Id + } + + newMessages, err = c.client.GetChatHistory(&client.GetChatHistoryRequest{ + ChatId: chatID, + FromMessageId: fromId, + Limit: limit, + }) + if err != nil { + return nil, err + } + + messages = append(messages, newMessages.Messages...) + + if len(newMessages.Messages) == 0 || len(messages) >= int(limit) { + break + } + } + + return messages, nil +} + // DownloadFile actually obtains a file by id given by TDlib func (c *Client) DownloadFile(id int32, priority int32, synchronous bool) (*client.File, error) { return c.client.DownloadFile(&client.DownloadFileRequest{ @@ -1652,3 +1744,39 @@ func (c *Client) memberStatusToAffiliation(memberStatus client.ChatMemberStatus) } return "member" } + +func (c *Client) sendMessagesReverse(chatID int64, messages []*client.Message, plain bool, toJid string) { + sChatId := strconv.FormatInt(chatID, 10) + var mucJid string + if toJid != "" { + mucJid = sChatId + "@" + gateway.Jid.Bare() + } + + for i := len(messages) - 1; i >= 0; i-- { + message := messages[i] + + if plain { + reply, _ := c.getMessageReply(message) + + gateway.SendMessage( + c.jid, + sChatId, + c.formatMessage(0, 0, false, message), + strconv.FormatInt(message.Id, 10), + c.xmpp, + reply, + 0, + false, + false, + ) + } else { + c.sendMessageToGateway( + chatID, + message, + true, + mucJid + "/" + c.getMUCNickname(c.getSenderId(message)), + []string{toJid}, + ) + } + } +} diff --git a/xmpp/extensions/extensions.go b/xmpp/extensions/extensions.go index 3a2f998..679b428 100644 --- a/xmpp/extensions/extensions.go +++ b/xmpp/extensions/extensions.go @@ -234,6 +234,20 @@ type PresenceXMucUserStatus struct { Code uint16 `xml:"code,attr"` } +// MessageDelay is from XEP-0203 +type MessageDelay struct { + XMLName xml.Name `xml:"urn:xmpp:delay delay"` + From string `xml:"from,attr"` + Stamp string `xml:"stamp,attr"` +} + +// MessageDelayLegacy is from XEP-0203 +type MessageDelayLegacy struct { + XMLName xml.Name `xml:"jabber:x:delay x"` + From string `xml:"from,attr"` + Stamp string `xml:"stamp,attr"` +} + // Namespace is a namespace! func (c PresenceNickExtension) Namespace() string { return c.XMLName.Space @@ -304,6 +318,16 @@ func (c PresenceXMucUserExtension) Namespace() string { return c.XMLName.Space } +// Namespace is a namespace! +func (c MessageDelay) Namespace() string { + return c.XMLName.Space +} + +// Namespace is a namespace! +func (c MessageDelayLegacy) Namespace() string { + return c.XMLName.Space +} + // Name is a packet name func (ClientMessage) Name() string { return "message" @@ -394,4 +418,16 @@ func init() { "http://jabber.org/protocol/muc#user", "x", }, PresenceXMucUserExtension{}) + + // message delay + stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{ + "urn:xmpp:delay", + "delay", + }, MessageDelay{}) + + // legacy message delay + stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{ + "jabber:x:delay", + "x", + }, MessageDelayLegacy{}) } diff --git a/xmpp/gateway/gateway.go b/xmpp/gateway/gateway.go index d6b7826..074d2d0 100644 --- a/xmpp/gateway/gateway.go +++ b/xmpp/gateway/gateway.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" "strings" "sync" + "time" "dev.narayana.im/narayana/telegabber/badger" "dev.narayana.im/narayana/telegabber/xmpp/extensions" @@ -42,26 +43,26 @@ var DirtySessions = false var MessageOutgoingPermissionVersion = 0 // SendMessage creates and sends a message stanza -func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, isCarbon bool) { - sendMessageWrapper(to, from, body, id, component, reply, "", isCarbon) +func SendMessage(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, timestamp int64, isCarbon, isGroupchat bool) { + sendMessageWrapper(to, from, body, id, component, reply, timestamp, "", isCarbon, isGroupchat) } // SendServiceMessage creates and sends a simple message stanza from transport func SendServiceMessage(to string, body string, component *xmpp.Component) { - sendMessageWrapper(to, "", body, "", component, nil, "", false) + sendMessageWrapper(to, "", body, "", component, nil, 0, "", false, false) } // SendTextMessage creates and sends a simple message stanza func SendTextMessage(to string, from string, body string, component *xmpp.Component) { - sendMessageWrapper(to, from, body, "", component, nil, "", false) + sendMessageWrapper(to, from, body, "", component, nil, 0, "", false, false) } // SendMessageWithOOB creates and sends a message stanza with OOB URL -func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { - sendMessageWrapper(to, from, body, id, component, reply, oob, isCarbon) +func SendMessageWithOOB(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, timestamp int64, oob string, isCarbon bool, isGroupchat bool) { + sendMessageWrapper(to, from, body, id, component, reply, timestamp, oob, isCarbon, isGroupchat) } -func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, oob string, isCarbon bool) { +func sendMessageWrapper(to string, from string, body string, id string, component *xmpp.Component, reply *Reply, timestamp int64, oob string, isCarbon bool, isGroupchat bool) { toJid, err := stanza.NewJid(to) if err != nil { log.WithFields(log.Fields{ @@ -76,12 +77,17 @@ func sendMessageWrapper(to string, from string, body string, id string, componen var logFrom string var messageFrom string var messageTo string - if from == "" { - logFrom = componentJid - messageFrom = componentJid - } else { + if isGroupchat { logFrom = from - messageFrom = from + "@" + componentJid + messageFrom = from + } else { + if from == "" { + logFrom = componentJid + messageFrom = componentJid + } else { + logFrom = from + messageFrom = from + "@" + componentJid + } } if isCarbon { messageTo = messageFrom @@ -95,11 +101,18 @@ func sendMessageWrapper(to string, from string, body string, id string, componen "to": to, }).Warn("Got message") + var messageType stanza.StanzaType + if isGroupchat { + messageType = stanza.MessageTypeGroupchat + } else { + messageType = stanza.MessageTypeChat + } + message := stanza.Message{ Attrs: stanza.Attrs{ From: messageFrom, To: messageTo, - Type: "chat", + Type: messageType, Id: id, }, Body: body, @@ -122,13 +135,27 @@ func sendMessageWrapper(to string, from string, body string, id string, componen if !isCarbon && toJid.Resource != "" { message.Extensions = append(message.Extensions, stanza.HintNoCopy{}) } + if timestamp != 0 { + var delayFrom string + if isGroupchat { + delayFrom, _, _ = SplitJID(from) + } + message.Extensions = append(message.Extensions, extensions.MessageDelay{ + From: delayFrom, + Stamp: time.Unix(timestamp, 0).UTC().Format(time.RFC3339), + }) + message.Extensions = append(message.Extensions, extensions.MessageDelayLegacy{ + From: delayFrom, + Stamp: time.Unix(timestamp, 0).UTC().Format("20060102T15:04:05"), + }) + } if isCarbon { carbonMessage := extensions.ClientMessage{ Attrs: stanza.Attrs{ From: bareTo, To: to, - Type: "chat", + Type: messageType, }, } carbonMessage.Extensions = append(carbonMessage.Extensions, extensions.CarbonSent{ diff --git a/xmpp/handlers.go b/xmpp/handlers.go index a7e3c55..bb2064a 100644 --- a/xmpp/handlers.go +++ b/xmpp/handlers.go @@ -440,7 +440,7 @@ func handleMUCPresence(s xmpp.Sender, p stanza.Presence) { return } - fromBare, _, ok := gateway.SplitJID(p.From) + fromBare, fromResource, ok := gateway.SplitJID(p.From) if !ok { presenceReplySetError(reply, 400) return @@ -458,7 +458,7 @@ func handleMUCPresence(s xmpp.Sender, p stanza.Presence) { return } - session.SendMUCStatuses(chatId) + session.JoinMUC(chatId, fromResource) } } }