aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikkelsv <mikkel.svagard@gmail.com>2017-12-03 13:23:42 +0100
committerMikkelsv <mikkel.svagard@gmail.com>2017-12-03 13:23:42 +0100
commitf9d3860f9536f1a2e2827cc9f91b977c70fa11e5 (patch)
tree698a5986d022d6af7325f271ac9c6a4b51d43258
parentbc2bf8033812373466c4cfba8d10c8095d2d7893 (diff)
downloadsaitama-f9d3860f9536f1a2e2827cc9f91b977c70fa11e5.tar.gz
Add all queue functionality into queue class
-rw-r--r--timeEventQueuer.py (renamed from threadTest.py)52
1 files changed, 24 insertions, 28 deletions
diff --git a/threadTest.py b/timeEventQueuer.py
index 166c739..aa08df6 100644
--- a/threadTest.py
+++ b/timeEventQueuer.py
@@ -1,6 +1,3 @@
-
-from fbchat import log, Client
-from fbchat.models import *
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
@@ -12,15 +9,16 @@ import random
from datetime import datetime, timedelta
import heapq
-TOPIC_TIME = "Time Topic"
-TOPIC_START = "Start Topic"
-TOPIC_STOP = "Stop Topic"
+EVENTID_TIME = "Topic: Time Event"
+EVENTID_QUEUE_START = "Topic: Start Event Queue"
+EVENTID_QUEUE_STOP = "Topic: Stop Event Queue"
"""
Event Queuer for Time Events
Executes the heap on top of heap sorted on time.
Post timeEvents to eventBus with desired topics and possible other data
+
Edit execute_event to fit execution process of event
The Queuer must be added as a subscriber to the event bus on the desired topics
@@ -43,19 +41,19 @@ class Time_event_queuer(threading.Thread,subscriber):
if not isinstance(new_event,event):
print("Invalid event type passed")
return
- if new_event.get_topic() == TOPIC_TIME:
+ if new_event.get_topic() == EVENTID_TIME:
self.add_event(new_event)
- elif new_event.get_topic() == TOPIC_START:
+ elif new_event.get_topic() == EVENTID_QUEUE_START:
self.start_queue_loop()
- elif new_event.get_topic() == TOPIC_STOP:
+ elif new_event.get_topic() == EVENTID_QUEUE_STOP:
self.stop_queue_loop()
- #Starts the queue loop on TOPIC_START event
+ #Starts the queue loop on EVENTID_QUEUE_START event
def start_queue_loop(self):
self.execute_flag = True
self.run_state()
- #Stops the queue loop on TOPIC_STOP event
+ #Stops the queue loop on EVENTID_QUEUE_STOP event
def stop_queue_loop(self):
print("Stopping queuer in {}".format(threading.current_thread().__class__.__name__))
self.execute_flag = False
@@ -122,6 +120,13 @@ class Time_event_queuer(threading.Thread,subscriber):
#Returns True if Heap is Empty
def heap_empty(self):
return len(self.heap)==0
+
+ @staticmethod
+ def post_time_event(eb,data,delta_time):
+ event_time = datetime.now() + delta_time #ms removed
+ print("{} scheduled at {}".format(data,event_time))
+ data = (event_time,data)
+ eb.post(event(EVENTID_TIME,data))
class Thread_handler():
def __init__(self,eb):
@@ -134,23 +139,18 @@ class Thread_handler():
def start_threads(self):
for t in self.threads:
print("Registering consumer")
- self.eb.register_consumer(t,TOPIC_TIME)
- self.eb.register_consumer(t,TOPIC_START)
- self.eb.register_consumer(t,TOPIC_STOP)
+ self.eb.register_consumer(t,EVENTID_TIME)
+ self.eb.register_consumer(t,EVENTID_QUEUE_START)
+ self.eb.register_consumer(t,EVENTID_QUEUE_STOP)
t.start()
- self.eb.post(event(TOPIC_START,""))
+ self.eb.post(event(EVENTID_QUEUE_START,""))
def join_threads(self):
- self.eb.post(event(TOPIC_STOP,""))
+ self.eb.post(event(EVENTID_QUEUE_STOP,""))
for t in self.threads:
print("Joining thread: {}".format(t.getName()))
t.join()
-def post_time_event(eb,data,delta_time):
- event_time = datetime.now() + delta_time #ms removed
- print("{} scheduled at {}".format(data,event_time))
- data = (event_time,data)
- eb.post(event(TOPIC_TIME,data))
def main():
# Create Thread Handler
@@ -166,16 +166,12 @@ def main():
th.start_threads()
# Post Information
- post_time_event(eb,"post1",timedelta(seconds=3))
+ teq.post_time_event(eb,"post1",timedelta(seconds=3))
time.sleep(2)
- post_time_event(eb,"post2",timedelta(seconds=10))
+ teq.post_time_event(eb,"post2",timedelta(seconds=10))
time.sleep(2)
- post_time_event(eb,"post3",timedelta(seconds=6))
+ teq.post_time_event(eb,"post3",timedelta(seconds=6))
time.sleep(1)
- post_time_event(eb,"post4",timedelta(seconds=7))
- time.sleep(2)
- post_time_event(eb,"post5",timedelta(seconds=1))
- time.sleep(10)
# Stopping threads
th.join_threads()