futuretask源码分析(推荐)

2025-05-29 0 40

FutureTask只实现RunnableFuture接口:

该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性。

1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) )。

2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。

如: 1. 取消任务执行

2. 查询任务是否执行完成

3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask支持执行两种任务, Callable 或者 Runnable的实现类。且可把FutureTask实例交由Executor执行。

源码部分(很简单):

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405
public class FutureTask<V> implements RunnableFuture<V> {

/*

* Revision notes: This differs from previous versions of this

* class that relied on AbstractQueuedSynchronizer, mainly to

* avoid surprising users about retaining interrupt status during

* cancellation races. Sync control in the current design relies

* on a "state" field updated via CAS to track completion, along

* with a simple Treiber stack to hold waiting threads.

*

* Style note: As usual, we bypass overhead of using

* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.

*/

/**

* The run state of this task, initially NEW. The run state

* transitions to a terminal state only in methods set,

* setException, and cancel. During completion, state may take on

* transient values of COMPLETING (while outcome is being set) or

* INTERRUPTING (only while interrupting the runner to satisfy a

* cancel(true)). Transitions from these intermediate to final

* states use cheaper ordered/lazy writes because values are unique

* and cannot be further modified.

*

* Possible state transitions:

* NEW -> COMPLETING -> NORMAL

* NEW -> COMPLETING -> EXCEPTIONAL

* NEW -> CANCELLED

* NEW -> INTERRUPTING -> INTERRUPTED

*/

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */

private Callable<V> callable;

/** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */

private Object outcome; // non-volatile, protected by state reads/writes

/** 当前运行Run方法的线程 */

private volatile Thread runner;

/** Treiber stack of waiting threads */

private volatile WaitNode waiters;

/**

* Returns result or throws exception for completed task.

*

* @param s completed state value

*/

@SuppressWarnings("unchecked")

private V report(int s) throws ExecutionException {

Object x = outcome;

if (s == NORMAL)

return (V)x;

if (s >= CANCELLED)

throw new CancellationException();

throw new ExecutionException((Throwable)x);

}

/**

* Creates a {@code FutureTask} that will, upon running, execute the

* given {@code Callable}.

*

* @param callable the callable task

* @throws NullPointerException if the callable is null

*/

public FutureTask(Callable<V> callable) {

if (callable == null)

throw new NullPointerException();

this.callable = callable;

this.state = NEW; // ensure visibility of callable

}

/**

* Creates a {@code FutureTask} that will, upon running, execute the

* given {@code Runnable}, and arrange that {@code get} will return the

* given result on successful completion.

*

* @param runnable the runnable task

* @param result the result to return on successful completion. If

* you don't need a particular result, consider using

* constructions of the form:

* {@code Future<?> f = new FutureTask<Void>(runnable, null)}

* @throws NullPointerException if the runnable is null

*/

public FutureTask(Runnable runnable, V result) {

this.callable = Executors.callable(runnable, result);

this.state = NEW; // ensure visibility of callable

}

//判断任务是否已取消(异常中断、取消等)

public boolean isCancelled() {

return state >= CANCELLED;

}

/**

判断任务是否已结束(取消、异常、完成、NORMAL都等于结束)

**

public boolean isDone() {

return state != NEW;

}

/**

mayInterruptIfRunning用来决定任务的状态。

true : 任务状态= INTERRUPTING = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行

false:CANCELLED = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行

**/

public boolean cancel(boolean mayInterruptIfRunning) {

if (state != NEW)

return false;

if (mayInterruptIfRunning) {

if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))

return false;

Thread t = runner;

if (t != null)

t.interrupt();

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state

}

else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))

return false;

finishCompletion();

return true;

}

/**

* @throws CancellationException {@inheritDoc}

*/

public V get() throws InterruptedException, ExecutionException {

int s = state;

//如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程

if (s <= COMPLETING)

s = awaitDone(false, 0L);

return report(s);

}

/**

* @throws CancellationException {@inheritDoc}

*/

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

/**

* Protected method invoked when this task transitions to state

* {@code isDone} (whether normally or via cancellation). The

* default implementation does nothing. Subclasses may override

* this method to invoke completion callbacks or perform

* bookkeeping. Note that you can query status inside the

* implementation of this method to determine whether this task

* has been cancelled.

*/

protected void done() { }

/**

该方法在FutureTask里只有run方法在任务完成后调用。

主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。

由该方法可以得知:

COMPLETING : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get

NORMAL : 任务彻底执行完成

**/

protected void set(V v) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v;

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();

}

}

/**

* Causes this future to report an {@link ExecutionException}

* with the given throwable as its cause, unless this future has

* already been set or has been cancelled.

*

* <p>This method is invoked internally by the {@link #run} method

* upon failure of the computation.

*

* @param t the cause of failure

*/

protected void setException(Throwable t) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t;

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();

}

}

/**

由于实现了Runnable接口的缘故,该方法可由执行线程所调用。

**/

public void run() {

//只有当任务状态=new时才被运行继续执行

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable<V> c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

//调用Callable的Call方法

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

setException(ex);

}

if (ran)

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

/**

如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。

所以该方法可以重复执行N次。不过不能直接调用,因为是protected权限。

**/

protected boolean runAndReset() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return false;

boolean ran = false;

int s = state;

try {

Callable<V> c = callable;

if (c != null && s == NEW) {

try {

c.call(); // don't set result

ran = true;

} catch (Throwable ex) {

setException(ex);

}

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

return ran && s == NEW;

}

/**

* Ensures that any interrupt from a possible cancel(true) is only

* delivered to a task while in run or runAndReset.

*/

private void handlePossibleCancellationInterrupt(int s) {

// It is possible for our interrupter to stall before getting a

// chance to interrupt us. Let's spin-wait patiently.

if (s == INTERRUPTING)

while (state == INTERRUPTING)

Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from

// cancel(true). However, it is permissible to use interrupts

// as an independent mechanism for a task to communicate with

// its caller, and there is no way to clear only the

// cancellation interrupt.

//

// Thread.interrupted();

}

/**

* Simple linked list nodes to record waiting threads in a Treiber

* stack. See other classes such as Phaser and SynchronousQueue

* for more detailed explanation.

*/

static final class WaitNode {

volatile Thread thread;

volatile WaitNode next;

WaitNode() { thread = Thread.currentThread(); }

}

/**

该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.

**/

private void finishCompletion() {

// assert state > COMPLETING;

for (WaitNode q; (q = waiters) != null;) {

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

for (;;) {

Thread t = q.thread;

if (t != null) {

q.thread = null;

LockSupport.unpark(t);

}

WaitNode next = q.next;

if (next == null)

break;

q.next = null; // unlink to help gc

q = next;

}

break;

}

}

done();

callable = null; // to reduce footprint

}

/**

阻塞等待任务执行完成(中断、正常完成、超时)

**/

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

/**

这里的if else的顺序也是有讲究的。

1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中)

2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。

3.如果任务状态=COMPLETING,证明该任务处于已执行完成,正在切换任务执行状态,CPU让出片刻即可

4.q==null,则证明还未创建节点,则创建节点

5.q节点入队

6和7.阻塞

**/

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

else if (q == null)

q = new WaitNode();

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

else

LockSupport.park(this);

}

}

/**

* Tries to unlink a timed-out or interrupted wait node to avoid

* accumulating garbage. Internal nodes are simply unspliced

* without CAS since it is harmless if they are traversed anyway

* by releasers. To avoid effects of unsplicing from already

* removed nodes, the list is retraversed in case of an apparent

* race. This is slow when there are a lot of nodes, but we don't

* expect lists to be long enough to outweigh higher-overhead

* schemes.

*/

private void removeWaiter(WaitNode node) {

if (node != null) {

node.thread = null;

retry:

for (;;) { // restart on removeWaiter race

for (WaitNode pred = null, q = waiters, s; q != null; q = s) {

s = q.next;

if (q.thread != null)

pred = q;

else if (pred != null) {

pred.next = s;

if (pred.thread == null) // check for race

continue retry;

}

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,

q, s))

continue retry;

}

break;

}

}

}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;

private static final long stateOffset;

private static final long runnerOffset;

private static final long waitersOffset;

static {

try {

UNSAFE = sun.misc.Unsafe.getUnsafe();

Class<?> k = FutureTask.class;

stateOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("state"));

runnerOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("runner"));

waitersOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("waiters"));

} catch (Exception e) {

throw new Error(e);

}

}

}

总结

以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。有什么问题可以随时留言,欢迎大家一起交流讨论。

原文链接:http://blog.csdn.net/wojiaolinaaa/article/details/50434817

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 futuretask源码分析(推荐) https://www.kuaiidc.com/114549.html

相关文章

发表评论
暂无评论