- การเพิ่มจำนวนของ loop ที่ใช้ในการประมวลผล
- การปรับค่าตัวแปรใน loop ที่ใช้ในการประมวลผล
- การเพิ่มจำนวนของ process ที่ใช้ในการประมวลผล
การเพิ่มจำนวนของ loop ที่ใช้ในการประมวลผลจะใช้ฟังค์ชันสำหรับคำนวณเลขขกกำลัง 4 ตามโค้ดข้างต้น
def print_quad(num,loop):
""" function to print cube of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for i in range(0,loop):
data = num * num * num * num
print("Quad: {}".format(data))
และฟังค์ชันที่ใช้ทดสอบจะแบ่งเป็น 2 ส่วนคือ แบบเรียกใช้ปกติ และแบบเรียกใช้เป็น Multiprocessing""" function to print cube of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for i in range(0,loop):
data = num * num * num * num
print("Quad: {}".format(data))
loop = 1
NUM_WORKERS = cpu_count()/8
# Run tasks serially
start_time = timer()
for _ in range(NUM_WORKERS):
print_quad(25,loop)
end_time = timer()
print("Not use process time = %s" %(end_time - start_time))
# Run tasks using processes
start_time = timer()
processes = [Process(target=print_quad , args=(25,loop,)) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = timer()
print("Use process time = %s" %(end_time - start_time))
NUM_WORKERS = cpu_count()/8
# Run tasks serially
start_time = timer()
for _ in range(NUM_WORKERS):
print_quad(25,loop)
end_time = timer()
print("Not use process time = %s" %(end_time - start_time))
# Run tasks using processes
start_time = timer()
processes = [Process(target=print_quad , args=(25,loop,)) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = timer()
print("Use process time = %s" %(end_time - start_time))
โดยผลที่ได้มีดังนี้
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 1 process
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 4 processes
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 8 processes
จะพบว่าแบบ 1 process นั้นมีผลลัพธ์ที่ไม่แตกต่างกันเลยระหว่างการใช้และไม่ใช้ multiprocessing ซึ่งต่างจากแบบ 4 และ 8 processes ที่เวลาการทำงานของแบบใช้ multiprocessing นั้นลดลงไป 1 ใน 4 จากแบบที่ไม่ใช่้ multiprocessing
จึงทำการทดสอบเพิ่มเติมโดยใช้วิธีการเติมแต่เปลี่ยนเลขที่ใช้ในการคำนวณตามเลข index ของ loop ณ ขณะนั้น และเก็บผลรวมจากการคำนวณดังกล่าว เพื่อเพิ่มความซับซ้อนในการคำนวณตามโค้ดข้างต้น
def sum_quad(loop):
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
data = 0
for i in range(0,loop):
data = (i * i * i * i) + data
print("Sum of quad: {}".format(data))
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
data = 0
for i in range(0,loop):
data = (i * i * i * i) + data
print("Sum of quad: {}".format(data))
โดยผลที่ได้จากการปรับฟังค์ชันมีดังนี้
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 1 process ในการหาผลรวมกำลัง 4
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 4 processes ในการหาผลรวมกำลัง 4
ภาพผลการทำงานแบบใช้และไม่ใช้ Multiprocessing แบบ 8 processes ในการหาผลรวมกำลัง 4
จะเห็นได้ว่าผลลัพธ์ยังคงอยู่ในลักษณะเดิม ซึ่งจะได้ข้อสรุปจากผลดังกล่าวคือ
- การทำงานที่ภายในฟังค์ชันมีจำนวน loop น้อย หรือเป็นการเรียกใช้แบบ single function หรือ single process จะส่งผลให้เวลาการทำงานแบบใช้ Multiprocessing นั้นจะให้ผลที่ใกล้เคียงกันกับแบบไม่ใช้ Multiprocessing
- การทำงานที่ภายในฟังค์ชันมีจำนวน loop สูงมากๆ หรือเป็นการเรียกใช้แบบ multiple function หรือ multiple process จะส่งผลให้เวลาการทำงานแบบใช้ Multiprocessing ลดลงเมื่อเทียบกับแบบไม่ใช้ Multiprocessing
1. ตัวแปรที่ใช้ภายใน process นั้นจะมีค่า address เป็นของตัวเอง ทำให้ถึงแม้จะใช้ตัวแปรประเภท global variable เพื่อรับหรือส่งข้อมูลระหว่าง process
โดยตัวอย่างที่ทดสอบจะเป็นการคำนวณค่าจากตัวแปร array ที่ส่งไปและทำการบันทึกค่าใหม่ที่ได้ลงใน array ที่เป็นตัวแปร global variable
square_result = []
def print_square(number):
""" function to print square of given num """
global square_result
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for n in (number):
square = n * n
print("Square : %s" %(square))
square_result.append(square)
print("Inner process result : %s" % square_result)
if __name__ == "__main__":
array = [1,2,4,6,8]
# creating processes
p1 = Process(target=print_square, args=(array,))
# starting process 1
p1.start()
# wait until process 1 is finished
p1.join()
print("Outer process result : %s" % square_result)
print("Done!!")
def print_square(number):
""" function to print square of given num """
global square_result
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for n in (number):
square = n * n
print("Square : %s" %(square))
square_result.append(square)
print("Inner process result : %s" % square_result)
if __name__ == "__main__":
array = [1,2,4,6,8]
# creating processes
p1 = Process(target=print_square, args=(array,))
# starting process 1
p1.start()
# wait until process 1 is finished
p1.join()
print("Outer process result : %s" % square_result)
print("Done!!")
ซึ่งผลการทดลองที่ได้เป็นดังนี้
จะพบว่าเมื่อ process ทำงานเสร็จสิ้นแล้วทำการ print ผลลัพธ์ square_result ออกมาพบว่าไม่มีข้อมูลใน array เลยถึงแม้ภายใน process จะประมวลผลเสร็จแล้วและแสดงผลว่าตัวแปร square_result มีค่าจริงๆ เป็นข้อพิสูจน์ว่าไม่สามารถใช้ตัวแปร global ในการรับหรือส่งค่าภายนอก process ได้ หากต้องการจะส่งค่าจำเป็นต้องใช้ Interprocess Communication (IPC) เช่น การใช้ Queue ในการรับส่งค่าระหว่าง process หรือภายนอก process ดังนี้
square_result = []
def print_square(number,q):
""" function to print square of given num """
global square_result
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for n in (number):
square = n * n
print("Square : %s" %(square))
square_result.append(square)
# put array in queue
q.put(square_result)
print("Inner process result : %s" % square_result)
if __name__ == "__main__":
array = [1,2,4,6,8]
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_square, args=(array,q,))
# starting process 1
p1.start()
# wait until process 1 is finished
p1.join()
# get array from queue
square_result = q.get()
print("Outer process result : %s" % square_result)
print("Done!!")
def print_square(number,q):
""" function to print square of given num """
global square_result
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for n in (number):
square = n * n
print("Square : %s" %(square))
square_result.append(square)
# put array in queue
q.put(square_result)
print("Inner process result : %s" % square_result)
if __name__ == "__main__":
array = [1,2,4,6,8]
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_square, args=(array,q,))
# starting process 1
p1.start()
# wait until process 1 is finished
p1.join()
# get array from queue
square_result = q.get()
print("Outer process result : %s" % square_result)
print("Done!!")
ซึ่งได้ผลลัพธ์ดังนี้
ผลลัพธ์รอบนี้จะพบว่าทั้งภายในและภายนอก process จะให้ผลเดียวกันตามที่เราต้องการ
อีกข้อจำกัดของ Multiprocessing นั้นคือ หากมี process ที่ทำงานพร้อมกันมากกว่า 1 ตัว หากมี process ตัวใดตัวหนึ่งที่ยังทำงานไม่เสร็จ จะไม่สามารถนำค่าที่ต้องการออกมาใช้ได้ ถึงแม้ว่า process ที่คำนวณค่านั้นจะเสร็จสิ้นแล้วก็ตาม
ในการทดลองนี้จะใช้ only_sleep ซึ่งเป็นฟังค์ชันสำหรับหน่วงเวลา โดยจะหน่วงไว้ที่ 10 วินาที
def only_sleep():
# Do nothing, wait for a timer to expire
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
wait = 10
for i in range(wait):
print("Countdown : %s" %(wait-i))
time.sleep(1)
print("Countdown Complete")
# Do nothing, wait for a timer to expire
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
wait = 10
for i in range(wait):
print("Countdown : %s" %(wait-i))
time.sleep(1)
print("Countdown Complete")
ในส่วนของการสร้าง process จะเพิ่ม process ตัวที่ 2 สำหรับเรียกใช้ only_sleep เพื่อจำลองเวลาการทำงานที่แตกต่างกันดังนี้
if __name__ == "__main__":
array = [1,2,4,6,8]
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_square, args=(array,q,))
p2 = Process(target=only_sleep)
# starting 2 processes
p1.start()
p2.start()
# wait until 2 processes are finished
p1.join()
p2.join()
# get array from queue
square_result = q.get()
print("Outer process result : %s" % square_result)
print("Done!!")
array = [1,2,4,6,8]
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_square, args=(array,q,))
p2 = Process(target=only_sleep)
# starting 2 processes
p1.start()
p2.start()
# wait until 2 processes are finished
p1.join()
p2.join()
# get array from queue
square_result = q.get()
print("Outer process result : %s" % square_result)
print("Done!!")
ผลลัพธ์ที่ได้จากการทดลองมีดังนี้
จากผลจะพบว่า ถึงแม้ process ที่คำนวณผลจะเสร็จสิ้นไปแล้วแต่อีก process ที่เป็นตัวหน่วงเวลายังทำงานไม่เสร็จทำให้ถึงผลจะพร้อมใช้แสดงก็จะไม่สามารถนำไปใช้ได้จนกว่าทุก process จะทำงานเสร็จสิ้น ซึ่งเป็นผลมาจากการใช้ join() นั่นเอง ซึ่งเมื่อนำ join() ของ process 2 ออกจะทำให้สามารถนำข้อมูลของ process 1 ไปใช้ได้ทันทีดังรูป
จะพบว่า เมื่อ process 1 ทำงานเสร็จสิ้นแล้ว จะออกสู่การทำงานนอก process ทันทีทำให้สามารถ print ค่านอก process ได้ถึงแม้ว่า process 2 จะยังทำงานไม่เสร็จก็ตาม
ข้อจำกัดอีกจุดคือการทำงานของ process คือ หากเราต้องการแยกข้อมูลออกเป็นส่วนๆเพื่อช่วยประมวลผลและต้องการผลที่เป็นลำดับที่ถูกต้องเช่น เราส่งเลข 17, 23, 79, 87 เข้าไปประมวลผลคูณกำลัง 2 สิ่งที่ได้ออกมาควรเป็น 289, 529, 6241, 7569 ตามลำดับ
โดยโค้ดทดสอบจะทำการส่งเลขที่ต้องการคำนวณไปยัง process แต่ละตัวที่ใช้ฟังค์ชันกำลัง 2 เหมือนกัน และบันทึกผลลง queue เมื่อเสร็จสิ้นการทำงาน
def print_number(num,q):
""" function to print square of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for i in range(4000000):
square = num * num
q.put(square)
print("Square : %s" %(square))
print("Process Name: %s Finish." % (current_process().name))
if __name__ == "__main__":
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_number, args=(17,q,))
p2 = Process(target=print_number, args=(23,q,))
p3 = Process(target=print_number, args=(79,q,))
p4 = Process(target=print_number, args=(87,q,))
# starting 4 processes
p1.start()
p2.start()
p3.start()
p4.start()
# wait until 4 processes are finished
p1.join()
p2.join()
p3.join()
p4.join()
for i in range(q.qsize()):
square_result.append(q.get())
print("Outer process result : %s" % square_result)
print("Done!!")
""" function to print square of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
for i in range(4000000):
square = num * num
q.put(square)
print("Square : %s" %(square))
print("Process Name: %s Finish." % (current_process().name))
if __name__ == "__main__":
# creating process queue
q = Queue()
# creating processes
p1 = Process(target=print_number, args=(17,q,))
p2 = Process(target=print_number, args=(23,q,))
p3 = Process(target=print_number, args=(79,q,))
p4 = Process(target=print_number, args=(87,q,))
# starting 4 processes
p1.start()
p2.start()
p3.start()
p4.start()
# wait until 4 processes are finished
p1.join()
p2.join()
p3.join()
p4.join()
for i in range(q.qsize()):
square_result.append(q.get())
print("Outer process result : %s" % square_result)
print("Done!!")
ซึ่งผลลัพธ์ที่ได้เป็นดังนี้
จะพบว่าในการทำงานครั้งแรกลำดับการทำงานยังคงปกติอยู่ แต่เนื่องจากในบางครั้ง process อาจใช้เวลาในการทำงานไม่เท่ากัน ทำให้อาจมีบาง process ที่ทำงานเสร็จก่อนทำให้ลำดับผิดเพี้ยน ซึ่งจากในรูปจะพบว่าลำดับของ process ที่ทำงานเสร็จก่อนนั้นคือ 2, 1, 3, 4 แทนที่จะเป็น 1, 2, 3, 4 ตามลำดับ ทำให้หากต้องนำข้อมูลไปคำนวณต่อและต้องเรียงลำดับอย่างถูกต้อง อาจก่อให้เกิดปัญหากับระบบได้
แนวทางแก้ไขปัญหานี้ทำได้โดย ส่งข้อมูลที่เราต้องการไปประมวลผลพร้อมกับลำดับของข้อมูลนั้นๆ เมื่อ process ทุกตัวทำงานเสร็จสิ้นแล้ว ให้นำข้อมูลที่ได้มา sort จะทำให้ลำดับของผลลัพธ์กลับมาถูกต้องอีกครั้ง
ในการทดสอบครั้งนี้ได้ทำการปรับปรุงฟังค์ชันการคำนวณกำลัง 2 ให้รับข้อมูล array ที่บรรจุ index ของ array และข้อมูลที่จะใช้คำนวณไว้ เมื่อทำการดึงข้อมูลมาคำนวณเสร็จสิ้นจะทำการบรรจุกลับไปใน array ก่อนใส่ลง queue และหลังจากได้ข้อมูลจากทุก process แล้วจะนำข้อมูลใน queue มาเก็บลง array ที่เตรียมไว้แล้วใช้ sort() เพื่อจัดเรียง
def print_number(array,q):
""" function to print square of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
num = array.pop()
for i in range(4000000):
square = num * num
array.append(square)
q.put(array)
print("Square : %s" %(square))
print("Process Name: %s Finish." % (current_process().name))
if __name__ == "__main__":
# creating process queue
q = Queue()
array = [[1,17],[2,23],[3,79],[4,87]]
# creating processes
p1 = Process(target=print_number, args=(array[0],q,))
p2 = Process(target=print_number, args=(array[1],q,))
p3 = Process(target=print_number, args=(array[2],q,))
p4 = Process(target=print_number, args=(array[3],q,))
# starting 4 processes
p1.start()
p2.start()
p3.start()
p4.start()
# wait until 4 processes are finished
p1.join()
p2.join()
p3.join()
p4.join()
for i in range(q.qsize()):
square_result.append(q.get())
square_result.sort()
print("Outer process result : %s" % square_result)
print("Done!!")
ซึ่งผลการทดลองที่ได้มีดังนี้""" function to print square of given num """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
current_process().name,
threading.current_thread().name))
num = array.pop()
for i in range(4000000):
square = num * num
array.append(square)
q.put(array)
print("Square : %s" %(square))
print("Process Name: %s Finish." % (current_process().name))
if __name__ == "__main__":
# creating process queue
q = Queue()
array = [[1,17],[2,23],[3,79],[4,87]]
# creating processes
p1 = Process(target=print_number, args=(array[0],q,))
p2 = Process(target=print_number, args=(array[1],q,))
p3 = Process(target=print_number, args=(array[2],q,))
p4 = Process(target=print_number, args=(array[3],q,))
# starting 4 processes
p1.start()
p2.start()
p3.start()
p4.start()
# wait until 4 processes are finished
p1.join()
p2.join()
p3.join()
p4.join()
for i in range(q.qsize()):
square_result.append(q.get())
square_result.sort()
print("Outer process result : %s" % square_result)
print("Done!!")
จะพบว่าถึง process ในการคำนวณค่านั้นจะไม่เป็นลำดับ แต่ผลที่ได้ออกมานั้นเป็นลำดับตามที่เราต้องการแล้ว
อีกวิธีที่น่าสนใจในการแบ่งการประมวลผลการทำงานคือการใช้ Multiprocessing Pool ซึ่งจะนำเสนอในบทความถัดไป












ไม่มีความคิดเห็น:
แสดงความคิดเห็น