aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikkel Svagård <mikkel.svagard@gmail.com>2017-12-03 13:24:32 +0100
committerGitHub <noreply@github.com>2017-12-03 13:24:32 +0100
commitac7900cfd87452ea5ee6ee9a15b99256ff5e8166 (patch)
tree698a5986d022d6af7325f271ac9c6a4b51d43258
parentdc8334b434ce68522e8c16b598379c27ad3ef1d2 (diff)
parentf9d3860f9536f1a2e2827cc9f91b977c70fa11e5 (diff)
downloadsaitama-ac7900cfd87452ea5ee6ee9a15b99256ff5e8166.tar.gz
Merge pull request #1 from kapteinstein/mikkel
Event Queue
-rw-r--r--fb-bot.py68
-rw-r--r--timeEventQueuer.py185
2 files changed, 246 insertions, 7 deletions
diff --git a/fb-bot.py b/fb-bot.py
index 75b0766..3f59bec 100644
--- a/fb-bot.py
+++ b/fb-bot.py
@@ -1,11 +1,19 @@
from fbchat import log, Client
from fbchat.models import *
+from geeteventbus.subscriber import subscriber
+from geeteventbus.eventbus import eventbus
+from geeteventbus.event import event
+
import time
import requests
import threading
import random
import datetime
+#class types
+ID_TIMER = 0;
+ID_LISTENER = 1;
+
_BASE_URL = 'http://spaghettiprojecti.no/saitama/'
_THREAD_ID = '1434497743266652'
@@ -60,6 +68,13 @@ class EchoBot(Client):
if now.hour == 0 and sent_today:
sent_today = False
+ def stopClient(self):
+ while not self.logout_request:
+ time.sleep(1)
+ self.logout_request = True
+ print("Stopping listening")
+ self.stopListening()
+
class Bot_thread(threading.Thread):
def __init__(self, threadID, name, client):
@@ -68,6 +83,7 @@ class Bot_thread(threading.Thread):
self.name = name
self.client = client
def run(self):
+ self.runFlag = True
print ("Starting " + self.name)
if self.name == "listener":
self.client.listen()
@@ -75,22 +91,60 @@ class Bot_thread(threading.Thread):
self.client.timer()
print ("Exiting " + self.name)
+ def stopRequest(self):
+ print("Stopping thread {} {}".format(self.threadID,self.name))
+ self.client.stopClient()
+
+
+
+class ThreadHandler():
+ def __init__():
+ self.threads = []
+ def addThread(threadObject):
+ self.threads.append(threadObject)
+
+ def startThreads():
+ for t in self.threads:
+ t.start()
+
+ def joinThreads():
+ for t in self.threads:
+ t.join()
+
+class TimeSubsrciber(threading.Thread, subscriber):
+ def process(self,timeEvent):
+ if not isinstance(timeEvent):
+ print("Invalid event type passed")
+ return
+ print(timeEvent.getTopic())
+
+class GenericEvent():
+ def __init__(self,topic,data):
+ self._topic = topic
+ self._data = data
+ def getTopic(): return self._topic
+ def getData(): return self._data
+
def main():
with open('passwd.txt', 'r') as f:
passwd = [a.strip() for a in f.readlines()]
client = EchoBot(passwd[0], passwd[1])
client.logout_request = False
+ # Create Thread Handler
+ th = ThreadHandler()
# Create new threads
- thread1 = Bot_thread(1, "listener", client)
- thread2 = Bot_thread(2, "timer", client)
-
+ th.addThread(Bot_thread(1, "listener", client))
+ th.addThread(Bot_thread(2, "timer", client))
+ th.addThread(TimeSubscriber())
+
# Start new Threads
- thread1.start()
- thread2.start()
- thread1.join()
- thread2.join()
+ th.startThreads()
+
+ client.stopClient()
+
+ th.joinThreads()
if client.isLoggedIn():
client.logout()
diff --git a/timeEventQueuer.py b/timeEventQueuer.py
new file mode 100644
index 0000000..aa08df6
--- /dev/null
+++ b/timeEventQueuer.py
@@ -0,0 +1,185 @@
+from geeteventbus.subscriber import subscriber
+from geeteventbus.eventbus import eventbus
+from geeteventbus.event import event
+
+import time
+import requests
+import threading
+import random
+from datetime import datetime, timedelta
+import heapq
+
+EVENTID_TIME = "Topic: Time Event"
+EVENTID_QUEUE_START = "Topic: Start Event Queue"
+EVENTID_QUEUE_STOP = "Topic: Stop Event Queue"
+
+"""
+Event Queuer for Time Events
+
+Executes the heap on top of heap sorted on time.
+Post timeEvents to eventBus with desired topics and possible other data
+
+Edit execute_event to fit execution process of event
+
+The Queuer must be added as a subscriber to the event bus on the desired topics
+"""
+class Time_event_queuer(threading.Thread,subscriber):
+ def __init__(self,cv):
+ super().__init__()
+ self.cv = cv #Conditional Variable, used for notifying on events
+ self.heap = []
+ self.execute_flag = False
+ self.new_event_flag = False #Set when a new event is added to the queue
+
+ #Executes the event on top of heap. Must be popped
+ def execute_event(self):
+ e = heapq.heappop(self.heap)
+ print("{} - Executing {}".format(datetime.now(),e[1]))
+
+ #Listens on the bus
+ def process(self,new_event):
+ if not isinstance(new_event,event):
+ print("Invalid event type passed")
+ return
+ if new_event.get_topic() == EVENTID_TIME:
+ self.add_event(new_event)
+ elif new_event.get_topic() == EVENTID_QUEUE_START:
+ self.start_queue_loop()
+ elif new_event.get_topic() == EVENTID_QUEUE_STOP:
+ self.stop_queue_loop()
+
+ #Starts the queue loop on EVENTID_QUEUE_START event
+ def start_queue_loop(self):
+ self.execute_flag = True
+ self.run_state()
+
+ #Stops the queue loop on EVENTID_QUEUE_STOP event
+ def stop_queue_loop(self):
+ print("Stopping queuer in {}".format(threading.current_thread().__class__.__name__))
+ self.execute_flag = False
+ with self.cv:
+ self.cv.notify()
+
+ #Runs the queue is not empty and the execution flag is True
+ def run_state(self):
+ print("TimeQueuer Running")
+ while(self.execute_flag):
+ if(self.heap_empty()):
+ self.idle_state()
+ return
+ with self.cv:
+ self.new_event_flag = False
+ t = self.get_sleep_time()
+ ne = self.get_event()
+ print("Next event: {} in {}".format(ne[1],t))
+ self.cv.wait(t)
+ if(self.new_event_flag):
+ continue
+ if(self.execute_flag):
+ self.execute_event()
+ print("Queuer Stopped from Running State")
+
+ #Idle when the queue is empty and the exectuion flag is True
+ def idle_state(self):
+ while(self.execute_flag):
+ if(not self.heap_empty()):
+ self.run_state()
+ return
+ print("TimeQueuer Idle - Execute _flag: {}".format(self.execute_flag))
+ with self.cv:
+ self.cv.wait(3)
+ print("Queuer Stopped from Idle State")
+
+ #Calculage sleep time until top of heap event
+ def get_sleep_time(self):
+ if(len(self.heap)==0):
+ print("ERROR: No event in queue to evaluate")
+ return None
+ te = self.heap[0][0]
+ tn = datetime.now()
+
+ td = te - tn
+ td = td.total_seconds()
+ return td
+
+ #Add new event to heap, set new_event_flag and notify Conditional Variable
+ def add_event(self,new_event):
+ heapq.heappush(self.heap,new_event.get_data())
+ self.new_event_flag = True
+ with self.cv:
+ self.cv.notify()
+
+ #Pops the event on top of heap
+ def pop_event(self):
+ return heapq.heappop(self.heap)
+
+ #Gets the event on top of heap
+ def get_event(self):
+ return self.heap[0]
+
+ #Returns True if Heap is Empty
+ def heap_empty(self):
+ return len(self.heap)==0
+
+ @staticmethod
+ def post_time_event(eb,data,delta_time):
+ event_time = datetime.now() + delta_time #ms removed
+ print("{} scheduled at {}".format(data,event_time))
+ data = (event_time,data)
+ eb.post(event(EVENTID_TIME,data))
+
+class Thread_handler():
+ def __init__(self,eb):
+ self.threads = []
+ self.eb = eb
+
+ def add_thread(self,threadObject):
+ self.threads.append(threadObject)
+
+ def start_threads(self):
+ for t in self.threads:
+ print("Registering consumer")
+ self.eb.register_consumer(t,EVENTID_TIME)
+ self.eb.register_consumer(t,EVENTID_QUEUE_START)
+ self.eb.register_consumer(t,EVENTID_QUEUE_STOP)
+ t.start()
+ self.eb.post(event(EVENTID_QUEUE_START,""))
+
+ def join_threads(self):
+ self.eb.post(event(EVENTID_QUEUE_STOP,""))
+ for t in self.threads:
+ print("Joining thread: {}".format(t.getName()))
+ t.join()
+
+
+def main():
+ # Create Thread Handler
+ eb = eventbus()
+ th = Thread_handler(eb)
+ cv = threading.Condition()
+
+ # Create new threads
+ teq = Time_event_queuer(cv)
+ th.add_thread(teq)
+
+ # Start new Threads
+ th.start_threads()
+
+ # Post Information
+ teq.post_time_event(eb,"post1",timedelta(seconds=3))
+ time.sleep(2)
+ teq.post_time_event(eb,"post2",timedelta(seconds=10))
+ time.sleep(2)
+ teq.post_time_event(eb,"post3",timedelta(seconds=6))
+ time.sleep(1)
+
+ # Stopping threads
+ th.join_threads()
+
+
+ print ("Exiting Main Thread")
+ eb.shutdown()
+
+
+if __name__ == "__main__":
+ main()