E.2.2 Hauptprogramm des Brokers

#!/usr/users2/diplom/hans/python/python/bin/python
# -------------------------------------------------------------
# Projekt : Digitale Bibliotheken Projekt
# Uni-Frankfurt/M, Professur Telematik und
# verteilte Systeme, Prof. O. Drobnik
# Diplomarbeit, Matzen,Hans, 1997
# Dateiname : broker.py
# Datum : 03.11.1997
# letzte Änderung :
# Autor : Hans Matzen, 1997, Frankfurt/M, Deutschland
# Sprache : Python v1.4
# Beschreibung : Hauptprogramm zum Broker
# Anmerkungen :
#
# -------------------------------------------------------------
 
# globale Variable
CONFIG_FILE="broker.conf"
 
# imports
import sys
import string
import signal
import confparse
import c_multiconn
import c_log
 
 
#
# Hilfsfunktionen
#
# Loescht das Element mit dem Wert x aus der Liste Y
def dellist(y,x):
    erg=[]
    for i in range(len(y)):
        if y[i]!=x:
            erg.append(y[i])
    return erg
 
 
# Verarbeitung Kommandozeilenparameter
# pruefe parameter
if len(sys.argv)!=2:
    print "Usage: broker.py <name> [-f <configfile>]"
    sys.exit(1)
 
# hole Kommandozeilenparameter und setzte Statusinformationen
# des Brokers
brokername=sys.argv[1]
brokerstate="run"
brokerqueue=0
brokercounter=0
# Wie wird verteilt automatisch=1, manuell=0
#
brokerdistmethod = 1
  
# installiere Signalhandler
doit=1
def sighup(a,b):
    global brokerstate
    if brokerstate=="run":
        brokerstate="paused"
    else:
        brokerstate="run"
 
def sigint(a,b):
    global doit
    doit=0
 
 
signal.signal(signal.SIGHUP ,sighup)
signal.signal(signal.SIGINT ,sigint)
 
# instanziiere Socketmengenobjekt
conn=c_multiconn.c_multiconn()
 
# Verarbeite Konfigurationsfile
params=confparse.confparse(CONFIG_FILE)
 
# setze defaultwerte
maxorders=10
ifobj_connlist=[]
ctrl_connlist=[]
rep_connlist=[]
lfile="./broker.log"
i=0
 
# setzte Parameter aus Konfigurationsfile
while i<len(params):
    el=params[i]
    if el[0]=="RECEIVE_PORT":
        rport=eval(el[1])
        ifobj_connlist.append(conn.open_sock("",rport))
    elif el[0]=="CONNECT_PORT":
        cport=eval(el[1])
        rep_connlist.append(conn.open_sock("",cport))
    elif el[0]=="CONTROL_PORT":
        ctrlport=eval(el[1])
        ctrl_connlist.append(conn.open_sock("",ctrlport))
    elif el[0]=="MAX_ORDERS":
        maxorders=eval(el[1])
    elif el[0]=="LOG_FILE":
        lfile=el[1]
     
    i=i+1
 
# Oeffne Logfile
log=c_log.c_log("Broker "+str(brokername),lfile)
log.log("Gestartet")
 
# init. Variablen fuer Auftragslistenverwaltung
ifobj_list=[]
ctrl_list=[]
rep_list=[]
 
 
# instanziiere Speicherliste und Jobliste
import c_orgalists
connected_reps=c_orgalists.c_replist()
jobs=c_orgalists.c_joblist()
 
# initialisere Sockets
# Schnittstellenkomponenten-Sockets vorbereiten
i=0
log.log("Verbindung fuer Schnittstellenobjekte unter:")
while (i<len(ifobj_connlist)):
    log.log(str(conn.get_localaddr(ifobj_connlist[i])))
    conn.listen(ifobj_connlist[i])
    i=i+1
 
# Speicher-Sockets vorbereiten
i=0
log.log("Verbindung fuer Speicher unter:")
while (i<len(rep_connlist)):
    log.log(str(conn.get_localaddr(rep_connlist[i])))
    conn.listen(rep_connlist[i])
    i=i+1
 
# Steuerungs-Sockets vorbereiten
i=0
log.log("Verbindung fuer Managementobjekte unter:")
while (i<len(ctrl_connlist)):
    log.log(str(conn.get_localaddr(ctrl_connlist[i])))
    conn.listen(ctrl_connlist[i])
    i=i+1
 
# ----------------------------------------------------------
# Mainloop
# ----------------------------------------------------------
doit=1
while doit==1:
    log.log("Warte auf Auftraege")
    active_socks=[]
    # auf Ereignissen an Sockets warten
    active_socks=conn.wait_event()
 
    # ereignisse abarbeiten
    i=0
    while i<len(active_socks):
        actsock=active_socks[i]
 
        # ist der broker angehalten ?
        if brokerstate=="paused":
            for h in active_socks:
                try:
                    conn.write_sock("Der Broker ist angehalten.\n")
                except:
                    pass
            # while schleife verlassen
            break
 
        #
        # Verbindungsaufbau der verschiedenen Objekte
        #
        if actsock in rep_connlist:
            log.log("Ein Speicher bittet um Verbindung.")
            # Speicher baut verbindung auf
            newfd=conn.accept(actsock)
            rep_list.append(newfd)
 
        if actsock in ifobj_connlist:
            log.log("Eine Schnittstelle bittet um Verbindung.")
            # Schnittstellenobjekt baut verbindung auf
            newfd=conn.accept(actsock)
            ifobj_list.append(newfd)
 
        if actsock in ctrl_connlist:
            log.log("Eine Steuerinstanz bittet um Verbindung.")
            # M/S Werkzeug baut verbindung auf
            newfd=conn.accept(actsock)
            ctrl_list.append(newfd)
         
        #
        # Verarbeitung verschiedener Nachrichten
        #
 
        # Nachricht von einem Speicher
        if actsock in rep_list:
            # Nachricht ueber bestehende Verbindung
            log.log("Ein Speicher bittet um Gehoer.")
            msg=""
            msg=conn.read_sock(actsock)
            log.log("Nachricht : "+msg[0:40])
            if msg=="":
                log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
                conn.close_sock(actsock)
                rep_list=dellist(rep_list,actsock)
            else:
                reply=""
                parts=string.split(msg," ")
                # Anmeldung eines Speichers
                if parts[0]=="CMD_CONN":
                    if connected_reps.addrep(parts[1], parts[2], parts[3])!=-1:
                        reply="CMD_CONNCONFIRM"
                    else:
                        reply="CMD_CONNDENY"
                else:
                    # Auftragsrueckmeldung empfangen
                    jobid=jobs.get_job_by_repfd(actsock)
                    njob=jobs.get_jobdata(jobid)
                    # Rueckmeldung an Schnittstellenobjekt schicken
                    conn.write_sock(njob[2],msg)
                    # Speicher als idle markieren
                    connected_reps.set_idle(njob[6])
                    # Auftrag aus Jobliste loeschen
                    jobs.remjob(jobid)
 
     
                if reply!="":
                    conn.write_sock(actsock,reply)
                    log.log("Sende Antwort "+reply[0:40])
 
        # Nachricht von einer Schnittstelle
        if actsock in ifobj_list:
            # Nachricht ueber bestehende Verbindung
            log.log("Eine Schnittstelle bittet um Gehoer.")
            msg=conn.read_sock(actsock)
            log.log("Nachricht : "+msg[0:40])
            if msg=="":
                log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
                conn.close_sock(actsock)
                ifobj_list=dellist(ifobj_list,actsock)
            else:
                reply=""
                freerep=connected_reps.get_idlerep()
                if freerep[0]==-1:
                    reply="Alle Speicher ausgelastet."
                else:
                    # neuen auftrag erfassen und weiterleiten
                    brokercounter=brokercounter + 1
                    ns=conn.open_sock("",0)
                    rep_list.append(ns)
                    conn.connect(ns, freerep[2], eval(freerep[3]))
                    jobs.addjob(actsock,"w",freerep[0],ns)
                    conn.write_sock(ns,msg)
 
                if reply!="":
                    conn.write_sock(actsock,reply)
 
        # Nachricht von einem Managementobjekt
        if actsock in ctrl_list:
            # Nachricht ueber bestehende Verbindung
            log.log("Eine Managementinstanz bittet um Gehoer.")
            msg=conn.read_sock(actsock)
            log.log("Nachricht : "+msg[0:40])
            if msg=="":
                log.log("Oops, der hat einfach aufgelegt ["+str(actsock)+"].")
                conn.close_sock(actsock)
                ctrl_list=dellist(ctrl_list,actsock)
            else:
                reply=""
                if msg=="CMD_CONFGET":
                    # erstelle Konfigurationsstring
                    brokerqueue=len(jobs.get_runningjobs())
                    brstr="{(" + brokername + "," + brokerstate + "," + str(brokercounter) + "," + str(brokerqueue) +")}"
                    reply=brstr + ";" + str(connected_reps.get_replist()) + ";" + str(jobs.get_runningjobs())
 
                elif msg=="CMD_EXIT":
                    # Broker anhalten
                    brokerstate="paused"
                    # Alle Speicher beenden
                    for i in connected_reps.get_replist():
                        conn.send(i[2],"CMD_EXIT")
                    # Abbruchbedingung fuer Mainloop setzen
                    doit=0
                elif msg=="CMD_QUIT":
                    # Notausstieg
                    del conn
                    raise SystemExit
 
                elif msg[:8]=="CMD_CHGST":
                    # zustand einens speichers veraendern
                    mlst=string.split(msg," ")
                    chgid=connected_reps.get_repbyname(mlst[1])
                    if chgid!=-1:
                        if mlst[2]=="paused":
                            # anhalten
                            connected_reps.set_paused(chgid)
                        else:
                            # fortsetzen
                            connected_reps.set_idle(chgid)
                elif msg[:9]=="CMD_MAPJOB":
                    # Auftrag zu Speicher zuordnen und
                    # abschicken
                    mlst=string.split(msg," ")
                    freerep=connected_reps.get_repinfo(mlst[2])
                    # neuen auftrag erfassen und weiterleiten
                    brokercounter=brokercounter + 1
                    ns=conn.open_sock("",0)
                    rep_list.append(ns)
                    conn.connect(ns, freerep[2], eval(freerep[3]))
                    jobs.set_repstate(actsock,"w")
                    conn.write_sock(ns,msg)
 
                elif msg=="CMD_PAUSE":
                    brokerstate="paused"
                elif msg=="CMD_CONT":
                    brokerstate="run"
                elif msg="CMD_AUTO":
                    brokerdistmethod=1
                elif msg="CMD_MAN":
                    brokerdistmethod=0
                if reply!="":
                    conn.write_sock(actsock,reply)
 
        i=i+1
del conn