Package cobra :: Module cluster
[hide private]
[frames] | no frames]

Source Code for Module cobra.cluster

  1   
  2  """ 
  3  Cobra's built in clustering framework 
  4  """ 
  5   
  6  import gc 
  7  import sys 
  8  import time 
  9  import Queue 
 10  import struct 
 11  import socket 
 12  import urllib2 
 13  import traceback 
 14  import threading 
 15  import subprocess 
 16  import multiprocessing 
 17   
 18  import cobra 
 19  import dcode 
 20   
 21  queen_port   = 32124 
 22   
 23  cluster_port = 32123 
 24  cluster_ip = "224.69.69.69" 
 25   
 26  sub_cmd = """ 
 27  import cobra.cluster 
 28  cobra.cluster.getAndDoWork("%s", docode=%s) 
 29  """ 
 30   
31 -class InvalidInProgWorkId(Exception):
32 - def __init__(self, workid):
33 Exception.__init__(self, "Work ID %d is not valid" % workid) 34 self.workid = workid
35
36 -class ClusterWork(object):
37 """ 38 Extend this object to create your own work units. Do it in 39 a proper module (and not __main__ to be able to use this 40 in conjunction with cobra.dcode). 41 """
42 - def __init__(self, timeout=None):
43 object.__init__(self) 44 self.id = None # Set by adding to the server 45 self.server = None # Set by ClusterClient 46 self.starttime = 0 47 self.endtime = 0 # Both are set by worker before and after work() 48 self.timeout = timeout 49 self.touchtime = None 50 self.excinfo = None # Will be exception traceback on work unit fail.
51
52 - def touch(self): # heh...
53 """ 54 Update the internal "touch time" which is used by the timeout 55 subsystem to see if this work unit has gone too long without 56 making progress... 57 """ 58 self.touchtime = time.time()
59
60 - def isTimedOut(self):
61 """ 62 Check if this work unit is timed out. 63 """ 64 if self.timeout == None: 65 return False 66 if self.touchtime == None: 67 return False 68 if self.endtime != 0: 69 return False 70 return (self.touchtime + self.timeout) < time.time()
71
72 - def work(self):
73 """ 74 Actually do the work associated with this work object. 75 """ 76 print "OVERRIDE ME" 77 for i in range(10): 78 self.setCompletion(i*10) 79 self.setStatus("Sleeping: %d" % i) 80 time.sleep(1)
81
82 - def done(self):
83 """ 84 This is called back on the server once a work unit 85 is complete and returned. 86 """ 87 print "OVERRIDE DONE"
88
89 - def setCompletion(self, percent):
90 """ 91 Work units may call this whenever they like to 92 tell the server how far along their work they are. 93 """ 94 self.touch() 95 self.server.setWorkCompletion(self.id, percent)
96
97 - def setStatus(self, status):
98 """ 99 Work units may call this to inform the server of 100 their status. 101 """ 102 self.touch() 103 self.server.setWorkStatus(self.id, status)
104
105 - def openSharedFile(self, filename):
106 ''' 107 A helper API to open a file like object on the server. 108 109 Example: 110 fd = self.openSharedFile('/foo/bar/baz') 111 fbytes = fd.read() 112 113 NOTE: The server must use shareFileToWorkers(). 114 ''' 115 uri = self.server.openSharedFile( filename ) 116 return cobra.CobraProxy(uri)
117
118 -class ClusterCallback:
119 """ 120 Place one of these in the ClusterServer to get synchronous 121 event information about what's going on in the cluster server. 122 (mostly for the GUI). 123 """ 124
125 - def workAdded(self, server, work):
126 pass
127 - def workGotten(self, server, work):
128 pass
129 - def workStatus(self, server, workid, status):
130 pass
131 - def workCompletion(self, server, workid, completion):
132 pass
133 - def workDone(self, server, work):
134 pass
135 - def workFailed(self, server, work):
136 pass
137 - def workTimeout(self, server, work):
138 pass
139 - def workCanceled(self, server, work):
140 pass
141
142 -class VerboseCallback(ClusterCallback):
143 # This is mostly for testing...
144 - def workAdded(self, server, work):
145 print "WORK ADDED: %d" % work.id
146 - def workGotten(self, server, work):
147 print "WORK GOTTEN: %d" % work.id
148 - def workStatus(self, server, workid, status):
149 print "WORK STATUS: (%d) %s" % (workid, status)
150 - def workCompletion(self, server, workid, completion):
151 print "WORK COMPLETION: (%d) %d%%" % (workid, completion)
152 - def workDone(self, server, work):
153 print "WORK DONE: %d" % work.id
154 - def workFailed(self, server, work):
155 print "WORK FAILED: %d" % work.id
156 - def workTimeout(self, server, work):
157 print "WORK TIMEOUT: %d" % work.id
158 - def workCanceled(self, server, work):
159 print "WORK CANCELED %d" % work.id
160 161 import collections 162
163 -class ClusterServer:
164
165 - def __init__(self, name, maxsize=None, docode=False, bindsrc="", cobrad=None):
166 """ 167 The cluster server is the core of the code that manages work units. 168 169 Arguments: 170 maxsize - How big should the work queue be before add blocks 171 docode - Should we also be a dcode server? 172 bindsrc - Should we bind a src IP for our multicast announcements? 173 cobrad - Should we use an existing cobra daemon to share our objects? 174 """ 175 self.go = True 176 self.name = name 177 self.queens = [] 178 self.nextwid = 0 179 self.inprog = {} 180 self.sharedfiles = {} 181 self.maxsize = maxsize 182 self.queue = collections.deque() 183 self.qcond = threading.Condition() 184 self.widiter = iter(xrange(999999999)) 185 186 # Initialize a cobra daemon if needed 187 if cobrad == None: 188 cobrad = cobra.CobraDaemon(host="", port=0) 189 self.cobrad = cobrad 190 self.cobraname = self.cobrad.shareObject(self) 191 192 # Setup our transmission socket 193 self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 194 self.sendsock.bind((bindsrc, 0)) 195 196 # Set this to a ClusterCallback extension if 197 # you want notifications. 198 self.callback = None 199 200 if docode: 201 dcode.enableDcodeServer(daemon=self.cobrad) 202 203 # Fire the timeout monitor thread... 204 thr = threading.Thread(target=self.timerThread) 205 thr.setDaemon(True) 206 thr.start()
207
208 - def addClusterQueen(self, queenhost):
209 ''' 210 Inform the ClusterServer about the presence of a ClusterQueen instance 211 on the given host. When the ClusterServer begins to announce work, 212 he will do so in "infrastructure mode" and ask any set queens for help. 213 ''' 214 queen = cobra.CobraProxy('cobra://%s:%d/ClusterQueen' % (queenhost, queen_port)) 215 self.queens.append( queen )
216
217 - def shareFileToWorkers(self, filename):
218 ''' 219 Add a file to the list of files which are "shared" to worker clients. 220 This allows workers to access a file from the server. 221 222 Example: 223 s.shareFileToWorkers('/path/to/file') 224 225 NOTE: Workers may use the openSharedFile() API to access them. 226 ''' 227 self.sharedfiles[filename] = True
228
229 - def openSharedFile(self, filename):
230 ''' 231 Return a URI for an open file decriptor for the specified filename. 232 233 NOTE: use openSharedFile() method on work unit to get back a proxy. 234 ''' 235 if not self.sharedfiles.get(filename): 236 raise Exception('File %s is not shared!') 237 238 fd = file(filename, 'rb') 239 240 cname = self.cobrad.shareObject(fd, doref=True) 241 host,port = cobra.getLocalInfo() 242 243 uri = 'cobra://%s:%d/%s' % (host, port, cname) 244 return uri
245
246 - def __touchWork(self, workid):
247 # Used to both validate an inprog workid *and* 248 # update it's timestamp for the timeout thread 249 work = self.inprog.get(workid, None) 250 if work == None: 251 raise InvalidInProgWorkId(workid) 252 work.touch()
253
254 - def __cleanWork(self, workid):
255 # Used by done/timeout/etc to clea up an in 256 # progress work unit 257 return self.inprog.pop(workid, None)
258
259 - def timerThread(self):
260 # Internal function to monitor work unit time 261 while self.go: 262 try: 263 for id,work in self.inprog.items(): 264 if work.isTimedOut(): 265 self.timeoutWork(work) 266 267 except Exception, e: 268 print "ClusterTimer: %s" % e 269 270 time.sleep(2)
271
272 - def shutdownServer(self):
273 self.go = False
274
275 - def announceWork(self):
276 """ 277 Announce to our multicast cluster peers that we have work 278 to do! (Or use a queen to proxy to them...) 279 """ 280 if self.queens: 281 for q in self.queens: 282 try: 283 q.proxyAnnounceWork(self.name, self.cobraname, self.cobrad.port) 284 except Exception, e: 285 print('Queen Error: %s' % e) 286 287 else: 288 buf = "cobra:%s:%s:%d" % (self.name, self.cobraname, self.cobrad.port) 289 self.sendsock.sendto(buf, (cluster_ip, cluster_port))
290
291 - def runServer(self, firethread=False):
292 293 if firethread: 294 thr = threading.Thread(target=self.runServer) 295 thr.setDaemon(True) 296 thr.start() 297 298 else: 299 self.cobrad.fireThread() 300 while self.go: 301 302 if len(self.queue): 303 self.announceWork() 304 305 time.sleep(2)
306
307 - def inQueueCount(self):
308 """ 309 How long is the current work unit queue. 310 """ 311 return len(self.queue)
312
313 - def inProgressCount(self):
314 """ 315 How many work units are in progress? 316 """ 317 return len(self.inprog)
318
319 - def addWork(self, work):
320 """ 321 Add a work object to the ClusterServer. This 322 """ 323 if not isinstance(work, ClusterWork): 324 raise Exception("%s is not a ClusterWork extension!") 325 326 # If this work has no ID, give it one 327 if work.id == None: 328 work.id = self.widiter.next() 329 330 self.qcond.acquire() 331 if self.maxsize != None: 332 while len(self.queue) >= self.maxsize: 333 self.qcond.wait() 334 self.queue.append(work) 335 self.qcond.release() 336 337 if self.callback: 338 self.callback.workAdded(self, work)
339
340 - def getWork(self):
341 342 self.qcond.acquire() 343 344 try: 345 ret = self.queue.popleft() 346 except IndexError, e: 347 self.qcond.release() 348 return None 349 350 self.qcond.notifyAll() 351 self.qcond.release() 352 353 self.inprog[ret.id] = ret 354 self.__touchWork(ret.id) 355 356 if self.callback: 357 self.callback.workGotten(self, ret) 358 359 return ret
360 361
362 - def doneWork(self, work):
363 """ 364 Used by the clients to report work as done. 365 """ 366 self.__cleanWork(work.id) 367 368 work.done() 369 if self.callback: 370 self.callback.workDone(self, work)
371
372 - def timeoutWork(self, work):
373 """ 374 This method may be over-ridden to handle 375 work units that time our for whatever reason. 376 """ 377 self.__cleanWork(work.id) 378 if self.callback: 379 self.callback.workTimeout(self, work)
380
381 - def failWork(self, work):
382 """ 383 This is called for a work unit that is in a failed state. This is most 384 commonly that the work() method has raised an exception. 385 """ 386 self.__cleanWork(work.id) 387 if self.callback: 388 self.callback.workFailed(self, work)
389
390 - def cancelAllWork(self, inprog=True):
391 """ 392 Cancel all of the currently pending work units. You may 393 specify inprog=False to cancel all *queued* work units 394 but allow inprogress work units to complete. 395 """ 396 self.qcond.acquire() 397 qlist = list(self.queue) 398 self.queue.clear() 399 400 if inprog: 401 p = self.inprog 402 self.inprog = {} 403 qlist.extend(p.values()) 404 405 self.qcond.notifyAll() 406 self.qcond.release() 407 408 if self.callback: 409 for w in qlist: 410 self.callback.workCanceled(self, w)
411
412 - def cancelWork(self, workid):
413 """ 414 Cancel a work unit by ID. 415 """ 416 cwork = self.__cleanWork(workid) 417 418 # Remove it from the work queue 419 # (if we didn't find in inprog) 420 if cwork == None: 421 self.qcond.acquire() 422 qlist = list(self.queue) 423 self.queue.clear() 424 for work in qlist: 425 if work.id != workid: 426 self.queue.append(work) 427 else: 428 cwork = work 429 430 self.qcond.notifyAll() 431 self.qcond.release() 432 433 if cwork == None: 434 return 435 436 if self.callback: 437 self.callback.workCanceled(self, cwork)
438
439 - def setWorkStatus(self, workid, status):
440 """ 441 Set the humon readable status for the given work unit. 442 """ 443 self.__touchWork(workid) 444 if self.callback: 445 self.callback.workStatus(self, workid, status)
446
447 - def setWorkCompletion(self, workid, percent):
448 """ 449 Set the percentage completion status for this work unit. 450 """ 451 self.__touchWork(workid) 452 if self.callback: 453 self.callback.workCompletion(self, workid, percent)
454
455 -class ClusterClient:
456 457 """ 458 Listen for our name (or any name if name=="*") on the cobra cluster 459 multicast address and if we find a server in need, go help. 460 461 maxwidth is the number of work units to do in parallel 462 docode will enable code sharing with the server 463 """ 464
465 - def __init__(self, name, maxwidth=multiprocessing.cpu_count(), docode=False):
466 self.go = True 467 self.name = name 468 self.width = 0 469 self.maxwidth = maxwidth 470 self.verbose = False 471 self.docode = docode 472 473 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 474 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 475 self.sock.bind(("",cluster_port)) 476 mreq = struct.pack("4sL", socket.inet_aton(cluster_ip), socket.INADDR_ANY) 477 self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
478
479 - def processWork(self):
480 """ 481 Runs handing out work up to maxwidth until self.go == False. 482 """ 483 while self.go: 484 485 buf, sockaddr = self.sock.recvfrom(4096) 486 if self.width >= self.maxwidth: 487 continue 488 489 server, svrport = sockaddr 490 491 if not buf.startswith("cobra"): 492 continue 493 494 info = buf.split(":") 495 496 ilen = len(info) 497 if ilen == 4: 498 cc,name,cobject,portstr = info 499 elif ilen == 5: 500 cc,name,cobject,portstr,server = info 501 else: 502 continue 503 504 if (self.name != name) and (self.name != "*"): 505 continue 506 507 port = int(portstr) 508 509 uri = "%s://%s:%d/%s" % (cc,server,port,cobject) 510 self.fireRunner(uri)
511
512 - def fireRunner(self, uri):
513 thr = threading.Thread(target=self.threadForker, args=(uri,)) 514 thr.setDaemon(True) 515 thr.start()
516
517 - def threadForker(self, uri):
518 self.width += 1 519 cmd = sub_cmd % (uri, self.docode) 520 try: 521 sub = subprocess.Popen([sys.executable, '-c', cmd], stdin=subprocess.PIPE) 522 sub.wait() 523 finally: 524 self.width -= 1
525
526 -class ClusterQueen:
527
528 - def __init__(self, ifip, recast=True):
529 530 # Setup our transmission socket 531 self.sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 532 self.sendsock.bind((ifip, 0))
533 534 # FIXME TODO make her optionally a multicast listener that re-broadcasts 535 # to her subjects... 536
537 - def proxyAnnounceWork(self, name, cobraname, port):
538 """ 539 Send out a multicast announcement to our subjects to go help 540 a cluster server. 541 """ 542 # Get the host IP from the connection information 543 host, x = cobra.getCallerInfo() 544 buf = "cobra:%s:%s:%d:%s" % (name, cobraname, port, host) 545 self.sendsock.sendto(buf, (cluster_ip, cluster_port))
546
547 -def getHostPortFromUri(uri):
548 """ 549 Take the server URI and pull out the 550 host and port for use in building the 551 dcode uri. 552 """ 553 x = urllib2.Request(uri) 554 port = None 555 hparts = x.get_host().split(":") 556 host = hparts[0] 557 if len(hparts): 558 port = int(hparts[1]) 559 return host,port
560
561 -def workThread(server, work):
562 try: 563 work.server = server 564 work.starttime = time.time() 565 work.touch() 566 work.work() 567 work.endtime = time.time() 568 work.server.doneWork(work) 569 570 except InvalidInProgWorkId, e: # the work was canceled 571 pass # Nothing to do, the server already knows 572 573 except Exception, e: 574 # Tell the server that the work unit failed 575 work.excinfo = traceback.format_exc() 576 work.server.failWork(work) 577 traceback.print_exc()
578
579 -def runAndWaitWork(server, work):
580 581 work.touch() 582 thr = threading.Thread(target=workThread, args=(server, work)) 583 thr.setDaemon(True) 584 thr.start() 585 586 # Wait around for done or timeout 587 while True: 588 if work.isTimedOut(): 589 break 590 591 # If the thread is done, lets get out. 592 if not thr.isAlive(): 593 break 594 595 # If our parent, or some thread closes stdin, 596 # time to pack up and go. 597 if sys.stdin.closed: 598 break 599 600 time.sleep(2)
601
602 -def getAndDoWork(uri, docode=False):
603 604 # If we wanna use dcode, set it up 605 try: 606 if docode: 607 host,port = getHostPortFromUri(uri) 608 cobra.dcode.addDcodeServer(host, port=port) 609 610 # Use a cobra proxy with timeout/maxretry so we 611 # don't hang forever if the server goes away 612 proxy = cobra.CobraProxy(uri, timeout=60, retrymax=3) 613 614 work = proxy.getWork() 615 # If we got work, do it. 616 if work != None: 617 runAndWaitWork(proxy, work) 618 619 except Exception, e: 620 traceback.print_exc() 621 622 # Any way it goes we wanna exit now. Work units may have 623 # spun up non-daemon threads, so lets GTFO. 624 gc.collect() # Try to call destructors 625 sys.exit(0) # GTFO
626