Ray only using two threads?

I wrote this

@ray.remote
def check(word, words):
    valid_ciphertexts = []
    for key in range(26):
        ciphertext = shift(word, key)
        if ciphertext in words:
            valid_ciphertexts.append(ciphertext)
        else:
            valid_ciphertexts.append(None)
    return valid_ciphertexts


if __name__ == '__main__':
    words = set()
    with open(sys.argv[1], 'r') as lexicon:
        for word in lexicon:
            words.add(word.strip())
    ray.init()
    results = ray.get([check.remote(word, words) for word in words])
    with open(sys.argv[2], 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([key for key in range(26)])
        writer.writerows(results)

I was expecting to see high usage across all my cores, due to parallelising the hundreds of thousands of calls to check, but the dashboard is showing this:

Why is it like this?

cc @Alex any idea why this happens?

Also @theonlygusti can you try adding things like time.sleep(10) within your check method and see if the issue is reproducible?

Can you try

    ray.init()
    words_ref = ray.put(words)
    results = ray.get([check.remote(word, words_ref) for word in words])

I’m wondering if this is because words it’s taking too long to keep re-serializing words

Yep! Have a look at this, totally different:

I’m confused as to why there’s a difference here to be honest, I thought Python was pass-by-reference anyway?

Yeah the issue is that when you pass an argument into a remote function, Ray needs to put the argument in the object store, since the task can run on any worker. The issue is that Ray doesn’t have a way of knowing if words has changed between calls, so it keeps re-putting it in the object store.

Adding on to Alex’s comment, yes it is right python is pass-by-reference by default, but when you pass objects to ray tasks, it is technically not python function (it is Ray’s remote methods), and it is pass-by-value by default, and if you’d like to pass a reference, you should use Object Reference, which can be created by ray.put (or as a return value of other tasks). Try

a = ray.put(obj)
print(type(a))