-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrosana.py
More file actions
259 lines (225 loc) · 10.3 KB
/
rosana.py
File metadata and controls
259 lines (225 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import asyncio
import logging
from datetime import datetime, timedelta
import re
from zoneinfo import ZoneInfo
import os
import random
# Try to import optional libraries
try:
from fuzzywuzzy import fuzz, process
except ImportError:
print("fuzzywuzzy library not found. Please install it using 'pip install fuzzywuzzy python-Levenshtein'")
fuzz = process = None
try:
from telethon import TelegramClient, events, utils
except ImportError:
print("telethon library not found. Please install it using 'pip install telethon'")
TelegramClient = events = utils = None
try:
from dotenv import load_dotenv
except ImportError:
print("python-dotenv library not found. Please install it using 'pip install python-dotenv'")
load_dotenv = lambda x: None
# Load environment variables
load_dotenv('.env.local')
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("\nStarting SSHJarvis...")
# Get API credentials from environment variables
api_id = os.getenv('TELEGRAM_API_ID')
api_hash = os.getenv('TELEGRAM_API_HASH')
# Configurable settings
inclusion_keywords = ["SHIFT AVAILABLE", "MULTIPLE SHIFTS AVAILABLE", "URGENT SHIFT AVAILABLE"]
inclusion_locations = ["Calvary Oakland", "Calvary Brighton",
"Calvary Kingswood", "Helping Hand North Adelaide",
"HH North Adelaide", "Helping Hand Golden Grove",
"HH Golden Grove", "Amber Age Care"]
exclusion_locations = ["SNOWTOWN"]
relevant_roles = ["PCW", "PCA"]
facility_wards = {
"Calvary Oakland": ["Glen", "Oak"],
"Helping Hand North Adelaide": ["Garden Court", "Gill", "First Floor"],
"HH North Adelaide": ["Garden Court", "Gill", "First Floor"],
"Helping Hand Golden Grove" : ["Ground Floor"],
"HH Golden Grove" : ["Ground Floor"],
"Amber Age Care": ["Paradise"]
}
chat_names = ["WorkforceXS Carers (PCA, PCW,CWK) chat", "test", "state"]
chat_ids = {}
bot_active = False # Initial state of the bot
RESPONSE_DELAY = 0 # Default delay
ADELAIDE_TZ = ZoneInfo("Australia/Adelaide")
async def get_chat_ids(client):
global chat_ids
async for dialog in client.iter_dialogs():
if dialog.name in chat_names:
chat_ids[dialog.name] = dialog.id
logger.info(f"Chat IDs: {chat_ids}")
def fuzzy_match(target, choices, threshold=80):
best_match, score = process.extractOne(target, choices)
if score >= threshold:
return best_match
return None
def parse_shift_message(message):
lines = message.split('\n')
date = None
current_venue = None
current_ward = None
shifts = []
is_urgent = False
for line in lines:
if any(keyword in line.upper() for keyword in inclusion_keywords):
if "URGENT" in line.upper():
is_urgent = True
continue
if not date:
if "TODAY" in line.upper():
date = "TODAY"
continue
elif "TOMORROW" in line.upper():
date = "TOMORROW"
continue
date_match = re.match(r'(?:MON|TUE|WED|THU|FRI|SAT|SUN)\s+(\d{1,2}/\d{1,2})', line)
if date_match:
date = date_match.group(1)
continue
possible_venue = fuzzy_match(line, inclusion_locations, threshold=70)
if possible_venue:
current_venue = possible_venue
# Check for ward information in the same line
ward_match = re.search(r'(?:in|IN)\s+(.+)$', line)
if ward_match:
current_ward = ward_match.group(1).strip().title()
else:
current_ward = None
continue
shift_match = re.match(r'(?:(\w+)\s+)?(\d{4})-(\d{4})(?:\s+IN\s+(.+?))?(?:\s*x\s*(\d+))?$', line)
if shift_match:
role, start_time, end_time, ward, multiplier = shift_match.groups()
if not role or role.upper() in (r.upper() for r in relevant_roles):
shift_time = f"{start_time}-{end_time}"
ward = ward.strip().title() if ward else current_ward
multiplier = int(multiplier) if multiplier else 1
shifts.extend([(shift_time, ward, current_venue)] * multiplier)
elif "ASAP" in line.upper():
asap_match = re.search(r'ASAP-(\d{4})(?:\s+IN\s+(.+))?$', line.upper())
if asap_match:
end_time, ward = asap_match.groups()
shift_time = f"ASAP-{end_time}"
ward = ward.strip().title() if ward else current_ward
shifts.append((shift_time, ward, current_venue))
else:
shifts.append(("ASAP-2359", current_ward, current_venue)) # Default end time if not specified
is_urgent = True
logger.info(f"Parsed message - Date: {date}, Shifts: {shifts}, Urgent: {is_urgent}")
return date, shifts, is_urgent
def format_date(date_str):
today = datetime.now(ADELAIDE_TZ)
if not date_str or date_str == "TODAY":
return today.strftime("%d %B")
if date_str == "TOMORROW":
tomorrow = today + timedelta(days=1)
return tomorrow.strftime("%d %B")
else:
try:
day, month = map(int, date_str.split('/'))
current_year = today.year
date_obj = datetime(current_year, month, day, tzinfo=ADELAIDE_TZ)
if date_obj < today:
date_obj = date_obj.replace(year=current_year + 1)
return date_obj.strftime("%d %B")
except ValueError:
return today.strftime("%d %B")
def format_response(venue, date, shift, is_urgent):
time, ward, _ = shift
formatted_date = format_date(date)
response = f"I can {venue}"
if ward:
response += f" in {ward}"
response += f"/{formatted_date}/"
if is_urgent and time.startswith("ASAP"):
response = f"I can in 20 mins {venue}"
if ward:
response += f" in {ward}"
response += f"/{formatted_date}/{time}"
else:
start_time, end_time = time.split('-')
formatted_time = f"{start_time[:2]}{start_time[2:] or ''}-{end_time[:2]}{end_time[2:] or ''}"
response += formatted_time
return response
def calculate_shift_duration(shift):
time, _, _ = shift
start, end = time.split('-')
if start.upper() == "ASAP":
return float('inf') # Prioritize ASAP shifts
start_minutes = int(start[:2]) * 60 + int(start[2:] or '0')
end_minutes = int(end[:2]) * 60 + int(end[2:] or '0')
duration = end_minutes - start_minutes
if duration < 0:
duration += 24 * 60
return duration
async def main():
if not api_id or not api_hash:
logger.error("API credentials not found. Please check your .env.local file.")
return
client = TelegramClient('rosana_session', api_id, api_hash)
async with client:
await get_chat_ids(client)
@client.on(events.NewMessage(chats=list(chat_ids.values())))
async def my_event_handler(event):
global bot_active, RESPONSE_DELAY
message = event.raw_text
sender = await event.get_sender()
sender_name = utils.get_display_name(sender)
chat_name = next((name for name, id in chat_ids.items() if id == event.chat_id), None)
logger.info(f"Message from {sender_name} in {chat_name}: {message}")
if chat_name == "state":
command_parts = message.strip().lower().split()
if command_parts[0] == "namaste":
if len(command_parts) == 2 and command_parts[1].isdigit():
delay = int(command_parts[1])
if 0 <= delay <= 4:
RESPONSE_DELAY = delay
bot_active = True
logger.info(f"Bot activated with delay {RESPONSE_DELAY}")
await client.send_message(event.chat_id, f'\n--------------------\nTURNED ON.\nShift Pick Gardinchu Hai!\nDelay set to {RESPONSE_DELAY} seconds.\n--------------------')
else:
await client.send_message(event.chat_id, "Invalid delay. Please use a number between 0 and 4.")
else:
await client.send_message(event.chat_id, "Please specify a delay between 0 and 4 seconds. Example: 'namaste 2'")
elif message.strip().lower() == "bye":
bot_active = False
logger.info("Bot deactivated.")
await client.send_message(event.chat_id, '\n--------------------\nTURNED OFF!\nMa Sutna Gaye!\n--------------------')
print("\n-------------------------\n")
return
if bot_active and any(keyword in message.upper() for keyword in inclusion_keywords):
try:
date, shifts, is_urgent = parse_shift_message(message)
if shifts:
longest_shift = max(shifts, key=calculate_shift_duration)
time, ward, venue = longest_shift
if venue not in exclusion_locations:
response = format_response(venue, date, longest_shift, is_urgent)
logger.info(f"Preparing to send response: {response}")
await asyncio.sleep(RESPONSE_DELAY)
await client.send_message(event.chat_id, response)
logger.info(f"Response sent after {RESPONSE_DELAY} seconds delay.")
else:
logger.info("Excluded location, not responding.")
else:
logger.info("No valid shifts found, not responding.")
except Exception as e:
logger.error(f"Error processing message: {e}")
else:
logger.info("Bot is inactive or no inclusion keyword found, not processing message.")
await client.run_until_disconnected()
if __name__ == "__main__":
if None in (fuzz, process, TelegramClient, load_dotenv):
print("Error: Some required libraries are missing. Please install them and try again.")
else:
print("SSHJarvis is now running!")
asyncio.run(main())
print("SSHJarvis has stopped.")