#
# Hilfsfunktionen
#
# Loescht das Element mit dem Wert x aus der Liste Y
def dellist(y,x):
erg=[]
for i in range(len(y)):
if y[i]!=x:
erg.append(y[i])
return erg
# hole Kommandozeilenparameter und setzte Statusinformationen
# des Brokers
brokername=sys.argv[1]
brokerstate="run"
brokerqueue=0
brokercounter=0
# Wie wird verteilt automatisch=1, manuell=0
#
brokerdistmethod = 1
# installiere Signalhandler
doit=1
def sighup(a,b):
global brokerstate
if brokerstate=="run":
brokerstate="paused"
else:
brokerstate="run"
# ----------------------------------------------------------
# Mainloop
# ----------------------------------------------------------
doit=1
while doit==1:
log.log("Warte auf Auftraege")
active_socks=[]
# auf Ereignissen an Sockets warten
active_socks=conn.wait_event()
# ereignisse abarbeiten
i=0
while i<len(active_socks):
actsock=active_socks[i]
# ist der broker angehalten ?
if brokerstate=="paused":
for h in active_socks:
try:
conn.write_sock("Der Broker ist angehalten.\n")
except:
pass
# while schleife verlassen
break
#
# Verbindungsaufbau der verschiedenen Objekte
#
if actsock in rep_connlist:
log.log("Ein Speicher bittet um Verbindung.")
# Speicher baut verbindung auf
newfd=conn.accept(actsock)
rep_list.append(newfd)
if actsock in ifobj_connlist:
log.log("Eine Schnittstelle bittet um Verbindung.")
# Schnittstellenobjekt baut verbindung auf
newfd=conn.accept(actsock)
ifobj_list.append(newfd)
if actsock in ctrl_connlist:
log.log("Eine Steuerinstanz bittet um Verbindung.")
# M/S Werkzeug baut verbindung auf
newfd=conn.accept(actsock)
ctrl_list.append(newfd)
#
# Verarbeitung verschiedener Nachrichten
#
# Nachricht von einem Speicher
if actsock in rep_list:
# Nachricht ueber bestehende Verbindung
log.log("Ein Speicher bittet um Gehoer.")
msg=""
msg=conn.read_sock(actsock)
log.log("Nachricht : "+msg[0:40])
if msg=="":
log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
conn.close_sock(actsock)
rep_list=dellist(rep_list,actsock)
else:
reply=""
parts=string.split(msg," ")
# Anmeldung eines Speichers
if parts[0]=="CMD_CONN":
if connected_reps.addrep(parts[1], parts[2], parts[3])!=-1:
reply="CMD_CONNCONFIRM"
else:
reply="CMD_CONNDENY"
else:
# Auftragsrueckmeldung empfangen
jobid=jobs.get_job_by_repfd(actsock)
njob=jobs.get_jobdata(jobid)
# Rueckmeldung an Schnittstellenobjekt schicken
conn.write_sock(njob[2],msg)
# Speicher als idle markieren
connected_reps.set_idle(njob[6])
# Auftrag aus Jobliste loeschen
jobs.remjob(jobid)
if reply!="":
conn.write_sock(actsock,reply)
log.log("Sende Antwort "+reply[0:40])
# Nachricht von einer Schnittstelle
if actsock in ifobj_list:
# Nachricht ueber bestehende Verbindung
log.log("Eine Schnittstelle bittet um Gehoer.")
msg=conn.read_sock(actsock)
log.log("Nachricht : "+msg[0:40])
if msg=="":
log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
conn.close_sock(actsock)
ifobj_list=dellist(ifobj_list,actsock)
else:
reply=""
freerep=connected_reps.get_idlerep()
if freerep[0]==-1:
reply="Alle Speicher ausgelastet."
else:
# neuen auftrag erfassen und weiterleiten
brokercounter=brokercounter + 1
ns=conn.open_sock("",0)
rep_list.append(ns)
conn.connect(ns, freerep[2], eval(freerep[3]))
jobs.addjob(actsock,"w",freerep[0],ns)
conn.write_sock(ns,msg)
if reply!="":
conn.write_sock(actsock,reply)
# Nachricht von einem Managementobjekt
if actsock in ctrl_list:
# Nachricht ueber bestehende Verbindung
log.log("Eine Managementinstanz bittet um Gehoer.")
msg=conn.read_sock(actsock)
log.log("Nachricht : "+msg[0:40])
if msg=="":
log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
conn.close_sock(actsock)
ctrl_list=dellist(ctrl_list,actsock)
else:
reply=""
if msg=="CMD_CONFGET":
# erstelle Konfigurationsstring
brokerqueue=len(jobs.get_runningjobs())
brstr="{(" + brokername + "," + brokerstate + "," + str(brokercounter) + "," + str(brokerqueue) +")}"
reply=brstr + ";" + str(connected_reps.get_replist()) + ";" + str(jobs.get_runningjobs())
elif msg=="CMD_EXIT":
# Broker anhalten
brokerstate="paused"
# Alle Speicher beenden
for i in connected_reps.get_replist():
conn.send(i[2],"CMD_EXIT")
# Abbruchbedingung fuer Mainloop setzen
doit=0
elif msg=="CMD_QUIT":
# Notausstieg
del conn
raise SystemExit
elif msg[:8]=="CMD_CHGST":
# zustand einens speichers veraendern
mlst=string.split(msg," ")
chgid=connected_reps.get_repbyname(mlst[1])
if chgid!=-1:
if mlst[2]=="paused":
# anhalten
connected_reps.set_paused(chgid)
else:
# fortsetzen
connected_reps.set_idle(chgid)
elif msg[:9]=="CMD_MAPJOB":
# Auftrag zu Speicher zuordnen und
# abschicken
mlst=string.split(msg," ")
freerep=connected_reps.get_repinfo(mlst[2])
# neuen auftrag erfassen und weiterleiten
brokercounter=brokercounter + 1
ns=conn.open_sock("",0)
rep_list.append(ns)
conn.connect(ns, freerep[2], eval(freerep[3]))
jobs.set_repstate(actsock,"w")
conn.write_sock(ns,msg)