2026-02-10 08:36:04 +01:00
|
|
|
# frozen_string_literal: true
|
|
|
|
|
|
|
|
|
|
# CdrSignalPoller handles polling Signal CLI for incoming messages and group membership changes.
|
|
|
|
|
# This replaces the bridge-worker tasks:
|
|
|
|
|
# - fetch-signal-messages.ts
|
|
|
|
|
# - check-group-membership.ts
|
|
|
|
|
#
|
|
|
|
|
# It runs via Zammad schedulers to poll at regular intervals.
|
|
|
|
|
|
|
|
|
|
class CdrSignalPoller
|
|
|
|
|
class << self
|
|
|
|
|
# Fetch messages from all active Signal channels
|
|
|
|
|
# This is called by the scheduler every 30 seconds
|
|
|
|
|
def fetch_messages
|
|
|
|
|
api = CdrSignalApi.new
|
|
|
|
|
channels = Channel.where(area: 'Signal::Number', active: true)
|
|
|
|
|
|
|
|
|
|
channels.each do |channel|
|
|
|
|
|
phone_number = channel.options[:phone_number]
|
|
|
|
|
bot_token = channel.options[:bot_token]
|
|
|
|
|
next unless phone_number.present?
|
|
|
|
|
|
|
|
|
|
Rails.logger.debug { "CdrSignalPoller: Fetching messages for #{phone_number}" }
|
|
|
|
|
|
|
|
|
|
messages = api.fetch_messages(phone_number)
|
|
|
|
|
process_messages(channel, messages, api)
|
|
|
|
|
rescue StandardError => e
|
|
|
|
|
Rails.logger.error "CdrSignalPoller: Error fetching messages for #{phone_number}: #{e.message}"
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Check group membership for all active Signal channels
|
|
|
|
|
# This is called by the scheduler every 2 minutes
|
|
|
|
|
def check_group_membership
|
|
|
|
|
api = CdrSignalApi.new
|
|
|
|
|
channels = Channel.where(area: 'Signal::Number', active: true)
|
|
|
|
|
|
|
|
|
|
channels.each do |channel|
|
|
|
|
|
phone_number = channel.options[:phone_number]
|
|
|
|
|
next unless phone_number.present?
|
|
|
|
|
|
|
|
|
|
Rails.logger.debug { "CdrSignalPoller: Checking groups for #{phone_number}" }
|
|
|
|
|
|
|
|
|
|
groups = api.list_groups(phone_number)
|
|
|
|
|
process_group_membership(channel, groups)
|
|
|
|
|
rescue StandardError => e
|
|
|
|
|
Rails.logger.error "CdrSignalPoller: Error checking groups for #{phone_number}: #{e.message}"
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
private
|
|
|
|
|
|
|
|
|
|
def process_messages(channel, messages, api)
|
|
|
|
|
messages.each do |msg|
|
|
|
|
|
envelope = msg['envelope']
|
|
|
|
|
next unless envelope
|
|
|
|
|
|
|
|
|
|
source = envelope['source']
|
|
|
|
|
source_uuid = envelope['sourceUuid']
|
|
|
|
|
data_message = envelope['dataMessage']
|
|
|
|
|
sync_message = envelope['syncMessage']
|
|
|
|
|
|
|
|
|
|
# Log envelope types for debugging
|
|
|
|
|
Rails.logger.debug do
|
|
|
|
|
"CdrSignalPoller: Received envelope - source: #{source}, uuid: #{source_uuid}, " \
|
|
|
|
|
"dataMessage: #{data_message.present?}, syncMessage: #{sync_message.present?}"
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Handle group join events from groupInfo
|
|
|
|
|
if data_message && data_message['groupInfo']
|
|
|
|
|
handle_group_info_event(channel, data_message['groupInfo'], source)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Process data messages with content
|
|
|
|
|
next unless data_message
|
|
|
|
|
|
|
|
|
|
process_data_message(channel, data_message, source, source_uuid, api)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def handle_group_info_event(channel, group_info, source)
|
|
|
|
|
type = group_info['type']
|
|
|
|
|
return unless %w[JOIN JOINED].include?(type)
|
|
|
|
|
|
|
|
|
|
group_id_raw = group_info['groupId']
|
|
|
|
|
return unless group_id_raw
|
|
|
|
|
|
|
|
|
|
group_id = "group.#{Base64.strict_encode64(group_id_raw.pack('c*'))}"
|
|
|
|
|
|
|
|
|
|
Rails.logger.info "CdrSignalPoller: User #{source} joined group #{group_id}"
|
|
|
|
|
notify_group_member_joined(channel, group_id, source)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def process_data_message(channel, data_message, source, source_uuid, api)
|
|
|
|
|
# Determine if this is a group message
|
|
|
|
|
is_group = data_message['groupV2'].present? ||
|
|
|
|
|
data_message['groupContext'].present? ||
|
|
|
|
|
data_message['groupInfo'].present?
|
|
|
|
|
|
|
|
|
|
# Get group ID if applicable
|
|
|
|
|
group_id_raw = data_message.dig('groupV2', 'id') ||
|
|
|
|
|
data_message.dig('groupContext', 'id') ||
|
|
|
|
|
data_message.dig('groupInfo', 'groupId')
|
|
|
|
|
|
|
|
|
|
phone_number = channel.options[:phone_number]
|
|
|
|
|
to_recipient = if group_id_raw
|
|
|
|
|
"group.#{Base64.strict_encode64(group_id_raw.is_a?(Array) ? group_id_raw.pack('c*') : group_id_raw)}"
|
|
|
|
|
else
|
|
|
|
|
phone_number
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Skip if message is from self
|
|
|
|
|
return if source == phone_number
|
|
|
|
|
|
|
|
|
|
message_text = data_message['message']
|
|
|
|
|
raw_timestamp = data_message['timestamp']
|
|
|
|
|
attachments = data_message['attachments']
|
|
|
|
|
|
|
|
|
|
# Generate unique message ID
|
|
|
|
|
message_id = "#{source_uuid}-#{raw_timestamp}"
|
|
|
|
|
|
|
|
|
|
# Check for duplicate
|
|
|
|
|
return if Ticket::Article.exists?(message_id: "cdr_signal.#{message_id}")
|
|
|
|
|
|
|
|
|
|
# Fetch and encode attachments
|
|
|
|
|
attachment_data = fetch_attachments(attachments, api)
|
|
|
|
|
|
|
|
|
|
# Process the message through the webhook handler
|
|
|
|
|
process_incoming_message(
|
|
|
|
|
channel: channel,
|
|
|
|
|
to: to_recipient,
|
|
|
|
|
from: source,
|
|
|
|
|
user_id: source_uuid,
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message: message_text,
|
|
|
|
|
sent_at: raw_timestamp ? Time.at(raw_timestamp / 1000).iso8601 : Time.current.iso8601,
|
|
|
|
|
attachments: attachment_data,
|
|
|
|
|
is_group: is_group
|
|
|
|
|
)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def fetch_attachments(attachments, api)
|
|
|
|
|
return [] unless attachments.is_a?(Array)
|
|
|
|
|
|
|
|
|
|
attachments.filter_map do |att|
|
|
|
|
|
id = att['id']
|
|
|
|
|
content_type = att['contentType']
|
|
|
|
|
filename = att['filename']
|
|
|
|
|
|
|
|
|
|
blob = api.fetch_attachment(id)
|
|
|
|
|
next unless blob
|
|
|
|
|
|
|
|
|
|
# Generate filename if not provided
|
|
|
|
|
default_filename = filename
|
|
|
|
|
unless default_filename
|
|
|
|
|
extension = content_type&.split('/')&.last || 'bin'
|
|
|
|
|
default_filename = id.include?('.') ? id : "#{id}.#{extension}"
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
filename: default_filename,
|
|
|
|
|
mime_type: content_type,
|
|
|
|
|
data: Base64.strict_encode64(blob)
|
|
|
|
|
}
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def process_incoming_message(channel:, to:, from:, user_id:, message_id:, message:, sent_at:, attachments:, is_group:)
|
|
|
|
|
# Find or create customer
|
|
|
|
|
customer = find_or_create_customer(from, user_id)
|
|
|
|
|
return unless customer
|
|
|
|
|
|
|
|
|
|
# Set current user context
|
|
|
|
|
UserInfo.current_user_id = customer.id
|
|
|
|
|
|
|
|
|
|
# Find or create ticket
|
|
|
|
|
ticket = find_or_create_ticket(
|
|
|
|
|
channel: channel,
|
|
|
|
|
customer: customer,
|
|
|
|
|
to: to,
|
|
|
|
|
from: from,
|
|
|
|
|
user_id: user_id,
|
|
|
|
|
is_group: is_group,
|
|
|
|
|
sent_at: sent_at
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Create article
|
|
|
|
|
create_article(
|
|
|
|
|
ticket: ticket,
|
|
|
|
|
from: from,
|
|
|
|
|
to: to,
|
|
|
|
|
user_id: user_id,
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message: message || 'No text content',
|
|
|
|
|
sent_at: sent_at,
|
|
|
|
|
attachments: attachments
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
Rails.logger.info "CdrSignalPoller: Created article for ticket ##{ticket.number} from #{from}"
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def find_or_create_customer(phone_number, user_id)
|
|
|
|
|
# Try phone number first
|
|
|
|
|
customer = User.find_by(phone: phone_number) if phone_number.present?
|
|
|
|
|
customer ||= User.find_by(mobile: phone_number) if phone_number.present?
|
|
|
|
|
|
|
|
|
|
# Try user ID
|
|
|
|
|
if customer.nil? && user_id.present?
|
|
|
|
|
customer = User.find_by(signal_uid: user_id)
|
|
|
|
|
customer ||= User.find_by(phone: user_id)
|
|
|
|
|
customer ||= User.find_by(mobile: user_id)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Create new customer if not found
|
|
|
|
|
unless customer
|
|
|
|
|
role_ids = Role.signup_role_ids
|
|
|
|
|
customer = User.create!(
|
|
|
|
|
firstname: '',
|
|
|
|
|
lastname: '',
|
|
|
|
|
email: '',
|
|
|
|
|
password: '',
|
|
|
|
|
phone: phone_number.presence || user_id,
|
|
|
|
|
signal_uid: user_id,
|
|
|
|
|
note: 'CDR Signal',
|
|
|
|
|
active: true,
|
|
|
|
|
role_ids: role_ids,
|
|
|
|
|
updated_by_id: 1,
|
|
|
|
|
created_by_id: 1
|
|
|
|
|
)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Update signal_uid if needed
|
|
|
|
|
customer.update!(signal_uid: user_id) if user_id.present? && customer.signal_uid.blank?
|
|
|
|
|
|
|
|
|
|
# Update phone if customer only has user_id
|
|
|
|
|
customer.update!(phone: phone_number) if phone_number.present? && customer.phone == user_id
|
|
|
|
|
|
|
|
|
|
customer
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def find_or_create_ticket(channel:, customer:, to:, from:, user_id:, is_group:, sent_at:)
|
|
|
|
|
state_ids = Ticket::State.where(name: %w[closed merged removed]).pluck(:id)
|
|
|
|
|
sender_display = from.presence || user_id
|
|
|
|
|
|
|
|
|
|
if is_group
|
|
|
|
|
# Find ticket by group ID
|
|
|
|
|
ticket = Ticket.where.not(state_id: state_ids)
|
|
|
|
|
.where('preferences LIKE ?', "%channel_id: #{channel.id}%")
|
|
|
|
|
.where('preferences LIKE ?', "%chat_id: #{to}%")
|
|
|
|
|
.order(updated_at: :desc)
|
|
|
|
|
.first
|
|
|
|
|
else
|
|
|
|
|
# Find ticket by customer
|
|
|
|
|
ticket = Ticket.where(customer_id: customer.id)
|
|
|
|
|
.where.not(state_id: state_ids)
|
|
|
|
|
.order(:updated_at)
|
|
|
|
|
.first
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
if ticket
|
|
|
|
|
ticket.title = "Message from #{sender_display} at #{sent_at}" if ticket.title == '-'
|
|
|
|
|
new_state = Ticket::State.find_by(default_create: true)
|
|
|
|
|
ticket.state = Ticket::State.find_by(default_follow_up: true) if ticket.state_id != new_state.id
|
|
|
|
|
else
|
|
|
|
|
chat_id = is_group ? to : (user_id.presence || from)
|
|
|
|
|
|
|
|
|
|
cdr_signal_prefs = {
|
|
|
|
|
bot_token: channel.options[:bot_token],
|
|
|
|
|
chat_id: chat_id,
|
|
|
|
|
user_id: user_id
|
|
|
|
|
}
|
|
|
|
|
cdr_signal_prefs[:original_recipient] = from if is_group
|
|
|
|
|
|
|
|
|
|
ticket = Ticket.new(
|
|
|
|
|
group_id: channel.group_id,
|
|
|
|
|
title: "Message from #{sender_display} at #{sent_at}",
|
|
|
|
|
customer_id: customer.id,
|
|
|
|
|
preferences: {
|
|
|
|
|
channel_id: channel.id,
|
|
|
|
|
cdr_signal: cdr_signal_prefs
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
ticket.save!
|
|
|
|
|
ticket.update!(create_article_type_id: Ticket::Article::Type.find_by(name: 'cdr_signal').id)
|
|
|
|
|
ticket
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def create_article(ticket:, from:, to:, user_id:, message_id:, message:, sent_at:, attachments:)
|
|
|
|
|
sender_display = from.presence || user_id
|
|
|
|
|
|
|
|
|
|
article_params = {
|
|
|
|
|
ticket_id: ticket.id,
|
|
|
|
|
type_id: Ticket::Article::Type.find_by(name: 'cdr_signal').id,
|
|
|
|
|
sender_id: Ticket::Article::Sender.find_by(name: 'Customer').id,
|
|
|
|
|
from: sender_display,
|
|
|
|
|
to: to,
|
|
|
|
|
subject: "Message from #{sender_display} at #{sent_at}",
|
|
|
|
|
body: message,
|
|
|
|
|
content_type: 'text/plain',
|
|
|
|
|
message_id: "cdr_signal.#{message_id}",
|
|
|
|
|
internal: false,
|
|
|
|
|
preferences: {
|
|
|
|
|
cdr_signal: {
|
|
|
|
|
timestamp: sent_at,
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
from: from,
|
|
|
|
|
user_id: user_id
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Add primary attachment if present
|
|
|
|
|
if attachments.present? && attachments.first
|
|
|
|
|
primary = attachments.first
|
|
|
|
|
article_params[:attachments] = [{
|
|
|
|
|
'filename' => primary[:filename],
|
|
|
|
|
filename: primary[:filename],
|
|
|
|
|
data: primary[:data],
|
|
|
|
|
'data' => primary[:data],
|
|
|
|
|
'mime-type' => primary[:mime_type]
|
|
|
|
|
}]
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
ticket.with_lock do
|
|
|
|
|
article = Ticket::Article.create!(article_params)
|
|
|
|
|
|
|
|
|
|
# Create additional articles for extra attachments
|
2026-02-10 15:58:26 +01:00
|
|
|
((attachments || [])[1..] || []).each_with_index do |att, index|
|
2026-02-10 08:36:04 +01:00
|
|
|
Ticket::Article.create!(
|
|
|
|
|
article_params.merge(
|
|
|
|
|
message_id: "cdr_signal.#{message_id}-#{index + 1}",
|
|
|
|
|
subject: att[:filename],
|
|
|
|
|
body: att[:filename],
|
|
|
|
|
attachments: [{
|
|
|
|
|
'filename' => att[:filename],
|
|
|
|
|
filename: att[:filename],
|
|
|
|
|
data: att[:data],
|
|
|
|
|
'data' => att[:data],
|
|
|
|
|
'mime-type' => att[:mime_type]
|
|
|
|
|
}]
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
article
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def process_group_membership(channel, groups)
|
|
|
|
|
groups.each do |group|
|
|
|
|
|
group_id = group['id']
|
|
|
|
|
internal_id = group['internalId']
|
|
|
|
|
members = group['members'] || []
|
|
|
|
|
next unless group_id && internal_id
|
|
|
|
|
|
|
|
|
|
Rails.logger.debug do
|
|
|
|
|
"CdrSignalPoller: Group #{group['name']} - #{members.length} members, " \
|
|
|
|
|
"#{(group['pendingInvites'] || []).length} pending"
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
members.each do |member_phone|
|
|
|
|
|
notify_group_member_joined(channel, group_id, member_phone)
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def notify_group_member_joined(channel, group_id, member_phone)
|
|
|
|
|
# Find ticket with this group_id
|
|
|
|
|
state_ids = Ticket::State.where(name: %w[closed merged removed]).pluck(:id)
|
|
|
|
|
|
|
|
|
|
ticket = Ticket.where.not(state_id: state_ids)
|
|
|
|
|
.where('preferences LIKE ?', "%chat_id: #{group_id}%")
|
|
|
|
|
.order(updated_at: :desc)
|
|
|
|
|
.first
|
|
|
|
|
|
|
|
|
|
return unless ticket
|
|
|
|
|
|
|
|
|
|
# Idempotency check
|
|
|
|
|
return if ticket.preferences.dig('cdr_signal', 'group_joined') == true
|
|
|
|
|
|
|
|
|
|
# Update group_joined flag
|
|
|
|
|
ticket.preferences[:cdr_signal] ||= {}
|
|
|
|
|
ticket.preferences[:cdr_signal][:group_joined] = true
|
|
|
|
|
ticket.preferences[:cdr_signal][:group_joined_at] = Time.current.iso8601
|
|
|
|
|
ticket.preferences[:cdr_signal][:group_joined_by] = member_phone
|
|
|
|
|
ticket.save!
|
|
|
|
|
|
|
|
|
|
Rails.logger.info "CdrSignalPoller: Member #{member_phone} joined group #{group_id} for ticket ##{ticket.number}"
|
|
|
|
|
|
|
|
|
|
# Add resolution note if there were pending notifications
|
|
|
|
|
add_group_join_resolution_note(ticket)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def add_group_join_resolution_note(ticket)
|
|
|
|
|
# Check if any articles had a group_not_joined notification
|
|
|
|
|
articles_with_pending = Ticket::Article.where(ticket_id: ticket.id)
|
|
|
|
|
.where('preferences LIKE ?', '%group_not_joined_note_added: true%')
|
|
|
|
|
|
|
|
|
|
return unless articles_with_pending.exists?
|
|
|
|
|
|
|
|
|
|
# Check if resolution note already exists
|
|
|
|
|
resolution_exists = Ticket::Article.where(ticket_id: ticket.id)
|
|
|
|
|
.where('preferences LIKE ?', '%group_joined_resolution: true%')
|
|
|
|
|
.exists?
|
|
|
|
|
|
|
|
|
|
return if resolution_exists
|
|
|
|
|
|
|
|
|
|
Ticket::Article.create!(
|
|
|
|
|
ticket_id: ticket.id,
|
|
|
|
|
content_type: 'text/plain',
|
|
|
|
|
body: 'Recipient has now joined the Signal group. Pending messages will be delivered shortly.',
|
|
|
|
|
internal: true,
|
|
|
|
|
sender: Ticket::Article::Sender.find_by(name: 'System'),
|
|
|
|
|
type: Ticket::Article::Type.find_by(name: 'note'),
|
|
|
|
|
preferences: {
|
|
|
|
|
delivery_message: true,
|
|
|
|
|
group_joined_resolution: true
|
|
|
|
|
},
|
|
|
|
|
updated_by_id: 1,
|
|
|
|
|
created_by_id: 1
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
Rails.logger.info "CdrSignalPoller: Added resolution note for ticket ##{ticket.number}"
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|