python - recv_pyobj behaves different from Thread to Process -
trying read data sockets on 6 different proceses @ time perfomance sake. made test opening 6 threads , socket read each 1 , test opening 6 sub-processes , read different socket. thread reading works fine , looks this:
class zmqserver: context = zmq.context() socket = none zmqthread = none def __init__(self, port, max_size): self.socket = self.context.socket(zmq.sub) self.socket.setsockopt(zmq.subscribe, '') self.socket.connect("tcp://127.0.0.1:" + str(port)) def startasync(self): zmqthread = threading.thread(target=self.start) zmqthread.start() def start(self): print "zmqserver:wait next request client on port: %d" % self.port while true: print "running loop" try: message = self.socket.recv_pyobj() except: print "zmqserver:error receiving messages" if __name__ == '__main__': zmqservers = [none] * 6 idx in range (0, 6): zmqservers[idx] = zmqserver(dtypes.ports_recregister[idx], 1024) zmqservers[idx].startasync()
this display:
zmqserver:wait next request client on port: 4994
running loop
zmqserver:wait next request client on port: 4995
running loop
zmqserver:wait next request client on port: 4996
running loop
zmqserver:wait next request client on port: 4997
running loop
zmqserver:wait next request client on port: 4998
running loop
zmqserver:wait next request client on port: 4999
important: receive data on socket wen send it.
now, need acheieve same behaviour instead of threads use processes octa core use more of processor. code looks this:
context = zmq.context() def createsocket(port): socket = context.socket(zmq.sub) socket.setsockopt(zmq.subscribe, '') socket.connect("tcp://127.0.0.1:" + str(port)) def listen(socket, port): print "zmqserver:wait next request client on port: %d" % port while true: print "running loop" try: message = socket.recv_pyobj() print "zmqserver:received request: %s" % message except: print "zmqserver:error receiving messages" continue #i'm trying first 1 process - 1 socket: if __name__ == '__main__': port = dtypes.ports_recregister[0] socket = createsocket(port) proc = process(target=listen, args=(socket, port)) proc.start() proc.join()
the output strange:
zmqserver:wait next request client on port: 4994
running loop
zmqserver:error receiving messages
running loop
zmqserver:error receiving messages
running loop
zmqserver:error receiving messages
............
and important not receive data on socket when send it
so, can understand:
1. method enters , on every while loop socket changed?
2. or recv_pyobj not blocking anymore?
experienced before? know how correclty multi-process socket reading?
thank you
actually, problem because of passing socket type argument process/thread. seems pickling, behind scenes serializes data passed callback method not serialize socket argument.
so, changed not pass socket argument:
context = zmq.context() def listetosocket(port): socket = context.socket(zmq.sub) socket.setsockopt(zmq.subscribe, '') socket.connect("tcp://127.0.0.1:" + str(port)) print "wait data on port: %d" % port while true: try: message = socket.recv_pyobj() print "port:%d received data: %s" % (port, message) except: print "port:%d: error receiving messages" % port if __name__ == '__main__': idx in range(0, dtypes.socket_max): proc1 = process(target=listetosocket, args=(dtypes.ports_recregister[idx], )) proc1.start()
Comments
Post a Comment