java - Concurrently Pair Matches -
i'm looking java concurrency idiom pair matches large number elements highest throughput.
consider have "people" coming in multiple threads. each "person" looking match. when finds waiting "person" matches both assigned each other , removed processing.
i don't want lock big structure change states. consider person has getmatch , setmatch. before being submitted each person's #getmatch null. when unblocked (or fished) either have expired because waited long match or #getmatch non null.
some problems keeping high through put if persona submitted @ same time personb. match each other personb matches waiting personc. personb's state changes "available" when submitted. persona needs not accidentally personb while personb being matched personc. make sense? also, want in way works asynchronously. in other words, don't want each submitter have hold on person on thread waitformatch type thing.
again, dont want requests have run on separate threads, it's okay if there 1 additional match maker thread.
seems there should idioms appears pretty common thing. google searches have come dry (i may using wrong terms).
update
there couple of things make problem hard me. 1 don't want have objects in memory, i'd have waiting candidates in redis or memcache or that. other person have several possible matches. consider interface following:
person.getid(); // lets call integer person.getfriendids(); // collection of other person ids
then have server looks this:
matchserver: submit( personid, expiration ) -> void // non-blocking returns isdone( personid ) -> boolean // either expired or found match getmatch( personid ) -> matchid // non-blocking
this rest interface , use redirects until got result. first thought have cache in matchserver backed redis , has concurrent weak value hash map objects locked , being acted on. each personid wrapped persistent state object states submitted, matched, , expired.
following far? pretty simple, submit code did initial work, this:
public void submit( person p, long expiration ) { matchstatus incoming = new matchstatus( p.getid(), expiration ); if ( !trymatch( incoming, p.getfriendids() ) ) cache.put( p.getid(), incoming ); } public boolean isdone( integer personid ) { matchstatus status = cache.get( personid ); status.lock(); try { return status.ismatched() || status.isexpired(); } { status.unlock(); } } public boolean trymatch( matchstatus incoming, iterable<integer> friends ) { ( integer friend : friends ) { if ( match( incoming, friend ) ) return true; } return false; } private boolean match( matchstatus incoming, integer waitingid ) { callstatus waiting = cache.get( waitingid ); if ( waiting == null ) return false; waiting.lock(); try { if ( waiting.ismatched() ) return false; waiting.setmatch( incoming.getid() ); incoming.setmatch( waiting.getid() ); return true } { waiting.unlock(); } }
so problem here if 2 people come in @ same time , matches wont find each other. race condition right? way see solve synchronize "trymatch()". kills throughput. can't have trymatch indefinitely loop because need these short calls.
so better way approach this? every solution come forces people in 1 @ time isn't great throughput. example, creating background thread , using blocking queue putting , taking incoming 1 @ time.
any guidance appreciated.
you might able use concurrenthashmap
. i'm assuming objects have keys can match on, e.g. persona , personb have "person" key.
concurrenthashmap<string, match> map = new concurrenthashmap<>(); void addmatch(match match) { boolean success = false; while(!success) { match oldmatch = map.remove(match.key); if(oldmatch != null) { match.setmatch(oldmatch); success = true; } else if(map.putifabsent(match.key, match) == null) { success = true; } } }
you'll keep looping until either add match map, or until you've removed existing match , paired it. remove
, putifabsent
both atomic.
edit: because want offload data disk, can use e.g. mongodb end, findandmodify method. if object key exists, command remove , return can pair old object new object , presumably store pair associated new key; if object key doesn't exist command stores object key. equivalent behavior of concurrenthashmap
except data stored on disk instead of in memory; don't need worry 2 objects writing @ same time, because findandmodify
logic prevents them inadvertently occupying same key.
use jackson if need serialize objects json.
there alternatives mongo, e.g. dynamodb, although dynamo free small amounts of data.
edit: given friends lists not reflexive, think can solve combination of mongodb (or key-value database atomic updates) , concurrenthashmap
.
- persons in mongodb either "matched" or "unmatched." (if "remove person mongodb", mean "set person's state 'matched.'")
- when add new person, first create
concurrenthashmap<key, boolean>
it, in globalconcurrenthashmap<key, concurrenthashmap<key, boolean>>
. - iterate through new person's friends:
- if friend in mongodb, use
findandmodify
atomically set "matched," write new person mongodb state of "matched," , add pair "pairs" collection in mongodb can queried end user. remove person'sconcurrenthashmap
global map. - if friend isn't in mongodb, check see if friend has written current friend's associated
concurrenthashmap
. has, nothing; if has not, check see if friend hasconcurrenthashmap
associated it; if does, set value associated current person's key "true." (note it's still possible 2 friends have written each others' hash maps since current person can't check own map , modify friend's map 1 atomic operation, self hash map check reduces possibility.) - if person hasn't been matched, write mongodb in "unmatched" state, remove
concurrenthashmap
global map, , create delayed task iterate through ids of of friends wrote person'sconcurrenthashmap
(i.e. usingconcurrenthashmap#keyset()
). delay on task should random (e.g.thread.sleep(500 * rand.nextint(30))
) 2 friends won't attempt match @ same time. if current person doesn't have friends needs re-check, don't create delayed task it. - when delay up, create new concurrenthashmap person, remove unmatched person mongodb, , loop step 1. if person matched, don't remove mongodb , terminate delayed task.
in common case, person either matches friend, or else fails match without friend having been added system while iterating through list of friends (i.e. person's concurrenthashmap
empty). in case simultaneous writes of friends:
friend1 , friend2 added @ same time.
- friend1 writes friend2's
concurrenthashmap
indicate missed each other. - friend2 writes friend1's
concurrenthashmap
indicate same (this occur if friend2 check see friend1 wrote map @ same time friend1 writing - ordinarily friend2 detect friend1 had written map , not write friend1's map). - friend1 , friend2 both write mongodb. friend1 randomly gets 5 second delay on followup task, friend2 randomly gets 15 second delay.
- friend1's task fires first, , matches friend2.
- friend2's task fires second; friend2 no longer in mongodb, task terminates.
a few hiccups:
- it's possible friend1 , friend2 don't both have
concurrenthashmaps
associated them, e.g. if friend2 still initializing hash map @ time friend1 checks see if map in memory. fine, because friend2 write friend1's hash map , we're guaranteed match attempted - @ least 1 of them have hash map while other iterating, since hash map creation precedes iteration. - the second iteration of match may fail if both friends' tasks somehow fired @ same time. in case, person should remove friends list if in mongodb in matched state; should take union of resulting list list of friends wrote
concurrenthashmap
, , next iteration should use new friend list. person matched, or else person's "re-check" friends list emptied. - you should increase task delay on each subsequent iteration in order increase probability 2 friends' tasks won't run simultaneously (e.g.
thread.sleep(500 * rand.nextint(30))
on first iteration,thread.sleep(500 * rand.nextint(60))
on second iteration,thread.sleep(500 * rand.nextint(90))
on third, etc). - on subsequent iterations, must create person's new
concurrenthashmap
before removing person mongodb, otherwise you'll have data race. likewise, must remove person mongodb while you're iterating through potential matches, otherwise might inadvertently match twice.
edit: code:
the method addunmatchedtomongo(person1)
writes "unmatched" person1 mongodb
settomatched(friend1)
uses findandmodify
atomically set friend1
"matched"; method return false if friend1
matched or doesn't exist, or return true if update successful
ismatched(friend1)
returns true if friend1
exists , matched, , returns false if doesn't exist or exists , "unmatched"
private concurrenthashmap<string, concurrenthashmap<string, person>> globalmap; private delayqueue<delayedretry> delayqueue; private threadpoolexecutor executor; executor.execute(new runnable() { public void run() { while(true) { runnable runnable = delayqueue.take(); executor.execute(runnable); } } } public static void findmatch(person person, collection<person> friends) { findmatch(person, friends, 1); } public static void findmatch(person person, collection<person> friends, int delaymultiplier) { globalmap.put(person.id, new concurrenthashmap<string, person>()); for(person friend : friends) { if(**settomatched(friend)**) { // write person mongodb in "matched" state // write "pair(person, friend)" mongodb can queried end user globalmap.remove(person.id); return; } else { if(**!ismatched(friend)** && globalmap.get(person.id).get(friend.id) == null) { // existence of "friendmap" indicates thread trying match friend concurrenthashmap<string, person> friendmap = globalmap.get(friend.id); if(friendmap != null) { friendmap.put(person.id, person); } } } } **addunmatchedtomongo(person)**; collection<person> retryfriends = globalmap.remove(person.id).values(); if(retryfriends.size() > 0) { delayqueue.add(new delayedretry(500 * new random().nextint(30 * delaymultiplier), person, retryfriends, delaymultiplier)); } } public class delayedretry implements runnable, delayed { private final long delay; private final person person; private final collection<person> friends; private final int delaymultiplier; public delayedretry(long delay, person person, collection<person> friends, delaymultiplier) { this.delay = delay; this.person = person; this.friends = friends; this.delaymultiplier = delaymultiplier; } public long getdelay(timeunit unit) { return unit.convert(delay, timeunit.milliseconds); } public void run { findmatch(person, friends, delaymultiplier + 1); } }
Comments
Post a Comment