voile.sync

sync モジュール
よく使う同期用クラスのインターフェースを利用可能。
  • Light
  • NamedMutex
Date: July 29, 2009
Authors: P.Knowledge, SHOO
License: NYSL ( http://www.kmonos.net/nysl/ )
class SyncEvent;
同期イベントクラス
Windowsの CreateEvent や SetEvent のラッパー Windows以外の環境でも動作するが、最適な実装ではないかもしれない。
Example:
SyncEvent[3] ev;
int data;
void run1()
{
	data = 1;
	ev[0].signal = true;
}

void run2()
{
	data = 2;
	ev[1].signal = true;
}

void run3()
{
	data = 3;
	ev[2].signal = true;
}
void main()
{
	ev[] = [new Light, new Light, new Light];
	scope t = new ThreadGroup;
	data = 0;
	t.create(&run1);
	ev[0].wait;
	assert(data == 1);
	data = 0;
	t.create(&run2);
	ev[1].wait;
	assert(data == 2);
	data = 0;
	t.create(&run3);
	ev[2].wait;
	assert(data == 3);
}
@property Condition handle();
ハンドルを得る
ただしOS依存する処理をする場合にのみ使用すること
this(bool firstCondition = false);
コンストラクタ
Parameters:
bool firstCondition 初期状態
@property bool signaled();
シグナル状態を返す
Returns: trueならシグナル状態で、waitはすぐに制御を返す falseなら非シグナル状態で、waitしたらシグナル状態になるか、時間が 過ぎるまで制御を返さない状態であることを示す。
@property void signaled(bool cond);
シグナル状態を設定する
Parameters:
bool cond
trueならシグナル状態にし、waitしているスレッドの制御を返す。 falseなら非シグナル状態で、waitしたらシグナル状態になるまで制御を 返さない状態にする。
const void wait();
シグナル状態になるまで待つ
conditionがtrueならシグナル状態であり、すぐに制御が返る。 conditionがfalseなら非シグナル状態で、シグナル状態になるまで制御を 返さない。
bool wait(Duration dur);
シグナル状態になるまで待つ
conditionがtrueならシグナル状態であり、すぐに制御が返る。 conditionがfalseなら非シグナル状態で、シグナル状態になるか、時間が 過ぎるまで制御を返さない。
class NamedMutex: object.Object.Monitor;
名前付きミューテックス
プロセス間で共有される名前付きミューテックスの作成を行う。
this(string aName);
コンストラクタ
Parameters:
string aName 名前付きミューテックスの名前を指定する。名前は128文字以内。
const pure nothrow @nogc @property @safe string name();
名前を返す。
nothrow @nogc void lock();
ロックする
ロックが成功するまで制御は返らない
nothrow @nogc bool tryLock();
ロックの試行
即座に制御が返る。 trueが帰った場合ロックが成功している。 falseなら別のMutexにロックされているため、ロックされなかった。
nothrow @nogc void unlock();
ロック解除
class QueuedMutex: object.Object.Monitor;
this();
shared this();
@trusted void lock();
shared @trusted void lock();
@trusted bool tryLock();
shared @trusted bool tryLock();
@trusted void unlock();
shared @trusted void unlock();
class QueuedSemaphore;
this(size_t count = 0);
shared this(size_t count = 0);
@trusted void wait();
shared @trusted void wait();
@trusted bool tryWait();
shared @trusted bool tryWait();
@trusted void notify();
shared @trusted void notify();
class Future(Ret);
Examples: ditto
auto future = new Future!int;
future.perform(delegate (int a) => a + 10, 10);
assert(future.yieldForce() == 20);
future.perform(taskPool, delegate (int a) => a + 20, 10);
assert(future.yieldForce() == 30);
static int foo(int a) { return a + 30; }
future.perform!foo(10);
assert(future.yieldForce() == 40);
future.perform!foo(taskPool, 10);
assert(future.yieldForce() == 40);

auto future2 = future.perform(delegate (int a) => a + 10, 10)
	.then((int a) => cast(ulong)(a + 20))
	.then(a => a + 20)
	.then!((ulong a) => a + 60)()
	.then(taskPool, a => cast(int)(a + 20))
	.then!((ref int a) => a + 60)(taskPool);
auto future3 = future2
	.then((int a){ assert(a == 200); })
	.then(taskPool, (){  })
	.then((){  })
	.then!((){  })
	.then!((){  })(taskPool);
assert(future2.yieldForce() == 200);
future3.join();
Examples: ditto
Exception lastEx;
auto feature = async({
	throw new Exception("Ex1");
}).then({
	assert(0);
}, (Exception e){
	lastEx = e;
});
try
{
	feature.join(true);
}
catch (Exception e)
{
	assert(lastEx.msg == "Ex1");
	assert(lastEx is e);
}
Examples: ditto
import std.exception;
Exception e1, e2;
auto future1 = async({
	throw new Exception("Ex1");
});
auto future2 = future1.then({
	assert(0);
}, (Exception e)
{
	// future1の例外処理
	e1 = e;
});

// future1でEx1が投げられている
e2 = future1.join(true).collectException();
assert(e2.msg == "Ex1");
// future2もEx1が投げられたことになっている
e2 = future2.join(true).collectException();
assert(e2.msg == "Ex1");
assert(e1 is e2);
Examples: ditto
import std.exception;
Exception e1, e2;
auto future1 = async(
{
	// future1の処理
});
auto future2 = future1.then(
{
	// feature1の後続処理
	throw new Exception("Ex1");
}, (Exception e)
{
	// future1の例外処理
	e1 = e;
});
auto future3 = future2.then(
{
	// feature2の後続処理
	throw new Exception("Ex2");
}, (Exception e)
{
	// future2の例外処理
	e2 = e;
});

// future1では例外が投げられない
auto e3 = future1.join(true).collectException();
assert(e1 is null);
assert(e3 is null);
// future2ではEx1例外が投げられる
auto e4 = future2.join(true).collectException();
assert(e2 !is null);
assert(e2 is e4);
assert(e2.msg == "Ex1");
// (Ex2は投げられない)
this();
this(SyncEvent evStart);
this(SyncEvent evStart, TaskPool pool);
this(ResultType val, SyncEvent evStart = null);
コンストラクタ
auto perform(alias func, Args...)(TaskPool pool, Args args)
if(is(typeof(func(args)) == ResultType));
auto perform(alias func, Args...)(Args args)
if(is(typeof(func(args)) == ResultType));
auto perform(F, Args...)(TaskPool pool, F dg, Args args)
if(is(typeof(dg(args)) == ResultType));
auto perform(F, Args...)(F dg, Args args)
if(is(typeof(dg(args)) == ResultType));
終了したら呼ばれる
auto then(Ret2)(TaskPool pool, Ret2 delegate(ResultType) callbackFinished, void delegate(Exception e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(!is(Ret == void) && is(typeof(callbackFinished(_resultRaw))));
auto then(Ret2)(Ret2 delegate(ResultType) callbackFinished, void delegate(Exception e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(!is(Ret == void) && is(typeof(callbackFinished(_resultRaw))));
auto then(alias func, Ex = Exception)(TaskPool pool, void delegate(Ex e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(!is(Ret == void) && is(typeof(func(_resultRaw))) && is(Ex == Exception));
auto then(alias func, Ex = Exception)(void delegate(Ex e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(!is(Ret == void) && is(typeof(func(_resultRaw))) && is(Ex == Exception));
auto then(Ret2)(TaskPool pool, Ret2 delegate() callbackFinished, void delegate(Exception e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(is(Ret == void) && is(typeof(callbackFinished())));
auto then(Ret2)(Ret2 delegate() callbackFinished, void delegate(Exception e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(is(Ret == void) && is(typeof(callbackFinished())));
auto then(alias func, Ex = Exception)(TaskPool pool, void delegate(Ex e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(is(Ret == void) && is(typeof(func())) && is(Ex == Exception));
auto then(alias func, Ex = Exception)(void delegate(Ex e) nothrow callbackFailed = null, void delegate(Throwable e) nothrow callbackFatal = null)
if(is(Ret == void) && is(typeof(func())) && is(Ex == Exception));
チェーン
FinishedHandler.HandlerProcId addListenerFinished(CallbackType dg);
終了したら呼ばれるコールバックをハンドラに登録
指定されたコールバックは並列処理が正常終了したときにのみ呼び出される。 並列処理がまだ終了していない場合には並列処理を行っていたスレッドでコールバックが呼び出されるが、 すでに並列処理が終了していた場合には現在のスレッドで即座にコールバックが呼び出される。 すでに終了していて、かつコールバック内で例外が発生した場合には、Failed, Fatalのハンドラが呼び出され、 Futureの状態も各々の状態へと変化する。
Parameters:
CallbackType dg 設定するコールバックを指定する。nullを指定したらハンドラに登録されたすべてのコールバックをクリアする。
Returns: 登録したハンドラのIDを返す。登録されなかった場合はFinishedHandler.HandlerProcId.initが返る
FailedHandler.HandlerProcId addListenerFailed(CallbackFailedType dg);
例外が発生したら呼ばれる
Parameters:
CallbackFailedType dg 設定するコールバックを指定する。nullを指定したらすべてのコールバックをクリアする。
Returns: 登録したハンドラのIDを返す。登録されなかった場合はFailedHandler.HandlerProcId.initが返る
FatalHandler.HandlerProcId addListenerFatal(CallbackFatalType dg);
致命的エラーが発生したら呼ばれる
Parameters:
CallbackFatalType dg 設定するコールバックを指定する。nullを指定したらすべてのコールバックをクリアする。
Returns: 登録したハンドラのIDを返す。登録されなかった場合はFatalHandler.HandlerProcId.initが返る
void removeListenerFinished(FinishedHandler.HandlerProcId id);
void removeListenerFailed(FailedHandler.HandlerProcId id);
void removeListenerFatal(FatalHandler.HandlerProcId id);
登録していたハンドラを削除する
const bool done();
終了しているか(例外発生含む)
const void join(bool rethrow = false);
終了するまで待機する
inout ref auto yieldForce();
inout ref auto workForce();
inout ref auto spinForce();
inout @property ref inout(ResultType) result();
結果を受け取る
auto async();
auto async(F, Args...)(TaskPool pool, F dg, Args args)
if(isCallable!F);
auto async(F, Args...)(F dg, Args args)
if(isCallable!F);
auto async(alias func, Args...)(TaskPool pool, Args args)
if(is(typeof(func(args))));
auto async(alias func, Args...)(Args args)
if(!is(Args[0] == TaskPool) && is(typeof(func(args))));
非同期処理の開始
class ManagedShared(T): Object.Monitor;
管理された共有資源
@trusted this()();
shared @trusted this()();
コンストラクタ
sharedのコンストラクタを呼んだ場合の初期状態は共有資源(unlockされた状態) 非sharedのコンストラクタを呼んだ場合の初期状態は非共有資源(lockされた状態)
inout pure nothrow @nogc @property inout(Mutex) mutex();
shared inout pure nothrow @nogc @property shared(inout(Mutex)) mutex();
@property @safe auto locked();
shared inout @property @trusted auto locked();
ロックされたデータを得る
この戻り値が破棄されるときにRAIIで自動的にロックが解除される。 また、戻り値はロックされた共有資源へ、非共有資源としてアクセス可能な参照として使用できる。
@safe bool tryLock();
shared @trusted bool tryLock();
ロックを試行する。
Returns: すでにロックしているならtrue ロックされていなければロックしてtrue 別のスレッドにロックされていてロックできなければfalse
@safe void lock();
shared @trusted void lock();
ロックする。
@safe void unlock();
shared @trusted void unlock();
ロック解除する。
inout @property ref T asUnshared();
shared inout @property ref T asUnshared();
非共有資源としてアクセスする
inout @property ref shared(T) asShared();
shared inout @property ref shared(T) asShared();
共有資源としてアクセスする
ManagedShared!T managedShared(T)(T dat);
ManagedShared!T managedShared(T, Args...)(Args args);
class TaskData;
マルチタスクキューによって管理されるタスクデータ
enum State: int;
状態
waiting
初期状態で、タスクプールに追加される前
ready
タスクプールに追加された状態
running
実行中の状態
finished
実行が終了した状態
failed
実行の結果、異常終了した状態
dropped
実行されずにドロップされた状態
protected immutable string type;
タスクの種類
protected immutable void delegate() shared onCall;
タスクの本体
protected shared SysTime timCreate;
Queueに追加された時刻
protected shared SysTime timReady;
Poolに追加された時刻
protected shared SysTime timStart;
実行開始した時刻
protected shared SysTime timEnd;
実行終了した時刻
protected immutable UUID uuid;
一意なID
protected State state;
状態
protected shared void onReady();
Poolに追加されたタイミングでコールバック
protected shared void onStart();
実行開始したタイミングでコールバック
protected shared void onEnd();
実行終了したタイミングでコールバック
protected shared void onFailed(Throwable);
実行失敗したタイミングでコールバック
protected shared void onDropped();
実行されずにドロップしたタイミングでコールバック
this(string ty, void delegate() shared callback, UUID id = randomUUID());
class MultiTaskQueue;
マルチタスクキュー
タスクの待ち行列を作成する。 同じ種類のタスクは待ち行列によって順次実行し、違う種類のタスクはタスクプールで並列実行する。 コンストラクタでタスクプールの設定を行い、invoke関数によってタスクの種類と実行内容を指定する。 invokeにより指定されるタスクは、TaskDataクラスを継承することで細かく内容を調整することができる。 invokeによって待ち行列に追加された未だ実行されていないタスクを、dropによって実行取り消しすることができる。 informations関数により、各タスクの実行状況を調べることができる。
以下のようなことが可能
this(TaskPool pool, void delegate() callbackFinishPool = null);
shared this(TaskPool pool, void delegate() callbackFinishPool = null);
this(size_t worker = 8, bool daemon = false);
shared this(size_t worker = 8, bool daemon = false);
コンストラクタ
Parameters:
TaskPool pool 使用するタスクプールを指定できる
void delegate() callbackFinishPool タスクキューを破棄した際に全てのタスクが終了した際に呼ばれる。タスクプールを終了するために使用できる。
size_t worker ワーカースレッド数を指定して作成できる
bool daemon スレッドのデーモン化をする場合はtrue。disposeしない場合がある場合にtrueを指定する。
void dispose();
インスタンスを破棄する。
bool invoke(TaskData tsk);
shared bool invoke(TaskData tsk);
bool invoke(string type, void delegate() shared dg, UUID id = randomUUID());
shared bool invoke(string type, void delegate() shared dg, UUID id = randomUUID());
タスクを実行予約する
タスクを待ち行列に追加する。待ち行列に追加されると、順次実行される。 待ち行列はtype毎にあり、どの待ち行列に追加されるかはタスクのtypeにより決まる。 同じtypeでは、追加された順に順次実行される。 異なるtypeの場合は並行して実行される。並行数はコンストラクタで指定したタスクプールに依存する。
Parameters:
TaskData tsk タスクデータを指定する
string type タスク種別を指定する
void delegate() shared dg タスクの処理内容のデリゲートを指定する
UUID id タスクの識別用IDを指定する
Returns: タスクを追加することができたらtrueを、追加できなかったらfalseを返す。
bool drop(string type, UUID id);
タスク実行を取りやめる
タスクがまだ実行されていない場合は、実行を取りやめる。
Parameters:
string type タスク種別を指定する
UUID id タスクの識別用IDを指定する
Returns: 正常にドロップされた場合はtrueが返り、さもなくばfalseが返る。
shared(TaskData) peek(string type, UUID id);
タスクを取り出す
struct TaskInfo;
タスクの情報
string type;
タスクの種類
SysTime timCreate;
Queueに追加された時刻
SysTime timReady;
Poolに追加された時刻
SysTime timStart;
実行開始した時刻
SysTime timEnd;
実行終了した時刻
UUID uuid;
一意なID
alias State = .TaskData.State;
State state;
タスクの状態
const @property @safe TaskInfo[] informations();
情報取得
タスクの実行状況を調べる。
Returns: タスクの情報をTaskInfoの配列で返す。
class MessageQueue(T) if (!hasUnsharedAliasing!T);
nothrow @system this(return scope Condition cond);
nothrow @system this();
shared nothrow @trusted this(Condition cond);
shared nothrow @safe this(shared(Condition) cond);
shared nothrow @safe this();
コンストラクタ
nothrow void put(T dat);
shared nothrow @trusted void put(T dat);
nothrow void put(Range)(Range src)
if(isInputRange!Range && is(ForeachType!Range : T));
shared nothrow @trusted void put(Range)(Range src)
if(isInputRange!Range && is(ForeachType!Range : T));
供給
nothrow T consume();
shared nothrow @trusted T consume();
nothrow void consumeAll(OutputRange)(ref OutputRange dst)
if(isOutputRange!(OutputRange, T));
shared nothrow @trusted void consumeAll(OutputRange)(ref OutputRange dst)
if(isOutputRange!(OutputRange, T));
nothrow bool tryConsume(ref T dst, Duration timeout = 0.msecs);
shared nothrow @trusted bool tryConsume(ref T dst, Duration timeout = 0.msecs);
nothrow bool tryConsumeAll(OutputRange)(ref OutputRange dst, Duration timeout = 0.msecs)
if(isOutputRange!(OutputRange, T));
shared nothrow @trusted bool tryConsumeAll(OutputRange)(ref OutputRange dst, Duration timeout = 0.msecs)
if(isOutputRange!(OutputRange, T));
消費
nothrow bool waitForData();
shared nothrow @trusted bool waitForData();
nothrow bool waitForData(Duration timeout);
shared nothrow @trusted bool waitForData(Duration timeout);
データ供給待ち
const nothrow @property @trusted size_t length();
shared const nothrow @property @trusted size_t length();
現在の待ち行列の長さ
nothrow void close(bool waitForConsume = false);
shared nothrow @trusted void close(bool waitForConsume = false);
とじる
class MessageBox(T, Key = string) if (!hasUnsharedAliasing!T);
shared this();
@system this();
コンストラクタ
shared nothrow void put(Key key, T dat);
shared nothrow void put()(T dat)
if(_hasDefaultKey);
shared nothrow void put(Range)(Key key, Range dat)
if(isInputRange!Range && is(ForeachType!Range : T));
shared nothrow void put(Range)(Range src)
if(_hasDefaultKey && isInputRange!Range && is(ForeachType!Range : T));
供給
shared nothrow T consume(Key key);
shared nothrow T consume()()
if(_hasDefaultKey);
shared nothrow void consumeAll(OutputRange)(Key key, ref OutputRange dst)
if(isOutputRange!(OutputRange, T));
shared nothrow void consumeAll(OutputRange)(ref OutputRange dst)
if(_hasDefaultKey && isOutputRange!(OutputRange, T));
shared nothrow bool tryConsume(Key key, ref T dst, Duration timeout = 0.msecs);
shared nothrow bool tryConsume()(ref T dst, Duration timeout = 0.msecs)
if(_hasDefaultKey);
shared nothrow bool tryConsumeAll(OutputRange)(Key key, ref OutputRange dst, Duration timeout = 0.msecs)
if(isOutputRange!(OutputRange, T));
shared nothrow bool tryConsumeAll(OutputRange)(ref OutputRange dst, Duration timeout = 0.msecs)
if(_hasDefaultKey && isOutputRange!(OutputRange, T));
消費
shared nothrow bool waitForData(Key key);
shared nothrow bool waitForData()()
if(_hasDefaultKey);
shared nothrow bool waitForData(Key key, Duration timeout);
shared nothrow bool waitForData()(Duration timeout)
if(_hasDefaultKey);
データ供給待ち
shared nothrow void close(Key key, bool waitForConsume = false);
shared nothrow void close()(bool waitForConsume = false)
if(_hasDefaultKey);
shared nothrow void closeAll(bool waitForConsume = false);
とじる
shared nothrow Queue opIndex(Key key);
shared nothrow Queue opBinaryRight(string op : "in")(Key key);
Queueにアクセス